latest scheduler

This commit is contained in:
mdipierro
2012-07-11 18:16:57 -05:00
parent 748028a7f4
commit e976bbe66b
2 changed files with 21 additions and 8 deletions

View File

@@ -1 +1 @@
Version 2.00.0 (2012-07-11 18:10:46) dev
Version 2.00.0 (2012-07-11 18:16:55) dev

View File

@@ -71,6 +71,7 @@ import socket
import datetime
import logging
import optparse
import types
if 'WEB2PY_PATH' in os.environ:
sys.path.append(os.environ['WEB2PY_PATH'])
@@ -105,6 +106,10 @@ SECONDS = 1
HEARTBEAT = 3*SECONDS
MAXHIBERNATION = 10
CALLABLETYPES = (types.LambdaType, types.FunctionType,
types.BuiltinFunctionType,
types.MethodType, types.BuiltinMethodType)
class Task(object):
def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs):
logging.debug(' new task allocated: %s.%s' % (app,function))
@@ -182,9 +187,14 @@ def executor(queue,task):
logging.getLogger().setLevel(level)
scheduler = current._scheduler
f = task.function
# First look for the func in tasks, else look in models
_function = current._scheduler.tasks.get(f) or _env.get(f)
assert _function, "Function %s not found in scheduler's environment"
functions = current._scheduler.tasks
if not functions:
#look into env
_function = _env.get(f)
else:
_function = functions.get(f)
if not isinstance(_function, CALLABLETYPES):
raise NameError("name '%s' not found in scheduler's environment" % f)
globals().update(_env)
args = loads(task.args)
vars = loads(task.vars, object_hook=_decode_dict)
@@ -337,7 +347,7 @@ class TYPE(object):
return (value,current.T('Not of type: %s') % self.myclass)
class Scheduler(MetaScheduler):
def __init__(self,db,tasks={},migrate=True,
def __init__(self,db,tasks=None,migrate=True,
worker_name=None,group_names=None,heartbeat=HEARTBEAT,
max_empty_runs=0, discard_results=False):
@@ -372,7 +382,7 @@ class Scheduler(MetaScheduler):
'scheduler_task',
Field('application_name',requires=IS_NOT_EMPTY(),
default=None,writable=False),
Field('task_name',requires=IS_NOT_EMPTY()),
Field('task_name',default=None),
Field('group_name',default='main',writable=False),
Field('status',requires=IS_IN_SET(TASK_STATUS),
default=QUEUED,writable=False),
@@ -689,8 +699,11 @@ class Scheduler(MetaScheduler):
if w['c'] < counter:
myw = i
counter = w['c']
task.update_record(status=ASSIGNED,
assigned_worker_name=wkgroups[gname]['workers'][myw]['name'])
d = dict(status=ASSIGNED,
assigned_worker_name=wkgroups[gname]['workers'][myw]['name'])
if not task.task_name:
d['task_name'] = task.function_name
task.update_record(**d)
wkgroups[gname]['workers'][myw]['c'] += 1
db.commit()