Merge pull request #565 from niphlod/scheduler/enhancement

better assignment (tasks go to not running workers only). Thanks @PengfeiYu
This commit is contained in:
mdipierro
2014-12-22 22:28:26 -06:00

View File

@@ -87,11 +87,17 @@ if 'WEB2PY_PATH' not in os.environ:
os.environ['WEB2PY_PATH'] = path
try:
from gluon.contrib.simplejson import loads, dumps
except:
# try external module
from simplejson import loads, dumps
except ImportError:
try:
# try stdlib (Python >= 2.6)
from json import loads, dumps
except:
# fallback to pure-Python module
from gluon.contrib.simplejson import loads, dumps
IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid())
IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid())
logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER)
@@ -160,6 +166,7 @@ class TaskReport(object):
def __str__(self):
return '<TaskReport: %s>' % self.status
class JobGraph(object):
"""Experimental: with JobGraph you can specify
dependencies amongs tasks"""
@@ -170,7 +177,9 @@ class JobGraph(object):
def add_deps(self, task_parent, task_child):
"""Creates a dependency between task_parent and task_child"""
self.db.scheduler_task_deps.insert(task_parent=task_parent, task_child=task_child, job_name=self.job_name)
self.db.scheduler_task_deps.insert(task_parent=task_parent,
task_child=task_child,
job_name=self.job_name)
def validate(self, job_name):
"""Validates if all tasks job_name can be completed, i.e. there
@@ -195,16 +204,18 @@ class JobGraph(object):
try:
rtn = []
for k, v in nested_dict.items():
v.discard(k) # Ignore self dependencies
v.discard(k) # Ignore self dependencies
extra_items_in_deps = reduce(set.union, nested_dict.values()) - set(nested_dict.keys())
nested_dict.update(dict((item, set()) for item in extra_items_in_deps))
while True:
ordered = set(item for item,dep in nested_dict.items() if not dep)
ordered = set(item for item, dep in nested_dict.items() if not dep)
if not ordered:
break
rtn.append(ordered)
nested_dict = dict((item, (dep - ordered)) for item, dep in nested_dict.items()
if item not in ordered)
nested_dict = dict(
(item, (dep - ordered)) for item, dep in nested_dict.items()
if item not in ordered
)
assert not nested_dict, "A cyclic dependency exists amongst %r" % nested_dict
db.commit()
return rtn
@@ -212,6 +223,7 @@ class JobGraph(object):
db.rollback()
return None
def demo_function(*argv, **kwargs):
""" test function """
for i in range(argv[0]):
@@ -268,7 +280,7 @@ def executor(queue, task, out):
def write(self, data):
self.out_queue.put(data)
W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid})
W2P_TASK = Storage({'id': task.task_id, 'uuid': task.uuid})
stdout = LogOutput(out)
try:
if task.app:
@@ -293,7 +305,7 @@ def executor(queue, task, out):
raise NameError(
"name '%s' not found in scheduler's environment" % f)
#Inject W2P_TASK into environment
_env.update({'W2P_TASK' : W2P_TASK})
_env.update({'W2P_TASK': W2P_TASK})
#Inject W2P_TASK into current
from gluon import current
current.W2P_TASK = W2P_TASK
@@ -357,8 +369,7 @@ class MetaScheduler(threading.Thread):
start = time.time()
while p.is_alive() and (
not task.timeout or time.time() - start < task.timeout):
while p.is_alive() and (not task.timeout or time.time() - start < task.timeout):
if tout:
try:
logger.debug(' partial output saved')
@@ -568,7 +579,7 @@ class Scheduler(MetaScheduler):
queue=0,
distribution=None,
workers=0)
) #dict holding statistics
) # dict holding statistics
from gluon import current
current._scheduler = self
@@ -740,7 +751,7 @@ class Scheduler(MetaScheduler):
contention and retries `assign_task` after 0.5 seconds
"""
logger.debug('Assigning tasks...')
db.commit() #db.commit() only for Mysql
db.commit() # db.commit() only for Mysql
x = 0
while x < 10:
try:
@@ -761,7 +772,7 @@ class Scheduler(MetaScheduler):
contention and retries `pop_task` after 0.5 seconds
"""
db = self.db
db.commit() #another nifty db.commit() only for Mysql
db.commit() # another nifty db.commit() only for Mysql
x = 0
while x < 10:
try:
@@ -1077,6 +1088,8 @@ class Scheduler(MetaScheduler):
#build workers as dict of groups
wkgroups = {}
for w in all_workers:
if w.worker_stats['status'] == 'RUNNING':
continue
group_names = w.group_names
for gname in group_names:
if gname not in wkgroups:
@@ -1090,8 +1103,9 @@ class Scheduler(MetaScheduler):
db(
(st.status.belongs((QUEUED, ASSIGNED))) &
(st.stop_time < now)
).update(status=EXPIRED)
).update(status=EXPIRED)
#calculate dependencies
deps_with_no_deps = db(
(sd.can_visit == False) &
(~sd.task_child.belongs(
@@ -1163,7 +1177,7 @@ class Scheduler(MetaScheduler):
if not task.task_name:
d['task_name'] = task.function_name
db(
(st.id==task.id) &
(st.id == task.id) &
(st.status.belongs((QUEUED, ASSIGNED)))
).update(**d)
wkgroups[gname]['workers'][myw]['c'] += 1
@@ -1207,8 +1221,8 @@ class Scheduler(MetaScheduler):
else:
for group in group_names:
workers = self.db(
(ws.group_names.contains(group)) &
(~ws.status.belongs(exclusion))
(ws.group_names.contains(group)) &
(~ws.status.belongs(exclusion))
)._select(ws.id, limitby=(0,limit))
self.db(ws.id.belongs(workers)).update(status=action)
@@ -1339,7 +1353,7 @@ class Scheduler(MetaScheduler):
**dict(orderby=orderby,
left=left,
limitby=(0, 1))
).first()
).first()
if row and output:
row.result = row.scheduler_run.run_result and \
loads(row.scheduler_run.run_result,