Merge pull request #1320 from niphlod/enhancement/redis_scheduler

sync with main scheduler
This commit is contained in:
mdipierro
2016-05-09 00:03:42 -05:00

View File

@@ -9,6 +9,17 @@ Scheduler with redis backend
---------------------------------
"""
import os
import time
import socket
import datetime
import logging
from gluon.utils import web2py_uuid
from gluon.storage import Storage
from gluon.scheduler import *
from gluon.scheduler import _decode_dict
from gluon.contrib.redis_utils import RWatchError
USAGE = """
## Example
@@ -35,11 +46,6 @@ mysched = RScheduler(db, dict(demo1=demo1,demo2=demo2), ...., redis_conn=rconn)
"""
import os
import time
import socket
import datetime
import logging
path = os.getcwd()
@@ -47,20 +53,19 @@ if 'WEB2PY_PATH' not in os.environ:
os.environ['WEB2PY_PATH'] = path
try:
from gluon.contrib.simplejson import loads, dumps
except:
# try external module
from simplejson import loads, dumps
except ImportError:
try:
# try stdlib (Python >= 2.6)
from json import loads, dumps
except:
# fallback to pure-Python module
from gluon.contrib.simplejson import loads, dumps
IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid())
logger = logging.getLogger('web2py.rscheduler.%s' % IDENTIFIER)
from gluon.utils import web2py_uuid
from gluon.storage import Storage
from gluon.scheduler import *
from gluon.scheduler import _decode_dict
from gluon.contrib.redis_utils import RWatchError
logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER)
POLLING = 'POLLING'
@@ -111,8 +116,7 @@ class RScheduler(Scheduler):
self._application = current.request.application or 'appname'
def _nkey(self, key):
"""Helper to restrict all keys to a namespace
and track them"""
"""Helper to restrict all keys to a namespace and track them."""
prefix = 'w2p:rsched:%s' % self._application
allkeys = '%s:allkeys' % prefix
newkey = "%s:%s" % (prefix, key)
@@ -120,10 +124,7 @@ class RScheduler(Scheduler):
return newkey
def prune_all(self):
"""
Just to be fair and implement a method
that does housekeeping
"""
"""Global housekeeping."""
all_keys = self._nkey('allkeys')
with self.r_server.pipeline() as pipe:
while True:
@@ -148,8 +149,9 @@ class RScheduler(Scheduler):
def send_heartbeat(self, counter):
"""
workers coordination has evolved into something is not that
easy. Here we try to do what we need in a single transaction,
Workers coordination in redis.
It has evolved into something is not that easy.
Here we try to do what we need in a single transaction,
and retry that transaction if something goes wrong
"""
with self.r_server.pipeline() as pipe:
@@ -167,7 +169,9 @@ class RScheduler(Scheduler):
def inner_send_heartbeat(self, counter, pipe):
"""
Does a few things:
Do a few things in the "maintenance" thread.
Specifically:
- registers the workers
- accepts commands sent to workers (KILL, TERMINATE, PICK, DISABLED, etc)
- adjusts sleep
@@ -269,6 +273,8 @@ class RScheduler(Scheduler):
def being_a_ticker(self, pipe):
"""
Elects a ticker.
This is slightly more convoluted than the original
but if far more efficient
"""
@@ -311,7 +317,9 @@ class RScheduler(Scheduler):
def assign_tasks(self, db):
"""
The real beauty. We don't need to ASSIGN tasks, we just put
The real beauty.
We don't need to ASSIGN tasks, we just put
them into the relevant queue
"""
st, sd = db.scheduler_task, db.scheduler_task_deps
@@ -375,9 +383,6 @@ class RScheduler(Scheduler):
all_available = db(
(st.status.belongs((QUEUED, ASSIGNED))) &
((st.times_run < st.repeats) | (st.repeats == 0)) &
(st.start_time <= now) &
((st.stop_time == None) | (st.stop_time > now)) &
(st.next_run_time <= now) &
(st.enabled == True) &
(st.id.belongs(no_deps))
@@ -437,6 +442,7 @@ class RScheduler(Scheduler):
logger.info('TICKER: tasks are %s', x)
def pop_task(self, db):
"""Lift a task off a queue."""
r_server = self.r_server
st = self.db.scheduler_task
task = None
@@ -533,7 +539,9 @@ class RScheduler(Scheduler):
def report_task(self, task, task_report):
"""
Needs overwriting only because we need to pop from the
Override.
Needs it only because we need to pop from the
running tasks
"""
r_server = self.r_server
@@ -558,12 +566,12 @@ class RScheduler(Scheduler):
logger.debug(' deleting task report in db because of no result')
db(sr.id == task.run_id).delete()
# if there is a stop_time and the following run would exceed it
is_expired = (task.stop_time
and task.next_run_time > task.stop_time
and True or False)
status = (task.run_again and is_expired and EXPIRED
or task.run_again and not is_expired
and QUEUED or COMPLETED)
is_expired = (task.stop_time and
task.next_run_time > task.stop_time and
True or False)
status = (task.run_again and is_expired and EXPIRED or
task.run_again and not is_expired and
QUEUED or COMPLETED)
if task_report.status == COMPLETED:
# assigned calculations
d = dict(status=status,
@@ -579,10 +587,10 @@ class RScheduler(Scheduler):
st_mapping = {'FAILED': 'FAILED',
'TIMEOUT': 'TIMEOUT',
'STOPPED': 'FAILED'}[task_report.status]
status = (task.retry_failed
and task.times_failed < task.retry_failed
and QUEUED or task.retry_failed == -1
and QUEUED or st_mapping)
status = (task.retry_failed and
task.times_failed < task.retry_failed and
QUEUED or task.retry_failed == -1 and
QUEUED or st_mapping)
db(st.id == task.task_id).update(
times_failed=st.times_failed + 1,
next_run_time=task.next_run_time,
@@ -596,7 +604,7 @@ class RScheduler(Scheduler):
r_server.hdel(running_dict, task.task_id)
def wrapped_pop_task(self):
"""Commodity function to call `pop_task` and trap exceptions
"""Commodity function to call `pop_task` and trap exceptions.
If an exception is raised, assume it happened because of database
contention and retries `pop_task` after 0.5 seconds
"""
@@ -620,8 +628,8 @@ class RScheduler(Scheduler):
time.sleep(0.5)
def get_workers(self, only_ticker=False):
""" Returns a dict holding worker_name : {**columns}
representing all "registered" workers
"""Return a dict holding worker_name : {**columns}
representing all "registered" workers.
only_ticker returns only the worker running as a TICKER,
if there is any
"""