fixed scheduler bug, thanks Niphlod

This commit is contained in:
mdipierro
2012-10-26 15:49:47 -05:00
parent 6af2e859ac
commit 1621825166
2 changed files with 7 additions and 7 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.2.1 (2012-10-25 17:22:57) stable
Version 2.2.1 (2012-10-26 15:49:42) stable
+6 -6
View File
@@ -63,7 +63,6 @@ import os
import time
import multiprocessing
import sys
import cStringIO
import threading
import traceback
import signal
@@ -206,7 +205,6 @@ def executor(queue, task, out):
if task.app:
os.chdir(os.environ['WEB2PY_PATH'])
from gluon.shell import env, parse_path_info
from gluon.dal import BaseAdapter
from gluon import current
level = logging.getLogger().getEffectiveLevel()
logging.getLogger().setLevel(logging.WARN)
@@ -788,9 +786,9 @@ class Scheduler(MetaScheduler):
#the scheduler): then it wasn't expired, but now it is
db(st.status.belongs(
(QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED)
all_available = db(st.status.belongs((QUEUED, ASSIGNED)))((st.times_run < st.repeats) | (st.repeats == 0))(st.start_time <= now)((st.stop_time == None) | (st.stop_time > now))(st.next_run_time <= now)(st.enabled == True)
limit = len(all_workers) * (50 / len(wkgroups))
limit = len(all_workers) * (50 / (len(wkgroups) or 1))
#if there are a moltitude of tasks, let's figure out a maximum of tasks per worker.
#this can be adjusted with some added intelligence (like esteeming how many tasks will
#a worker complete before the ticker reassign them around, but the gain is quite small
@@ -804,11 +802,13 @@ class Scheduler(MetaScheduler):
#let's freeze it up
db.commit()
x = 0
for group in wkgroups.keys():
tasks = all_available(st.group_name==group).select(
limitby=(0, limit), orderby=st.next_run_time)
#let's break up the queue evenly among workers
for task in tasks:
x += 1
gname = task.group_name
ws = wkgroups.get(gname)
if ws:
@@ -827,10 +827,10 @@ class Scheduler(MetaScheduler):
db.commit()
#I didn't report tasks but I'm working nonetheless!!!!
if len(tasks) > 0:
if x > 0:
self.empty_runs = 0
logger.info('TICKER: workers are %s' % len(all_workers))
logger.info('TICKER: tasks are %s' % len(tasks))
logger.info('TICKER: tasks are %s' % x)
def sleep(self):
time.sleep(self.heartbeat * self.worker_status[1])