diff --git a/VERSION b/VERSION index 6e59ed7b..8c27771a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.00.0 (2012-06-25 16:36:15) dev +Version 2.00.0 (2012-06-25 16:48:59) dev diff --git a/gluon/scheduler.py b/gluon/scheduler.py index ed4d8879..205d39c5 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -1,5 +1,3 @@ -#### WORK IN PROGRESS... NOT SUPPOSED TO WORK YET - USAGE = """ ## Example @@ -19,6 +17,8 @@ scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2)) ## run worker nodes with: cd web2py + python web2py.py -K myapp +or python gluon/scheduler.py -u sqlite://storage.sqlite \ -f applications/myapp/databases/ \ -t mytasks.py @@ -37,7 +37,23 @@ http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0 ## view workers http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0 -## Comments +## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put the +## following into /etc/init/web2py-scheduler.conf: +## (This assumes your web2py instance is installed in 's home directory, +## running as , with app , on network interface eth0.) + +description "web2py task scheduler" +start on (local-filesystems and net-device-up IFACE=eth0) +stop on shutdown +respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds. +exec sudo -u python /home//web2py/web2py.py -K +respawn + +## You can then start/stop/restart/check status of the daemon with: +sudo start web2py-scheduler +sudo stop web2py-scheduler +sudo restart web2py-scheduler +sudo status web2py-scheduler """ import os @@ -318,6 +334,8 @@ class Scheduler(MetaScheduler): self.heartbeat = heartbeat self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid()) self.max_empty_runs = max_empty_runs + self.is_a_ticker = False + self.do_assign_tasks = False from gluon import current current._scheduler = self @@ -343,7 +361,7 @@ class Scheduler(MetaScheduler): Field('enabled','boolean',default=True), Field('start_time','datetime',default=now), Field('next_run_time','datetime',default=now), - Field('stop_time','datetime',default=now+datetime.timedelta(days=1)), + Field('stop_time','datetime',default=None), Field('repeats','integer',default=1,comment="0=unlimted"), Field('period','integer',default=60,comment='seconds'), Field('timeout','integer',default=60,comment='seconds'), @@ -372,6 +390,7 @@ class Scheduler(MetaScheduler): Field('first_heartbeat','datetime'), Field('last_heartbeat','datetime'), Field('status',requires=IS_IN_SET(WORKER_STATUS)), + Field('is_ticker', 'boolean', default=False), migrate=migrate) db.commit() @@ -381,34 +400,30 @@ class Scheduler(MetaScheduler): def pop_task(self): now = datetime.datetime.now() db, ts = self.db, self.db.scheduler_task - try: - logging.debug(' grabbing all queued tasks') - all_available = db(ts.status.belongs((QUEUED,RUNNING)))\ - ((ts.times_runnow)\ - (ts.next_run_time<=now)\ - (ts.enabled==True)\ - (ts.group_name.belongs(self.group_names))\ - (ts.assigned_worker_name.belongs((None,'',self.worker_name))) #None? - number_grabbed = all_available.update( - assigned_worker_name=self.worker_name,status=ASSIGNED) - db.commit() - except: - number_grabbed = None - db.rollback() - if number_grabbed: - logging.debug(' grabbed %s tasks' % number_grabbed) - grabbed = db(ts.assigned_worker_name==self.worker_name)\ - (ts.status==ASSIGNED) - task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first() + if self.is_a_ticker and self.do_assign_tasks: + #I'm a ticker, and 5 loops passed without reassigning tasks, let's do + #that and loop again + db.commit() #?don't know if it's useful, let's be completely sure + while True: + try: + self.assign_tasks() + db.commit() + break + except: + db.rollback() + logging.error('TICKER: error assigning tasks') + return None + grabbed = db(ts.assigned_worker_name==self.worker_name)\ + (ts.status==ASSIGNED) - logging.debug(' releasing all but one (running)') - if task: - task.update_record(status=RUNNING,last_run_time=now) - grabbed.update(assigned_worker_name='',status=QUEUED) - db.commit() + task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first() + if task: + task.update_record(status=RUNNING,last_run_time=now) + #noone will touch my task! + db.commit() + logging.debug(' work to do %s' % task.id) else: + logging.debug('nothing to do') return None next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period) times_run = task.times_run + 1 @@ -453,11 +468,13 @@ class Scheduler(MetaScheduler): if task_report.status == COMPLETED: d = dict(status = task.run_again and QUEUED or COMPLETED, next_run_time = task.next_run_time, - times_run = task.times_run, - assigned_worker_name = '') + times_run = task.times_run) + #I'd like to know who worked my task, reviewing some logs... + #,assigned_worker_name = '') else: d = dict( - assigned_worker_name = '', + #same as before... + #assigned_worker_name = '', status = {'FAILED':'FAILED', 'TIMEOUT':'TIMEOUT', 'STOPPED':'QUEUED'}[task_report.status]) @@ -482,20 +499,82 @@ class Scheduler(MetaScheduler): .update(last_heartbeat = now, status = ACTIVE): sw.insert(status = ACTIVE,worker_name = self.worker_name, first_heartbeat = now,last_heartbeat = now) - if counter % 10 == 0: - # deallocate jobs assigned to inactive workers and requeue them - logging.debug(' freeing workers that have not sent heartbeat') - inactive_workers = db(sw.last_heartbeat self.worker_name) & (sw.is_ticker == True)).select().first() + if not ticker: + db(sw.worker_name == self.worker_name).update(is_ticker = True) + logging.info("TICKER: I'm a ticker (%s)" % self.worker_name) + return True + else: + logging.info("%s is a ticker, I'm a poor worker" % ticker.worker_name) + return False + + def assign_tasks(self): + db = self.db + sw, ts = db.scheduler_worker, db.scheduler_task + now = datetime.datetime.now() + all_workers = db(sw.id>0).select() + workers = [row.worker_name for row in all_workers] + all_available = db(ts.status.belongs((QUEUED,ASSIGNED)))\ + ((ts.times_runnow))\ + (ts.next_run_time<=now)\ + (ts.enabled==True)\ + (ts.group_name.belongs(self.group_names)) #\ + #(ts.assigned_worker_name <> self.worker_name) + limit = len(workers) * 50 + #if there are a moltitude of tasks, let's assign a maximum of 50 tasks per worker. + #this can be adjusted with some added intelligence (like esteeming how many tasks will + #a worker complete before the ticker reassign them around, but the gain is quite small + #50 is quite a sweet spot also for fast tasks, with sane heartbeat values + #NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less + #than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass. + tasks = all_available.select(limitby=(0,limit), orderby=ts.next_run_time) + #everything until now is going fine. If a worker is currently elaborating a long task, + #all other tasks assigned to him needs to be reassigned "freely" to other workers, that may be free. + #this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability + #let's freeze it up + db.commit() + #it's useful to reduce computation times of reassigning tasks if there is only one worker around + if len(workers) == 1: + all_available.update(status=ASSIGNED, assigned_worker_name=workers[0]) + #let's break up the queue evenly among workers + else: + for i, task in enumerate(tasks): + worker = workers[i % len(workers)] + task.update_record(status=ASSIGNED, assigned_worker_name=workers[i % len(workers)]) + db.commit() + #I didn't report tasks but I'm working nonetheless!!!! + if len(tasks) > 0: + self.empty_runs = 0 + logging.info('TICKER: workers are %s' % len(workers)) + logging.info('TICKER: tasks are %s' % len(tasks)) + def sleep(self): time.sleep(self.heartbeat) # should only sleep until next available task @@ -570,5 +649,3 @@ def main(): if __name__=='__main__': main() - -