From 755a860011ffaf28fa2793df59958af65d0f2f16 Mon Sep 17 00:00:00 2001 From: mdipierro Date: Thu, 1 Nov 2012 21:35:53 -0500 Subject: [PATCH] many scheduler improvements, thanks Niphlod --- VERSION | 2 +- gluon/scheduler.py | 164 ++++++++++++++++++++++++++++----------------- gluon/widget.py | 2 + 3 files changed, 106 insertions(+), 62 deletions(-) diff --git a/VERSION b/VERSION index a6b02268..8a06df38 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.2.1 (2012-11-01 21:33:00) stable +Version 2.2.1 (2012-11-01 21:35:48) stable diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 2cc7e1c5..7b441749 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -40,8 +40,8 @@ http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0 ## view workers http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0 -## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put the -## following into /etc/init/web2py-scheduler.conf: +## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put +## the following into /etc/init/web2py-scheduler.conf: ## (This assumes your web2py instance is installed in 's home directory, ## running as , with app , on network interface eth0.) @@ -116,7 +116,7 @@ CALLABLETYPES = (types.LambdaType, types.FunctionType, class Task(object): def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs): - logger.debug(' new task allocated: %s.%s' % (app, function)) + logger.debug(' new task allocated: %s.%s', app, function) self.app = app self.function = function self.timeout = timeout @@ -130,11 +130,11 @@ class Task(object): class TaskReport(object): def __init__(self, status, result=None, output=None, tb=None): - logger.debug(' new task report: %s' % status) + logger.debug(' new task report: %s', status) if tb: - logger.debug(' traceback: %s' % tb) + logger.debug(' traceback: %s', tb) else: - logger.debug(' result: %s' % result) + logger.debug(' result: %s', result) self.status = status self.result = result self.output = output @@ -213,7 +213,6 @@ def executor(queue, task, out): (a, c, f) = parse_path_info(task.app) _env = env(a=a, c=c, import_models=True) logging.getLogger().setLevel(level) - scheduler = current._scheduler f = task.function functions = current._scheduler.tasks if not functions: @@ -434,14 +433,14 @@ class Scheduler(MetaScheduler): self.heartbeat = heartbeat self.worker_name = worker_name or socket.gethostname( ) + '#' + str(os.getpid()) - self.worker_status = RUNNING, 1 # tuple containing status as recorded in - #the table, plus a boost parameter for - #hibernation (i.e. when someone stop the - #worker acting on the scheduler_worker table) + #list containing status as recorded in the table plus a boost parameter + #for hibernation (i.e. when someone stop the worker acting on the worker table) + self.worker_status = [RUNNING, 1] self.max_empty_runs = max_empty_runs self.discard_results = discard_results self.is_a_ticker = False self.do_assign_tasks = False + self.greedy = False self.utc_time = utc_time from gluon import current @@ -461,7 +460,7 @@ class Scheduler(MetaScheduler): def define_tables(self, db, migrate): from gluon.dal import DEFAULT - logger.debug('defining tables (migrate=%s)' % migrate) + logger.debug('defining tables (migrate=%s)', migrate) now = self.now db.define_table( 'scheduler_task', @@ -531,14 +530,16 @@ class Scheduler(MetaScheduler): self.start_heartbeats() while True and self.have_heartbeat: if self.worker_status[0] == DISABLED: - logger.debug('Someone stopped me, sleeping until better times come (%s)' % self.worker_status[1]) + logger.debug('Someone stopped me, sleeping until better times come (%s)', self.worker_status[1]) self.sleep() continue logger.debug('looping...') task = self.pop_task() if task: self.empty_runs = 0 + self.worker_status[0] = RUNNING self.report_task(task, self.async(task)) + self.worker_status[0] = ACTIVE else: self.empty_runs += 1 logger.debug('sleeping...') @@ -554,23 +555,29 @@ class Scheduler(MetaScheduler): logger.info('catched') self.die() + def wrapped_assign_tasks(self, db): + db.commit() # ?don't know if it's useful, let's be completely sure + x = 0 + while x < 10: + try: + self.assign_tasks(db) + db.commit() + break + except: + db.rollback() + logger.error('TICKER(%s): error assigning tasks', self.worker_name) + x += 1 + time.sleep(0.5) + def pop_task(self): now = self.now() db, st = self.db, self.db.scheduler_task if self.is_a_ticker and self.do_assign_tasks: #I'm a ticker, and 5 loops passed without reassigning tasks, let's do #that and loop again - db.commit() # ?don't know if it's useful, let's be completely sure - while True: - try: - self.assign_tasks() - db.commit() - break - except: - db.rollback() - logger.error('TICKER: error assigning tasks') + self.wrapped_assign_tasks(db) return None - db.commit() + #ready to process something grabbed = db(st.assigned_worker_name == self.worker_name)( st.status == ASSIGNED) @@ -579,16 +586,23 @@ class Scheduler(MetaScheduler): task.update_record(status=RUNNING, last_run_time=now) #noone will touch my task! db.commit() - logger.debug(' work to do %s' % task.id) + logger.debug(' work to do %s', task.id) else: - logger.debug('nothing to do') + if self.greedy and self.is_a_ticker: + #there are other tasks ready to be assigned + logger.info('TICKER (%s): greedy loop', self.worker_name) + self.wrapped_assign_tasks(db) + else: + logger.info('nothing to do') return None next_run_time = task.last_run_time + datetime.timedelta( seconds=task.period) times_run = task.times_run + 1 if times_run < task.repeats or task.repeats == 0: + #need to run (repeating task) run_again = True else: + #no need to run again run_again = False run_id = 0 while True and not self.discard_results: @@ -602,6 +616,7 @@ class Scheduler(MetaScheduler): db.commit() break except: + time.sleep(0.5) db.rollback() logger.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task) return Task( @@ -630,7 +645,7 @@ class Scheduler(MetaScheduler): #result is 'null' as a string if task completed #if it's stopped it's None as NoneType, so we record #the STOPPED "run" anyway - logger.debug(' recording task report in db (%s)' % + logger.debug(' recording task report in db (%s)', task_report.status) db(db.scheduler_run.id == task.run_id).update( status=task_report.status, @@ -641,6 +656,7 @@ class Scheduler(MetaScheduler): else: logger.debug(' deleting task report in db because of no result') db(db.scheduler_run.id == task.run_id).delete() + #if there is a stop_time and the following run would exceed it is_expired = (task.stop_time and task.next_run_time > task.stop_time and True or False) @@ -663,21 +679,26 @@ class Scheduler(MetaScheduler): and task.times_failed < task.retry_failed and QUEUED or task.retry_failed == -1 and QUEUED or st_mapping) - db(db.scheduler_task.id == task.task_id)(db.scheduler_task.status == RUNNING).update( + db( + (db.scheduler_task.id == task.task_id) & + (db.scheduler_task.status == RUNNING) + ).update( times_failed=db.scheduler_task.times_failed + 1, next_run_time=task.next_run_time, - status=status) + status=status + ) db.commit() - logger.info('task completed (%s)' % task_report.status) + logger.info('task completed (%s)', task_report.status) break except: db.rollback() + time.sleep(0.5) def adj_hibernation(self): if self.worker_status[0] == DISABLED: - hibernation = self.worker_status[1] + 1 if self.worker_status[ - 1] < MAXHIBERNATION else MAXHIBERNATION - self.worker_status = DISABLED, hibernation + wk_st = self.worker_status[1] + hibernation = wk_st + 1 if wk_st < MAXHIBERNATION else MAXHIBERNATION + self.worker_status[1] = hibernation def send_heartbeat(self, counter): if not self.db_thread: @@ -689,9 +710,6 @@ class Scheduler(MetaScheduler): db = self.db_thread sw, st = db.scheduler_worker, db.scheduler_task now = self.now() - expiration = now - datetime.timedelta(seconds=self.heartbeat * 3) - departure = now - datetime.timedelta( - seconds=self.heartbeat * 3 * MAXHIBERNATION) # record heartbeat mybackedstatus = db( sw.worker_name == self.worker_name).select().first() @@ -699,35 +717,37 @@ class Scheduler(MetaScheduler): sw.insert(status=ACTIVE, worker_name=self.worker_name, first_heartbeat=now, last_heartbeat=now, group_names=self.group_names) - self.worker_status = ACTIVE, 1 # activating the process + self.worker_status = [ACTIVE, 1] # activating the process else: if mybackedstatus.status == DISABLED: - self.worker_status = DISABLED, self.worker_status[ - 1] # keep sleeping + # keep sleeping + self.worker_status[0] = DISABLED if self.worker_status[1] == MAXHIBERNATION: logger.debug('........recording heartbeat') db(sw.worker_name == self.worker_name).update( last_heartbeat=now) - elif mybackedstatus.status == TERMINATE: - self.worker_status = TERMINATE, self.worker_status[1] + self.worker_status[0] = TERMINATE logger.debug("Waiting to terminate the current task") self.give_up() return elif mybackedstatus.status == KILL: - self.worker_status = KILL, self.worker_status[1] + self.worker_status[0] = KILL self.die() - else: - logger.debug('........recording heartbeat') + logger.debug('........recording heartbeat (%s)', self.worker_status[0]) db(sw.worker_name == self.worker_name).update( last_heartbeat=now, status=ACTIVE) - self.worker_status = ACTIVE, 1 # re-activating the process + self.worker_status[1] = 1 # re-activating the process self.do_assign_tasks = False + if counter % 5 == 0: try: # delete inactive workers + expiration = now - datetime.timedelta(seconds=self.heartbeat * 3) + departure = now - datetime.timedelta( + seconds=self.heartbeat * 3 * MAXHIBERNATION) logger.debug( ' freeing workers that have not sent heartbeat') inactive_workers = db( @@ -753,21 +773,31 @@ class Scheduler(MetaScheduler): def being_a_ticker(self): db = self.db_thread sw = db.scheduler_worker - ticker = db((sw.worker_name != self.worker_name) & ( - sw.is_ticker == True) & (sw.status == ACTIVE)).select().first() + all_active = db( + (sw.worker_name != self.worker_name) & (sw.status == ACTIVE) + ).select() + ticker = all_active.find(lambda row: row.is_ticker is True).first() + not_busy = self.worker_status[0] == ACTIVE if not ticker: - db(sw.worker_name == self.worker_name).update(is_ticker=True) - db(sw.worker_name != self.worker_name).update(is_ticker=False) - logger.info("TICKER: I'm a ticker (%s)" % self.worker_name) + if not_busy: + #only if this worker isn't busy, otherwise wait for a free one + db(sw.worker_name == self.worker_name).update(is_ticker=True) + db(sw.worker_name != self.worker_name).update(is_ticker=False) + logger.info("TICKER(%s): I'm a ticker", self.worker_name) + else: + #giving up, only if I'm not alone + if len(all_active) > 1: + db(sw.worker_name == self.worker_name).update(is_ticker=False) + else: + not_busy = True db.commit() - return True + return not_busy else: logger.info( "%s is a ticker, I'm a poor worker" % ticker.worker_name) return False - def assign_tasks(self): - db = self.db + def assign_tasks(self, db): sw, st = db.scheduler_worker, db.scheduler_task now = self.now() all_workers = db(sw.status == ACTIVE).select() @@ -786,8 +816,15 @@ class Scheduler(MetaScheduler): #the scheduler): then it wasn't expired, but now it is db(st.status.belongs( (QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED) - - all_available = db(st.status.belongs((QUEUED, ASSIGNED)))((st.times_run < st.repeats) | (st.repeats == 0))(st.start_time <= now)((st.stop_time == None) | (st.stop_time > now))(st.next_run_time <= now)(st.enabled == True) + + all_available = db( + (st.status.belongs((QUEUED, ASSIGNED))) & + ((st.times_run < st.repeats) | (st.repeats == 0)) & + (st.start_time <= now) & + ((st.stop_time == None) | (st.stop_time > now)) & + (st.next_run_time <= now) & + (st.enabled == True) + ) limit = len(all_workers) * (50 / (len(wkgroups) or 1)) #if there are a moltitude of tasks, let's figure out a maximum of tasks per worker. #this can be adjusted with some added intelligence (like esteeming how many tasks will @@ -804,8 +841,8 @@ class Scheduler(MetaScheduler): db.commit() x = 0 for group in wkgroups.keys(): - tasks = all_available(st.group_name==group).select( - limitby=(0, limit), orderby=st.next_run_time) + tasks = all_available(st.group_name == group).select( + limitby=(0, limit), orderby = st.next_run_time) #let's break up the queue evenly among workers for task in tasks: x += 1 @@ -818,8 +855,10 @@ class Scheduler(MetaScheduler): if w['c'] < counter: myw = i counter = w['c'] - d = dict(status=ASSIGNED, - assigned_worker_name=wkgroups[gname]['workers'][myw]['name']) + d = dict( + status=ASSIGNED, + assigned_worker_name=wkgroups[gname]['workers'][myw]['name'] + ) if not task.task_name: d['task_name'] = task.function_name task.update_record(**d) @@ -829,12 +868,15 @@ class Scheduler(MetaScheduler): #I didn't report tasks but I'm working nonetheless!!!! if x > 0: self.empty_runs = 0 - logger.info('TICKER: workers are %s' % len(all_workers)) - logger.info('TICKER: tasks are %s' % x) + #I'll be greedy only if tasks assigned are equal to the limit + # (meaning there could be others ready to be assigned) + self.greedy = x >= limit and True or False + logger.info('TICKER(%s): workers are %s', self.worker_name, len(all_workers)) + logger.info('TICKER(%s): tasks are %s', self.worker_name, x) def sleep(self): time.sleep(self.heartbeat * self.worker_status[1]) - # should only sleep until next available task + # should only sleep until next available task def queue_task(self, function, pargs=[], pvars={}, **kwargs): """ diff --git a/gluon/widget.py b/gluon/widget.py index d4a38d95..b754e037 100644 --- a/gluon/widget.py +++ b/gluon/widget.py @@ -1007,6 +1007,8 @@ def start_schedulers(options): processes.append(p) print "Currently running %s scheduler processes" % (len(processes)) p.start() + ##to avoid bashing the db at the same time + time.sleep(0.7) print "Processes started" for p in processes: try: