From 837ed7fb5ee71509b78c8790a5bcd0be92989ea3 Mon Sep 17 00:00:00 2001 From: Massimo Di Pierro Date: Thu, 24 May 2012 22:31:51 -0500 Subject: [PATCH] new scheduler (not tested yet), thanks Niphlod --- VERSION | 2 +- gluon/scheduler.py | 147 ++++++++++++++++++++++++++++++++------------- 2 files changed, 107 insertions(+), 42 deletions(-) diff --git a/VERSION b/VERSION index a4b941d0..bfd40dcc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.00.0 (2012-05-23 09:27:57) dev +Version 2.00.0 (2012-05-24 22:30:18) dev diff --git a/gluon/scheduler.py b/gluon/scheduler.py index e189c1d8..2f22127d 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -334,6 +334,8 @@ class Scheduler(MetaScheduler): self.heartbeat = heartbeat self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid()) self.max_empty_runs = max_empty_runs + self.is_a_ticker = False + self.do_assign_tasks = False from gluon import current current._scheduler = self @@ -388,6 +390,7 @@ class Scheduler(MetaScheduler): Field('first_heartbeat','datetime'), Field('last_heartbeat','datetime'), Field('status',requires=IS_IN_SET(WORKER_STATUS)), + Field('is_ticker', 'boolean', default=False), migrate=migrate) db.commit() @@ -397,34 +400,41 @@ class Scheduler(MetaScheduler): def pop_task(self): now = datetime.datetime.now() db, ts = self.db, self.db.scheduler_task - try: - logging.debug(' grabbing all queued tasks') - all_available = db(ts.status.belongs((QUEUED,RUNNING)))\ - ((ts.times_runnow)\ - (ts.next_run_time<=now)\ - (ts.enabled==True)\ - (ts.group_name.belongs(self.group_names))\ - (ts.assigned_worker_name.belongs((None,'',self.worker_name))) #None? - number_grabbed = all_available.update( - assigned_worker_name=self.worker_name,status=ASSIGNED) - db.commit() - except: - number_grabbed = None - db.rollback() - if number_grabbed: - logging.debug(' grabbed %s tasks' % number_grabbed) - grabbed = db(ts.assigned_worker_name==self.worker_name)\ - (ts.status==ASSIGNED) - task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first() - - logging.debug(' releasing all but one (running)') - if task: - task.update_record(status=RUNNING,last_run_time=now) - grabbed.update(assigned_worker_name='',status=QUEUED) - db.commit() + if self.is_a_ticker and self.do_assign_tasks: + #I'm a ticker, and 5 loops passed without reassigning tasks, let's do + #that and loop again + db.commit() #?don't know if it's useful, let's be completely sure + while True: + try: + self.assign_tasks() + db.commit() + break + except: + db.rollback() + logging.error('TICKER: error assigning tasks') + #I didn't report tasks but I'm working nonetheless!!!! + self.empty_runs = 0 + return None + grabbed = db(ts.assigned_worker_name==self.worker_name)\ + (ts.status==ASSIGNED) + task_id = grabbed._select(ts.id, limitby=(0,1), orderby=ts.next_run_time) + updated = db( + ts.id.belongs(task_id) + ).update(status=RUNNING,last_run_time=now) #reduces collisions? + #noone will touch my task! + db.commit() + if updated: + logging.debug(' work to do %s' % updated) + task = db(ts.assigned_worker_name==self.worker_name)\ + (ts.status==RUNNING).select().first() + if not task: + #it's very likely (almost impossible) that this will happen. + #please report any abnormal activity on web2py-users or file a bug + #about new scheduler on http://code.google.com/p/web2py/issues/list + logging.error('Something is not going on nicely, someone stealed my task!') + return None else: + logging.debug('nothing to do') return None next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period) times_run = task.times_run + 1 @@ -469,11 +479,13 @@ class Scheduler(MetaScheduler): if task_report.status == COMPLETED: d = dict(status = task.run_again and QUEUED or COMPLETED, next_run_time = task.next_run_time, - times_run = task.times_run, - assigned_worker_name = '') + times_run = task.times_run) + #I'd like to know who worked my task, reviewing some logs... + #,assigned_worker_name = '') else: d = dict( - assigned_worker_name = '', + #same as before... + #assigned_worker_name = '', status = {'FAILED':'FAILED', 'TIMEOUT':'TIMEOUT', 'STOPPED':'QUEUED'}[task_report.status]) @@ -498,20 +510,75 @@ class Scheduler(MetaScheduler): .update(last_heartbeat = now, status = ACTIVE): sw.insert(status = ACTIVE,worker_name = self.worker_name, first_heartbeat = now,last_heartbeat = now) - if counter % 10 == 0: - # deallocate jobs assigned to inactive workers and requeue them - logging.debug(' freeing workers that have not sent heartbeat') - inactive_workers = db(sw.last_heartbeat self.worker_name) & (sw.is_ticker == True)).select().first() + if not ticker: + db(sw.worker_name == self.worker_name).update(is_ticker = True) + logging.info("TICKER: I'm a ticker (%s)" % self.worker_name) + return True + else: + logging.info("%s is a ticker, I'm a poor worker" % ticker.worker_name) + return False + + def assign_tasks(self): + db = self.db + sw, ts = db.scheduler_worker, db.scheduler_task + now = datetime.datetime.now() + all_workers = db(sw.id>0).select() + workers = [row.worker_name for row in all_workers] + all_available = db(ts.status.belongs((QUEUED,ASSIGNED)))\ + ((ts.times_runnow)\ + (ts.next_run_time<=now)\ + (ts.enabled==True)\ + (ts.group_name.belongs(self.group_names)) #\ + #(ts.assigned_worker_name <> self.worker_name) + limit = len(workers) * 50 + #if there are a moltitude of tasks, let's assign a maximum of 50 tasks per worker. + #this can be adjusted with some added intelligence (like esteeming how many tasks will + #a worker complete before the ticker reassign them around, but the gain is quite small + #50 is quite a sweet spot also for fast tasks, with sane heartbeat values + #NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less + #than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass. + tasks = all_available.select(limitby=(0,limit), orderby=ts.next_run_time) + #everything until now is going fine. If a worker is currently elaborating a long task, + #all other tasks assigned to him needs to be reassigned "freely" to other workers, that may be free. + #this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability + #let's freeze it up + db.commit() + #it's useful to reduce computation times of reassigning tasks if there is only one worker around + if len(workers) == 1: + all_available.update(status=ASSIGNED, assigned_worker_name=workers[0]) + #let's break up the queue evenly among workers + else: + for i, task in enumerate(tasks): + worker = workers[i % len(workers)] + task.update_record(status=ASSIGNED, assigned_worker_name=workers[i % len(workers)]) + db.commit() + logging.debug('TICKER: workers are %s' % len(workers)) + logging.debug('TICKER: tasks are %s' % len(tasks)) + def sleep(self): time.sleep(self.heartbeat) # should only sleep until next available task @@ -586,5 +653,3 @@ def main(): if __name__=='__main__': main() - -