many scheduler improvements, thanks Niphlod

This commit is contained in:
mdipierro
2012-11-01 21:35:53 -05:00
parent 3173d75dad
commit 755a860011
3 changed files with 106 additions and 62 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.2.1 (2012-11-01 21:33:00) stable
Version 2.2.1 (2012-11-01 21:35:48) stable
+103 -61
View File
@@ -40,8 +40,8 @@ http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0
## view workers
http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0
## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put the
## following into /etc/init/web2py-scheduler.conf:
## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put
## the following into /etc/init/web2py-scheduler.conf:
## (This assumes your web2py instance is installed in <user>'s home directory,
## running as <user>, with app <myapp>, on network interface eth0.)
@@ -116,7 +116,7 @@ CALLABLETYPES = (types.LambdaType, types.FunctionType,
class Task(object):
def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs):
logger.debug(' new task allocated: %s.%s' % (app, function))
logger.debug(' new task allocated: %s.%s', app, function)
self.app = app
self.function = function
self.timeout = timeout
@@ -130,11 +130,11 @@ class Task(object):
class TaskReport(object):
def __init__(self, status, result=None, output=None, tb=None):
logger.debug(' new task report: %s' % status)
logger.debug(' new task report: %s', status)
if tb:
logger.debug(' traceback: %s' % tb)
logger.debug(' traceback: %s', tb)
else:
logger.debug(' result: %s' % result)
logger.debug(' result: %s', result)
self.status = status
self.result = result
self.output = output
@@ -213,7 +213,6 @@ def executor(queue, task, out):
(a, c, f) = parse_path_info(task.app)
_env = env(a=a, c=c, import_models=True)
logging.getLogger().setLevel(level)
scheduler = current._scheduler
f = task.function
functions = current._scheduler.tasks
if not functions:
@@ -434,14 +433,14 @@ class Scheduler(MetaScheduler):
self.heartbeat = heartbeat
self.worker_name = worker_name or socket.gethostname(
) + '#' + str(os.getpid())
self.worker_status = RUNNING, 1 # tuple containing status as recorded in
#the table, plus a boost parameter for
#hibernation (i.e. when someone stop the
#worker acting on the scheduler_worker table)
#list containing status as recorded in the table plus a boost parameter
#for hibernation (i.e. when someone stop the worker acting on the worker table)
self.worker_status = [RUNNING, 1]
self.max_empty_runs = max_empty_runs
self.discard_results = discard_results
self.is_a_ticker = False
self.do_assign_tasks = False
self.greedy = False
self.utc_time = utc_time
from gluon import current
@@ -461,7 +460,7 @@ class Scheduler(MetaScheduler):
def define_tables(self, db, migrate):
from gluon.dal import DEFAULT
logger.debug('defining tables (migrate=%s)' % migrate)
logger.debug('defining tables (migrate=%s)', migrate)
now = self.now
db.define_table(
'scheduler_task',
@@ -531,14 +530,16 @@ class Scheduler(MetaScheduler):
self.start_heartbeats()
while True and self.have_heartbeat:
if self.worker_status[0] == DISABLED:
logger.debug('Someone stopped me, sleeping until better times come (%s)' % self.worker_status[1])
logger.debug('Someone stopped me, sleeping until better times come (%s)', self.worker_status[1])
self.sleep()
continue
logger.debug('looping...')
task = self.pop_task()
if task:
self.empty_runs = 0
self.worker_status[0] = RUNNING
self.report_task(task, self.async(task))
self.worker_status[0] = ACTIVE
else:
self.empty_runs += 1
logger.debug('sleeping...')
@@ -554,23 +555,29 @@ class Scheduler(MetaScheduler):
logger.info('catched')
self.die()
def wrapped_assign_tasks(self, db):
db.commit() # ?don't know if it's useful, let's be completely sure
x = 0
while x < 10:
try:
self.assign_tasks(db)
db.commit()
break
except:
db.rollback()
logger.error('TICKER(%s): error assigning tasks', self.worker_name)
x += 1
time.sleep(0.5)
def pop_task(self):
now = self.now()
db, st = self.db, self.db.scheduler_task
if self.is_a_ticker and self.do_assign_tasks:
#I'm a ticker, and 5 loops passed without reassigning tasks, let's do
#that and loop again
db.commit() # ?don't know if it's useful, let's be completely sure
while True:
try:
self.assign_tasks()
db.commit()
break
except:
db.rollback()
logger.error('TICKER: error assigning tasks')
self.wrapped_assign_tasks(db)
return None
db.commit()
#ready to process something
grabbed = db(st.assigned_worker_name == self.worker_name)(
st.status == ASSIGNED)
@@ -579,16 +586,23 @@ class Scheduler(MetaScheduler):
task.update_record(status=RUNNING, last_run_time=now)
#noone will touch my task!
db.commit()
logger.debug(' work to do %s' % task.id)
logger.debug(' work to do %s', task.id)
else:
logger.debug('nothing to do')
if self.greedy and self.is_a_ticker:
#there are other tasks ready to be assigned
logger.info('TICKER (%s): greedy loop', self.worker_name)
self.wrapped_assign_tasks(db)
else:
logger.info('nothing to do')
return None
next_run_time = task.last_run_time + datetime.timedelta(
seconds=task.period)
times_run = task.times_run + 1
if times_run < task.repeats or task.repeats == 0:
#need to run (repeating task)
run_again = True
else:
#no need to run again
run_again = False
run_id = 0
while True and not self.discard_results:
@@ -602,6 +616,7 @@ class Scheduler(MetaScheduler):
db.commit()
break
except:
time.sleep(0.5)
db.rollback()
logger.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task)
return Task(
@@ -630,7 +645,7 @@ class Scheduler(MetaScheduler):
#result is 'null' as a string if task completed
#if it's stopped it's None as NoneType, so we record
#the STOPPED "run" anyway
logger.debug(' recording task report in db (%s)' %
logger.debug(' recording task report in db (%s)',
task_report.status)
db(db.scheduler_run.id == task.run_id).update(
status=task_report.status,
@@ -641,6 +656,7 @@ class Scheduler(MetaScheduler):
else:
logger.debug(' deleting task report in db because of no result')
db(db.scheduler_run.id == task.run_id).delete()
#if there is a stop_time and the following run would exceed it
is_expired = (task.stop_time
and task.next_run_time > task.stop_time
and True or False)
@@ -663,21 +679,26 @@ class Scheduler(MetaScheduler):
and task.times_failed < task.retry_failed
and QUEUED or task.retry_failed == -1
and QUEUED or st_mapping)
db(db.scheduler_task.id == task.task_id)(db.scheduler_task.status == RUNNING).update(
db(
(db.scheduler_task.id == task.task_id) &
(db.scheduler_task.status == RUNNING)
).update(
times_failed=db.scheduler_task.times_failed + 1,
next_run_time=task.next_run_time,
status=status)
status=status
)
db.commit()
logger.info('task completed (%s)' % task_report.status)
logger.info('task completed (%s)', task_report.status)
break
except:
db.rollback()
time.sleep(0.5)
def adj_hibernation(self):
if self.worker_status[0] == DISABLED:
hibernation = self.worker_status[1] + 1 if self.worker_status[
1] < MAXHIBERNATION else MAXHIBERNATION
self.worker_status = DISABLED, hibernation
wk_st = self.worker_status[1]
hibernation = wk_st + 1 if wk_st < MAXHIBERNATION else MAXHIBERNATION
self.worker_status[1] = hibernation
def send_heartbeat(self, counter):
if not self.db_thread:
@@ -689,9 +710,6 @@ class Scheduler(MetaScheduler):
db = self.db_thread
sw, st = db.scheduler_worker, db.scheduler_task
now = self.now()
expiration = now - datetime.timedelta(seconds=self.heartbeat * 3)
departure = now - datetime.timedelta(
seconds=self.heartbeat * 3 * MAXHIBERNATION)
# record heartbeat
mybackedstatus = db(
sw.worker_name == self.worker_name).select().first()
@@ -699,35 +717,37 @@ class Scheduler(MetaScheduler):
sw.insert(status=ACTIVE, worker_name=self.worker_name,
first_heartbeat=now, last_heartbeat=now,
group_names=self.group_names)
self.worker_status = ACTIVE, 1 # activating the process
self.worker_status = [ACTIVE, 1] # activating the process
else:
if mybackedstatus.status == DISABLED:
self.worker_status = DISABLED, self.worker_status[
1] # keep sleeping
# keep sleeping
self.worker_status[0] = DISABLED
if self.worker_status[1] == MAXHIBERNATION:
logger.debug('........recording heartbeat')
db(sw.worker_name == self.worker_name).update(
last_heartbeat=now)
elif mybackedstatus.status == TERMINATE:
self.worker_status = TERMINATE, self.worker_status[1]
self.worker_status[0] = TERMINATE
logger.debug("Waiting to terminate the current task")
self.give_up()
return
elif mybackedstatus.status == KILL:
self.worker_status = KILL, self.worker_status[1]
self.worker_status[0] = KILL
self.die()
else:
logger.debug('........recording heartbeat')
logger.debug('........recording heartbeat (%s)', self.worker_status[0])
db(sw.worker_name == self.worker_name).update(
last_heartbeat=now, status=ACTIVE)
self.worker_status = ACTIVE, 1 # re-activating the process
self.worker_status[1] = 1 # re-activating the process
self.do_assign_tasks = False
if counter % 5 == 0:
try:
# delete inactive workers
expiration = now - datetime.timedelta(seconds=self.heartbeat * 3)
departure = now - datetime.timedelta(
seconds=self.heartbeat * 3 * MAXHIBERNATION)
logger.debug(
' freeing workers that have not sent heartbeat')
inactive_workers = db(
@@ -753,21 +773,31 @@ class Scheduler(MetaScheduler):
def being_a_ticker(self):
db = self.db_thread
sw = db.scheduler_worker
ticker = db((sw.worker_name != self.worker_name) & (
sw.is_ticker == True) & (sw.status == ACTIVE)).select().first()
all_active = db(
(sw.worker_name != self.worker_name) & (sw.status == ACTIVE)
).select()
ticker = all_active.find(lambda row: row.is_ticker is True).first()
not_busy = self.worker_status[0] == ACTIVE
if not ticker:
db(sw.worker_name == self.worker_name).update(is_ticker=True)
db(sw.worker_name != self.worker_name).update(is_ticker=False)
logger.info("TICKER: I'm a ticker (%s)" % self.worker_name)
if not_busy:
#only if this worker isn't busy, otherwise wait for a free one
db(sw.worker_name == self.worker_name).update(is_ticker=True)
db(sw.worker_name != self.worker_name).update(is_ticker=False)
logger.info("TICKER(%s): I'm a ticker", self.worker_name)
else:
#giving up, only if I'm not alone
if len(all_active) > 1:
db(sw.worker_name == self.worker_name).update(is_ticker=False)
else:
not_busy = True
db.commit()
return True
return not_busy
else:
logger.info(
"%s is a ticker, I'm a poor worker" % ticker.worker_name)
return False
def assign_tasks(self):
db = self.db
def assign_tasks(self, db):
sw, st = db.scheduler_worker, db.scheduler_task
now = self.now()
all_workers = db(sw.status == ACTIVE).select()
@@ -786,8 +816,15 @@ class Scheduler(MetaScheduler):
#the scheduler): then it wasn't expired, but now it is
db(st.status.belongs(
(QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED)
all_available = db(st.status.belongs((QUEUED, ASSIGNED)))((st.times_run < st.repeats) | (st.repeats == 0))(st.start_time <= now)((st.stop_time == None) | (st.stop_time > now))(st.next_run_time <= now)(st.enabled == True)
all_available = db(
(st.status.belongs((QUEUED, ASSIGNED))) &
((st.times_run < st.repeats) | (st.repeats == 0)) &
(st.start_time <= now) &
((st.stop_time == None) | (st.stop_time > now)) &
(st.next_run_time <= now) &
(st.enabled == True)
)
limit = len(all_workers) * (50 / (len(wkgroups) or 1))
#if there are a moltitude of tasks, let's figure out a maximum of tasks per worker.
#this can be adjusted with some added intelligence (like esteeming how many tasks will
@@ -804,8 +841,8 @@ class Scheduler(MetaScheduler):
db.commit()
x = 0
for group in wkgroups.keys():
tasks = all_available(st.group_name==group).select(
limitby=(0, limit), orderby=st.next_run_time)
tasks = all_available(st.group_name == group).select(
limitby=(0, limit), orderby = st.next_run_time)
#let's break up the queue evenly among workers
for task in tasks:
x += 1
@@ -818,8 +855,10 @@ class Scheduler(MetaScheduler):
if w['c'] < counter:
myw = i
counter = w['c']
d = dict(status=ASSIGNED,
assigned_worker_name=wkgroups[gname]['workers'][myw]['name'])
d = dict(
status=ASSIGNED,
assigned_worker_name=wkgroups[gname]['workers'][myw]['name']
)
if not task.task_name:
d['task_name'] = task.function_name
task.update_record(**d)
@@ -829,12 +868,15 @@ class Scheduler(MetaScheduler):
#I didn't report tasks but I'm working nonetheless!!!!
if x > 0:
self.empty_runs = 0
logger.info('TICKER: workers are %s' % len(all_workers))
logger.info('TICKER: tasks are %s' % x)
#I'll be greedy only if tasks assigned are equal to the limit
# (meaning there could be others ready to be assigned)
self.greedy = x >= limit and True or False
logger.info('TICKER(%s): workers are %s', self.worker_name, len(all_workers))
logger.info('TICKER(%s): tasks are %s', self.worker_name, x)
def sleep(self):
time.sleep(self.heartbeat * self.worker_status[1])
# should only sleep until next available task
# should only sleep until next available task
def queue_task(self, function, pargs=[], pvars={}, **kwargs):
"""
+2
View File
@@ -1007,6 +1007,8 @@ def start_schedulers(options):
processes.append(p)
print "Currently running %s scheduler processes" % (len(processes))
p.start()
##to avoid bashing the db at the same time
time.sleep(0.7)
print "Processes started"
for p in processes:
try: