fixes #1191
fix is in lines 828-835 . needed to backport total_seconds for py2.6 (694-701). everything else is just pep8.
This commit is contained in:
+46
-35
@@ -215,7 +215,7 @@ class JobGraph(object):
|
||||
nested_dict = dict(
|
||||
(item, (dep - ordered)) for item, dep in nested_dict.items()
|
||||
if item not in ordered
|
||||
)
|
||||
)
|
||||
assert not nested_dict, "A cyclic dependency exists amongst %r" % nested_dict
|
||||
db.commit()
|
||||
return rtn
|
||||
@@ -297,7 +297,7 @@ def executor(queue, task, out):
|
||||
f = task.function
|
||||
functions = current._scheduler.tasks
|
||||
if not functions:
|
||||
#look into env
|
||||
# look into env
|
||||
_function = _env.get(f)
|
||||
else:
|
||||
_function = functions.get(f)
|
||||
@@ -314,7 +314,7 @@ def executor(queue, task, out):
|
||||
vars = loads(task.vars, object_hook=_decode_dict)
|
||||
result = dumps(_function(*args, **vars))
|
||||
else:
|
||||
### for testing purpose only
|
||||
# for testing purpose only
|
||||
result = eval(task.function)(
|
||||
*loads(task.args, object_hook=_decode_dict),
|
||||
**loads(task.vars, object_hook=_decode_dict))
|
||||
@@ -663,7 +663,7 @@ class Scheduler(MetaScheduler):
|
||||
Field('traceback', 'text'),
|
||||
Field('worker_name', default=self.worker_name),
|
||||
migrate=self.__get_migrate('scheduler_run', migrate)
|
||||
)
|
||||
)
|
||||
|
||||
db.define_table(
|
||||
'scheduler_worker',
|
||||
@@ -675,23 +675,30 @@ class Scheduler(MetaScheduler):
|
||||
Field('group_names', 'list:string', default=self.group_names),
|
||||
Field('worker_stats', 'json'),
|
||||
migrate=self.__get_migrate('scheduler_worker', migrate)
|
||||
)
|
||||
)
|
||||
|
||||
db.define_table(
|
||||
'scheduler_task_deps',
|
||||
Field('job_name', default='job_0'),
|
||||
Field('task_parent', 'integer',
|
||||
requires=IS_IN_DB(db, 'scheduler_task.id',
|
||||
'%(task_name)s')
|
||||
),
|
||||
requires=IS_IN_DB(db, 'scheduler_task.id', '%(task_name)s')
|
||||
),
|
||||
Field('task_child', 'reference scheduler_task'),
|
||||
Field('can_visit', 'boolean', default=False),
|
||||
migrate=self.__get_migrate('scheduler_task_deps', migrate)
|
||||
)
|
||||
)
|
||||
|
||||
if migrate is not False:
|
||||
db.commit()
|
||||
|
||||
@staticmethod
|
||||
def total_seconds(td):
|
||||
# backport for py2.6
|
||||
if hasattr(td, 'total_seconds'):
|
||||
return td.total_seconds()
|
||||
else:
|
||||
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10.0 ** 6
|
||||
|
||||
def loop(self, worker_name=None):
|
||||
"""Main loop
|
||||
|
||||
@@ -716,7 +723,7 @@ class Scheduler(MetaScheduler):
|
||||
while True and self.have_heartbeat:
|
||||
if self.w_stats.status == DISABLED:
|
||||
logger.debug('Someone stopped me, sleeping until better'
|
||||
' times come (%s)', self.w_stats.sleep)
|
||||
' times come (%s)', self.w_stats.sleep)
|
||||
self.sleep()
|
||||
continue
|
||||
logger.debug('looping...')
|
||||
@@ -733,7 +740,8 @@ class Scheduler(MetaScheduler):
|
||||
logger.debug('sleeping...')
|
||||
if self.max_empty_runs != 0:
|
||||
logger.debug('empty runs %s/%s',
|
||||
self.w_stats.empty_runs, self.max_empty_runs)
|
||||
self.w_stats.empty_runs,
|
||||
self.max_empty_runs)
|
||||
if self.w_stats.empty_runs >= self.max_empty_runs:
|
||||
logger.info(
|
||||
'empty runs limit reached, killing myself')
|
||||
@@ -789,15 +797,15 @@ class Scheduler(MetaScheduler):
|
||||
now = self.now()
|
||||
st = self.db.scheduler_task
|
||||
if self.is_a_ticker and self.do_assign_tasks:
|
||||
#I'm a ticker, and 5 loops passed without reassigning tasks,
|
||||
#let's do that and loop again
|
||||
# I'm a ticker, and 5 loops passed without reassigning tasks,
|
||||
# let's do that and loop again
|
||||
self.wrapped_assign_tasks(db)
|
||||
return None
|
||||
# ready to process something
|
||||
grabbed = db(
|
||||
(st.assigned_worker_name == self.worker_name) &
|
||||
(st.status == ASSIGNED)
|
||||
)
|
||||
)
|
||||
|
||||
task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first()
|
||||
if task:
|
||||
@@ -817,11 +825,15 @@ class Scheduler(MetaScheduler):
|
||||
if not task.prevent_drift:
|
||||
next_run_time = task.last_run_time + datetime.timedelta(
|
||||
seconds=task.period
|
||||
)
|
||||
else:
|
||||
next_run_time = task.start_time + datetime.timedelta(
|
||||
seconds=task.period * times_run
|
||||
)
|
||||
else:
|
||||
# calc next_run_time based on available slots
|
||||
# see #1191
|
||||
next_run_time = task.start_time
|
||||
secondspassed = self.total_seconds(now - next_run_time)
|
||||
steps = secondspassed // task.period + 1
|
||||
next_run_time += datetime.timedelta(seconds=task.period * steps)
|
||||
|
||||
if times_run < task.repeats or task.repeats == 0:
|
||||
# need to run (repeating task)
|
||||
run_again = True
|
||||
@@ -843,7 +855,7 @@ class Scheduler(MetaScheduler):
|
||||
time.sleep(0.5)
|
||||
db.rollback()
|
||||
logger.info('new task %(id)s "%(task_name)s"'
|
||||
' %(application_name)s.%(function_name)s' % task)
|
||||
' %(application_name)s.%(function_name)s' % task)
|
||||
return Task(
|
||||
app=task.application_name,
|
||||
function=task.function_name,
|
||||
@@ -926,7 +938,7 @@ class Scheduler(MetaScheduler):
|
||||
and QUEUED or task.retry_failed == -1
|
||||
and QUEUED or st_mapping)
|
||||
db(st.id == task.task_id).update(
|
||||
times_failed=db.scheduler_task.times_failed + 1,
|
||||
times_failed=st.times_failed + 1,
|
||||
next_run_time=task.next_run_time,
|
||||
status=status
|
||||
)
|
||||
@@ -980,7 +992,7 @@ class Scheduler(MetaScheduler):
|
||||
# keep sleeping
|
||||
self.w_stats.status = DISABLED
|
||||
logger.debug('........recording heartbeat (%s)',
|
||||
self.w_stats.status)
|
||||
self.w_stats.status)
|
||||
db(sw.worker_name == self.worker_name).update(
|
||||
last_heartbeat=now,
|
||||
worker_stats=self.w_stats)
|
||||
@@ -997,7 +1009,7 @@ class Scheduler(MetaScheduler):
|
||||
logger.info('Asked to kill the current task')
|
||||
self.terminate_process()
|
||||
logger.debug('........recording heartbeat (%s)',
|
||||
self.w_stats.status)
|
||||
self.w_stats.status)
|
||||
db(sw.worker_name == self.worker_name).update(
|
||||
last_heartbeat=now, status=ACTIVE,
|
||||
worker_stats=self.w_stats)
|
||||
@@ -1023,7 +1035,7 @@ class Scheduler(MetaScheduler):
|
||||
db(
|
||||
(st.assigned_worker_name.belongs(dead_workers_name)) &
|
||||
(st.status == RUNNING)
|
||||
).update(assigned_worker_name='', status=QUEUED)
|
||||
).update(assigned_worker_name='', status=QUEUED)
|
||||
dead_workers.delete()
|
||||
try:
|
||||
self.is_a_ticker = self.being_a_ticker()
|
||||
@@ -1108,20 +1120,20 @@ class Scheduler(MetaScheduler):
|
||||
(sd.can_visit == False) &
|
||||
(~sd.task_child.belongs(
|
||||
db(sd.can_visit == False)._select(sd.task_parent)
|
||||
)
|
||||
)
|
||||
)._select(sd.task_child)
|
||||
)
|
||||
)._select(sd.task_child)
|
||||
no_deps = db(
|
||||
(st.status.belongs((QUEUED, ASSIGNED))) &
|
||||
(
|
||||
(sd.id == None) | (st.id.belongs(deps_with_no_deps))
|
||||
|
||||
)
|
||||
)._select(st.id, distinct=True, left=sd.on(
|
||||
(st.id == sd.task_parent) &
|
||||
(sd.can_visit == False)
|
||||
)
|
||||
)
|
||||
)._select(st.id, distinct=True, left=sd.on(
|
||||
(st.id == sd.task_parent) &
|
||||
(sd.can_visit == False)
|
||||
)
|
||||
)
|
||||
|
||||
all_available = db(
|
||||
(st.status.belongs((QUEUED, ASSIGNED))) &
|
||||
@@ -1133,7 +1145,6 @@ class Scheduler(MetaScheduler):
|
||||
(st.id.belongs(no_deps))
|
||||
)
|
||||
|
||||
|
||||
limit = len(all_workers) * (50 / (len(wkgroups) or 1))
|
||||
# if there are a moltitude of tasks, let's figure out a maximum of
|
||||
# tasks per worker. This can be further tuned with some added
|
||||
@@ -1177,7 +1188,7 @@ class Scheduler(MetaScheduler):
|
||||
db(
|
||||
(st.id == task.id) &
|
||||
(st.status.belongs((QUEUED, ASSIGNED)))
|
||||
).update(**d)
|
||||
).update(**d)
|
||||
wkgroups[gname]['workers'][myw]['c'] += 1
|
||||
db.commit()
|
||||
# I didn't report tasks but I'm working nonetheless!!!!
|
||||
@@ -1215,7 +1226,7 @@ class Scheduler(MetaScheduler):
|
||||
self.db(
|
||||
(ws.group_names.contains(group)) &
|
||||
(~ws.status.belongs(exclusion))
|
||||
).update(status=action)
|
||||
).update(status=action)
|
||||
else:
|
||||
for group in group_names:
|
||||
workers = self.db((ws.group_names.contains(group)) &
|
||||
@@ -1300,7 +1311,7 @@ class Scheduler(MetaScheduler):
|
||||
if immediate:
|
||||
self.db(
|
||||
(self.db.scheduler_worker.is_ticker == True)
|
||||
).update(status=PICK)
|
||||
).update(status=PICK)
|
||||
else:
|
||||
rtn.uuid = None
|
||||
return rtn
|
||||
@@ -1350,7 +1361,7 @@ class Scheduler(MetaScheduler):
|
||||
**dict(orderby=orderby,
|
||||
left=left,
|
||||
limitby=(0, 1))
|
||||
).first()
|
||||
).first()
|
||||
if row and output:
|
||||
row.result = row.scheduler_run.run_result and \
|
||||
loads(row.scheduler_run.run_result,
|
||||
|
||||
Reference in New Issue
Block a user