new (better) scheduler, thanks niphlod and szimszon

This commit is contained in:
Massimo DiPierro
2012-06-04 18:49:35 -05:00
parent dbd7c8e065
commit efdabad37e
2 changed files with 17 additions and 21 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.00.0 (2012-06-04 18:45:13) dev
Version 2.00.0 (2012-06-04 18:49:33) dev
+16 -20
View File
@@ -412,27 +412,16 @@ class Scheduler(MetaScheduler):
except:
db.rollback()
logging.error('TICKER: error assigning tasks')
#I didn't report tasks but I'm working nonetheless!!!!
self.empty_runs = 0
return None
grabbed = db(ts.assigned_worker_name==self.worker_name)\
(ts.status==ASSIGNED)
task_id = grabbed._select(ts.id, limitby=(0,1), orderby=ts.next_run_time)
updated = db(
ts.id.belongs(task_id)
).update(status=RUNNING,last_run_time=now) #reduces collisions?
#noone will touch my task!
db.commit()
if updated:
logging.debug(' work to do %s' % updated)
task = db(ts.assigned_worker_name==self.worker_name)\
(ts.status==RUNNING).select().first()
if not task:
#it's very likely (almost impossible) that this will happen.
#please report any abnormal activity on web2py-users or file a bug
#about new scheduler on http://code.google.com/p/web2py/issues/list
logging.error('Something is not going on nicely, someone stealed my task!')
return None
task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first()
if task:
task.update_record(status=RUNNING,last_run_time=now)
#noone will touch my task!
db.commit()
logging.debug(' work to do %s' % task.id)
else:
logging.debug('nothing to do')
return None
@@ -517,6 +506,10 @@ class Scheduler(MetaScheduler):
# delete inactive workers
logging.debug(' freeing workers that have not sent heartbeat')
inactive_workers = db(sw.last_heartbeat<expiration)
db(st.assigned_worker_name.belongs(
inactive_workers._select(sw.worker_name)))\
(st.status == RUNNING)\
.update(assigned_worker_name='',status=QUEUED)
inactive_workers.delete()
self.is_a_ticker = self.being_a_ticker()
self.do_assign_tasks = True
@@ -576,8 +569,11 @@ class Scheduler(MetaScheduler):
worker = workers[i % len(workers)]
task.update_record(status=ASSIGNED, assigned_worker_name=workers[i % len(workers)])
db.commit()
logging.debug('TICKER: workers are %s' % len(workers))
logging.debug('TICKER: tasks are %s' % len(tasks))
#I didn't report tasks but I'm working nonetheless!!!!
if len(tasks) > 0:
self.empty_runs = 0
logging.info('TICKER: workers are %s' % len(workers))
logging.info('TICKER: tasks are %s' % len(tasks))
def sleep(self):
time.sleep(self.heartbeat) # should only sleep until next available task