repeats via cronline expression

This commit is contained in:
niphlod
2016-06-15 20:26:09 +02:00
parent a18e0e489f
commit 6bb255286a
2 changed files with 603 additions and 41 deletions

View File

@@ -9,9 +9,9 @@ Background processes made simple
---------------------------------
"""
from __future__ import print_function
from gluon._compat import Queue, long, iteritems
import os
import re
import time
import multiprocessing
import sys
@@ -24,13 +24,13 @@ import logging
import optparse
import tempfile
import types
from functools import reduce
from json import loads, dumps
from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB
from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB, IS_EMPTY_OR
from gluon import IS_INT_IN_RANGE, IS_DATETIME, IS_IN_DB
from gluon.utils import web2py_uuid
from gluon._compat import Queue, long, iteritems
from gluon.storage import Storage
from functools import reduce
USAGE = """
## Example
@@ -71,23 +71,6 @@ http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0
## view workers
http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0
## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put
## the following into /etc/init/web2py-scheduler.conf:
## (This assumes your web2py instance is installed in <user>'s home directory,
## running as <user>, with app <myapp>, on network interface eth0.)
description "web2py task scheduler"
start on (local-filesystems and net-device-up IFACE=eth0)
stop on shutdown
respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds.
exec sudo -u <user> python /home/<user>/web2py/web2py.py -K <myapp>
respawn
## You can then start/stop/restart/check status of the daemon with:
sudo start web2py-scheduler
sudo stop web2py-scheduler
sudo restart web2py-scheduler
sudo status web2py-scheduler
"""
path = os.getcwd()
@@ -160,7 +143,7 @@ class TaskReport(object):
class JobGraph(object):
"""Experimental: dependencies amongs tasks"""
"""Experimental: dependencies amongs tasks."""
def __init__(self, db, job_name):
self.job_name = job_name or 'job_0'
@@ -216,9 +199,216 @@ class JobGraph(object):
db.rollback()
return None
class CronParser(object):
def __init__(self, cronline, base=None):
self.cronline = cronline
self.sched = base or datetime.datetime.now()
self.task = None
@staticmethod
def _rangetolist(s, period='min'):
retval = []
if s.startswith('*'):
if period == 'min':
s = s.replace('*', '0-59', 1)
elif period == 'hr':
s = s.replace('*', '0-23', 1)
elif period == 'dom':
s = s.replace('*', '1-31', 1)
elif period == 'mon':
s = s.replace('*', '1-12', 1)
elif period == 'dow':
s = s.replace('*', '0-6', 1)
m = re.compile(r'(\d+)-(\d+)/(\d+)')
match = m.match(s)
if match:
min_, max_ = int(match.group(1)), int(match.group(2)) + 1
step_ = int(match.group(3))
else:
m = re.compile(r'(\d+)/(\d+)')
ranges_max = {'min': 59, 'hr': 23, 'mon': 12, 'dom': 31, 'dow': 7}
match = m.match(s)
if match:
min_, max_ = int(match.group(1)), ranges_max[period] + 1
step_ = int(match.group(2))
if match:
for i in range(min_, max_, step_):
retval.append(i)
return retval
@staticmethod
def _sanitycheck(values, period):
if period == 'min':
check = all(0 <= i <= 59 for i in values)
elif period == 'hr':
check = all(0 <= i <= 23 for i in values)
elif period == 'dom':
check = all(1 <= i <= 31 or i == 'l' for i in values)
elif period == 'mon':
check = all(1 <= i <= 12 for i in values)
elif period == 'dow':
check = all(0 <= i <= 7 for i in values)
return check
def _parse(self):
line = self.cronline.lower()
task = {}
if line.startswith('@yearly'):
line = line.replace('@yearly', '0 0 1 1 *')
elif line.startswith('@annually'):
line = line.replace('@annually', '0 0 1 1 *')
elif line.startswith('@monthly'):
line = line.replace('@monthly', '0 0 1 * *')
elif line.startswith('@weekly'):
line = line.replace('@weekly', '0 0 * * 0')
elif line.startswith('@daily'):
line = line.replace('@daily', '0 0 * * *')
elif line.startswith('@midnight'):
line = line.replace('@midnight', '0 0 * * *')
elif line.startswith('@hourly'):
line = line.replace('@hourly', '0 * * * *')
params = line.strip().split()
if len(params) < 5:
raise ValueError('Invalid cron line (too short)')
elif len(params) > 5:
raise ValueError('Invalid cron line (too long)')
daysofweek = {'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4,
'fri': 5, 'sat': 6}
monthsofyear = {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5,
'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10,
'nov': 11, 'dec': 12, 'l': 'l'}
for (s, i) in zip(params[:5], ['min', 'hr', 'dom', 'mon', 'dow']):
if s not in [None, '*']:
task[i] = []
vals = s.split(',')
for val in vals:
if i == 'dow':
refdict = daysofweek
elif i == 'mon':
refdict = monthsofyear
if i in ('dow', 'mon') and '-' in val and '/' not in val:
isnum = val.split('-')[0].isdigit()
if isnum:
val = '%s/1' % val
else:
val = '-'.join([str(refdict[v])
for v in val.split('-')])
if val != '-1' and '-' in val and '/' not in val:
val = '%s/1' % val
if '/' in val:
task[i] += self._rangetolist(val, i)
elif val.isdigit() or val == '-1':
task[i].append(int(val))
elif i in ('dow', 'mon'):
if val in refdict:
task[i].append(refdict[val])
elif i == 'dom' and val == 'l':
task[i].append(val)
if not task[i]:
raise ValueError('Invalid cron value (%s)' % s)
if not self._sanitycheck(task[i], i):
raise ValueError('Invalid cron value (%s)' % s)
task[i] = sorted(task[i])
self.task = task
@staticmethod
def _get_next_dow(sched, task):
task_dow = [a % 7 for a in task['dow']]
while sched.isoweekday() % 7 not in task_dow:
sched += datetime.timedelta(days=1)
return sched
@staticmethod
def _get_next_dom(sched, task):
if task['dom'] == ['l']:
last_feb = 29 if sched.year % 4 == 0 else 28
lastdayofmonth = [
31, last_feb, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31
]
task_dom = [lastdayofmonth[sched.month - 1]]
else:
task_dom = task['dom']
while sched.day not in task_dom:
sched += datetime.timedelta(days=1)
return sched
@staticmethod
def _get_next_mon(sched, task):
while sched.month not in task['mon']:
if sched.month < 12:
sched = sched.replace(month=sched.month + 1)
else:
sched = sched.replace(month=1, year=sched.year + 1)
return sched
@staticmethod
def _getnext_hhmm(sched, task, add_to=True):
if add_to:
sched += datetime.timedelta(minutes=1)
if 'min' in task:
while sched.minute not in task['min']:
sched += datetime.timedelta(minutes=1)
if 'hr' in task and sched.hour not in task['hr']:
while sched.hour not in task['hr']:
sched += datetime.timedelta(hours=1)
return sched
def _getnext_date(self, sched, task):
if 'dow' in task and 'dom' in task:
dow = self._get_next_dow(sched, task)
dom = self._get_next_dom(sched, task)
sched = min(dow, dom)
elif 'dow' in task:
sched = self._get_next_dow(sched, task)
elif 'dom' in task:
sched = self._get_next_dom(sched, task)
if 'mon' in task:
sched = self._get_next_mon(sched, task)
return sched.replace(hour=0, minute=0)
def get_next(self):
"""Get next date according to specs."""
if not self.task:
self._parse()
task = self.task
sched = self.sched
x = 0
while x < 1000: # avoid potential max recursions
x += 1
try:
next_date = self._getnext_date(sched, task)
except (ValueError, OverflowError) as e:
raise ValueError('Invalid cron expression (%s)' % e)
if next_date.date() > self.sched.date():
# we rolled date, check for valid hhmm
sched = self._getnext_hhmm(next_date, task, False)
break
else:
# same date, get next hhmm
sched_time = self._getnext_hhmm(sched, task, True)
if sched_time.date() > sched.date():
# we rolled date again :(
sched = sched_time
else:
sched = sched_time
break
else:
raise ValueError('Potential bug found, please submit your '
'cron expression to the authors')
self.sched = sched
return sched
def __iter__(self):
"""Support iteration."""
return self
__next__ = next = get_next
# the two functions below deal with simplejson decoding as unicode, esp for the dict decode
# and subsequent usage as function Keyword arguments unicode variable names won't work!
# borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python
# borrowed from http://stackoverflow.com/questions/956867/
def _decode_list(lst):
@@ -321,7 +511,7 @@ def executor(queue, task, out):
class MetaScheduler(threading.Thread):
"""Base class documenting scheduler's base methods"""
"""Base class documenting scheduler's base methods."""
def __init__(self):
threading.Thread.__init__(self)
@@ -377,7 +567,7 @@ class MetaScheduler(threading.Thread):
while not out.empty():
tout += out.get()
if tout:
logger.debug(' partial output: "%s"' % str(tout))
logger.debug(' partial output: "%s"', str(tout))
if CLEAROUT in tout:
task_output = tout[
tout.rfind(CLEAROUT) + len(CLEAROUT):]
@@ -428,7 +618,7 @@ class MetaScheduler(threading.Thread):
self.have_heartbeat = False
def terminate_process(self):
"""Terminates any running tasks (internal use only)"""
"""Terminate any running tasks (internal use only)"""
try:
self.process.terminate()
except:
@@ -468,6 +658,22 @@ TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED)
RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK)
class IS_CRONLINE(object):
"""
Validates cronline
"""
def __init__(self, error_message=None):
self.error_message = error_message
def __call__(self, value):
recur = CronParser(value, datetime.datetime.now())
try:
recur.get_next()
return (value, None)
except (KeyError, ValueError) as e:
if not self.error_message:
return (value, e)
return (value, self.error_message)
class TYPE(object):
"""
@@ -612,7 +818,10 @@ class Scheduler(MetaScheduler):
Field('period', 'integer', default=60, comment='seconds',
requires=IS_INT_IN_RANGE(0, None)),
Field('prevent_drift', 'boolean', default=False,
comment='Cron-like start_times between runs'),
comment='Exact start_times between runs'),
Field('cronline', default=None,
comment='Discard "period", use this cron expr instead',
requires=IS_EMPTY_OR(IS_CRONLINE())),
Field('timeout', 'integer', default=60, comment='seconds',
requires=IS_INT_IN_RANGE(1, None)),
Field('sync_output', 'integer', default=0,
@@ -665,14 +874,6 @@ class Scheduler(MetaScheduler):
if migrate is not False:
db.commit()
@staticmethod
def total_seconds(td):
"""Backport for py2.6."""
if hasattr(td, 'total_seconds'):
return td.total_seconds()
else:
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10.0 ** 6
def loop(self, worker_name=None):
"""Main loop.
@@ -694,7 +895,7 @@ class Scheduler(MetaScheduler):
signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
try:
self.start_heartbeats()
while True and self.have_heartbeat:
while self.have_heartbeat:
if self.w_stats.status == DISABLED:
logger.debug('Someone stopped me, sleeping until better'
' times come (%s)', self.w_stats.sleep)
@@ -798,7 +999,10 @@ class Scheduler(MetaScheduler):
logger.info('nothing to do')
return None
times_run = task.times_run + 1
if not task.prevent_drift:
if task.cronline:
cron_recur = CronParser(task.cronline, now.replace(second=0))
next_run_time = cron_recur.get_next()
elif not task.prevent_drift:
next_run_time = task.last_run_time + datetime.timedelta(
seconds=task.period
)
@@ -806,7 +1010,7 @@ class Scheduler(MetaScheduler):
# calc next_run_time based on available slots
# see #1191
next_run_time = task.start_time
secondspassed = self.total_seconds(now - next_run_time)
secondspassed = (now - next_run_time).total_seconds()
steps = secondspassed // task.period + 1
next_run_time += datetime.timedelta(seconds=task.period * steps)
@@ -1275,13 +1479,22 @@ class Scheduler(MetaScheduler):
tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid()
tname = 'task_name' in kwargs and kwargs.pop('task_name') or function
immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None
rtn = self.db.scheduler_task.validate_and_insert(
cronline = kwargs.get('cronline')
kwargs.update(
function_name=function,
task_name=tname,
args=targs,
vars=tvars,
uuid=tuuid,
**kwargs)
)
if cronline:
try:
start_time = kwargs.get('start_time', self.now)
next_run_time = CronParser(cronline, start_time).get_next()
kwargs.update(start_time=start_time, next_run_time=next_run_time)
except:
pass
rtn = self.db.scheduler_task.validate_and_insert(**kwargs)
if not rtn.errors:
rtn.uuid = tuuid
if immediate:

View File

@@ -15,7 +15,7 @@ fix_sys_path(__file__)
from gluon.storage import Storage
from gluon.languages import translator
from gluon.scheduler import JobGraph, Scheduler
from gluon.scheduler import JobGraph, Scheduler, CronParser
from gluon.dal import DAL
@@ -51,6 +51,355 @@ class BaseTestScheduler(unittest.TestCase):
pass
class CronParserTest(unittest.TestCase):
def testMinute(self):
# minute asterisk
base = datetime.datetime(2010, 1, 23, 12, 18)
itr = CronParser('*/1 * * * *', base)
n1 = itr.get_next() # 19
self.assertEqual(base.year, n1.year)
self.assertEqual(base.month, n1.month)
self.assertEqual(base.day, n1.day)
self.assertEqual(base.hour, n1.hour)
self.assertEqual(base.minute, n1.minute - 1)
for i in range(39): # ~ 58
itr.get_next()
n2 = itr.get_next()
self.assertEqual(n2.minute, 59)
n3 = itr.get_next()
self.assertEqual(n3.minute, 0)
self.assertEqual(n3.hour, 13)
itr = CronParser('*/5 * * * *', base)
n4 = itr.get_next()
self.assertEqual(n4.minute, 20)
for i in range(6):
itr.get_next()
n5 = itr.get_next()
self.assertEqual(n5.minute, 55)
n6 = itr.get_next()
self.assertEqual(n6.minute, 0)
self.assertEqual(n6.hour, 13)
base = datetime.datetime(2010, 1, 23, 12, 18)
itr = CronParser('4/34 * * * *', base)
n7 = itr.get_next()
self.assertEqual(n7.minute, 38)
self.assertEqual(n7.hour, 12)
n8 = itr.get_next()
self.assertEqual(n8.minute, 4)
self.assertEqual(n8.hour, 13)
def testHour(self):
base = datetime.datetime(2010, 1, 24, 12, 2)
itr = CronParser('0 */3 * * *', base)
n1 = itr.get_next()
self.assertEqual(n1.hour, 15)
self.assertEqual(n1.minute, 0)
for i in range(2):
itr.get_next()
n2 = itr.get_next()
self.assertEqual(n2.hour, 0)
self.assertEqual(n2.day, 25)
def testDay(self):
base = datetime.datetime(2010, 2, 24, 12, 9)
itr = CronParser('0 0 */3 * *', base)
n1 = itr.get_next()
# 1 4 7 10 13 16 19 22 25 28
self.assertEqual(n1.day, 25)
n2 = itr.get_next()
self.assertEqual(n2.day, 28)
n3 = itr.get_next()
self.assertEqual(n3.day, 1)
self.assertEqual(n3.month, 3)
# test leap year
base = datetime.datetime(1996, 2, 27)
itr = CronParser('0 0 * * *', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 28)
self.assertEqual(n1.month, 2)
n2 = itr.get_next()
self.assertEqual(n2.day, 29)
self.assertEqual(n2.month, 2)
base2 = datetime.datetime(2000, 2, 27)
itr2 = CronParser('0 0 * * *', base2)
n3 = itr2.get_next()
self.assertEqual(n3.day, 28)
self.assertEqual(n3.month, 2)
n4 = itr2.get_next()
self.assertEqual(n4.day, 29)
self.assertEqual(n4.month, 2)
def testWeekDay(self):
base = datetime.datetime(2010, 2, 25)
itr = CronParser('0 0 * * sat', base)
n1 = itr.get_next()
self.assertEqual(n1.isoweekday(), 6)
self.assertEqual(n1.day, 27)
n2 = itr.get_next()
self.assertEqual(n2.isoweekday(), 6)
self.assertEqual(n2.day, 6)
self.assertEqual(n2.month, 3)
base = datetime.datetime(2010, 1, 25)
itr = CronParser('0 0 1 * wed', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 1)
self.assertEqual(n1.day, 27)
self.assertEqual(n1.year, 2010)
n2 = itr.get_next()
self.assertEqual(n2.month, 2)
self.assertEqual(n2.day, 1)
self.assertEqual(n2.year, 2010)
n3 = itr.get_next()
self.assertEqual(n3.month, 2)
self.assertEqual(n3.day, 3)
self.assertEqual(n3.year, 2010)
def testMonth(self):
base = datetime.datetime(2010, 1, 25)
itr = CronParser('0 0 1 * *', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 2)
self.assertEqual(n1.day, 1)
n2 = itr.get_next()
self.assertEqual(n2.month, 3)
self.assertEqual(n2.day, 1)
for i in range(8):
itr.get_next()
n3 = itr.get_next()
self.assertEqual(n3.month, 12)
self.assertEqual(n3.year, 2010)
n4 = itr.get_next()
self.assertEqual(n4.month, 1)
self.assertEqual(n4.year, 2011)
base = datetime.datetime(2010, 1, 25)
itr = CronParser('0 0 1 */4 *', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 5)
self.assertEqual(n1.day, 1)
base = datetime.datetime(2010, 1, 25)
itr = CronParser('0 0 1 1-3 *', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 2)
self.assertEqual(n1.day, 1)
n2 = itr.get_next()
self.assertEqual(n2.month, 3)
self.assertEqual(n2.day, 1)
n3 = itr.get_next()
self.assertEqual(n3.month, 1)
self.assertEqual(n3.day, 1)
def testSundayToThursdayWithAlphaConversion(self):
base = datetime.datetime(2010, 8, 25, 15, 56)
itr = CronParser("30 22 * * sun-thu", base)
n1 = itr.get_next()
self.assertEqual(base.year, n1.year)
self.assertEqual(base.month, n1.month)
self.assertEqual(base.day, n1.day)
self.assertEqual(22, n1.hour)
self.assertEqual(30, n1.minute)
def testISOWeekday(self):
base = datetime.datetime(2010, 2, 25)
itr = CronParser('0 0 * * 7', base)
n1 = itr.get_next()
self.assertEqual(n1.isoweekday(), 7)
self.assertEqual(n1.day, 28)
n2 = itr.get_next()
self.assertEqual(n2.isoweekday(), 7)
self.assertEqual(n2.day, 7)
self.assertEqual(n2.month, 3)
base = datetime.datetime(2010, 2, 22)
itr = CronParser('0 0 * * */2', base)
n1 = itr.get_next()
self.assertEqual(n1.isoweekday(), 2)
self.assertEqual(n1.day, 23)
n2 = itr.get_next()
self.assertEqual(n2.isoweekday(), 4)
self.assertEqual(n2.day, 25)
def testBug2(self):
base = datetime.datetime(2012, 1, 1, 0, 0)
itr = CronParser('0 * * 3 *', base)
n1 = itr.get_next()
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.month, 3)
self.assertEqual(n1.day, base.day)
self.assertEqual(n1.hour, base.hour)
self.assertEqual(n1.minute, base.minute)
n2 = itr.get_next()
self.assertEqual(n2.year, base.year)
self.assertEqual(n2.month, 3)
self.assertEqual(n2.day, base.day)
self.assertEqual(n2.hour, base.hour + 1)
self.assertEqual(n2.minute, base.minute)
n3 = itr.get_next()
self.assertEqual(n3.year, base.year)
self.assertEqual(n3.month, 3)
self.assertEqual(n3.day, base.day)
self.assertEqual(n3.hour, base.hour + 2)
self.assertEqual(n3.minute, base.minute)
def testBug3(self):
base = datetime.datetime(2013, 3, 1, 12, 17, 34, 257877)
c = CronParser('00 03 16,30 * *', base)
n1 = c.get_next()
self.assertEqual(n1.month, 3)
self.assertEqual(n1.day, 16)
n2 = c.get_next()
self.assertEqual(n2.month, 3)
self.assertEqual(n2.day, 30)
n3 = c.get_next()
self.assertEqual(n3.month, 4)
self.assertEqual(n3.day, 16)
def test_rangeGenerator(self):
base = datetime.datetime(2013, 3, 4, 0, 0)
itr = CronParser('1-9/2 0 1 * *', base)
n1 = itr.get_next()
n2 = itr.get_next()
n3 = itr.get_next()
n4 = itr.get_next()
n5 = itr.get_next()
self.assertEqual(n1.minute, 1)
self.assertEqual(n2.minute, 3)
self.assertEqual(n3.minute, 5)
self.assertEqual(n4.minute, 7)
self.assertEqual(n5.minute, 9)
def test_iterGenerator(self):
base = datetime.datetime(2013, 3, 4, 0, 0)
itr = CronParser('1-9/2 0 1 * *', base)
x = 0
for n in itr:
x += 1
if x > 4:
break
self.assertEqual(n.minute, 9)
def test_invalidcron(self):
base = datetime.datetime(2013, 3, 4, 0, 0)
itr = CronParser('5 4 31 2 *', base)
self.assertRaises(ValueError, itr.get_next)
itr = CronParser('* * 5-1 * *', base)
self.assertRaises(ValueError, itr.get_next)
itr = CronParser('* * * janu-jun *', base)
self.assertRaises(KeyError, itr.get_next)
itr = CronParser('* * * * * *', base)
self.assertRaises(ValueError, itr.get_next)
itr = CronParser('* * * *', base)
self.assertRaises(ValueError, itr.get_next)
def testLastDayOfMonth(self):
base = datetime.datetime(2015, 9, 4)
itr = CronParser('0 0 L * *', base)
n1 = itr.get_next()
self.assertEqual(n1.month, 9)
self.assertEqual(n1.day, 30)
n2 = itr.get_next()
self.assertEqual(n2.month, 10)
self.assertEqual(n2.day, 31)
n3 = itr.get_next()
self.assertEqual(n3.month, 11)
self.assertEqual(n3.day, 30)
n4 = itr.get_next()
self.assertEqual(n4.month, 12)
self.assertEqual(n4.day, 31)
base = datetime.datetime(1996, 2, 27)
itr = CronParser('0 0 L * *', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 29)
self.assertEqual(n1.month, 2)
n2 = itr.get_next()
self.assertEqual(n2.day, 31)
self.assertEqual(n2.month, 3)
def testSpecialExpr(self):
base = datetime.datetime(2000, 1, 1)
itr = CronParser('@yearly', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 1)
self.assertEqual(n1.month, 1)
self.assertEqual(n1.year, base.year + 1)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@annually', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 1)
self.assertEqual(n1.month, 1)
self.assertEqual(n1.year, base.year + 1)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@monthly', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 1)
self.assertEqual(n1.month, base.month + 1)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@weekly', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 2)
self.assertEqual(n1.month, base.month)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
n2 = itr.get_next()
self.assertEqual(n2.day, 9)
self.assertEqual(n2.month, base.month)
self.assertEqual(n2.year, base.year)
self.assertEqual(n2.hour, 0)
self.assertEqual(n2.minute, 0)
n3 = itr.get_next()
self.assertEqual(n3.day, 16)
self.assertEqual(n3.month, base.month)
self.assertEqual(n3.year, base.year)
self.assertEqual(n3.hour, 0)
self.assertEqual(n3.minute, 0)
itr = CronParser('@daily', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 2)
self.assertEqual(n1.month, base.month)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@midnight', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 2)
self.assertEqual(n1.month, base.month)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 0)
self.assertEqual(n1.minute, 0)
itr = CronParser('@hourly', base)
n1 = itr.get_next()
self.assertEqual(n1.day, 1)
self.assertEqual(n1.month, base.month)
self.assertEqual(n1.year, base.year)
self.assertEqual(n1.hour, 1)
self.assertEqual(n1.minute, 0)
class TestsForJobGraph(BaseTestScheduler):
def testJobGraph(self):