scheduler.queue_task, thank Niphlod

This commit is contained in:
mdipierro
2012-10-18 18:20:51 -05:00
parent e707cfc67c
commit de2337dfe3
3 changed files with 38 additions and 7 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.1.1 (2012-10-18 13:42:45) dev
Version 2.1.1 (2012-10-18 18:20:45) dev
+1 -1
View File
@@ -7815,7 +7815,7 @@ class Table(object):
for key,value in fields.iteritems():
value,error = self[key].validate(value)
if error:
response.errors[key] = error
response.errors[key] = "%s" % error
else:
new_fields[key] = value
if not response.errors:
+36 -5
View File
@@ -386,7 +386,7 @@ class TYPE(object):
def __init__(self,myclass=list,parse=False):
self.myclass = myclass
self.parse=parse
self.parse = parse
def __call__(self,value):
from gluon import current
@@ -805,13 +805,29 @@ class Scheduler(MetaScheduler):
def sleep(self):
time.sleep(self.heartbeat*self.worker_status[1]) # should only sleep until next available task
def queue_task(self, function, args=[], vars={}, **kwargs):
def queue_task(self, function, pargs=[], pvars={}, **kwargs):
"""
Queue tasks. This takes care of handling the validation of all
values.
:param function: the function (anything callable with a __name__)
:param pargs: "raw" args to be passed to the function. Automatically
jsonified.
:param pvars: "raw" kwargs to be passed to the function. Automatically
jsonified
:param kwargs: all the scheduler_task columns. args and vars here should be
in json format already, they will override pargs and pvars
returns a dict just as a normal validate_and_insert, plus a uuid key holding
the uuid of the queued task. If validation is not passed, both id and uuid
will be None, and you'll get an "error" dict holding the errors found.
"""
if hasattr(function, '__name__'):
function = function.__name__
targs = 'args' in kwargs and kwargs.pop('args') or dumps(args)
tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(vars)
targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs)
tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars)
tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid()
tname = 'task_name' in kwargs and kwargs.pop('task_name') or function
print 'a', targs
rtn = self.db.scheduler_task.validate_and_insert(
function_name=function,
task_name=tname,
@@ -826,6 +842,21 @@ class Scheduler(MetaScheduler):
return rtn
def task_status(self, ref, output=False):
"""
Shortcut for task status retrieval
:param ref: can be
- integer --> lookup will be done by scheduler_task.id
- string --> lookup will be done by scheduler_task.uuid
- query --> lookup as you wish (as in db.scheduler_task.task_name == 'test1')
:param output: fetch also the scheduler_run record
Returns a single Row object, for the last queued task
If output == True, returns also the last scheduler_run record
scheduler_run record is fetched by a left join, so it can
have all fields == None
"""
from gluon.dal import Query
sr, st = self.db.scheduler_run, self.db.scheduler_task
if isinstance(ref, int):
@@ -910,7 +941,7 @@ def main():
tasks = {}
group_names = [x.strip() for x in options.group_names.split(',')]
logger.getLogger().setLevel(options.logger_level)
logging.getLogger().setLevel(options.logger_level)
print 'groups for this worker: '+', '.join(group_names)
print 'connecting to database in folder: ' + options.db_folder or './'