diff --git a/gluon/contrib/redis_scheduler.py b/gluon/contrib/redis_scheduler.py index 51c836e0..412cb61c 100644 --- a/gluon/contrib/redis_scheduler.py +++ b/gluon/contrib/redis_scheduler.py @@ -9,6 +9,17 @@ Scheduler with redis backend --------------------------------- """ +import os +import time +import socket +import datetime +import logging +from gluon.utils import web2py_uuid +from gluon.storage import Storage +from gluon.scheduler import * +from gluon.scheduler import _decode_dict +from gluon.contrib.redis_utils import RWatchError + USAGE = """ ## Example @@ -35,11 +46,6 @@ mysched = RScheduler(db, dict(demo1=demo1,demo2=demo2), ...., redis_conn=rconn) """ -import os -import time -import socket -import datetime -import logging path = os.getcwd() @@ -47,20 +53,19 @@ if 'WEB2PY_PATH' not in os.environ: os.environ['WEB2PY_PATH'] = path try: - from gluon.contrib.simplejson import loads, dumps -except: + # try external module from simplejson import loads, dumps +except ImportError: + try: + # try stdlib (Python >= 2.6) + from json import loads, dumps + except: + # fallback to pure-Python module + from gluon.contrib.simplejson import loads, dumps IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid()) -logger = logging.getLogger('web2py.rscheduler.%s' % IDENTIFIER) - -from gluon.utils import web2py_uuid -from gluon.storage import Storage -from gluon.scheduler import * -from gluon.scheduler import _decode_dict -from gluon.contrib.redis_utils import RWatchError - +logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER) POLLING = 'POLLING' @@ -111,8 +116,7 @@ class RScheduler(Scheduler): self._application = current.request.application or 'appname' def _nkey(self, key): - """Helper to restrict all keys to a namespace - and track them""" + """Helper to restrict all keys to a namespace and track them.""" prefix = 'w2p:rsched:%s' % self._application allkeys = '%s:allkeys' % prefix newkey = "%s:%s" % (prefix, key) @@ -120,10 +124,7 @@ class RScheduler(Scheduler): return newkey def prune_all(self): - """ - Just to be fair and implement a method - that does housekeeping - """ + """Global housekeeping.""" all_keys = self._nkey('allkeys') with self.r_server.pipeline() as pipe: while True: @@ -148,8 +149,9 @@ class RScheduler(Scheduler): def send_heartbeat(self, counter): """ - workers coordination has evolved into something is not that - easy. Here we try to do what we need in a single transaction, + Workers coordination in redis. + It has evolved into something is not that easy. + Here we try to do what we need in a single transaction, and retry that transaction if something goes wrong """ with self.r_server.pipeline() as pipe: @@ -167,7 +169,9 @@ class RScheduler(Scheduler): def inner_send_heartbeat(self, counter, pipe): """ - Does a few things: + Do a few things in the "maintenance" thread. + + Specifically: - registers the workers - accepts commands sent to workers (KILL, TERMINATE, PICK, DISABLED, etc) - adjusts sleep @@ -269,6 +273,8 @@ class RScheduler(Scheduler): def being_a_ticker(self, pipe): """ + Elects a ticker. + This is slightly more convoluted than the original but if far more efficient """ @@ -311,7 +317,9 @@ class RScheduler(Scheduler): def assign_tasks(self, db): """ - The real beauty. We don't need to ASSIGN tasks, we just put + The real beauty. + + We don't need to ASSIGN tasks, we just put them into the relevant queue """ st, sd = db.scheduler_task, db.scheduler_task_deps @@ -375,9 +383,6 @@ class RScheduler(Scheduler): all_available = db( (st.status.belongs((QUEUED, ASSIGNED))) & - ((st.times_run < st.repeats) | (st.repeats == 0)) & - (st.start_time <= now) & - ((st.stop_time == None) | (st.stop_time > now)) & (st.next_run_time <= now) & (st.enabled == True) & (st.id.belongs(no_deps)) @@ -437,6 +442,7 @@ class RScheduler(Scheduler): logger.info('TICKER: tasks are %s', x) def pop_task(self, db): + """Lift a task off a queue.""" r_server = self.r_server st = self.db.scheduler_task task = None @@ -533,7 +539,9 @@ class RScheduler(Scheduler): def report_task(self, task, task_report): """ - Needs overwriting only because we need to pop from the + Override. + + Needs it only because we need to pop from the running tasks """ r_server = self.r_server @@ -558,12 +566,12 @@ class RScheduler(Scheduler): logger.debug(' deleting task report in db because of no result') db(sr.id == task.run_id).delete() # if there is a stop_time and the following run would exceed it - is_expired = (task.stop_time - and task.next_run_time > task.stop_time - and True or False) - status = (task.run_again and is_expired and EXPIRED - or task.run_again and not is_expired - and QUEUED or COMPLETED) + is_expired = (task.stop_time and + task.next_run_time > task.stop_time and + True or False) + status = (task.run_again and is_expired and EXPIRED or + task.run_again and not is_expired and + QUEUED or COMPLETED) if task_report.status == COMPLETED: # assigned calculations d = dict(status=status, @@ -579,10 +587,10 @@ class RScheduler(Scheduler): st_mapping = {'FAILED': 'FAILED', 'TIMEOUT': 'TIMEOUT', 'STOPPED': 'FAILED'}[task_report.status] - status = (task.retry_failed - and task.times_failed < task.retry_failed - and QUEUED or task.retry_failed == -1 - and QUEUED or st_mapping) + status = (task.retry_failed and + task.times_failed < task.retry_failed and + QUEUED or task.retry_failed == -1 and + QUEUED or st_mapping) db(st.id == task.task_id).update( times_failed=st.times_failed + 1, next_run_time=task.next_run_time, @@ -596,7 +604,7 @@ class RScheduler(Scheduler): r_server.hdel(running_dict, task.task_id) def wrapped_pop_task(self): - """Commodity function to call `pop_task` and trap exceptions + """Commodity function to call `pop_task` and trap exceptions. If an exception is raised, assume it happened because of database contention and retries `pop_task` after 0.5 seconds """ @@ -620,8 +628,8 @@ class RScheduler(Scheduler): time.sleep(0.5) def get_workers(self, only_ticker=False): - """ Returns a dict holding worker_name : {**columns} - representing all "registered" workers + """Return a dict holding worker_name : {**columns} + representing all "registered" workers. only_ticker returns only the worker running as a TICKER, if there is any """