new scheduler (not tested yet), thanks Niphlod

This commit is contained in:
Massimo Di Pierro
2012-05-24 22:31:51 -05:00
parent 2fa604df3a
commit 837ed7fb5e
2 changed files with 107 additions and 42 deletions

View File

@@ -1 +1 @@
Version 2.00.0 (2012-05-23 09:27:57) dev
Version 2.00.0 (2012-05-24 22:30:18) dev

View File

@@ -334,6 +334,8 @@ class Scheduler(MetaScheduler):
self.heartbeat = heartbeat
self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid())
self.max_empty_runs = max_empty_runs
self.is_a_ticker = False
self.do_assign_tasks = False
from gluon import current
current._scheduler = self
@@ -388,6 +390,7 @@ class Scheduler(MetaScheduler):
Field('first_heartbeat','datetime'),
Field('last_heartbeat','datetime'),
Field('status',requires=IS_IN_SET(WORKER_STATUS)),
Field('is_ticker', 'boolean', default=False),
migrate=migrate)
db.commit()
@@ -397,34 +400,41 @@ class Scheduler(MetaScheduler):
def pop_task(self):
now = datetime.datetime.now()
db, ts = self.db, self.db.scheduler_task
try:
logging.debug(' grabbing all queued tasks')
all_available = db(ts.status.belongs((QUEUED,RUNNING)))\
((ts.times_run<ts.repeats)|(ts.repeats==0))\
(ts.start_time<=now)\
(ts.stop_time>now)\
(ts.next_run_time<=now)\
(ts.enabled==True)\
(ts.group_name.belongs(self.group_names))\
(ts.assigned_worker_name.belongs((None,'',self.worker_name))) #None?
number_grabbed = all_available.update(
assigned_worker_name=self.worker_name,status=ASSIGNED)
db.commit()
except:
number_grabbed = None
db.rollback()
if number_grabbed:
logging.debug(' grabbed %s tasks' % number_grabbed)
grabbed = db(ts.assigned_worker_name==self.worker_name)\
(ts.status==ASSIGNED)
task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first()
logging.debug(' releasing all but one (running)')
if task:
task.update_record(status=RUNNING,last_run_time=now)
grabbed.update(assigned_worker_name='',status=QUEUED)
db.commit()
if self.is_a_ticker and self.do_assign_tasks:
#I'm a ticker, and 5 loops passed without reassigning tasks, let's do
#that and loop again
db.commit() #?don't know if it's useful, let's be completely sure
while True:
try:
self.assign_tasks()
db.commit()
break
except:
db.rollback()
logging.error('TICKER: error assigning tasks')
#I didn't report tasks but I'm working nonetheless!!!!
self.empty_runs = 0
return None
grabbed = db(ts.assigned_worker_name==self.worker_name)\
(ts.status==ASSIGNED)
task_id = grabbed._select(ts.id, limitby=(0,1), orderby=ts.next_run_time)
updated = db(
ts.id.belongs(task_id)
).update(status=RUNNING,last_run_time=now) #reduces collisions?
#noone will touch my task!
db.commit()
if updated:
logging.debug(' work to do %s' % updated)
task = db(ts.assigned_worker_name==self.worker_name)\
(ts.status==RUNNING).select().first()
if not task:
#it's very likely (almost impossible) that this will happen.
#please report any abnormal activity on web2py-users or file a bug
#about new scheduler on http://code.google.com/p/web2py/issues/list
logging.error('Something is not going on nicely, someone stealed my task!')
return None
else:
logging.debug('nothing to do')
return None
next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period)
times_run = task.times_run + 1
@@ -469,11 +479,13 @@ class Scheduler(MetaScheduler):
if task_report.status == COMPLETED:
d = dict(status = task.run_again and QUEUED or COMPLETED,
next_run_time = task.next_run_time,
times_run = task.times_run,
assigned_worker_name = '')
times_run = task.times_run)
#I'd like to know who worked my task, reviewing some logs...
#,assigned_worker_name = '')
else:
d = dict(
assigned_worker_name = '',
#same as before...
#assigned_worker_name = '',
status = {'FAILED':'FAILED',
'TIMEOUT':'TIMEOUT',
'STOPPED':'QUEUED'}[task_report.status])
@@ -498,20 +510,75 @@ class Scheduler(MetaScheduler):
.update(last_heartbeat = now, status = ACTIVE):
sw.insert(status = ACTIVE,worker_name = self.worker_name,
first_heartbeat = now,last_heartbeat = now)
if counter % 10 == 0:
# deallocate jobs assigned to inactive workers and requeue them
logging.debug(' freeing workers that have not sent heartbeat')
inactive_workers = db(sw.last_heartbeat<expiration)
db(st.assigned_worker_name.belongs(
inactive_workers._select(sw.worker_name)))\
(st.status.belongs((RUNNING,ASSIGNED,QUEUED)))\
.update(assigned_worker_name='',status=QUEUED)
inactive_workers.delete()
self.do_assign_tasks = False
if counter % 5 == 0:
try:
# delete inactive workers
logging.debug(' freeing workers that have not sent heartbeat')
inactive_workers = db(sw.last_heartbeat<expiration)
inactive_workers.delete()
self.is_a_ticker = self.being_a_ticker()
self.do_assign_tasks = True
except:
pass
db.commit()
except:
db.rollback()
time.sleep(self.heartbeat)
def being_a_ticker(self):
db = self.db_thread
sw = db.scheduler_worker
ticker = db((sw.worker_name <> self.worker_name) & (sw.is_ticker == True)).select().first()
if not ticker:
db(sw.worker_name == self.worker_name).update(is_ticker = True)
logging.info("TICKER: I'm a ticker (%s)" % self.worker_name)
return True
else:
logging.info("%s is a ticker, I'm a poor worker" % ticker.worker_name)
return False
def assign_tasks(self):
db = self.db
sw, ts = db.scheduler_worker, db.scheduler_task
now = datetime.datetime.now()
all_workers = db(sw.id>0).select()
workers = [row.worker_name for row in all_workers]
all_available = db(ts.status.belongs((QUEUED,ASSIGNED)))\
((ts.times_run<ts.repeats)|(ts.repeats==0))\
(ts.start_time<=now)\
(ts.stop_time>now)\
(ts.next_run_time<=now)\
(ts.enabled==True)\
(ts.group_name.belongs(self.group_names)) #\
#(ts.assigned_worker_name <> self.worker_name)
limit = len(workers) * 50
#if there are a moltitude of tasks, let's assign a maximum of 50 tasks per worker.
#this can be adjusted with some added intelligence (like esteeming how many tasks will
#a worker complete before the ticker reassign them around, but the gain is quite small
#50 is quite a sweet spot also for fast tasks, with sane heartbeat values
#NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less
#than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass.
tasks = all_available.select(limitby=(0,limit), orderby=ts.next_run_time)
#everything until now is going fine. If a worker is currently elaborating a long task,
#all other tasks assigned to him needs to be reassigned "freely" to other workers, that may be free.
#this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability
#let's freeze it up
db.commit()
#it's useful to reduce computation times of reassigning tasks if there is only one worker around
if len(workers) == 1:
all_available.update(status=ASSIGNED, assigned_worker_name=workers[0])
#let's break up the queue evenly among workers
else:
for i, task in enumerate(tasks):
worker = workers[i % len(workers)]
task.update_record(status=ASSIGNED, assigned_worker_name=workers[i % len(workers)])
db.commit()
logging.debug('TICKER: workers are %s' % len(workers))
logging.debug('TICKER: tasks are %s' % len(tasks))
def sleep(self):
time.sleep(self.heartbeat) # should only sleep until next available task
@@ -586,5 +653,3 @@ def main():
if __name__=='__main__':
main()