diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 79d22c0f..05567592 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -804,6 +804,7 @@ class Scheduler(MetaScheduler): Field('group_name', default='main'), Field('status', requires=IS_IN_SET(TASK_STATUS), default=QUEUED, writable=False), + Field('broadcast', 'boolean', default=False), Field('function_name', requires=IS_IN_SET(sorted(self.tasks.keys())) if self.tasks else DEFAULT), @@ -1358,23 +1359,48 @@ class Scheduler(MetaScheduler): gname = task.group_name ws = wkgroups.get(gname) if ws: - counter = 0 - myw = 0 - for i, w in enumerate(ws['workers']): - if w['c'] < counter: - myw = i - counter = w['c'] - assigned_wn = wkgroups[gname]['workers'][myw]['name'] - d = dict( - status=ASSIGNED, - assigned_worker_name=assigned_wn - ) - db( - (st.id == task.id) & - (st.status.belongs((QUEUED, ASSIGNED))) - ).update(**d) - wkgroups[gname]['workers'][myw]['c'] += 1 - db.commit() + if task.broadcast: + for worker in ws['workers']: + new_task = db.scheduler_task.insert( + application_name = task.application_name, + task_name = task.task_name, + group_name = task.group_name, + status = ASSIGNED, + broadcast = False, + function_name = task.function_name, + args = task.args, + start_time = now, + repeats = 1, + retry_failed = task.retry_failed, + sync_output = task.sync_output, + assigned_worker_name = worker['name']) + if task.period: + next_run_time = now+datetime.timedelta(seconds=task.period) + else: + # must be cronline + raise NotImplementedError + db(st.id == task.id).update(times_run=task.times_run+1, + next_run_time=next_run_time, + last_run_time=now) + db.commit() + else: + counter = 0 + myw = 0 + for i, w in enumerate(ws['workers']): + if w['c'] < counter: + myw = i + counter = w['c'] + assigned_wn = wkgroups[gname]['workers'][myw]['name'] + d = dict( + status=ASSIGNED, + assigned_worker_name=assigned_wn + ) + db( + (st.id == task.id) & + (st.status.belongs((QUEUED, ASSIGNED))) + ).update(**d) + wkgroups[gname]['workers'][myw]['c'] += 1 + db.commit() # I didn't report tasks but I'm working nonetheless!!!! if x > 0: self.w_stats.empty_runs = 0