From 71fae9efce193ece77697e2e7d2fa03edc42346c Mon Sep 17 00:00:00 2001 From: mdipierro Date: Mon, 25 Jun 2012 16:36:20 -0500 Subject: [PATCH] improved scheduler, thanks Niphlod --- VERSION | 2 +- gluon/scheduler.py | 161 ++++++++++++--------------------------------- 2 files changed, 43 insertions(+), 120 deletions(-) diff --git a/VERSION b/VERSION index db185464..6e59ed7b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.00.0 (2012-06-25 14:40:17) dev +Version 2.00.0 (2012-06-25 16:36:15) dev diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 205d39c5..ed4d8879 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -1,3 +1,5 @@ +#### WORK IN PROGRESS... NOT SUPPOSED TO WORK YET + USAGE = """ ## Example @@ -17,8 +19,6 @@ scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2)) ## run worker nodes with: cd web2py - python web2py.py -K myapp -or python gluon/scheduler.py -u sqlite://storage.sqlite \ -f applications/myapp/databases/ \ -t mytasks.py @@ -37,23 +37,7 @@ http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0 ## view workers http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0 -## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put the -## following into /etc/init/web2py-scheduler.conf: -## (This assumes your web2py instance is installed in 's home directory, -## running as , with app , on network interface eth0.) - -description "web2py task scheduler" -start on (local-filesystems and net-device-up IFACE=eth0) -stop on shutdown -respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds. -exec sudo -u python /home//web2py/web2py.py -K -respawn - -## You can then start/stop/restart/check status of the daemon with: -sudo start web2py-scheduler -sudo stop web2py-scheduler -sudo restart web2py-scheduler -sudo status web2py-scheduler +## Comments """ import os @@ -334,8 +318,6 @@ class Scheduler(MetaScheduler): self.heartbeat = heartbeat self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid()) self.max_empty_runs = max_empty_runs - self.is_a_ticker = False - self.do_assign_tasks = False from gluon import current current._scheduler = self @@ -361,7 +343,7 @@ class Scheduler(MetaScheduler): Field('enabled','boolean',default=True), Field('start_time','datetime',default=now), Field('next_run_time','datetime',default=now), - Field('stop_time','datetime',default=None), + Field('stop_time','datetime',default=now+datetime.timedelta(days=1)), Field('repeats','integer',default=1,comment="0=unlimted"), Field('period','integer',default=60,comment='seconds'), Field('timeout','integer',default=60,comment='seconds'), @@ -390,7 +372,6 @@ class Scheduler(MetaScheduler): Field('first_heartbeat','datetime'), Field('last_heartbeat','datetime'), Field('status',requires=IS_IN_SET(WORKER_STATUS)), - Field('is_ticker', 'boolean', default=False), migrate=migrate) db.commit() @@ -400,30 +381,34 @@ class Scheduler(MetaScheduler): def pop_task(self): now = datetime.datetime.now() db, ts = self.db, self.db.scheduler_task - if self.is_a_ticker and self.do_assign_tasks: - #I'm a ticker, and 5 loops passed without reassigning tasks, let's do - #that and loop again - db.commit() #?don't know if it's useful, let's be completely sure - while True: - try: - self.assign_tasks() - db.commit() - break - except: - db.rollback() - logging.error('TICKER: error assigning tasks') - return None - grabbed = db(ts.assigned_worker_name==self.worker_name)\ - (ts.status==ASSIGNED) - - task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first() - if task: - task.update_record(status=RUNNING,last_run_time=now) - #noone will touch my task! + try: + logging.debug(' grabbing all queued tasks') + all_available = db(ts.status.belongs((QUEUED,RUNNING)))\ + ((ts.times_runnow)\ + (ts.next_run_time<=now)\ + (ts.enabled==True)\ + (ts.group_name.belongs(self.group_names))\ + (ts.assigned_worker_name.belongs((None,'',self.worker_name))) #None? + number_grabbed = all_available.update( + assigned_worker_name=self.worker_name,status=ASSIGNED) db.commit() - logging.debug(' work to do %s' % task.id) + except: + number_grabbed = None + db.rollback() + if number_grabbed: + logging.debug(' grabbed %s tasks' % number_grabbed) + grabbed = db(ts.assigned_worker_name==self.worker_name)\ + (ts.status==ASSIGNED) + task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first() + + logging.debug(' releasing all but one (running)') + if task: + task.update_record(status=RUNNING,last_run_time=now) + grabbed.update(assigned_worker_name='',status=QUEUED) + db.commit() else: - logging.debug('nothing to do') return None next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period) times_run = task.times_run + 1 @@ -468,13 +453,11 @@ class Scheduler(MetaScheduler): if task_report.status == COMPLETED: d = dict(status = task.run_again and QUEUED or COMPLETED, next_run_time = task.next_run_time, - times_run = task.times_run) - #I'd like to know who worked my task, reviewing some logs... - #,assigned_worker_name = '') + times_run = task.times_run, + assigned_worker_name = '') else: d = dict( - #same as before... - #assigned_worker_name = '', + assigned_worker_name = '', status = {'FAILED':'FAILED', 'TIMEOUT':'TIMEOUT', 'STOPPED':'QUEUED'}[task_report.status]) @@ -499,82 +482,20 @@ class Scheduler(MetaScheduler): .update(last_heartbeat = now, status = ACTIVE): sw.insert(status = ACTIVE,worker_name = self.worker_name, first_heartbeat = now,last_heartbeat = now) - - self.do_assign_tasks = False - if counter % 5 == 0: - try: - # delete inactive workers - logging.debug(' freeing workers that have not sent heartbeat') - inactive_workers = db(sw.last_heartbeat self.worker_name) & (sw.is_ticker == True)).select().first() - if not ticker: - db(sw.worker_name == self.worker_name).update(is_ticker = True) - logging.info("TICKER: I'm a ticker (%s)" % self.worker_name) - return True - else: - logging.info("%s is a ticker, I'm a poor worker" % ticker.worker_name) - return False - - def assign_tasks(self): - db = self.db - sw, ts = db.scheduler_worker, db.scheduler_task - now = datetime.datetime.now() - all_workers = db(sw.id>0).select() - workers = [row.worker_name for row in all_workers] - all_available = db(ts.status.belongs((QUEUED,ASSIGNED)))\ - ((ts.times_runnow))\ - (ts.next_run_time<=now)\ - (ts.enabled==True)\ - (ts.group_name.belongs(self.group_names)) #\ - #(ts.assigned_worker_name <> self.worker_name) - limit = len(workers) * 50 - #if there are a moltitude of tasks, let's assign a maximum of 50 tasks per worker. - #this can be adjusted with some added intelligence (like esteeming how many tasks will - #a worker complete before the ticker reassign them around, but the gain is quite small - #50 is quite a sweet spot also for fast tasks, with sane heartbeat values - #NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less - #than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass. - tasks = all_available.select(limitby=(0,limit), orderby=ts.next_run_time) - #everything until now is going fine. If a worker is currently elaborating a long task, - #all other tasks assigned to him needs to be reassigned "freely" to other workers, that may be free. - #this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability - #let's freeze it up - db.commit() - #it's useful to reduce computation times of reassigning tasks if there is only one worker around - if len(workers) == 1: - all_available.update(status=ASSIGNED, assigned_worker_name=workers[0]) - #let's break up the queue evenly among workers - else: - for i, task in enumerate(tasks): - worker = workers[i % len(workers)] - task.update_record(status=ASSIGNED, assigned_worker_name=workers[i % len(workers)]) - db.commit() - #I didn't report tasks but I'm working nonetheless!!!! - if len(tasks) > 0: - self.empty_runs = 0 - logging.info('TICKER: workers are %s' % len(workers)) - logging.info('TICKER: tasks are %s' % len(tasks)) - def sleep(self): time.sleep(self.heartbeat) # should only sleep until next available task @@ -649,3 +570,5 @@ def main(): if __name__=='__main__': main() + +