diff --git a/VERSION b/VERSION index 291cb1bc..827cea5a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.1.1 (2012-10-20 10:12:01) dev +Version 2.1.1 (2012-10-20 15:27:09) dev diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 1e986ad8..0495d01c 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -558,7 +558,7 @@ class Scheduler(MetaScheduler): def pop_task(self): now = self.now() - db, ts = self.db, self.db.scheduler_task + db, st = self.db, self.db.scheduler_task if self.is_a_ticker and self.do_assign_tasks: #I'm a ticker, and 5 loops passed without reassigning tasks, let's do #that and loop again @@ -573,10 +573,10 @@ class Scheduler(MetaScheduler): logger.error('TICKER: error assigning tasks') return None db.commit() - grabbed = db(ts.assigned_worker_name == self.worker_name)( - ts.status == ASSIGNED) + grabbed = db(st.assigned_worker_name == self.worker_name)( + st.status == ASSIGNED) - task = grabbed.select(limitby=(0, 1), orderby=ts.next_run_time).first() + task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first() if task: task.update_record(status=RUNNING, last_run_time=now) #noone will touch my task! @@ -770,7 +770,7 @@ class Scheduler(MetaScheduler): def assign_tasks(self): db = self.db - sw, ts = db.scheduler_worker, db.scheduler_task + sw, st = db.scheduler_worker, db.scheduler_task now = self.now() all_workers = db(sw.status == ACTIVE).select() #build workers as dict of groups @@ -784,45 +784,46 @@ class Scheduler(MetaScheduler): else: wkgroups[gname]['workers'].append( {'name': w.worker_name, 'c': 0}) - #set queued tasks that expired between "runs" (i.e., you turned off) - #the scheduler and then it wasn't expired, but now it is - db(ts.status.belongs( - (QUEUED, ASSIGNED)))(ts.stop_time < now).update(status=EXPIRED) + #set queued tasks that expired between "runs" (i.e., you turned off + #the scheduler): then it wasn't expired, but now it is + db(st.status.belongs( + (QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED) - all_available = db(ts.status.belongs((QUEUED, ASSIGNED)))((ts.times_run < ts.repeats) | (ts.repeats == 0))(ts.start_time <= now)((ts.stop_time is None) | (ts.stop_time > now))(ts.next_run_time <= now)(ts.enabled == True) - - limit = len(all_workers) * 50 - #if there are a moltitude of tasks, let's assign a maximum of 50 tasks per worker. + all_available = db(st.status.belongs((QUEUED, ASSIGNED)))((st.times_run < st.repeats) | (st.repeats == 0))(st.start_time <= now)((st.stop_time is None) | (st.stop_time > now))(st.next_run_time <= now)(st.enabled == True) + limit = len(all_workers) * (50 / len(wkgroups)) + #if there are a moltitude of tasks, let's figure out a maximum of tasks per worker. #this can be adjusted with some added intelligence (like esteeming how many tasks will #a worker complete before the ticker reassign them around, but the gain is quite small #50 is quite a sweet spot also for fast tasks, with sane heartbeat values #NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less #than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass. - tasks = all_available.select( - limitby=(0, limit), orderby=ts.next_run_time) - #everything until now is going fine. If a worker is currently elaborating a long task, - #all other tasks assigned to him needs to be reassigned "freely" to other workers, that may be free. + + #If a worker is currently elaborating a long task, all other tasks assigned + #to him needs to be reassigned "freely" to other workers, that may be free. #this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability + #let's freeze it up db.commit() - #let's break up the queue evenly among workers - - for task in tasks: - gname = task.group_name - ws = wkgroups.get(gname) - if ws: - counter = 0 - myw = 0 - for i, w in enumerate(ws['workers']): - if w['c'] < counter: - myw = i - counter = w['c'] - d = dict(status=ASSIGNED, - assigned_worker_name=wkgroups[gname]['workers'][myw]['name']) - if not task.task_name: - d['task_name'] = task.function_name - task.update_record(**d) - wkgroups[gname]['workers'][myw]['c'] += 1 + for group in wkgroups.keys(): + tasks = all_available(st.group_name == group).select( + limitby=(0, limit), orderby=st.next_run_time) + #let's break up the queue evenly among workers + for task in tasks: + gname = task.group_name + ws = wkgroups.get(gname) + if ws: + counter = 0 + myw = 0 + for i, w in enumerate(ws['workers']): + if w['c'] < counter: + myw = i + counter = w['c'] + d = dict(status=ASSIGNED, + assigned_worker_name=wkgroups[gname]['workers'][myw]['name']) + if not task.task_name: + d['task_name'] = task.function_name + task.update_record(**d) + wkgroups[gname]['workers'][myw]['c'] += 1 db.commit() #I didn't report tasks but I'm working nonetheless!!!!