assign_task takes better care for group_names, thanks Niphlod

This commit is contained in:
mdipierro
2012-10-20 15:27:13 -05:00
parent cb9c2a8ef9
commit 0f446833c0
2 changed files with 37 additions and 36 deletions

View File

@@ -1 +1 @@
Version 2.1.1 (2012-10-20 10:12:01) dev
Version 2.1.1 (2012-10-20 15:27:09) dev

View File

@@ -558,7 +558,7 @@ class Scheduler(MetaScheduler):
def pop_task(self):
now = self.now()
db, ts = self.db, self.db.scheduler_task
db, st = self.db, self.db.scheduler_task
if self.is_a_ticker and self.do_assign_tasks:
#I'm a ticker, and 5 loops passed without reassigning tasks, let's do
#that and loop again
@@ -573,10 +573,10 @@ class Scheduler(MetaScheduler):
logger.error('TICKER: error assigning tasks')
return None
db.commit()
grabbed = db(ts.assigned_worker_name == self.worker_name)(
ts.status == ASSIGNED)
grabbed = db(st.assigned_worker_name == self.worker_name)(
st.status == ASSIGNED)
task = grabbed.select(limitby=(0, 1), orderby=ts.next_run_time).first()
task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first()
if task:
task.update_record(status=RUNNING, last_run_time=now)
#noone will touch my task!
@@ -770,7 +770,7 @@ class Scheduler(MetaScheduler):
def assign_tasks(self):
db = self.db
sw, ts = db.scheduler_worker, db.scheduler_task
sw, st = db.scheduler_worker, db.scheduler_task
now = self.now()
all_workers = db(sw.status == ACTIVE).select()
#build workers as dict of groups
@@ -784,45 +784,46 @@ class Scheduler(MetaScheduler):
else:
wkgroups[gname]['workers'].append(
{'name': w.worker_name, 'c': 0})
#set queued tasks that expired between "runs" (i.e., you turned off)
#the scheduler and then it wasn't expired, but now it is
db(ts.status.belongs(
(QUEUED, ASSIGNED)))(ts.stop_time < now).update(status=EXPIRED)
#set queued tasks that expired between "runs" (i.e., you turned off
#the scheduler): then it wasn't expired, but now it is
db(st.status.belongs(
(QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED)
all_available = db(ts.status.belongs((QUEUED, ASSIGNED)))((ts.times_run < ts.repeats) | (ts.repeats == 0))(ts.start_time <= now)((ts.stop_time is None) | (ts.stop_time > now))(ts.next_run_time <= now)(ts.enabled == True)
limit = len(all_workers) * 50
#if there are a moltitude of tasks, let's assign a maximum of 50 tasks per worker.
all_available = db(st.status.belongs((QUEUED, ASSIGNED)))((st.times_run < st.repeats) | (st.repeats == 0))(st.start_time <= now)((st.stop_time is None) | (st.stop_time > now))(st.next_run_time <= now)(st.enabled == True)
limit = len(all_workers) * (50 / len(wkgroups))
#if there are a moltitude of tasks, let's figure out a maximum of tasks per worker.
#this can be adjusted with some added intelligence (like esteeming how many tasks will
#a worker complete before the ticker reassign them around, but the gain is quite small
#50 is quite a sweet spot also for fast tasks, with sane heartbeat values
#NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less
#than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass.
tasks = all_available.select(
limitby=(0, limit), orderby=ts.next_run_time)
#everything until now is going fine. If a worker is currently elaborating a long task,
#all other tasks assigned to him needs to be reassigned "freely" to other workers, that may be free.
#If a worker is currently elaborating a long task, all other tasks assigned
#to him needs to be reassigned "freely" to other workers, that may be free.
#this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability
#let's freeze it up
db.commit()
#let's break up the queue evenly among workers
for task in tasks:
gname = task.group_name
ws = wkgroups.get(gname)
if ws:
counter = 0
myw = 0
for i, w in enumerate(ws['workers']):
if w['c'] < counter:
myw = i
counter = w['c']
d = dict(status=ASSIGNED,
assigned_worker_name=wkgroups[gname]['workers'][myw]['name'])
if not task.task_name:
d['task_name'] = task.function_name
task.update_record(**d)
wkgroups[gname]['workers'][myw]['c'] += 1
for group in wkgroups.keys():
tasks = all_available(st.group_name == group).select(
limitby=(0, limit), orderby=st.next_run_time)
#let's break up the queue evenly among workers
for task in tasks:
gname = task.group_name
ws = wkgroups.get(gname)
if ws:
counter = 0
myw = 0
for i, w in enumerate(ws['workers']):
if w['c'] < counter:
myw = i
counter = w['c']
d = dict(status=ASSIGNED,
assigned_worker_name=wkgroups[gname]['workers'][myw]['name'])
if not task.task_name:
d['task_name'] = task.function_name
task.update_record(**d)
wkgroups[gname]['workers'][myw]['c'] += 1
db.commit()
#I didn't report tasks but I'm working nonetheless!!!!