added task broadcasting for workers within a group

This commit is contained in:
Francisco Ribeiro
2018-05-09 04:45:08 +01:00
committed by Francisco Ribeiro
parent 7aafd05cbb
commit 32eb1bc27d
+43 -17
View File
@@ -804,6 +804,7 @@ class Scheduler(MetaScheduler):
Field('group_name', default='main'),
Field('status', requires=IS_IN_SET(TASK_STATUS),
default=QUEUED, writable=False),
Field('broadcast', 'boolean', default=False),
Field('function_name',
requires=IS_IN_SET(sorted(self.tasks.keys()))
if self.tasks else DEFAULT),
@@ -1358,23 +1359,48 @@ class Scheduler(MetaScheduler):
gname = task.group_name
ws = wkgroups.get(gname)
if ws:
counter = 0
myw = 0
for i, w in enumerate(ws['workers']):
if w['c'] < counter:
myw = i
counter = w['c']
assigned_wn = wkgroups[gname]['workers'][myw]['name']
d = dict(
status=ASSIGNED,
assigned_worker_name=assigned_wn
)
db(
(st.id == task.id) &
(st.status.belongs((QUEUED, ASSIGNED)))
).update(**d)
wkgroups[gname]['workers'][myw]['c'] += 1
db.commit()
if task.broadcast:
for worker in ws['workers']:
new_task = db.scheduler_task.insert(
application_name = task.application_name,
task_name = task.task_name,
group_name = task.group_name,
status = ASSIGNED,
broadcast = False,
function_name = task.function_name,
args = task.args,
start_time = now,
repeats = 1,
retry_failed = task.retry_failed,
sync_output = task.sync_output,
assigned_worker_name = worker['name'])
if task.period:
next_run_time = now+datetime.timedelta(seconds=task.period)
else:
# must be cronline
raise NotImplementedError
db(st.id == task.id).update(times_run=task.times_run+1,
next_run_time=next_run_time,
last_run_time=now)
db.commit()
else:
counter = 0
myw = 0
for i, w in enumerate(ws['workers']):
if w['c'] < counter:
myw = i
counter = w['c']
assigned_wn = wkgroups[gname]['workers'][myw]['name']
d = dict(
status=ASSIGNED,
assigned_worker_name=assigned_wn
)
db(
(st.id == task.id) &
(st.status.belongs((QUEUED, ASSIGNED)))
).update(**d)
wkgroups[gname]['workers'][myw]['c'] += 1
db.commit()
# I didn't report tasks but I'm working nonetheless!!!!
if x > 0:
self.w_stats.empty_runs = 0