better scheduler, thanks Niphlod

This commit is contained in:
mdipierro
2012-06-28 22:50:03 -05:00
parent ffae332ad3
commit a53f689a6d
2 changed files with 110 additions and 31 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.00.0 (2012-06-28 09:56:50) dev
Version 2.00.0 (2012-06-28 22:49:58) dev
+109 -30
View File
@@ -1,3 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
USAGE = """
## Example
@@ -96,8 +99,10 @@ STOPPED = 'STOPPED'
ACTIVE = 'ACTIVE'
INACTIVE = 'INACTIVE'
DISABLED = 'DISABLED'
EXPIRED = 'EXPIRED'
SECONDS = 1
HEARTBEAT = 3*SECONDS
MAXHIBERNATION = 10
class Task(object):
def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs):
@@ -293,7 +298,7 @@ class MetaScheduler(threading.Thread):
self.die()
TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED)
RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED)
@@ -333,6 +338,10 @@ class Scheduler(MetaScheduler):
self.group_names = group_names or ['main']
self.heartbeat = heartbeat
self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid())
self.worker_status = RUNNING, 1 #tuple containing status as recorded in
#the table, plus a boost parameter for
#hibernation (i.e. when someone stop the
#worker acting on the scheduler_worker table)
self.max_empty_runs = max_empty_runs
self.is_a_ticker = False
self.do_assign_tasks = False
@@ -361,7 +370,7 @@ class Scheduler(MetaScheduler):
Field('enabled','boolean',default=True),
Field('start_time','datetime',default=now),
Field('next_run_time','datetime',default=now),
Field('stop_time','datetime',default=None),
Field('stop_time','datetime'),
Field('repeats','integer',default=1,comment="0=unlimted"),
Field('period','integer',default=60,comment='seconds'),
Field('timeout','integer',default=60,comment='seconds'),
@@ -390,12 +399,35 @@ class Scheduler(MetaScheduler):
Field('first_heartbeat','datetime'),
Field('last_heartbeat','datetime'),
Field('status',requires=IS_IN_SET(WORKER_STATUS)),
Field('is_ticker', 'boolean', default=False),
Field('is_ticker', 'boolean', default=False, writable=False),
Field('group_names', 'list:string', default=self.group_names),
migrate=migrate)
db.commit()
def loop(self,worker_name=None):
MetaScheduler.loop(self)
try:
self.start_heartbeats()
while True and self.have_heartbeat:
if self.worker_status[0] == DISABLED:
logging.debug('Someone stopped me, sleeping until better times come (%s)' % self.worker_status[1])
self.sleep()
continue
logging.debug('looping...')
task = self.pop_task()
if task:
self.empty_runs = 0
self.report_task(task,self.async(task))
else:
self.empty_runs += 1
logging.debug('sleeping...')
if self.max_empty_runs <> 0:
logging.debug('empty runs %s/%s', self.empty_runs, self.max_empty_runs)
if self.empty_runs >= self.max_empty_runs:
logging.info('empty runs limit reached, killing myself')
self.die()
self.sleep()
except KeyboardInterrupt:
self.die()
def pop_task(self):
now = datetime.datetime.now()
@@ -454,7 +486,8 @@ class Scheduler(MetaScheduler):
run_id = run_id,
run_again = run_again,
next_run_time=next_run_time,
times_run = times_run)
times_run = times_run,
stop_time = task.stop_time)
def report_task(self,task,task_report):
logging.debug(' recording task report in db (%s)' % task_report.status)
@@ -465,8 +498,11 @@ class Scheduler(MetaScheduler):
result = task_report.result,
output = task_report.output,
traceback = task_report.tb)
is_expired = task.stop_time and task.next_run_time > task.stop_time and True or False
status = (task.run_again and is_expired and EXPIRED
or task.run_again and not is_expired and QUEUED or COMPLETED)
if task_report.status == COMPLETED:
d = dict(status = task.run_again and QUEUED or COMPLETED,
d = dict(status = status,
next_run_time = task.next_run_time,
times_run = task.times_run)
#I'd like to know who worked my task, reviewing some logs...
@@ -483,6 +519,11 @@ class Scheduler(MetaScheduler):
db.commit()
logging.info('task completed (%s)' % task_report.status)
def adj_hibernation(self):
if self.worker_status[0] == DISABLED:
hibernation = self.worker_status[1] + 1 if self.worker_status[1] < MAXHIBERNATION else MAXHIBERNATION
self.worker_status = DISABLED, hibernation
def send_heartbeat(self,counter):
if not self.db_thread:
logging.debug('thread building own DAL object')
@@ -493,40 +534,59 @@ class Scheduler(MetaScheduler):
sw, st = db.scheduler_worker, db.scheduler_task
now = datetime.datetime.now()
expiration = now-datetime.timedelta(seconds=self.heartbeat*3)
departure = now-datetime.timedelta(seconds=self.heartbeat*3*MAXHIBERNATION)
# record heartbeat
logging.debug('........recording heartbeat')
if not db(sw.worker_name==self.worker_name)\
.update(last_heartbeat = now, status = ACTIVE):
mybackedstatus = db(sw.worker_name==self.worker_name).select().first()
if not mybackedstatus:
sw.insert(status = ACTIVE,worker_name = self.worker_name,
first_heartbeat = now,last_heartbeat = now)
first_heartbeat = now,last_heartbeat = now,
group_names = self.group_names)
self.worker_status = ACTIVE, 1 #activating the process
else:
if mybackedstatus.status == DISABLED:
self.worker_status = DISABLED, self.worker_status[1]#keep sleeping
if self.worker_status[1] == MAXHIBERNATION:
logging.debug('........recording heartbeat')
db(sw.worker_name==self.worker_name).update(
last_heartbeat = now)
else:
logging.debug('........recording heartbeat')
db(sw.worker_name==self.worker_name).update(
last_heartbeat = now, status = ACTIVE)
self.worker_status = ACTIVE, 1 #re-activating the process
self.do_assign_tasks = False
if counter % 5 == 0:
try:
# delete inactive workers
logging.debug(' freeing workers that have not sent heartbeat')
inactive_workers = db(sw.last_heartbeat<expiration)
inactive_workers = db(
((sw.last_heartbeat<expiration) & (sw.status == ACTIVE)) |
((sw.last_heartbeat<departure) & (sw.status == DISABLED))
)
db(st.assigned_worker_name.belongs(
inactive_workers._select(sw.worker_name)))\
(st.status == RUNNING)\
.update(assigned_worker_name='',status=QUEUED)
inactive_workers.delete()
self.is_a_ticker = self.being_a_ticker()
self.do_assign_tasks = True
if self.worker_status[0] <> DISABLED:
self.do_assign_tasks = True
except:
pass
db.commit()
except:
db.rollback()
time.sleep(self.heartbeat)
self.adj_hibernation()
self.sleep()
def being_a_ticker(self):
db = self.db_thread
sw = db.scheduler_worker
ticker = db((sw.worker_name <> self.worker_name) & (sw.is_ticker == True)).select().first()
ticker = db((sw.worker_name <> self.worker_name) & (sw.is_ticker == True) & (sw.status == ACTIVE)).select().first()
if not ticker:
db(sw.worker_name == self.worker_name).update(is_ticker = True)
db(sw.worker_name <> self.worker_name).update(is_ticker = False)
logging.info("TICKER: I'm a ticker (%s)" % self.worker_name)
return True
else:
@@ -537,17 +597,28 @@ class Scheduler(MetaScheduler):
db = self.db
sw, ts = db.scheduler_worker, db.scheduler_task
now = datetime.datetime.now()
all_workers = db(sw.id>0).select()
workers = [row.worker_name for row in all_workers]
all_workers = db(sw.status <> DISABLED).select()
#build workers as dict of groups
wkgroups = {}
for w in all_workers:
group_names = w.group_names
for gname in group_names:
if gname not in wkgroups:
wkgroups[gname] = dict(workers=[{'name' : w.worker_name, 'c' : 0}])
else:
wkgroups[gname]['workers'].append({'name' : w.worker_name, 'c' : 0})
#set queued tasks that expired between "runs" (i.e., you turned off)
#the scheduler and then it wasn't expired, but now it is
db(ts.status.belongs((QUEUED,ASSIGNED)))(ts.stop_time<now).update(status=EXPIRED)
all_available = db(ts.status.belongs((QUEUED,ASSIGNED)))\
((ts.times_run<ts.repeats)|(ts.repeats==0))\
(ts.start_time<=now)\
((ts.stop_time==None) | (ts.stop_time>now))\
(ts.next_run_time<=now)\
(ts.enabled==True)\
(ts.group_name.belongs(self.group_names)) #\
#(ts.assigned_worker_name <> self.worker_name)
limit = len(workers) * 50
(ts.enabled==True)
limit = len(all_workers) * 50
#if there are a moltitude of tasks, let's assign a maximum of 50 tasks per worker.
#this can be adjusted with some added intelligence (like esteeming how many tasks will
#a worker complete before the ticker reassign them around, but the gain is quite small
@@ -560,23 +631,31 @@ class Scheduler(MetaScheduler):
#this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability
#let's freeze it up
db.commit()
#it's useful to reduce computation times of reassigning tasks if there is only one worker around
if len(workers) == 1:
all_available.update(status=ASSIGNED, assigned_worker_name=workers[0])
#let's break up the queue evenly among workers
else:
for i, task in enumerate(tasks):
worker = workers[i % len(workers)]
task.update_record(status=ASSIGNED, assigned_worker_name=workers[i % len(workers)])
for task in tasks:
gname = task.group_name
ws = wkgroups.get(gname)
if ws:
counter = 0
myw = 0
for i, w in enumerate(ws['workers']):
if w['c'] < counter:
myw = i
counter = w['c']
task.update_record(status=ASSIGNED,
assigned_worker_name=wkgroups[gname]['workers'][myw]['name'])
wkgroups[gname]['workers'][myw]['c'] += 1
db.commit()
#I didn't report tasks but I'm working nonetheless!!!!
if len(tasks) > 0:
self.empty_runs = 0
logging.info('TICKER: workers are %s' % len(workers))
logging.info('TICKER: workers are %s' % len(all_workers))
logging.info('TICKER: tasks are %s' % len(tasks))
def sleep(self):
time.sleep(self.heartbeat) # should only sleep until next available task
time.sleep(self.heartbeat*self.worker_status[1]) # should only sleep until next available task
def main():
"""