From de2337dfe3e6587e5f31b7082961b9fa117c2cd5 Mon Sep 17 00:00:00 2001 From: mdipierro Date: Thu, 18 Oct 2012 18:20:51 -0500 Subject: [PATCH] scheduler.queue_task, thank Niphlod --- VERSION | 2 +- gluon/dal.py | 2 +- gluon/scheduler.py | 41 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/VERSION b/VERSION index 0d8e203a..53625f35 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.1.1 (2012-10-18 13:42:45) dev +Version 2.1.1 (2012-10-18 18:20:45) dev diff --git a/gluon/dal.py b/gluon/dal.py index 087be33b..05d054ae 100644 --- a/gluon/dal.py +++ b/gluon/dal.py @@ -7815,7 +7815,7 @@ class Table(object): for key,value in fields.iteritems(): value,error = self[key].validate(value) if error: - response.errors[key] = error + response.errors[key] = "%s" % error else: new_fields[key] = value if not response.errors: diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 6a9d3ce5..00bc2650 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -386,7 +386,7 @@ class TYPE(object): def __init__(self,myclass=list,parse=False): self.myclass = myclass - self.parse=parse + self.parse = parse def __call__(self,value): from gluon import current @@ -805,13 +805,29 @@ class Scheduler(MetaScheduler): def sleep(self): time.sleep(self.heartbeat*self.worker_status[1]) # should only sleep until next available task - def queue_task(self, function, args=[], vars={}, **kwargs): + def queue_task(self, function, pargs=[], pvars={}, **kwargs): + """ + Queue tasks. This takes care of handling the validation of all + values. + :param function: the function (anything callable with a __name__) + :param pargs: "raw" args to be passed to the function. Automatically + jsonified. + :param pvars: "raw" kwargs to be passed to the function. Automatically + jsonified + :param kwargs: all the scheduler_task columns. args and vars here should be + in json format already, they will override pargs and pvars + + returns a dict just as a normal validate_and_insert, plus a uuid key holding + the uuid of the queued task. If validation is not passed, both id and uuid + will be None, and you'll get an "error" dict holding the errors found. + """ if hasattr(function, '__name__'): function = function.__name__ - targs = 'args' in kwargs and kwargs.pop('args') or dumps(args) - tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(vars) + targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs) + tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars) tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid() tname = 'task_name' in kwargs and kwargs.pop('task_name') or function + print 'a', targs rtn = self.db.scheduler_task.validate_and_insert( function_name=function, task_name=tname, @@ -826,6 +842,21 @@ class Scheduler(MetaScheduler): return rtn def task_status(self, ref, output=False): + """ + Shortcut for task status retrieval + + :param ref: can be + - integer --> lookup will be done by scheduler_task.id + - string --> lookup will be done by scheduler_task.uuid + - query --> lookup as you wish (as in db.scheduler_task.task_name == 'test1') + :param output: fetch also the scheduler_run record + + Returns a single Row object, for the last queued task + If output == True, returns also the last scheduler_run record + scheduler_run record is fetched by a left join, so it can + have all fields == None + + """ from gluon.dal import Query sr, st = self.db.scheduler_run, self.db.scheduler_task if isinstance(ref, int): @@ -910,7 +941,7 @@ def main(): tasks = {} group_names = [x.strip() for x in options.group_names.split(',')] - logger.getLogger().setLevel(options.logger_level) + logging.getLogger().setLevel(options.logger_level) print 'groups for this worker: '+', '.join(group_names) print 'connecting to database in folder: ' + options.db_folder or './'