improved scheduler, thanks Niphlod

This commit is contained in:
mdipierro
2012-06-25 16:36:20 -05:00
parent 3a8a7c911e
commit 71fae9efce
2 changed files with 43 additions and 120 deletions

View File

@@ -1 +1 @@
Version 2.00.0 (2012-06-25 14:40:17) dev
Version 2.00.0 (2012-06-25 16:36:15) dev

View File

@@ -1,3 +1,5 @@
#### WORK IN PROGRESS... NOT SUPPOSED TO WORK YET
USAGE = """
## Example
@@ -17,8 +19,6 @@ scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2))
## run worker nodes with:
cd web2py
python web2py.py -K myapp
or
python gluon/scheduler.py -u sqlite://storage.sqlite \
-f applications/myapp/databases/ \
-t mytasks.py
@@ -37,23 +37,7 @@ http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0
## view workers
http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0
## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put the
## following into /etc/init/web2py-scheduler.conf:
## (This assumes your web2py instance is installed in <user>'s home directory,
## running as <user>, with app <myapp>, on network interface eth0.)
description "web2py task scheduler"
start on (local-filesystems and net-device-up IFACE=eth0)
stop on shutdown
respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds.
exec sudo -u <user> python /home/<user>/web2py/web2py.py -K <myapp>
respawn
## You can then start/stop/restart/check status of the daemon with:
sudo start web2py-scheduler
sudo stop web2py-scheduler
sudo restart web2py-scheduler
sudo status web2py-scheduler
## Comments
"""
import os
@@ -334,8 +318,6 @@ class Scheduler(MetaScheduler):
self.heartbeat = heartbeat
self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid())
self.max_empty_runs = max_empty_runs
self.is_a_ticker = False
self.do_assign_tasks = False
from gluon import current
current._scheduler = self
@@ -361,7 +343,7 @@ class Scheduler(MetaScheduler):
Field('enabled','boolean',default=True),
Field('start_time','datetime',default=now),
Field('next_run_time','datetime',default=now),
Field('stop_time','datetime',default=None),
Field('stop_time','datetime',default=now+datetime.timedelta(days=1)),
Field('repeats','integer',default=1,comment="0=unlimted"),
Field('period','integer',default=60,comment='seconds'),
Field('timeout','integer',default=60,comment='seconds'),
@@ -390,7 +372,6 @@ class Scheduler(MetaScheduler):
Field('first_heartbeat','datetime'),
Field('last_heartbeat','datetime'),
Field('status',requires=IS_IN_SET(WORKER_STATUS)),
Field('is_ticker', 'boolean', default=False),
migrate=migrate)
db.commit()
@@ -400,30 +381,34 @@ class Scheduler(MetaScheduler):
def pop_task(self):
now = datetime.datetime.now()
db, ts = self.db, self.db.scheduler_task
if self.is_a_ticker and self.do_assign_tasks:
#I'm a ticker, and 5 loops passed without reassigning tasks, let's do
#that and loop again
db.commit() #?don't know if it's useful, let's be completely sure
while True:
try:
self.assign_tasks()
db.commit()
break
except:
db.rollback()
logging.error('TICKER: error assigning tasks')
return None
grabbed = db(ts.assigned_worker_name==self.worker_name)\
(ts.status==ASSIGNED)
task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first()
if task:
task.update_record(status=RUNNING,last_run_time=now)
#noone will touch my task!
try:
logging.debug(' grabbing all queued tasks')
all_available = db(ts.status.belongs((QUEUED,RUNNING)))\
((ts.times_run<ts.repeats)|(ts.repeats==0))\
(ts.start_time<=now)\
(ts.stop_time>now)\
(ts.next_run_time<=now)\
(ts.enabled==True)\
(ts.group_name.belongs(self.group_names))\
(ts.assigned_worker_name.belongs((None,'',self.worker_name))) #None?
number_grabbed = all_available.update(
assigned_worker_name=self.worker_name,status=ASSIGNED)
db.commit()
logging.debug(' work to do %s' % task.id)
except:
number_grabbed = None
db.rollback()
if number_grabbed:
logging.debug(' grabbed %s tasks' % number_grabbed)
grabbed = db(ts.assigned_worker_name==self.worker_name)\
(ts.status==ASSIGNED)
task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first()
logging.debug(' releasing all but one (running)')
if task:
task.update_record(status=RUNNING,last_run_time=now)
grabbed.update(assigned_worker_name='',status=QUEUED)
db.commit()
else:
logging.debug('nothing to do')
return None
next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period)
times_run = task.times_run + 1
@@ -468,13 +453,11 @@ class Scheduler(MetaScheduler):
if task_report.status == COMPLETED:
d = dict(status = task.run_again and QUEUED or COMPLETED,
next_run_time = task.next_run_time,
times_run = task.times_run)
#I'd like to know who worked my task, reviewing some logs...
#,assigned_worker_name = '')
times_run = task.times_run,
assigned_worker_name = '')
else:
d = dict(
#same as before...
#assigned_worker_name = '',
assigned_worker_name = '',
status = {'FAILED':'FAILED',
'TIMEOUT':'TIMEOUT',
'STOPPED':'QUEUED'}[task_report.status])
@@ -499,82 +482,20 @@ class Scheduler(MetaScheduler):
.update(last_heartbeat = now, status = ACTIVE):
sw.insert(status = ACTIVE,worker_name = self.worker_name,
first_heartbeat = now,last_heartbeat = now)
self.do_assign_tasks = False
if counter % 5 == 0:
try:
# delete inactive workers
logging.debug(' freeing workers that have not sent heartbeat')
inactive_workers = db(sw.last_heartbeat<expiration)
db(st.assigned_worker_name.belongs(
if counter % 10 == 0:
# deallocate jobs assigned to inactive workers and requeue them
logging.debug(' freeing workers that have not sent heartbeat')
inactive_workers = db(sw.last_heartbeat<expiration)
db(st.assigned_worker_name.belongs(
inactive_workers._select(sw.worker_name)))\
(st.status == RUNNING)\
(st.status.belongs((RUNNING,ASSIGNED,QUEUED)))\
.update(assigned_worker_name='',status=QUEUED)
inactive_workers.delete()
self.is_a_ticker = self.being_a_ticker()
self.do_assign_tasks = True
except:
pass
inactive_workers.delete()
db.commit()
except:
db.rollback()
time.sleep(self.heartbeat)
def being_a_ticker(self):
db = self.db_thread
sw = db.scheduler_worker
ticker = db((sw.worker_name <> self.worker_name) & (sw.is_ticker == True)).select().first()
if not ticker:
db(sw.worker_name == self.worker_name).update(is_ticker = True)
logging.info("TICKER: I'm a ticker (%s)" % self.worker_name)
return True
else:
logging.info("%s is a ticker, I'm a poor worker" % ticker.worker_name)
return False
def assign_tasks(self):
db = self.db
sw, ts = db.scheduler_worker, db.scheduler_task
now = datetime.datetime.now()
all_workers = db(sw.id>0).select()
workers = [row.worker_name for row in all_workers]
all_available = db(ts.status.belongs((QUEUED,ASSIGNED)))\
((ts.times_run<ts.repeats)|(ts.repeats==0))\
(ts.start_time<=now)\
((ts.stop_time==None) | (ts.stop_time>now))\
(ts.next_run_time<=now)\
(ts.enabled==True)\
(ts.group_name.belongs(self.group_names)) #\
#(ts.assigned_worker_name <> self.worker_name)
limit = len(workers) * 50
#if there are a moltitude of tasks, let's assign a maximum of 50 tasks per worker.
#this can be adjusted with some added intelligence (like esteeming how many tasks will
#a worker complete before the ticker reassign them around, but the gain is quite small
#50 is quite a sweet spot also for fast tasks, with sane heartbeat values
#NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less
#than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass.
tasks = all_available.select(limitby=(0,limit), orderby=ts.next_run_time)
#everything until now is going fine. If a worker is currently elaborating a long task,
#all other tasks assigned to him needs to be reassigned "freely" to other workers, that may be free.
#this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability
#let's freeze it up
db.commit()
#it's useful to reduce computation times of reassigning tasks if there is only one worker around
if len(workers) == 1:
all_available.update(status=ASSIGNED, assigned_worker_name=workers[0])
#let's break up the queue evenly among workers
else:
for i, task in enumerate(tasks):
worker = workers[i % len(workers)]
task.update_record(status=ASSIGNED, assigned_worker_name=workers[i % len(workers)])
db.commit()
#I didn't report tasks but I'm working nonetheless!!!!
if len(tasks) > 0:
self.empty_runs = 0
logging.info('TICKER: workers are %s' % len(workers))
logging.info('TICKER: tasks are %s' % len(tasks))
def sleep(self):
time.sleep(self.heartbeat) # should only sleep until next available task
@@ -649,3 +570,5 @@ def main():
if __name__=='__main__':
main()