diff --git a/gluon/scheduler.py b/gluon/scheduler.py index da6cc7db..4ca449c1 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +# vim: set ts=4 sw=4 et ai: """ | This file is part of the web2py Web Framework | Copyrighted by Massimo Di Pierro @@ -9,22 +10,24 @@ Background processes made simple --------------------------------- """ -import os -import re -import time -import multiprocessing -import sys -import threading -import traceback -import signal +from __future__ import print_function + import socket -import datetime +import os import logging -import optparse -import tempfile import types from functools import reduce +import datetime +import re +import sys from json import loads, dumps +import tempfile +import traceback +import threading +import multiprocessing +import time +import signal + from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB, IS_EMPTY_OR from gluon import IS_INT_IN_RANGE, IS_DATETIME, IS_IN_DB from gluon.utils import web2py_uuid @@ -34,19 +37,19 @@ from gluon.storage import Storage USAGE = """ ## Example -For any existing app +For any existing application myapp -Create File: app/models/scheduler.py ====== +Create File: myapp/models/scheduler.py ====== from gluon.scheduler import Scheduler -def demo1(*args,**vars): +def demo1(*args, **vars): print('you passed args=%s and vars=%s' % (args, vars)) return 'done!' def demo2(): 1/0 -scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2)) +scheduler = Scheduler(db, dict(demo1=demo1, demo2=demo2)) ## run worker nodes with: cd web2py @@ -62,21 +65,16 @@ python scheduler.py -h http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task ## monitor scheduled jobs -http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id>0 +http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id ## view completed jobs -http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0 +http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id ## view workers -http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0 +http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id """ -path = os.getcwd() - -if 'WEB2PY_PATH' not in os.environ: - os.environ['WEB2PY_PATH'] = path - IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid()) logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER) @@ -99,6 +97,7 @@ SECONDS = 1 HEARTBEAT = 3 * SECONDS MAXHIBERNATION = 10 CLEAROUT = '!clear!' +RESULTINFILE = 'result_in_file:' CALLABLETYPES = (types.LambdaType, types.FunctionType, types.BuiltinFunctionType, @@ -166,7 +165,7 @@ class JobGraph(object): if job_name: q = sd.job_name == job_name else: - q = sd.id > 0 + q = sd.id edges = db(q).select() nested_dict = {} @@ -194,7 +193,7 @@ class JobGraph(object): assert not nested_dict, "A cyclic dependency exists amongst %r" % nested_dict db.commit() return rtn - except: + except Exception: db.rollback() return None @@ -208,7 +207,6 @@ class CronParser(object): @staticmethod def _rangetolist(s, period='min'): - retval = [] if s.startswith('*'): if period == 'min': s = s.replace('*', '0-59', 1) @@ -220,21 +218,21 @@ class CronParser(object): s = s.replace('*', '1-12', 1) elif period == 'dow': s = s.replace('*', '0-6', 1) - m = re.compile(r'(\d+)-(\d+)/(\d+)') - match = m.match(s) + match = re.match(r'(\d+)-(\d+)/(\d+)', s) if match: - min_, max_ = int(match.group(1)), int(match.group(2)) + 1 + max_ = int(match.group(2)) + 1 step_ = int(match.group(3)) else: - m = re.compile(r'(\d+)/(\d+)') - ranges_max = {'min': 59, 'hr': 23, 'mon': 12, 'dom': 31, 'dow': 7} - match = m.match(s) + match = re.match(r'(\d+)/(\d+)', s) if match: - min_, max_ = int(match.group(1)), ranges_max[period] + 1 + ranges_max = dict(min=59, hr=23, mon=12, dom=31, dow=7) + max_ = ranges_max[period] + 1 step_ = int(match.group(2)) if match: - for i in range(min_, max_, step_): - retval.append(i) + min_ = int(match.group(1)) + retval = list(range(min_, max_, step_)) + else: + retval = [] return retval @staticmethod @@ -278,9 +276,9 @@ class CronParser(object): 'fri': 5, 'sat': 6} monthsofyear = {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, - 'nov': 11, 'dec': 12, 'l': 'l'} - for (s, i) in zip(params[:5], ['min', 'hr', 'dom', 'mon', 'dow']): - if s not in [None, '*']: + 'nov': 11, 'dec': 12} + for (s, i) in zip(params, ('min', 'hr', 'dom', 'mon', 'dow')): + if s != '*': task[i] = [] vals = s.split(',') for val in vals: @@ -293,13 +291,13 @@ class CronParser(object): if isnum: val = '%s/1' % val else: - val = '-'.join([str(refdict[v]) + val = '-'.join([str(refdict.get(v, '')) for v in val.split('-')]) - if val != '-1' and '-' in val and '/' not in val: + if '-' in val and '/' not in val: val = '%s/1' % val if '/' in val: task[i] += self._rangetolist(val, i) - elif val.isdigit() or val == '-1': + elif val.isdigit(): task[i].append(int(val)) elif i in ('dow', 'mon'): if val in refdict: @@ -323,7 +321,12 @@ class CronParser(object): @staticmethod def _get_next_dom(sched, task): if task['dom'] == ['l']: - last_feb = 29 if sched.year % 4 == 0 else 28 + # instead of calendar.isleap + try: + last_feb = 29 + datetime.date(sched.year, 2, last_feb) + except ValueError: + last_feb = 28 lastdayofmonth = [ 31, last_feb, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 ] @@ -368,7 +371,7 @@ class CronParser(object): sched = self._get_next_mon(sched, task) return sched.replace(hour=0, minute=0) - def get_next(self): + def next(self): """Get next date according to specs.""" if not self.task: self._parse() @@ -404,13 +407,14 @@ class CronParser(object): """Support iteration.""" return self - __next__ = next = get_next + __next__ = next -# the two functions below deal with simplejson decoding as unicode, esp for the dict decode -# and subsequent usage as function Keyword arguments unicode variable names won't work! + +# the two functions below deal with simplejson decoding as unicode, +# esp for the dict decode and subsequent usage as function Keyword arguments +# unicode variable names won't work! # borrowed from http://stackoverflow.com/questions/956867/ - def _decode_list(lst): if not PY2: return lst @@ -423,7 +427,6 @@ def _decode_list(lst): newlist.append(i) return newlist - def _decode_dict(dct): if not PY2: return dct @@ -438,7 +441,7 @@ def _decode_dict(dct): return newdict -def executor(queue, task, out): +def executor(retq, task, outq): """The function used to execute tasks in the background process.""" logger.debug(' task started') @@ -448,50 +451,55 @@ def executor(queue, task, out): def __init__(self, out_queue): self.out_queue = out_queue self.stdout = sys.stdout + self.written = False sys.stdout = self - def __del__(self): + def close(self): sys.stdout = self.stdout + if self.written: + # see "Joining processes that use queues" section in + # https://docs.python.org/2/library/multiprocessing.html#programming-guidelines + # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines + self.out_queue.cancel_join_thread() def flush(self): pass def write(self, data): self.out_queue.put(data) + self.written = True W2P_TASK = Storage({ 'id': task.task_id, 'uuid': task.uuid, 'run_id': task.run_id }) - stdout = LogOutput(out) + stdout = LogOutput(outq) try: if task.app: - os.chdir(os.environ['WEB2PY_PATH']) from gluon.shell import env, parse_path_info from gluon import current - level = logging.getLogger().getEffectiveLevel() - logging.getLogger().setLevel(logging.WARN) - # Get controller-specific subdirectory if task.app is of - # form 'app/controller' + ## FIXME: why temporarily change the log level of the root logger? + #level = logging.getLogger().getEffectiveLevel() + #logging.getLogger().setLevel(logging.WARN) + # support for task.app like 'app/controller' (a, c, f) = parse_path_info(task.app) _env = env(a=a, c=c, import_models=True, extra_request={'is_scheduler': True}) - logging.getLogger().setLevel(level) + #logging.getLogger().setLevel(level) f = task.function functions = current._scheduler.tasks - if not functions: + if functions: + _function = functions.get(f) + else: # look into env _function = _env.get(f) - else: - _function = functions.get(f) if not isinstance(_function, CALLABLETYPES): raise NameError( "name '%s' not found in scheduler's environment" % f) # Inject W2P_TASK into environment _env.update({'W2P_TASK': W2P_TASK}) # Inject W2P_TASK into current - from gluon import current current.W2P_TASK = W2P_TASK globals().update(_env) args = _decode_list(loads(task.args)) @@ -506,162 +514,15 @@ def executor(queue, task, out): fd, temp_path = tempfile.mkstemp(suffix='.w2p_sched') with os.fdopen(fd, 'w') as f: f.write(result) - result = 'w2p_special:%s' % temp_path - queue.put(TaskReport('COMPLETED', result=result)) - except BaseException as e: + result = RESULTINFILE + temp_path + retq.put(TaskReport('COMPLETED', result=result)) + except: tb = traceback.format_exc() - queue.put(TaskReport('FAILED', tb=tb)) - del stdout + retq.put(TaskReport('FAILED', tb=tb)) + finally: + stdout.close() -class MetaScheduler(threading.Thread): - """Base class documenting scheduler's base methods.""" - - def __init__(self): - threading.Thread.__init__(self) - self.process = None # the background process - self.have_heartbeat = True # set to False to kill - self.empty_runs = 0 - - def local_async(self, task): - """Start the background process. - - Args: - task : a `Task` object - - Returns: - tuple: containing:: - - ('ok',result,output) - ('error',exception,None) - ('timeout',None,None) - ('terminated',None,None) - - """ - db = self.db - sr = db.scheduler_run - out = multiprocessing.Queue() - queue = multiprocessing.Queue(maxsize=1) - p = multiprocessing.Process(target=executor, args=(queue, task, out)) - self.process = p - logger.debug(' task starting') - p.start() - - task_output = "" - tout = "" - - try: - if task.sync_output > 0: - run_timeout = task.sync_output - else: - run_timeout = task.timeout - - start = time.time() - - while p.is_alive() and (not task.timeout or time.time() - start < task.timeout): - if tout: - try: - logger.debug(' partial output saved') - db(sr.id == task.run_id).update(run_output=task_output) - db.commit() - except: - pass - p.join(timeout=run_timeout) - tout = "" - while not out.empty(): - tout += out.get() - if tout: - logger.debug(' partial output: "%s"', str(tout)) - if CLEAROUT in tout: - task_output = tout[ - tout.rfind(CLEAROUT) + len(CLEAROUT):] - else: - task_output += tout - except: - p.terminate() - p.join() - logger.debug(' task stopped by general exception') - tr = TaskReport(STOPPED) - else: - if p.is_alive(): - p.terminate() - logger.debug(' task timeout') - try: - # we try to get a traceback here - tr = queue.get(timeout=2) - tr.status = TIMEOUT - tr.output = task_output - except Queue.Empty: - tr = TaskReport(TIMEOUT) - elif queue.empty(): - logger.debug(' task stopped') - tr = TaskReport(STOPPED) - else: - logger.debug(' task completed or failed') - tr = queue.get() - result = tr.result - if result and result.startswith('w2p_special'): - temp_path = result.replace('w2p_special:', '', 1) - with open(temp_path) as f: - tr.result = f.read() - os.unlink(temp_path) - tr.output = task_output - return tr - - def die(self): - """Forces termination of the worker process along with any running - task""" - logger.info('die!') - self.have_heartbeat = False - self.terminate_process() - - def give_up(self): - """Waits for any running task to be executed, then exits the worker - process""" - logger.info('Giving up as soon as possible!') - self.have_heartbeat = False - - def terminate_process(self): - """Terminate any running tasks (internal use only)""" - try: - self.process.terminate() - except: - pass # no process to terminate - - def run(self): - """This is executed by the main thread to send heartbeats""" - counter = 0 - while self.have_heartbeat: - self.send_heartbeat(counter) - counter += 1 - - def start_heartbeats(self): - self.start() - - def send_heartbeat(self, counter): - raise NotImplementedError - - def pop_task(self): - """Fetches a task ready to be executed""" - raise NotImplementedError - - def report_task(self, task, task_report): - """Creates a task report""" - raise NotImplementedError - - def sleep(self): - raise NotImplementedError - - def loop(self): - """Main loop, fetching tasks and starting executor's background - processes""" - raise NotImplementedError - - -TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED) -RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) -WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK) - class IS_CRONLINE(object): """ Validates cronline @@ -672,9 +533,9 @@ class IS_CRONLINE(object): def __call__(self, value): recur = CronParser(value, datetime.datetime.now()) try: - recur.get_next() + recur.next() return (value, None) - except (KeyError, ValueError) as e: + except ValueError as e: if not self.error_message: return (value, e) return (value, self.error_message) @@ -705,7 +566,12 @@ class TYPE(object): return (value, current.T('Not of type: %s') % self.myclass) -class Scheduler(MetaScheduler): +TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED) +RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) +WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK) + + +class Scheduler(threading.Thread): """Scheduler object Args: @@ -737,7 +603,12 @@ class Scheduler(MetaScheduler): worker_name=None, group_names=None, heartbeat=HEARTBEAT, max_empty_runs=0, discard_results=False, utc_time=False): - MetaScheduler.__init__(self) + threading.Thread.__init__(self) + self.setDaemon(True) + self.process = None # the background process + self.process_queues = (None, None) + self.have_heartbeat = True # set to False to kill + self.empty_runs = 0 self.db = db self.db_thread = None @@ -751,6 +622,7 @@ class Scheduler(MetaScheduler): self.do_assign_tasks = False self.greedy = False self.utc_time = utc_time + self.w_stats_lock = threading.RLock() self.w_stats = Storage( dict( status=RUNNING, @@ -768,6 +640,151 @@ class Scheduler(MetaScheduler): self.define_tables(db, migrate=migrate) + def execute(self, task): + """Start the background process. + + Args: + task : a `Task` object + + Returns: + a `TaskReport` object + """ + outq = multiprocessing.Queue() + retq = multiprocessing.Queue(maxsize=1) + self.process = p = \ + multiprocessing.Process(target=executor, args=(retq, task, outq)) + self.process_queues = (retq, outq) + + logger.debug(' task starting') + p.start() + start = time.time() + + if task.sync_output > 0: + run_timeout = task.sync_output + else: + run_timeout = task.timeout + task_output = tout = '' + try: + while p.is_alive() and (not task.timeout or + time.time() - start < task.timeout): + # NOTE: try always to empty the out queue before + # the child process is joined, + # see "Joining processes that use queues" section in + # https://docs.python.org/2/library/multiprocessing.html#programming-guidelines + # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines + while True: + try: + tout += outq.get(timeout=2) + except Queue.Empty: + break + if tout: + logger.debug(' partial output: "%s"', tout) + if CLEAROUT in tout: + task_output = tout[ + tout.rfind(CLEAROUT) + len(CLEAROUT):] + else: + task_output += tout + try: + db = self.db + db(db.scheduler_run.id == task.run_id).update(run_output=task_output) + db.commit() + tout = '' + logger.debug(' partial output saved') + except Exception: + logger.exception(' error while saving partial output') + task_output = task_output[:-len(tout)] + p.join(timeout=run_timeout) + except: + logger.exception(' task stopped by general exception') + self.terminate_process() + tr = TaskReport(STOPPED) + else: + if p.is_alive(): + logger.debug(' task timeout') + self.terminate_process(flush_ret=False) + try: + # we try to get a traceback here + tr = retq.get(timeout=2) # NOTE: risky after terminate + tr.status = TIMEOUT + tr.output = task_output + except Queue.Empty: + tr = TaskReport(TIMEOUT) + else: + try: + tr = retq.get_nowait() + except Queue.Empty: + logger.debug(' task stopped') + tr = TaskReport(STOPPED) + else: + logger.debug(' task completed or failed') + result = tr.result + if result and result.startswith(RESULTINFILE): + temp_path = result.replace(RESULTINFILE, '', 1) + with open(temp_path) as f: + tr.result = f.read() + os.unlink(temp_path) + tr.output = task_output + return tr + + _terminate_process_lock = threading.RLock() + + def terminate_process(self, flush_out=True, flush_ret=True): + """Terminate any running tasks (internal use only)""" + if self.process is not None: + # must synchronize since we are called by main and heartbeat thread + with self._terminate_process_lock: + if flush_out: + queue = self.process_queues[1] + while not queue.empty(): # NOTE: empty() is not reliable + try: + queue.get_nowait() + except Queue.Empty: + pass + if flush_ret: + queue = self.process_queues[0] + while not queue.empty(): + try: + queue.get_nowait() + except Queue.Empty: + pass + logger.debug('terminating process') + try: + # NOTE: terminate should not be called when using shared + # resources, see "Avoid terminating processes" + # section in + # https://docs.python.org/2/library/multiprocessing.html#programming-guidelines + # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines + self.process.terminate() + # NOTE: calling join after a terminate is risky, + # as explained in "Avoid terminating processes" + # section this can lead to a deadlock + self.process.join() + finally: + self.process = None + + def die(self): + """Forces termination of the worker process along with any running + task""" + logger.info('die!') + self.have_heartbeat = False + self.terminate_process() + + def give_up(self): + """Waits for any running task to be executed, then exits the worker + process""" + logger.info('Giving up as soon as possible!') + self.have_heartbeat = False + + def run(self): + """This is executed by the heartbeat thread""" + counter = 0 + while self.have_heartbeat: + self.send_heartbeat(counter) + counter += 1 + + def start_heartbeats(self): + self.start() + def __get_migrate(self, tablename, migrate=True): if migrate is False: return False @@ -901,58 +918,49 @@ class Scheduler(MetaScheduler): try: self.start_heartbeats() while self.have_heartbeat: - if self.w_stats.status == DISABLED: + with self.w_stats_lock: + is_disabled = self.w_stats.status == DISABLED + if is_disabled: logger.debug('Someone stopped me, sleeping until better' ' times come (%s)', self.w_stats.sleep) self.sleep() continue logger.debug('looping...') + if self.is_a_ticker and self.do_assign_tasks: + # I'm a ticker, and 5 loops passed without + # reassigning tasks, let's do that + self.wrapped_assign_tasks() task = self.wrapped_pop_task() if task: - self.w_stats.empty_runs = 0 - self.w_stats.status = RUNNING - self.w_stats.total += 1 - self.wrapped_report_task(task, self.local_async(task)) - if not self.w_stats.status == DISABLED: - self.w_stats.status = ACTIVE + with self.w_stats_lock: + self.w_stats.empty_runs = 0 + self.w_stats.status = RUNNING + self.w_stats.total += 1 + self.wrapped_report_task(task, self.execute(task)) + with self.w_stats_lock: + if not self.w_stats.status == DISABLED: + self.w_stats.status = ACTIVE else: - self.w_stats.empty_runs += 1 + with self.w_stats_lock: + self.w_stats.empty_runs += 1 + if self.max_empty_runs != 0: + logger.debug('empty runs %s/%s', + self.w_stats.empty_runs, + self.max_empty_runs) + if self.w_stats.empty_runs >= self.max_empty_runs: + logger.info( + 'empty runs limit reached, killing myself') + self.die() + if self.is_a_ticker and self.greedy: + # there could be other tasks ready to be assigned + logger.info('TICKER: greedy loop') + self.wrapped_assign_tasks() logger.debug('sleeping...') - if self.max_empty_runs != 0: - logger.debug('empty runs %s/%s', - self.w_stats.empty_runs, - self.max_empty_runs) - if self.w_stats.empty_runs >= self.max_empty_runs: - logger.info( - 'empty runs limit reached, killing myself') - self.die() self.sleep() except (KeyboardInterrupt, SystemExit): logger.info('catched') self.die() - def wrapped_assign_tasks(self, db): - """Commodity function to call `assign_tasks` and trap exceptions. - - If an exception is raised, assume it happened because of database - contention and retries `assign_task` after 0.5 seconds - """ - logger.debug('Assigning tasks...') - db.commit() # db.commit() only for Mysql - x = 0 - while x < 10: - try: - self.assign_tasks(db) - db.commit() - logger.debug('Tasks assigned...') - break - except: - self.w_stats.errors += 1 - db.rollback() - logger.error('TICKER: error assigning tasks (%s)', x) - x += 1 - time.sleep(0.5) - def wrapped_pop_task(self): """Commodity function to call `pop_task` and trap exceptions. @@ -960,30 +968,21 @@ class Scheduler(MetaScheduler): contention and retries `pop_task` after 0.5 seconds """ db = self.db - db.commit() # another nifty db.commit() only for Mysql - x = 0 - while x < 10: + db.commit() # for MySQL only; FIXME: Niphlod, still needed? could avoid when not MySQL? + for x in range(10): try: - rtn = self.pop_task(db) - return rtn - break - except: + return self.pop_task() + except Exception: + logger.exception(' error popping tasks') self.w_stats.errors += 1 db.rollback() - logger.error(' error popping tasks') - x += 1 time.sleep(0.5) - def pop_task(self, db): + def pop_task(self): """Grab a task ready to be executed from the queue.""" now = self.now() - st = self.db.scheduler_task - if self.is_a_ticker and self.do_assign_tasks: - # I'm a ticker, and 5 loops passed without reassigning tasks, - # let's do that and loop again - self.wrapped_assign_tasks(db) - return None - # ready to process something + db = self.db + st = db.scheduler_task grabbed = db( (st.assigned_worker_name == self.worker_name) & (st.status == ASSIGNED) @@ -991,22 +990,18 @@ class Scheduler(MetaScheduler): task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first() if task: + # none will touch my task! task.update_record(status=RUNNING, last_run_time=now) - # noone will touch my task! db.commit() logger.debug(' work to do %s', task.id) else: - if self.is_a_ticker and self.greedy: - # there are other tasks ready to be assigned - logger.info('TICKER: greedy loop') - self.wrapped_assign_tasks(db) - else: - logger.info('nothing to do') + logger.info('nothing to do') return None - times_run = task.times_run + 1 + if task.cronline: - cron_recur = CronParser(task.cronline, now.replace(second=0)) - next_run_time = cron_recur.get_next() + cron_recur = CronParser(task.cronline, + now.replace(second=0, microsecond=0)) + next_run_time = cron_recur.next() elif not task.prevent_drift: next_run_time = task.last_run_time + datetime.timedelta( seconds=task.period @@ -1016,9 +1011,10 @@ class Scheduler(MetaScheduler): # see #1191 next_run_time = task.start_time secondspassed = (now - next_run_time).total_seconds() - steps = secondspassed // task.period + 1 - next_run_time += datetime.timedelta(seconds=task.period * steps) + times = secondspassed // task.period + 1 + next_run_time += datetime.timedelta(seconds=task.period * times) + times_run = task.times_run + 1 if times_run < task.repeats or task.repeats == 0: # need to run (repeating task) run_again = True @@ -1026,7 +1022,7 @@ class Scheduler(MetaScheduler): # no need to run again run_again = False run_id = 0 - while True and not self.discard_results: + while not self.discard_results: # FIXME: forever? logger.debug(' new scheduler_run record') try: run_id = db.scheduler_run.insert( @@ -1036,9 +1032,10 @@ class Scheduler(MetaScheduler): worker_name=self.worker_name) db.commit() break - except: - time.sleep(0.5) + except Exception: + logger.exception(' error inserting scheduler_run') db.rollback() + time.sleep(0.5) logger.info('new task %(id)s "%(task_name)s"' ' %(application_name)s.%(function_name)s' % task) return Task( @@ -1065,15 +1062,14 @@ class Scheduler(MetaScheduler): contention and retries `pop_task` after 0.5 seconds """ db = self.db - while True: + while True: # FIXME: forever? try: self.report_task(task, task_report) db.commit() break - except: - self.w_stats.errors += 1 + except Exception: + logger.exception(' error storing result') db.rollback() - logger.error(' error storing result') time.sleep(0.5) def report_task(self, task, task_report): @@ -1081,8 +1077,8 @@ class Scheduler(MetaScheduler): Deals with logic for repeating tasks. """ - db = self.db now = self.now() + db = self.db st = db.scheduler_task sr = db.scheduler_run if not self.discard_results: @@ -1103,11 +1099,11 @@ class Scheduler(MetaScheduler): db(sr.id == task.run_id).delete() # if there is a stop_time and the following run would exceed it is_expired = (task.stop_time and - task.next_run_time > task.stop_time and - True or False) + task.next_run_time > task.stop_time or + False) status = (task.run_again and is_expired and EXPIRED or - task.run_again and not is_expired and - QUEUED or COMPLETED) + task.run_again and not is_expired and QUEUED or + COMPLETED) if task_report.status == COMPLETED: d = dict(status=status, next_run_time=task.next_run_time, @@ -1116,7 +1112,7 @@ class Scheduler(MetaScheduler): ) db(st.id == task.task_id).update(**d) if status == COMPLETED: - self.update_dependencies(db, task.task_id) + self.update_dependencies(task.task_id) else: st_mapping = {'FAILED': 'FAILED', 'TIMEOUT': 'TIMEOUT', @@ -1132,16 +1128,18 @@ class Scheduler(MetaScheduler): ) logger.info('task completed (%s)', task_report.status) - def update_dependencies(self, db, task_id): + def update_dependencies(self, task_id): """Unblock execution paths for Jobs.""" + db = self.db db(db.scheduler_task_deps.task_child == task_id).update(can_visit=True) def adj_hibernation(self): """Used to increase the "sleep" interval for DISABLED workers.""" - if self.w_stats.status == DISABLED: - wk_st = self.w_stats.sleep - hibernation = wk_st + HEARTBEAT if wk_st < MAXHIBERNATION else MAXHIBERNATION - self.w_stats.sleep = hibernation + with self.w_stats_lock: + if self.w_stats.status == DISABLED: + wk_st = self.w_stats.sleep + hibernation = wk_st + HEARTBEAT if wk_st < MAXHIBERNATION else MAXHIBERNATION + self.w_stats.sleep = hibernation def send_heartbeat(self, counter): """Coordination among available workers. @@ -1157,8 +1155,7 @@ class Scheduler(MetaScheduler): if self.db_thread: # BKR 20180612 check if connection still works try: - query = self.db_thread.scheduler_worker.worker_name == self.worker_name - self.db_thread(query).count() + self.db_thread(self.db_thread.scheduler_worker).count() except self.db_thread._adapter.connection.OperationalError: # if not -> throw away self.db_thread and force reconnect self.db_thread = None @@ -1168,53 +1165,55 @@ class Scheduler(MetaScheduler): self.db_thread = DAL( self.db._uri, folder=self.db._adapter.folder, decode_credentials=True) self.define_tables(self.db_thread, migrate=False) + try: - db = self.db_thread - sw, st = db.scheduler_worker, db.scheduler_task now = self.now() + db = self.db_thread + sw = db.scheduler_worker + st = db.scheduler_task # record heartbeat - mybackedstatus = db(sw.worker_name == self.worker_name).select().first() - if not mybackedstatus: - sw.insert(status=ACTIVE, worker_name=self.worker_name, - first_heartbeat=now, last_heartbeat=now, - group_names=self.group_names, - worker_stats=self.w_stats) - self.w_stats.status = ACTIVE - self.w_stats.sleep = self.heartbeat - mybackedstatus = ACTIVE - else: - mybackedstatus = mybackedstatus.status - if mybackedstatus == DISABLED: - # keep sleeping - self.w_stats.status = DISABLED - logger.debug('........recording heartbeat (%s)', - self.w_stats.status) - db(sw.worker_name == self.worker_name).update( - last_heartbeat=now, - worker_stats=self.w_stats) - elif mybackedstatus == TERMINATE: - self.w_stats.status = TERMINATE - logger.debug("Waiting to terminate the current task") - self.give_up() - elif mybackedstatus == KILL: - self.w_stats.status = KILL - self.die() - return + row = db(sw.worker_name == self.worker_name).select().first() + with self.w_stats_lock: + if not row: + sw.insert(status=ACTIVE, worker_name=self.worker_name, + first_heartbeat=now, last_heartbeat=now, + group_names=self.group_names, + worker_stats=self.w_stats) + self.w_stats.status = ACTIVE + self.w_stats.sleep = self.heartbeat + backed_status = ACTIVE else: - if mybackedstatus == STOP_TASK: - logger.info('Asked to kill the current task') - self.terminate_process() - logger.debug('........recording heartbeat (%s)', - self.w_stats.status) - db(sw.worker_name == self.worker_name).update( - last_heartbeat=now, status=ACTIVE, - worker_stats=self.w_stats) - self.w_stats.sleep = self.heartbeat # re-activating the process - if self.w_stats.status != RUNNING: - self.w_stats.status = ACTIVE + backed_status = row.status + if backed_status == DISABLED: + # keep sleeping + self.w_stats.status = DISABLED + logger.debug('........recording heartbeat (DISABLED)') + db(sw.worker_name == self.worker_name).update( + last_heartbeat=now, + worker_stats=self.w_stats) + elif backed_status == TERMINATE: + self.w_stats.status = TERMINATE + logger.debug("Waiting to terminate the current task") + self.give_up() + elif backed_status == KILL: + self.w_stats.status = KILL + self.die() + return + else: + if backed_status == STOP_TASK: + logger.info('Asked to kill the current task') + self.terminate_process() + logger.debug('........recording heartbeat (%s)', + self.w_stats.status) + db(sw.worker_name == self.worker_name).update( + last_heartbeat=now, status=ACTIVE, + worker_stats=self.w_stats) + self.w_stats.sleep = self.heartbeat # re-activating the process + if self.w_stats.status != RUNNING: + self.w_stats.status = ACTIVE self.do_assign_tasks = False - if counter % 5 == 0 or mybackedstatus == PICK: + if counter % 5 == 0 or backed_status == PICK: try: # delete dead workers expiration = now - datetime.timedelta( @@ -1236,14 +1235,16 @@ class Scheduler(MetaScheduler): try: self.is_a_ticker = self.being_a_ticker() except: - logger.error('Error coordinating TICKER') - if self.w_stats.status == ACTIVE: - self.do_assign_tasks = True + logger.exception('Error coordinating TICKER') + with self.w_stats_lock: + if self.w_stats.status == ACTIVE: + self.do_assign_tasks = True except: - logger.error('Error cleaning up') + logger.exception('Error cleaning up') + db.commit() except: - logger.error('Error retrieving status') + logger.exception('Error retrieving status') db.rollback() self.adj_hibernation() self.sleep() @@ -1261,7 +1262,8 @@ class Scheduler(MetaScheduler): (sw.worker_name != my_name) & (sw.status == ACTIVE) ).select(sw.is_ticker, sw.worker_name) ticker = all_active.find(lambda row: row.is_ticker is True).first() - not_busy = self.w_stats.status == ACTIVE + with self.w_stats_lock: + not_busy = self.w_stats.status == ACTIVE if not ticker: # if no other tickers are around if not_busy: @@ -1283,14 +1285,38 @@ class Scheduler(MetaScheduler): "%s is a ticker, I'm a poor worker" % ticker.worker_name) return False - def assign_tasks(self, db): + def wrapped_assign_tasks(self): + """Commodity function to call `assign_tasks` and trap exceptions. + + If an exception is raised, assume it happened because of database + contention and retries `assign_task` after 0.5 seconds + """ + logger.debug('Assigning tasks...') + db = self.db + db.commit() # for MySQL only; FIXME: Niphlod, still needed? could avoid when not MySQL? + for x in range(10): + try: + self.assign_tasks() + db.commit() + logger.debug('Tasks assigned...') + break + except Exception: + logger.exception('TICKER: error assigning tasks') + self.w_stats.errors += 1 + db.rollback() + time.sleep(0.5) + + def assign_tasks(self): """Assign task to workers, that can then pop them from the queue. Deals with group_name(s) logic, in order to assign linearly tasks to available workers for those groups """ - sw, st, sd = db.scheduler_worker, db.scheduler_task, db.scheduler_task_deps now = self.now() + db = self.db + sw = db.scheduler_worker + st = db.scheduler_task + sd = db.scheduler_task_deps all_workers = db(sw.status == ACTIVE).select() # build workers as dict of groups wkgroups = {} @@ -1324,7 +1350,6 @@ class Scheduler(MetaScheduler): (st.status.belongs((QUEUED, ASSIGNED))) & ( (sd.id == None) | (st.id.belongs(deps_with_no_deps)) - ) )._select(st.id, distinct=True, left=sd.on( (st.id == sd.task_parent) & @@ -1356,13 +1381,13 @@ class Scheduler(MetaScheduler): # let's freeze it up db.commit() - x = 0 + tnum = 0 for group in wkgroups.keys(): tasks = all_available(st.group_name == group).select( limitby=(0, limit), orderby=st.next_run_time) # let's break up the queue evenly among workers for task in tasks: - x += 1 + tnum += 1 gname = task.group_name ws = wkgroups.get(gname) if ws: @@ -1385,7 +1410,9 @@ class Scheduler(MetaScheduler): next_run_time = now+datetime.timedelta(seconds=task.period) else: # must be cronline - raise NotImplementedError + cron_recur = CronParser(task.cronline, + now.replace(second=0, microsecond=0)) + next_run_time = cron_recur.next() db(st.id == task.id).update(times_run=task.times_run+1, next_run_time=next_run_time, last_run_time=now) @@ -1409,16 +1436,17 @@ class Scheduler(MetaScheduler): wkgroups[gname]['workers'][myw]['c'] += 1 db.commit() # I didn't report tasks but I'm working nonetheless!!!! - if x > 0: - self.w_stats.empty_runs = 0 - self.w_stats.queue = x - self.w_stats.distribution = wkgroups - self.w_stats.workers = len(all_workers) + with self.w_stats_lock: + if tnum > 0: + self.w_stats.empty_runs = 0 + self.w_stats.queue = tnum + self.w_stats.distribution = wkgroups + self.w_stats.workers = len(all_workers) # I'll be greedy only if tasks assigned are equal to the limit # (meaning there could be others ready to be assigned) - self.greedy = x >= limit + self.greedy = tnum >= limit logger.info('TICKER: workers are %s', len(all_workers)) - logger.info('TICKER: tasks are %s', x) + logger.info('TICKER: tasks are %s', tnum) def sleep(self): """Calculate the number of seconds to sleep.""" @@ -1428,27 +1456,28 @@ class Scheduler(MetaScheduler): def set_worker_status(self, group_names=None, action=ACTIVE, exclude=None, limit=None, worker_name=None): """Internal function to set worker's status.""" - ws = self.db.scheduler_worker + db = self.db + ws = db.scheduler_worker if not group_names: group_names = self.group_names elif isinstance(group_names, str): group_names = [group_names] if worker_name: - self.db(ws.worker_name == worker_name).update(status=action) + db(ws.worker_name == worker_name).update(status=action) return exclusion = exclude and exclude.append(action) or [action] if not limit: for group in group_names: - self.db( + db( (ws.group_names.contains(group)) & (~ws.status.belongs(exclusion)) ).update(status=action) else: for group in group_names: - workers = self.db((ws.group_names.contains(group)) & - (~ws.status.belongs(exclusion)) - )._select(ws.id, limitby=(0, limit)) - self.db(ws.id.belongs(workers)).update(status=action) + workers = db((ws.group_names.contains(group)) & + (~ws.status.belongs(exclusion)) + )._select(ws.id, limitby=(0, limit)) + db(ws.id.belongs(workers)).update(status=action) def disable(self, group_names=None, limit=None, worker_name=None): """Set DISABLED on the workers processing `group_names` tasks. @@ -1529,18 +1558,19 @@ class Scheduler(MetaScheduler): if cronline: try: start_time = kwargs.get('start_time', self.now) - next_run_time = CronParser(cronline, start_time).get_next() + next_run_time = CronParser(cronline, start_time).next() kwargs.update(start_time=start_time, next_run_time=next_run_time) - except: + except Exception: pass if 'start_time' in kwargs and 'next_run_time' not in kwargs: kwargs.update(next_run_time=kwargs['start_time']) - rtn = self.db.scheduler_task.validate_and_insert(**kwargs) + db = self.db + rtn = db.scheduler_task.validate_and_insert(**kwargs) if not rtn.errors: rtn.uuid = tuuid if immediate: - self.db( - (self.db.scheduler_worker.is_ticker == True) + db( + (db.scheduler_worker.is_ticker == True) ).update(status=PICK) else: rtn.uuid = None @@ -1569,7 +1599,9 @@ class Scheduler(MetaScheduler): """ from pydal.objects import Query - sr, st = self.db.scheduler_run, self.db.scheduler_task + db = self.db + sr = db.scheduler_run + st = db.scheduler_task if isinstance(ref, integer_types): q = st.id == ref elif isinstance(ref, str): @@ -1586,7 +1618,7 @@ class Scheduler(MetaScheduler): fields = st.ALL, sr.ALL left = sr.on(sr.task_id == st.id) orderby = ~st.id | ~sr.id - row = self.db(q).select( + row = db(q).select( *fields, **dict(orderby=orderby, left=left, @@ -1620,7 +1652,9 @@ class Scheduler(MetaScheduler): Note: Experimental """ - st, sw = self.db.scheduler_task, self.db.scheduler_worker + db = self.db + st = db.scheduler_task + sw = db.scheduler_worker if isinstance(ref, integer_types): q = st.id == ref elif isinstance(ref, str): @@ -1628,16 +1662,16 @@ class Scheduler(MetaScheduler): else: raise SyntaxError( "You can retrieve results only by id or uuid") - task = self.db(q).select(st.id, st.status, st.assigned_worker_name) + task = db(q).select(st.id, st.status, st.assigned_worker_name) task = task.first() rtn = None if not task: return rtn if task.status == 'RUNNING': q = sw.worker_name == task.assigned_worker_name - rtn = self.db(q).update(status=STOP_TASK) + rtn = db(q).update(status=STOP_TASK) elif task.status == 'QUEUED': - rtn = self.db(q).update( + rtn = db(q).update( stop_time=self.now(), enabled=False, status=STOPPED) @@ -1653,7 +1687,7 @@ class Scheduler(MetaScheduler): if only_ticker: workers = db(db.scheduler_worker.is_ticker == True).select() else: - workers = db(db.scheduler_worker.id > 0).select() + workers = db(db.scheduler_worker.id).select() all_workers = {} for row in workers: all_workers[row.worker_name] = Storage( @@ -1674,6 +1708,7 @@ def main(): python gluon/scheduler.py """ + import optparse parser = optparse.OptionParser() parser.add_option( "-w", "--worker_name", dest="worker_name", default=None, diff --git a/gluon/tests/test_scheduler.py b/gluon/tests/test_scheduler.py index aecbcb5a..393abdab 100644 --- a/gluon/tests/test_scheduler.py +++ b/gluon/tests/test_scheduler.py @@ -8,43 +8,46 @@ import unittest import glob import datetime import sys +import shutil from gluon.storage import Storage from gluon.languages import TranslatorFactory from gluon.scheduler import JobGraph, Scheduler, CronParser from gluon.dal import DAL +from gluon.fileutils import create_app +test_app_name = '_test_scheduler' + class BaseTestScheduler(unittest.TestCase): + def setUp(self): + self.tearDown() + appdir = os.path.join('applications', test_app_name) + os.mkdir(appdir) + create_app(appdir) + self.db = None - self.cleanfolder() from gluon.globals import current - s = Storage({'application': 'welcome', - 'folder': 'applications/welcome', + s = Storage({'application': test_app_name, + 'folder': "applications/%s" % test_app_name, 'controller': 'default'}) current.request = s T = TranslatorFactory('', 'en') current.T = T - self.db = DAL('sqlite://dummy2.db', check_reserved=['all']) - - def cleanfolder(self): - if self.db: - self.db.close() - try: - os.unlink('dummy2.db') - except: - pass - tfiles = glob.glob('*_scheduler*.table') - for a in tfiles: - os.unlink(a) + self.db = DAL('sqlite://dummy2.db', + folder="applications/%s/databases" % test_app_name, + check_reserved=['all']) def tearDown(self): - self.cleanfolder() try: self.inner_teardown() except: pass + appdir = os.path.join('applications', test_app_name) + if os.path.exists(appdir): + shutil.rmtree(appdir) + class CronParserTest(unittest.TestCase): @@ -53,105 +56,105 @@ class CronParserTest(unittest.TestCase): # minute asterisk base = datetime.datetime(2010, 1, 23, 12, 18) itr = CronParser('*/1 * * * *', base) - n1 = itr.get_next() # 19 + n1 = itr.next() # 19 self.assertEqual(base.year, n1.year) self.assertEqual(base.month, n1.month) self.assertEqual(base.day, n1.day) self.assertEqual(base.hour, n1.hour) self.assertEqual(base.minute, n1.minute - 1) for i in range(39): # ~ 58 - itr.get_next() - n2 = itr.get_next() + itr.next() + n2 = itr.next() self.assertEqual(n2.minute, 59) - n3 = itr.get_next() + n3 = itr.next() self.assertEqual(n3.minute, 0) self.assertEqual(n3.hour, 13) itr = CronParser('*/5 * * * *', base) - n4 = itr.get_next() + n4 = itr.next() self.assertEqual(n4.minute, 20) for i in range(6): - itr.get_next() - n5 = itr.get_next() + itr.next() + n5 = itr.next() self.assertEqual(n5.minute, 55) - n6 = itr.get_next() + n6 = itr.next() self.assertEqual(n6.minute, 0) self.assertEqual(n6.hour, 13) base = datetime.datetime(2010, 1, 23, 12, 18) itr = CronParser('4/34 * * * *', base) - n7 = itr.get_next() + n7 = itr.next() self.assertEqual(n7.minute, 38) self.assertEqual(n7.hour, 12) - n8 = itr.get_next() + n8 = itr.next() self.assertEqual(n8.minute, 4) self.assertEqual(n8.hour, 13) def testHour(self): base = datetime.datetime(2010, 1, 24, 12, 2) itr = CronParser('0 */3 * * *', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.hour, 15) self.assertEqual(n1.minute, 0) for i in range(2): - itr.get_next() - n2 = itr.get_next() + itr.next() + n2 = itr.next() self.assertEqual(n2.hour, 0) self.assertEqual(n2.day, 25) def testDay(self): base = datetime.datetime(2010, 2, 24, 12, 9) itr = CronParser('0 0 */3 * *', base) - n1 = itr.get_next() + n1 = itr.next() # 1 4 7 10 13 16 19 22 25 28 self.assertEqual(n1.day, 25) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.day, 28) - n3 = itr.get_next() + n3 = itr.next() self.assertEqual(n3.day, 1) self.assertEqual(n3.month, 3) # test leap year base = datetime.datetime(1996, 2, 27) itr = CronParser('0 0 * * *', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.day, 28) self.assertEqual(n1.month, 2) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.day, 29) self.assertEqual(n2.month, 2) base2 = datetime.datetime(2000, 2, 27) itr2 = CronParser('0 0 * * *', base2) - n3 = itr2.get_next() + n3 = itr2.next() self.assertEqual(n3.day, 28) self.assertEqual(n3.month, 2) - n4 = itr2.get_next() + n4 = itr2.next() self.assertEqual(n4.day, 29) self.assertEqual(n4.month, 2) def testWeekDay(self): base = datetime.datetime(2010, 2, 25) itr = CronParser('0 0 * * sat', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.isoweekday(), 6) self.assertEqual(n1.day, 27) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.isoweekday(), 6) self.assertEqual(n2.day, 6) self.assertEqual(n2.month, 3) base = datetime.datetime(2010, 1, 25) itr = CronParser('0 0 1 * wed', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.month, 1) self.assertEqual(n1.day, 27) self.assertEqual(n1.year, 2010) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.month, 2) self.assertEqual(n2.day, 1) self.assertEqual(n2.year, 2010) - n3 = itr.get_next() + n3 = itr.next() self.assertEqual(n3.month, 2) self.assertEqual(n3.day, 3) self.assertEqual(n3.year, 2010) @@ -159,43 +162,43 @@ class CronParserTest(unittest.TestCase): def testMonth(self): base = datetime.datetime(2010, 1, 25) itr = CronParser('0 0 1 * *', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.month, 2) self.assertEqual(n1.day, 1) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.month, 3) self.assertEqual(n2.day, 1) for i in range(8): - itr.get_next() - n3 = itr.get_next() + itr.next() + n3 = itr.next() self.assertEqual(n3.month, 12) self.assertEqual(n3.year, 2010) - n4 = itr.get_next() + n4 = itr.next() self.assertEqual(n4.month, 1) self.assertEqual(n4.year, 2011) base = datetime.datetime(2010, 1, 25) itr = CronParser('0 0 1 */4 *', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.month, 5) self.assertEqual(n1.day, 1) base = datetime.datetime(2010, 1, 25) itr = CronParser('0 0 1 1-3 *', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.month, 2) self.assertEqual(n1.day, 1) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.month, 3) self.assertEqual(n2.day, 1) - n3 = itr.get_next() + n3 = itr.next() self.assertEqual(n3.month, 1) self.assertEqual(n3.day, 1) def testSundayToThursdayWithAlphaConversion(self): base = datetime.datetime(2010, 8, 25, 15, 56) itr = CronParser("30 22 * * sun-thu", base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(base.year, n1.year) self.assertEqual(base.month, n1.month) self.assertEqual(base.day, n1.day) @@ -205,19 +208,19 @@ class CronParserTest(unittest.TestCase): def testISOWeekday(self): base = datetime.datetime(2010, 2, 25) itr = CronParser('0 0 * * 7', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.isoweekday(), 7) self.assertEqual(n1.day, 28) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.isoweekday(), 7) self.assertEqual(n2.day, 7) self.assertEqual(n2.month, 3) base = datetime.datetime(2010, 2, 22) itr = CronParser('0 0 * * */2', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.isoweekday(), 2) self.assertEqual(n1.day, 23) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.isoweekday(), 4) self.assertEqual(n2.day, 25) @@ -225,21 +228,21 @@ class CronParserTest(unittest.TestCase): base = datetime.datetime(2012, 1, 1, 0, 0) itr = CronParser('0 * * 3 *', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.year, base.year) self.assertEqual(n1.month, 3) self.assertEqual(n1.day, base.day) self.assertEqual(n1.hour, base.hour) self.assertEqual(n1.minute, base.minute) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.year, base.year) self.assertEqual(n2.month, 3) self.assertEqual(n2.day, base.day) self.assertEqual(n2.hour, base.hour + 1) self.assertEqual(n2.minute, base.minute) - n3 = itr.get_next() + n3 = itr.next() self.assertEqual(n3.year, base.year) self.assertEqual(n3.month, 3) self.assertEqual(n3.day, base.day) @@ -250,26 +253,26 @@ class CronParserTest(unittest.TestCase): base = datetime.datetime(2013, 3, 1, 12, 17, 34, 257877) c = CronParser('00 03 16,30 * *', base) - n1 = c.get_next() + n1 = c.next() self.assertEqual(n1.month, 3) self.assertEqual(n1.day, 16) - n2 = c.get_next() + n2 = c.next() self.assertEqual(n2.month, 3) self.assertEqual(n2.day, 30) - n3 = c.get_next() + n3 = c.next() self.assertEqual(n3.month, 4) self.assertEqual(n3.day, 16) def test_rangeGenerator(self): base = datetime.datetime(2013, 3, 4, 0, 0) itr = CronParser('1-9/2 0 1 * *', base) - n1 = itr.get_next() - n2 = itr.get_next() - n3 = itr.get_next() - n4 = itr.get_next() - n5 = itr.get_next() + n1 = itr.next() + n2 = itr.next() + n3 = itr.next() + n4 = itr.next() + n5 = itr.next() self.assertEqual(n1.minute, 1) self.assertEqual(n2.minute, 3) self.assertEqual(n3.minute, 5) @@ -289,45 +292,49 @@ class CronParserTest(unittest.TestCase): def test_invalidcron(self): base = datetime.datetime(2013, 3, 4, 0, 0) itr = CronParser('5 4 31 2 *', base) - self.assertRaises(ValueError, itr.get_next) + self.assertRaises(ValueError, itr.next) + itr = CronParser('1- * * * *', base) + self.assertRaises(ValueError, itr.next) + itr = CronParser('-1 * * * *', base) + self.assertRaises(ValueError, itr.next) itr = CronParser('* * 5-1 * *', base) - self.assertRaises(ValueError, itr.get_next) + self.assertRaises(ValueError, itr.next) itr = CronParser('* * * janu-jun *', base) - self.assertRaises(KeyError, itr.get_next) + self.assertRaises(ValueError, itr.next) itr = CronParser('* * * * * *', base) - self.assertRaises(ValueError, itr.get_next) + self.assertRaises(ValueError, itr.next) itr = CronParser('* * * *', base) - self.assertRaises(ValueError, itr.get_next) + self.assertRaises(ValueError, itr.next) def testLastDayOfMonth(self): base = datetime.datetime(2015, 9, 4) itr = CronParser('0 0 L * *', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.month, 9) self.assertEqual(n1.day, 30) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.month, 10) self.assertEqual(n2.day, 31) - n3 = itr.get_next() + n3 = itr.next() self.assertEqual(n3.month, 11) self.assertEqual(n3.day, 30) - n4 = itr.get_next() + n4 = itr.next() self.assertEqual(n4.month, 12) self.assertEqual(n4.day, 31) base = datetime.datetime(1996, 2, 27) itr = CronParser('0 0 L * *', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.day, 29) self.assertEqual(n1.month, 2) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.day, 31) self.assertEqual(n2.month, 3) def testSpecialExpr(self): base = datetime.datetime(2000, 1, 1) itr = CronParser('@yearly', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.day, 1) self.assertEqual(n1.month, 1) self.assertEqual(n1.year, base.year + 1) @@ -335,7 +342,7 @@ class CronParserTest(unittest.TestCase): self.assertEqual(n1.minute, 0) itr = CronParser('@annually', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.day, 1) self.assertEqual(n1.month, 1) self.assertEqual(n1.year, base.year + 1) @@ -343,7 +350,7 @@ class CronParserTest(unittest.TestCase): self.assertEqual(n1.minute, 0) itr = CronParser('@monthly', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.day, 1) self.assertEqual(n1.month, base.month + 1) self.assertEqual(n1.year, base.year) @@ -351,19 +358,19 @@ class CronParserTest(unittest.TestCase): self.assertEqual(n1.minute, 0) itr = CronParser('@weekly', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.day, 2) self.assertEqual(n1.month, base.month) self.assertEqual(n1.year, base.year) self.assertEqual(n1.hour, 0) self.assertEqual(n1.minute, 0) - n2 = itr.get_next() + n2 = itr.next() self.assertEqual(n2.day, 9) self.assertEqual(n2.month, base.month) self.assertEqual(n2.year, base.year) self.assertEqual(n2.hour, 0) self.assertEqual(n2.minute, 0) - n3 = itr.get_next() + n3 = itr.next() self.assertEqual(n3.day, 16) self.assertEqual(n3.month, base.month) self.assertEqual(n3.year, base.year) @@ -371,7 +378,7 @@ class CronParserTest(unittest.TestCase): self.assertEqual(n3.minute, 0) itr = CronParser('@daily', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.day, 2) self.assertEqual(n1.month, base.month) self.assertEqual(n1.year, base.year) @@ -379,7 +386,7 @@ class CronParserTest(unittest.TestCase): self.assertEqual(n1.minute, 0) itr = CronParser('@midnight', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.day, 2) self.assertEqual(n1.month, base.month) self.assertEqual(n1.year, base.year) @@ -387,7 +394,7 @@ class CronParserTest(unittest.TestCase): self.assertEqual(n1.minute, 0) itr = CronParser('@hourly', base) - n1 = itr.get_next() + n1 = itr.next() self.assertEqual(n1.day, 1) self.assertEqual(n1.month, base.month) self.assertEqual(n1.year, base.year) @@ -623,7 +630,7 @@ class testForSchedulerRunnerBase(BaseTestScheduler): import os import time from gluon.scheduler import Scheduler -db_dal = os.path.abspath(os.path.join(request.folder, '..', '..', 'dummy2.db')) +db_dal = os.path.abspath(os.path.join(request.folder, 'databases', 'dummy2.db')) sched_dal = DAL('sqlite://%s' % db_dal, folder=os.path.dirname(db_dal)) sched = Scheduler(sched_dal, max_empty_runs=15, migrate=False, heartbeat=1) def termination(): @@ -635,7 +642,7 @@ def termination(): def exec_sched(self): import subprocess - call_args = [sys.executable, 'web2py.py', '--no_banner', '-D', 'INFO','-K', 'welcome'] + call_args = [sys.executable, 'web2py.py', '--no_banner', '-D', 'INFO','-K', test_app_name] ret = subprocess.call(call_args, env=dict(os.environ)) return ret @@ -678,7 +685,7 @@ def demo4(): ("task times_run is 2", task.times_run == 2), ("task ran 2 times only", len(task_run) == 2), ("scheduler_run records are COMPLETED ", (task_run[0].status == task_run[1].status == 'COMPLETED')), - ("period is respected", (task_run[1].start_time > task_run[0].start_time + datetime.timedelta(seconds=task.period))) + ("period is respected", (task_run[1].start_time >= task_run[0].start_time + datetime.timedelta(seconds=task.period))) ] self.exec_asserts(res, 'REPEATS') @@ -838,7 +845,7 @@ def demo8(): ("task times_failed is 2", task.times_failed == 2), ("task ran 2 times only", len(task_run) == 2), ("scheduler_run records are FAILED", (task_run[0].status == task_run[1].status == 'FAILED')), - ("period is respected", (task_run[1].start_time > task_run[0].start_time + datetime.timedelta(seconds=task.period))) + ("period is respected", (task_run[1].start_time >= task_run[0].start_time + datetime.timedelta(seconds=task.period))) ] self.exec_asserts(res, 'FAILED')