From 1621825166e26d4aee64bf77bfc5152012f7f709 Mon Sep 17 00:00:00 2001 From: mdipierro Date: Fri, 26 Oct 2012 15:49:47 -0500 Subject: [PATCH] fixed scheduler bug, thanks Niphlod --- VERSION | 2 +- gluon/scheduler.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/VERSION b/VERSION index 07725f11..27056127 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.2.1 (2012-10-25 17:22:57) stable +Version 2.2.1 (2012-10-26 15:49:42) stable diff --git a/gluon/scheduler.py b/gluon/scheduler.py index fb6d5841..2cc7e1c5 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -63,7 +63,6 @@ import os import time import multiprocessing import sys -import cStringIO import threading import traceback import signal @@ -206,7 +205,6 @@ def executor(queue, task, out): if task.app: os.chdir(os.environ['WEB2PY_PATH']) from gluon.shell import env, parse_path_info - from gluon.dal import BaseAdapter from gluon import current level = logging.getLogger().getEffectiveLevel() logging.getLogger().setLevel(logging.WARN) @@ -788,9 +786,9 @@ class Scheduler(MetaScheduler): #the scheduler): then it wasn't expired, but now it is db(st.status.belongs( (QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED) - + all_available = db(st.status.belongs((QUEUED, ASSIGNED)))((st.times_run < st.repeats) | (st.repeats == 0))(st.start_time <= now)((st.stop_time == None) | (st.stop_time > now))(st.next_run_time <= now)(st.enabled == True) - limit = len(all_workers) * (50 / len(wkgroups)) + limit = len(all_workers) * (50 / (len(wkgroups) or 1)) #if there are a moltitude of tasks, let's figure out a maximum of tasks per worker. #this can be adjusted with some added intelligence (like esteeming how many tasks will #a worker complete before the ticker reassign them around, but the gain is quite small @@ -804,11 +802,13 @@ class Scheduler(MetaScheduler): #let's freeze it up db.commit() + x = 0 for group in wkgroups.keys(): tasks = all_available(st.group_name==group).select( limitby=(0, limit), orderby=st.next_run_time) #let's break up the queue evenly among workers for task in tasks: + x += 1 gname = task.group_name ws = wkgroups.get(gname) if ws: @@ -827,10 +827,10 @@ class Scheduler(MetaScheduler): db.commit() #I didn't report tasks but I'm working nonetheless!!!! - if len(tasks) > 0: + if x > 0: self.empty_runs = 0 logger.info('TICKER: workers are %s' % len(all_workers)) - logger.info('TICKER: tasks are %s' % len(tasks)) + logger.info('TICKER: tasks are %s' % x) def sleep(self): time.sleep(self.heartbeat * self.worker_status[1])