diff --git a/VERSION b/VERSION index 3d66c2df..0c966fba 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.00.0 (2012-07-11 18:10:46) dev +Version 2.00.0 (2012-07-11 18:16:55) dev diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 80c8cda9..9e775d98 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -71,6 +71,7 @@ import socket import datetime import logging import optparse +import types if 'WEB2PY_PATH' in os.environ: sys.path.append(os.environ['WEB2PY_PATH']) @@ -105,6 +106,10 @@ SECONDS = 1 HEARTBEAT = 3*SECONDS MAXHIBERNATION = 10 +CALLABLETYPES = (types.LambdaType, types.FunctionType, + types.BuiltinFunctionType, + types.MethodType, types.BuiltinMethodType) + class Task(object): def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs): logging.debug(' new task allocated: %s.%s' % (app,function)) @@ -182,9 +187,14 @@ def executor(queue,task): logging.getLogger().setLevel(level) scheduler = current._scheduler f = task.function - # First look for the func in tasks, else look in models - _function = current._scheduler.tasks.get(f) or _env.get(f) - assert _function, "Function %s not found in scheduler's environment" + functions = current._scheduler.tasks + if not functions: + #look into env + _function = _env.get(f) + else: + _function = functions.get(f) + if not isinstance(_function, CALLABLETYPES): + raise NameError("name '%s' not found in scheduler's environment" % f) globals().update(_env) args = loads(task.args) vars = loads(task.vars, object_hook=_decode_dict) @@ -337,7 +347,7 @@ class TYPE(object): return (value,current.T('Not of type: %s') % self.myclass) class Scheduler(MetaScheduler): - def __init__(self,db,tasks={},migrate=True, + def __init__(self,db,tasks=None,migrate=True, worker_name=None,group_names=None,heartbeat=HEARTBEAT, max_empty_runs=0, discard_results=False): @@ -372,7 +382,7 @@ class Scheduler(MetaScheduler): 'scheduler_task', Field('application_name',requires=IS_NOT_EMPTY(), default=None,writable=False), - Field('task_name',requires=IS_NOT_EMPTY()), + Field('task_name',default=None), Field('group_name',default='main',writable=False), Field('status',requires=IS_IN_SET(TASK_STATUS), default=QUEUED,writable=False), @@ -689,8 +699,11 @@ class Scheduler(MetaScheduler): if w['c'] < counter: myw = i counter = w['c'] - task.update_record(status=ASSIGNED, - assigned_worker_name=wkgroups[gname]['workers'][myw]['name']) + d = dict(status=ASSIGNED, + assigned_worker_name=wkgroups[gname]['workers'][myw]['name']) + if not task.task_name: + d['task_name'] = task.function_name + task.update_record(**d) wkgroups[gname]['workers'][myw]['c'] += 1 db.commit()