Merge pull request #867 from BuhtigithuB/pep8-scheduler-py

Improve PEP8 gluon/scheduler.py
This commit is contained in:
mdipierro
2015-03-22 19:13:16 -05:00
+43 -44
View File
@@ -231,9 +231,9 @@ def demo_function(*argv, **kwargs):
time.sleep(1)
return 'done'
#the two functions below deal with simplejson decoding as unicode, esp for the dict decode
#and subsequent usage as function Keyword arguments unicode variable names won't work!
#borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python
# the two functions below deal with simplejson decoding as unicode, esp for the dict decode
# and subsequent usage as function Keyword arguments unicode variable names won't work!
# borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python
def _decode_list(lst):
@@ -304,9 +304,9 @@ def executor(queue, task, out):
if not isinstance(_function, CALLABLETYPES):
raise NameError(
"name '%s' not found in scheduler's environment" % f)
#Inject W2P_TASK into environment
# Inject W2P_TASK into environment
_env.update({'W2P_TASK': W2P_TASK})
#Inject W2P_TASK into current
# Inject W2P_TASK into current
from gluon import current
current.W2P_TASK = W2P_TASK
globals().update(_env)
@@ -795,7 +795,7 @@ class Scheduler(MetaScheduler):
#let's do that and loop again
self.wrapped_assign_tasks(db)
return None
#ready to process something
# ready to process something
grabbed = db(
(st.assigned_worker_name == self.worker_name) &
(st.status == ASSIGNED)
@@ -804,12 +804,12 @@ class Scheduler(MetaScheduler):
task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first()
if task:
task.update_record(status=RUNNING, last_run_time=now)
#noone will touch my task!
# noone will touch my task!
db.commit()
logger.debug(' work to do %s', task.id)
else:
if self.is_a_ticker and self.greedy:
#there are other tasks ready to be assigned
# there are other tasks ready to be assigned
logger.info('TICKER: greedy loop')
self.wrapped_assign_tasks(db)
else:
@@ -825,10 +825,10 @@ class Scheduler(MetaScheduler):
seconds=task.period * times_run
)
if times_run < task.repeats or task.repeats == 0:
#need to run (repeating task)
# need to run (repeating task)
run_again = True
else:
#no need to run again
# no need to run again
run_again = False
run_id = 0
while True and not self.discard_results:
@@ -889,9 +889,9 @@ class Scheduler(MetaScheduler):
sr = db.scheduler_run
if not self.discard_results:
if task_report.result != 'null' or task_report.tb:
#result is 'null' as a string if task completed
#if it's stopped it's None as NoneType, so we record
#the STOPPED "run" anyway
# result is 'null' as a string if task completed
# if it's stopped it's None as NoneType, so we record
# the STOPPED "run" anyway
logger.debug(' recording task report in db (%s)',
task_report.status)
db(sr.id == task.run_id).update(
@@ -903,7 +903,7 @@ class Scheduler(MetaScheduler):
else:
logger.debug(' deleting task report in db because of no result')
db(sr.id == task.run_id).delete()
#if there is a stop_time and the following run would exceed it
# if there is a stop_time and the following run would exceed it
is_expired = (task.stop_time
and task.next_run_time > task.stop_time
and True or False)
@@ -1056,16 +1056,16 @@ class Scheduler(MetaScheduler):
ticker = all_active.find(lambda row: row.is_ticker is True).first()
not_busy = self.w_stats.status == ACTIVE
if not ticker:
#if no other tickers are around
# if no other tickers are around
if not_busy:
#only if I'm not busy
# only if I'm not busy
db(sw.worker_name == my_name).update(is_ticker=True)
db(sw.worker_name != my_name).update(is_ticker=False)
logger.info("TICKER: I'm a ticker")
else:
#I'm busy
# I'm busy
if len(all_active) >= 1:
#so I'll "downgrade" myself to a "poor worker"
# so I'll "downgrade" myself to a "poor worker"
db(sw.worker_name == my_name).update(is_ticker=False)
else:
not_busy = True
@@ -1085,7 +1085,7 @@ class Scheduler(MetaScheduler):
sw, st, sd = db.scheduler_worker, db.scheduler_task, db.scheduler_task_deps
now = self.now()
all_workers = db(sw.status == ACTIVE).select()
#build workers as dict of groups
# build workers as dict of groups
wkgroups = {}
for w in all_workers:
if w.worker_stats['status'] == 'RUNNING':
@@ -1098,14 +1098,14 @@ class Scheduler(MetaScheduler):
else:
wkgroups[gname]['workers'].append(
{'name': w.worker_name, 'c': 0})
#set queued tasks that expired between "runs" (i.e., you turned off
#the scheduler): then it wasn't expired, but now it is
# set queued tasks that expired between "runs" (i.e., you turned off
# the scheduler): then it wasn't expired, but now it is
db(
(st.status.belongs((QUEUED, ASSIGNED))) &
(st.stop_time < now)
).update(status=EXPIRED)
#calculate dependencies
# calculate dependencies
deps_with_no_deps = db(
(sd.can_visit == False) &
(~sd.task_child.belongs(
@@ -1114,7 +1114,7 @@ class Scheduler(MetaScheduler):
)
)._select(sd.task_child)
no_deps = db(
(st.status.belongs((QUEUED,ASSIGNED))) &
(st.status.belongs((QUEUED, ASSIGNED))) &
(
(sd.id == None) | (st.id.belongs(deps_with_no_deps))
@@ -1137,27 +1137,27 @@ class Scheduler(MetaScheduler):
limit = len(all_workers) * (50 / (len(wkgroups) or 1))
#if there are a moltitude of tasks, let's figure out a maximum of
#tasks per worker. This can be further tuned with some added
#intelligence (like esteeming how many tasks will a worker complete
#before the ticker reassign them around, but the gain is quite small
#50 is a sweet spot also for fast tasks, with sane heartbeat values
#NB: ticker reassign tasks every 5 cycles, so if a worker completes its
#50 tasks in less than heartbeat*5 seconds,
#it won't pick new tasks until heartbeat*5 seconds pass.
# if there are a moltitude of tasks, let's figure out a maximum of
# tasks per worker. This can be further tuned with some added
# intelligence (like esteeming how many tasks will a worker complete
# before the ticker reassign them around, but the gain is quite small
# 50 is a sweet spot also for fast tasks, with sane heartbeat values
# NB: ticker reassign tasks every 5 cycles, so if a worker completes its
# 50 tasks in less than heartbeat*5 seconds,
# it won't pick new tasks until heartbeat*5 seconds pass.
#If a worker is currently elaborating a long task, its tasks needs to
#be reassigned to other workers
#this shuffles up things a bit, in order to give a task equal chances
#to be executed
# If a worker is currently elaborating a long task, its tasks needs to
# be reassigned to other workers
# this shuffles up things a bit, in order to give a task equal chances
# to be executed
#let's freeze it up
# let's freeze it up
db.commit()
x = 0
for group in wkgroups.keys():
tasks = all_available(st.group_name == group).select(
limitby=(0, limit), orderby = st.next_run_time)
#let's break up the queue evenly among workers
# let's break up the queue evenly among workers
for task in tasks:
x += 1
gname = task.group_name
@@ -1182,13 +1182,13 @@ class Scheduler(MetaScheduler):
).update(**d)
wkgroups[gname]['workers'][myw]['c'] += 1
db.commit()
#I didn't report tasks but I'm working nonetheless!!!!
# I didn't report tasks but I'm working nonetheless!!!!
if x > 0:
self.w_stats.empty_runs = 0
self.w_stats.queue = x
self.w_stats.distribution = wkgroups
self.w_stats.workers = len(all_workers)
#I'll be greedy only if tasks assigned are equal to the limit
# I'll be greedy only if tasks assigned are equal to the limit
# (meaning there could be others ready to be assigned)
self.greedy = x >= limit
logger.info('TICKER: workers are %s', len(all_workers))
@@ -1201,7 +1201,7 @@ class Scheduler(MetaScheduler):
# should only sleep until next available task
def set_worker_status(self, group_names=None, action=ACTIVE,
exclude=None, limit=None, worker_name=None):
exclude=None, limit=None, worker_name=None):
"""Internal function to set worker's status"""
ws = self.db.scheduler_worker
if not group_names:
@@ -1220,10 +1220,9 @@ class Scheduler(MetaScheduler):
).update(status=action)
else:
for group in group_names:
workers = self.db(
(ws.group_names.contains(group)) &
(~ws.status.belongs(exclusion))
)._select(ws.id, limitby=(0,limit))
workers = self.db((ws.group_names.contains(group)) &
(~ws.status.belongs(exclusion))
)._select(ws.id, limitby=(0, limit))
self.db(ws.id.belongs(workers)).update(status=action)
def disable(self, group_names=None, limit=None, worker_name=None):