From a53f689a6d28a810aac436bc4f537ca460f9d1db Mon Sep 17 00:00:00 2001 From: mdipierro Date: Thu, 28 Jun 2012 22:50:03 -0500 Subject: [PATCH] better scheduler, thanks Niphlod --- VERSION | 2 +- gluon/scheduler.py | 139 +++++++++++++++++++++++++++++++++++---------- 2 files changed, 110 insertions(+), 31 deletions(-) diff --git a/VERSION b/VERSION index 89b0ad2f..67ef7eae 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.00.0 (2012-06-28 09:56:50) dev +Version 2.00.0 (2012-06-28 22:49:58) dev diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 205d39c5..401cff6c 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + USAGE = """ ## Example @@ -96,8 +99,10 @@ STOPPED = 'STOPPED' ACTIVE = 'ACTIVE' INACTIVE = 'INACTIVE' DISABLED = 'DISABLED' +EXPIRED = 'EXPIRED' SECONDS = 1 HEARTBEAT = 3*SECONDS +MAXHIBERNATION = 10 class Task(object): def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs): @@ -293,7 +298,7 @@ class MetaScheduler(threading.Thread): self.die() -TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) +TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED) RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED) @@ -333,6 +338,10 @@ class Scheduler(MetaScheduler): self.group_names = group_names or ['main'] self.heartbeat = heartbeat self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid()) + self.worker_status = RUNNING, 1 #tuple containing status as recorded in + #the table, plus a boost parameter for + #hibernation (i.e. when someone stop the + #worker acting on the scheduler_worker table) self.max_empty_runs = max_empty_runs self.is_a_ticker = False self.do_assign_tasks = False @@ -361,7 +370,7 @@ class Scheduler(MetaScheduler): Field('enabled','boolean',default=True), Field('start_time','datetime',default=now), Field('next_run_time','datetime',default=now), - Field('stop_time','datetime',default=None), + Field('stop_time','datetime'), Field('repeats','integer',default=1,comment="0=unlimted"), Field('period','integer',default=60,comment='seconds'), Field('timeout','integer',default=60,comment='seconds'), @@ -390,12 +399,35 @@ class Scheduler(MetaScheduler): Field('first_heartbeat','datetime'), Field('last_heartbeat','datetime'), Field('status',requires=IS_IN_SET(WORKER_STATUS)), - Field('is_ticker', 'boolean', default=False), + Field('is_ticker', 'boolean', default=False, writable=False), + Field('group_names', 'list:string', default=self.group_names), migrate=migrate) db.commit() def loop(self,worker_name=None): - MetaScheduler.loop(self) + try: + self.start_heartbeats() + while True and self.have_heartbeat: + if self.worker_status[0] == DISABLED: + logging.debug('Someone stopped me, sleeping until better times come (%s)' % self.worker_status[1]) + self.sleep() + continue + logging.debug('looping...') + task = self.pop_task() + if task: + self.empty_runs = 0 + self.report_task(task,self.async(task)) + else: + self.empty_runs += 1 + logging.debug('sleeping...') + if self.max_empty_runs <> 0: + logging.debug('empty runs %s/%s', self.empty_runs, self.max_empty_runs) + if self.empty_runs >= self.max_empty_runs: + logging.info('empty runs limit reached, killing myself') + self.die() + self.sleep() + except KeyboardInterrupt: + self.die() def pop_task(self): now = datetime.datetime.now() @@ -454,7 +486,8 @@ class Scheduler(MetaScheduler): run_id = run_id, run_again = run_again, next_run_time=next_run_time, - times_run = times_run) + times_run = times_run, + stop_time = task.stop_time) def report_task(self,task,task_report): logging.debug(' recording task report in db (%s)' % task_report.status) @@ -465,8 +498,11 @@ class Scheduler(MetaScheduler): result = task_report.result, output = task_report.output, traceback = task_report.tb) + is_expired = task.stop_time and task.next_run_time > task.stop_time and True or False + status = (task.run_again and is_expired and EXPIRED + or task.run_again and not is_expired and QUEUED or COMPLETED) if task_report.status == COMPLETED: - d = dict(status = task.run_again and QUEUED or COMPLETED, + d = dict(status = status, next_run_time = task.next_run_time, times_run = task.times_run) #I'd like to know who worked my task, reviewing some logs... @@ -483,6 +519,11 @@ class Scheduler(MetaScheduler): db.commit() logging.info('task completed (%s)' % task_report.status) + def adj_hibernation(self): + if self.worker_status[0] == DISABLED: + hibernation = self.worker_status[1] + 1 if self.worker_status[1] < MAXHIBERNATION else MAXHIBERNATION + self.worker_status = DISABLED, hibernation + def send_heartbeat(self,counter): if not self.db_thread: logging.debug('thread building own DAL object') @@ -493,40 +534,59 @@ class Scheduler(MetaScheduler): sw, st = db.scheduler_worker, db.scheduler_task now = datetime.datetime.now() expiration = now-datetime.timedelta(seconds=self.heartbeat*3) + departure = now-datetime.timedelta(seconds=self.heartbeat*3*MAXHIBERNATION) # record heartbeat - logging.debug('........recording heartbeat') - if not db(sw.worker_name==self.worker_name)\ - .update(last_heartbeat = now, status = ACTIVE): + mybackedstatus = db(sw.worker_name==self.worker_name).select().first() + if not mybackedstatus: sw.insert(status = ACTIVE,worker_name = self.worker_name, - first_heartbeat = now,last_heartbeat = now) + first_heartbeat = now,last_heartbeat = now, + group_names = self.group_names) + self.worker_status = ACTIVE, 1 #activating the process + else: + if mybackedstatus.status == DISABLED: + self.worker_status = DISABLED, self.worker_status[1]#keep sleeping + if self.worker_status[1] == MAXHIBERNATION: + logging.debug('........recording heartbeat') + db(sw.worker_name==self.worker_name).update( + last_heartbeat = now) + else: + logging.debug('........recording heartbeat') + db(sw.worker_name==self.worker_name).update( + last_heartbeat = now, status = ACTIVE) + self.worker_status = ACTIVE, 1 #re-activating the process self.do_assign_tasks = False if counter % 5 == 0: try: # delete inactive workers logging.debug(' freeing workers that have not sent heartbeat') - inactive_workers = db(sw.last_heartbeat DISABLED: + self.do_assign_tasks = True except: pass db.commit() - except: db.rollback() - time.sleep(self.heartbeat) + self.adj_hibernation() + self.sleep() def being_a_ticker(self): db = self.db_thread sw = db.scheduler_worker - ticker = db((sw.worker_name <> self.worker_name) & (sw.is_ticker == True)).select().first() + ticker = db((sw.worker_name <> self.worker_name) & (sw.is_ticker == True) & (sw.status == ACTIVE)).select().first() if not ticker: db(sw.worker_name == self.worker_name).update(is_ticker = True) + db(sw.worker_name <> self.worker_name).update(is_ticker = False) logging.info("TICKER: I'm a ticker (%s)" % self.worker_name) return True else: @@ -537,17 +597,28 @@ class Scheduler(MetaScheduler): db = self.db sw, ts = db.scheduler_worker, db.scheduler_task now = datetime.datetime.now() - all_workers = db(sw.id>0).select() - workers = [row.worker_name for row in all_workers] + all_workers = db(sw.status <> DISABLED).select() + #build workers as dict of groups + wkgroups = {} + for w in all_workers: + group_names = w.group_names + for gname in group_names: + if gname not in wkgroups: + wkgroups[gname] = dict(workers=[{'name' : w.worker_name, 'c' : 0}]) + else: + wkgroups[gname]['workers'].append({'name' : w.worker_name, 'c' : 0}) + #set queued tasks that expired between "runs" (i.e., you turned off) + #the scheduler and then it wasn't expired, but now it is + db(ts.status.belongs((QUEUED,ASSIGNED)))(ts.stop_timenow))\ (ts.next_run_time<=now)\ - (ts.enabled==True)\ - (ts.group_name.belongs(self.group_names)) #\ - #(ts.assigned_worker_name <> self.worker_name) - limit = len(workers) * 50 + (ts.enabled==True) + + limit = len(all_workers) * 50 #if there are a moltitude of tasks, let's assign a maximum of 50 tasks per worker. #this can be adjusted with some added intelligence (like esteeming how many tasks will #a worker complete before the ticker reassign them around, but the gain is quite small @@ -560,23 +631,31 @@ class Scheduler(MetaScheduler): #this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability #let's freeze it up db.commit() - #it's useful to reduce computation times of reassigning tasks if there is only one worker around - if len(workers) == 1: - all_available.update(status=ASSIGNED, assigned_worker_name=workers[0]) #let's break up the queue evenly among workers - else: - for i, task in enumerate(tasks): - worker = workers[i % len(workers)] - task.update_record(status=ASSIGNED, assigned_worker_name=workers[i % len(workers)]) + + for task in tasks: + gname = task.group_name + ws = wkgroups.get(gname) + if ws: + counter = 0 + myw = 0 + for i, w in enumerate(ws['workers']): + if w['c'] < counter: + myw = i + counter = w['c'] + task.update_record(status=ASSIGNED, + assigned_worker_name=wkgroups[gname]['workers'][myw]['name']) + wkgroups[gname]['workers'][myw]['c'] += 1 + db.commit() #I didn't report tasks but I'm working nonetheless!!!! if len(tasks) > 0: self.empty_runs = 0 - logging.info('TICKER: workers are %s' % len(workers)) + logging.info('TICKER: workers are %s' % len(all_workers)) logging.info('TICKER: tasks are %s' % len(tasks)) def sleep(self): - time.sleep(self.heartbeat) # should only sleep until next available task + time.sleep(self.heartbeat*self.worker_status[1]) # should only sleep until next available task def main(): """