From 12acdb51d74131a56126c8b4ca65bcfd3a81632f Mon Sep 17 00:00:00 2001 From: niphlod Date: Sat, 2 Jan 2016 23:15:00 +0100 Subject: [PATCH] new redis toolset to use with web2py This is a refactor of everything web2py uses with redis. Specifically: - a refactored redis_cache.py that fixes #958, allowing a "fail gracefully" behaviour in case redis is not available - a refactored redis_session.py that blocks less with with_lock=True (although still not optimal) - a new (and NEEDED) redis_utils.py that serves as the base for everything else, allowing an RConn object that you can freely use as a redis.StrictRedis connection, and that you can override in case you're using a different library (or that web2py will use in case redis-py won't be the de-facto standard around) - a newly - and much anticipated - redis_scheduler.py. It's a slip-in replacement for the standard scheduler that uses redis for workers coordination. Feel free to dig in the code and improve it. For redis_cache and redis_session changes are BREAKING. It means that users will need to change the import locations and tune a bit the code. Now every module depends on an gluon.contrib.redis_utils.RConn object (or similar) that in turns is very similar to a redis.StrictRedis one. The redis instance is EXTERNAL to the modules themselves (no more "host, port, db, password" parameters in RedisCache or RedisSession) See the relevant docstrings for usage examples --- CHANGELOG | 31 +- gluon/contrib/redis_cache.py | 106 ++--- gluon/contrib/redis_scheduler.py | 785 +++++++++++++++++++++++++++++++ gluon/contrib/redis_session.py | 116 ++--- gluon/contrib/redis_utils.py | 70 +++ 5 files changed, 980 insertions(+), 128 deletions(-) create mode 100644 gluon/contrib/redis_scheduler.py create mode 100644 gluon/contrib/redis_utils.py diff --git a/CHANGELOG b/CHANGELOG index 01e106f0..f684e667 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,9 +1,38 @@ +## trunk +- new JWT implementation (experimental) +- new gluon.contrib.redis_scheduler +- BREAKING: changes to gluon.contrib.redis_cache + BEFORE: + from gluon.contrib.redis_cache import RedisCache + cache.redis = RedisCache('localhost:6379',db=None, debug=True) + NOW: + from gluon.contrib.redis_utils import RConn + from gluon.contrib.redis_cache import RedisCache + rconn = RConn() + # or RConn(host='localhost', port=6379, + # db=0, password=None, socket_timeout=None, + # socket_connect_timeout=None, .....) + # exactly as a redis.StrictRedis instance + cache.redis = RedisCache(redis_conn=rconn, debug=True) +- BREAKING: changes to gluon.contrib.redis_session + BEFORE: + from gluon.contrib.redis_session import RedisSession + sessiondb = RedisSession('localhost:6379',db=0, session_expiry=False) + session.connect(request, response, db = sessiondb) + NOW: + from gluon.contrib.redis_utils import RConn + from gluon.contrib.redis_session import RedisSession + rconn = RConn() + sessiondb = RedisSession(redis_conn=rconn, session_expiry=False) + session.connect(request, response, db = sessiondb) + + ## 2.13.1-2 - fixed a security issue in request_reset_password - added fabfile.py - fixed oauth2 renew token, thanks dokime7 -- fixed add_membership, del_membership, add_membership IntegrityError (when auth.enable_record_versioning) +- fixed add_membership, del_membership, add_membership IntegrityError (when auth.enable_record_versioning) - allow passing unicode to template render - allow IS_NOT_IN_DB to work with custom primarykey, thanks timmyborg - allow HttpOnly cookies diff --git a/gluon/contrib/redis_cache.py b/gluon/contrib/redis_cache.py index 0f35395a..47f30959 100644 --- a/gluon/contrib/redis_cache.py +++ b/gluon/contrib/redis_cache.py @@ -2,20 +2,20 @@ Developed by niphlod@gmail.com Released under web2py license because includes gluon/cache.py source code """ -import redis -from redis.exceptions import ConnectionError -from gluon import current -from gluon.cache import CacheAbstract + try: - import cPickle as pickle + import cPickle as pickle except: - import pickle + import pickle import time import re import logging import thread import random - +from gluon import current +from gluon.cache import CacheAbstract +from gluon.contrib.redis_utils import acquire_lock, release_lock +from gluon.contrib.redis_utils import register_release_lock, RConnectionError logger = logging.getLogger("web2py.cache.redis") @@ -24,17 +24,22 @@ locker = thread.allocate_lock() def RedisCache(*args, **vars): """ - Usage example: put in models + Usage example: put in models:: - from gluon.contrib.redis_cache import RedisCache - cache.redis = RedisCache('localhost:6379',db=None, debug=True, with_lock=True, password=None) + from gluon.contrib.redis_utils import RConn + rconn = RConn() + from gluon.contrib.redis_cache import RedisCache + cache.redis = RedisCache(redis_conn=rconn, debug=True, with_lock=True) - :param db: redis db to use (0..16) - :param debug: if True adds to stats() the total_hits and misses - :param with_lock: sets the default locking mode for creating new keys. + Args: + redis_conn: a redis-like connection object + debug: if True adds to stats() the total_hits and misses + with_lock: sets the default locking mode for creating new keys. By default is False (usualy when you choose Redis you do it for performances reason) When True, only one thread/process can set a value concurrently + fail_gracefully: if redis is unavailable, returns the value computing it + instead of raising an exception When you use cache.redis directly you can use value = cache.redis('mykey', lambda: time.time(), with_lock=True) @@ -81,22 +86,19 @@ class RedisClient(object): MAX_RETRIES = 5 RETRIES = 0 - def __init__(self, server='localhost:6379', db=None, debug=False, with_lock=False, password=None): - self.server = server - self.password = password - self.db = db or 0 - host, port = (self.server.split(':') + ['6379'])[:2] - port = int(port) + def __init__(self, redis_conn=None, debug=False, + with_lock=False, fail_gracefully=False): self.request = current.request self.debug = debug self.with_lock = with_lock - self.prefix = "w2p:%s:" % (self.request.application) + self.fail_gracefully = fail_gracefully + self.prefix = "w2p:cache:%s:" % (self.request.application) if self.request: app = self.request.application else: app = '' - if not app in self.meta_storage: + if app not in self.meta_storage: self.storage = self.meta_storage[app] = { CacheAbstract.cache_stats_name: { 'hit_total': 0, @@ -107,7 +109,8 @@ class RedisClient(object): self.cache_set_key = 'w2p:%s:___cache_set' % (self.request.application) - self.r_server = redis.Redis(host=host, port=port, db=self.db, password=self.password) + self.r_server = redis_conn + self._release_script = register_release_lock(self.r_server) def initialize(self): pass @@ -140,26 +143,20 @@ class RedisClient(object): #naive distributed locking if with_lock: lock_key = '%s:__lock' % newKey - try: - while True: - lock = self.r_server.setnx(lock_key, 1) - if lock: - value = self.cache_it(newKey, f, time_expire) - break - else: - time.sleep(0.2) - #did someone else create it in the meanwhile ? - obj = self.r_server.get(newKey) - if obj: - value = pickle.loads(obj) - break - finally: - self.r_server.delete(lock_key) + randomvalue = time.time() + al = acquire_lock(self.r_server, lock_key, randomvalue) + #someone may have computed it + obj = self.r_server.get(newKey) + if obj is None: + value = self.cache_it(newKey, f, time_expire) + else: + value = pickle.loads(obj) + release_lock(self, lock_key, al) else: #without distributed locking value = self.cache_it(newKey, f, time_expire) return value - except ConnectionError: + except RConnectionError: return self.retry_call(key, f, time_expire, with_lock) def cache_it(self, key, f, time_expire): @@ -172,39 +169,42 @@ class RedisClient(object): value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL) if time_expire == 0: time_expire = 1 - self.r_server.setex(key, value_, time_expire) + self.r_server.setex(key, time_expire, value_) #print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expireat)) #print 'that will expire on %s' % (bucket_key, time.ctime(((expireat/60) + 1)*60)) p = self.r_server.pipeline() #add bucket to the fixed set p.sadd(cache_set_key, bucket_key) #sets the key - p.setex(key, value_, time_expire) + p.setex(key, time_expire, value_) #add the key to the bucket p.sadd(bucket_key, key) #expire the bucket properly - p.expireat(bucket_key, ((expireat/60) + 1)*60) + p.expireat(bucket_key, ((expireat / 60) + 1) * 60) p.execute() return value - def retry_call(self, key, f, time_expire, with_locking): + def retry_call(self, key, f, time_expire, with_lock): self.RETRIES += 1 if self.RETRIES <= self.MAX_RETRIES: logger.error("sleeping %s seconds before reconnecting" % (2 * self.RETRIES)) time.sleep(2 * self.RETRIES) - self.__init__(self.server, self.db, self.debug, self.with_lock) - return self.__call__(key, f, time_expire, with_locking) + if self.fail_gracefully: + self.RETRIES = 0 + return f() + return self.__call__(key, f, time_expire, with_lock) else: self.RETRIES = 0 - raise ConnectionError('Redis instance is unavailable at %s' % ( - self.server)) + if self.fail_gracefully: + return f + raise RConnectionError('Redis instance is unavailable') def increment(self, key, value=1): try: newKey = self.__keyFormat__(key) return self.r_server.incr(newKey, value) - except ConnectionError: + except RConnectionError: return self.retry_increment(key, value) def retry_increment(self, key, value): @@ -212,12 +212,10 @@ class RedisClient(object): if self.RETRIES <= self.MAX_RETRIES: logger.error("sleeping some seconds before reconnecting") time.sleep(2 * self.RETRIES) - self.__init__(self.server, self.db, self.debug, self.with_lock) return self.increment(key, value) else: self.RETRIES = 0 - raise ConnectionError('Redis instance is unavailable at %s' % ( - self.server)) + raise RConnectionError('Redis instance is unavailable') def clear(self, regex): """ @@ -225,9 +223,9 @@ class RedisClient(object): clear cache entries """ r = re.compile(regex) - #get all buckets + # get all buckets buckets = self.r_server.smembers(self.cache_set_key) - #get all keys in buckets + # get all keys in buckets if buckets: keys = self.r_server.sunion(buckets) else: @@ -237,8 +235,8 @@ class RedisClient(object): for a in keys: if r.match(str(a).replace(prefix, '', 1)): pipe.delete(a) - if random.randrange(0,100) < 10: - #do this just once in a while (10% chance) + if random.randrange(0, 100) < 10: + # do this just once in a while (10% chance) self.clear_buckets(buckets) pipe.execute() diff --git a/gluon/contrib/redis_scheduler.py b/gluon/contrib/redis_scheduler.py new file mode 100644 index 00000000..109ff071 --- /dev/null +++ b/gluon/contrib/redis_scheduler.py @@ -0,0 +1,785 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +| This file is part of the web2py Web Framework +| Created by niphlod@gmail.com +| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) + +Scheduler with redis backend +--------------------------------- +""" + +USAGE = """ +## Example + +For any existing app + +Create File: app/models/scheduler.py ====== +from gluon.contrib.redis_utils import RConn +from gluon.contrib.redis_scheduler import RScheduler + +def demo1(*args,**vars): + print 'you passed args=%s and vars=%s' % (args, vars) + return 'done!' + +def demo2(): + 1/0 + +rconn = RConn() +mysched = RScheduler(db, dict(demo1=demo1,demo2=demo2), ...., redis_conn=rconn) + +## run worker nodes with: + + cd web2py + python web2py.py -K app + +""" + +import os +import time +import socket +import datetime +import logging + +path = os.getcwd() + +if 'WEB2PY_PATH' not in os.environ: + os.environ['WEB2PY_PATH'] = path + +try: + from gluon.contrib.simplejson import loads, dumps +except: + from simplejson import loads, dumps + +IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid()) + +logger = logging.getLogger('web2py.rscheduler.%s' % IDENTIFIER) + +from gluon.utils import web2py_uuid +from gluon.storage import Storage +from gluon.scheduler import * +from gluon.scheduler import _decode_dict +from gluon.contrib.redis_utils import RWatchError + + +POLLING = 'POLLING' + + +class RScheduler(Scheduler): + + def __init__(self, db, tasks=None, migrate=True, + worker_name=None, group_names=None, heartbeat=HEARTBEAT, + max_empty_runs=0, discard_results=False, utc_time=False, + redis_conn=None, mode=1): + + """ + Highly-experimental coordination with redis + Takes all args from Scheduler except redis_conn which + must be something closer to a StrictRedis instance. + + My only regret - and the reason why I kept this under the hood for a + while - is that it's hard to hook up in web2py to something happening + right after the commit to a table, which will enable this version of the + scheduler to process "immediate" tasks right away instead of waiting a + few seconds (see FIXME in queue_task()) + + mode is reserved for future usage patterns. + Right now it moves the coordination (which is the most intensive + routine in the scheduler in matters of IPC) of workers to redis. + I'd like to have incrementally redis-backed modes of operations, + such as e.g.: + - 1: IPC through redis (which is the current implementation) + - 2: Store task results in redis (which will relieve further pressure + from the db leaving the scheduler_run table empty and possibly + keep things smooth as tasks results can be set to expire + after a bit of time) + - 3: Move all the logic for storing and queueing tasks to redis + itself - which means no scheduler_task usage too - and use + the database only as an historical record-bookkeeping + (e.g. for reporting) + + As usual, I'm eager to see your comments. + """ + + Scheduler.__init__(self, db, tasks=tasks, migrate=migrate, + worker_name=worker_name, group_names=group_names, + heartbeat=heartbeat, max_empty_runs=max_empty_runs, + discard_results=discard_results, utc_time=utc_time) + + self.r_server = redis_conn + from gluon import current + self._application = current.request.application or 'appname' + + def _nkey(self, key): + """Helper to restrict all keys to a namespace + and track them""" + prefix = 'w2p:rsched:%s' % self._application + allkeys = '%s:allkeys' % prefix + newkey = "%s:%s" % (prefix, key) + self.r_server.sadd(allkeys, newkey) + return newkey + + def prune_all(self): + """ + Just to be fair and implement a method + that does housekeeping + """ + all_keys = self._nkey('allkeys') + with self.r_server.pipeline() as pipe: + while True: + try: + pipe.watch('PRUNE_ALL') + while True: + k = pipe.spop(all_keys) + if k is None: + break + pipe.delete(k) + pipe.execute() + break + except RWatchError: + time.sleep(0.1) + continue + + def dt2str(self, value): + return value.strftime('%Y-%m-%d %H:%M:%S') + + def str2date(self, value): + return datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S') + + def send_heartbeat(self, counter): + """ + workers coordination has evolved into something is not that + easy. Here we try to do what we need in a single transaction, + and retry that transaction if something goes wrong + """ + with self.r_server.pipeline() as pipe: + while True: + try: + pipe.watch('SEND_HEARTBEAT') + self.inner_send_heartbeat(counter, pipe) + pipe.execute() + self.adj_hibernation() + self.sleep() + break + except RWatchError: + time.sleep(0.1) + continue + + def inner_send_heartbeat(self, counter, pipe): + """ + Does a few things: + - registers the workers + - accepts commands sent to workers (KILL, TERMINATE, PICK, DISABLED, etc) + - adjusts sleep + - saves stats + - elects master + - does "housecleaning" for dead workers + - triggers tasks assignment + """ + r_server = pipe + status_keyset = self._nkey('worker_statuses') + status_key = self._nkey('worker_status:%s' % (self.worker_name)) + now = self.now() + mybackedstatus = r_server.hgetall(status_key) + if not mybackedstatus: + r_server.hmset( + status_key, + dict( + status=ACTIVE, worker_name=self.worker_name, + first_heartbeat=self.dt2str(now), + last_heartbeat=self.dt2str(now), + group_names=dumps(self.group_names), is_ticker=False, + worker_stats=dumps(self.w_stats)) + ) + r_server.sadd(status_keyset, status_key) + if not self.w_stats.status == POLLING: + self.w_stats.status = ACTIVE + self.w_stats.sleep = self.heartbeat + mybackedstatus = ACTIVE + else: + mybackedstatus = mybackedstatus['status'] + if mybackedstatus == DISABLED: + # keep sleeping + self.w_stats.status = DISABLED + r_server.hmset( + status_key, + dict(last_heartbeat=self.dt2str(now), + worker_stats=dumps(self.w_stats)) + ) + elif mybackedstatus == TERMINATE: + self.w_stats.status = TERMINATE + logger.debug("Waiting to terminate the current task") + self.give_up() + elif mybackedstatus == KILL: + self.w_stats.status = KILL + self.die() + else: + if mybackedstatus == STOP_TASK: + logger.info('Asked to kill the current task') + self.terminate_process() + logger.info('........recording heartbeat (%s)', + self.w_stats.status) + r_server.hmset( + status_key, + dict( + last_heartbeat=self.dt2str(now), status=ACTIVE, + worker_stats=dumps(self.w_stats) + ) + ) + # newroutine + r_server.expire(status_key, self.heartbeat * 3 * 15) + self.w_stats.sleep = self.heartbeat # re-activating the process + if self.w_stats.status not in (RUNNING, POLLING): + self.w_stats.status = ACTIVE + + self.do_assign_tasks = False + if counter % 5 == 0 or mybackedstatus == PICK: + try: + logger.info( + ' freeing workers that have not sent heartbeat') + registered_workers = r_server.smembers(status_keyset) + allkeys = self._nkey('allkeys') + for worker in registered_workers: + w = r_server.hgetall(worker) + w = Storage(w) + if not w: + r_server.srem(status_keyset, worker) + logger.info('removing %s from %s', worker, allkeys) + r_server.srem(allkeys, worker) + continue + try: + self.is_a_ticker = self.being_a_ticker(pipe) + except: + pass + if self.w_stats.status in (ACTIVE, POLLING): + self.do_assign_tasks = True + if self.is_a_ticker and self.do_assign_tasks: + # I'm a ticker, and 5 loops passed without reassigning tasks, + # let's do that and loop again + if not self.db_thread: + logger.debug('thread building own DAL object') + self.db_thread = DAL( + self.db._uri, folder=self.db._adapter.folder) + self.define_tables(self.db_thread, migrate=False) + db = self.db_thread + self.wrapped_assign_tasks(db) + return None + except: + logger.error('Error assigning tasks') + + def being_a_ticker(self, pipe): + """ + This is slightly more convoluted than the original + but if far more efficient + """ + r_server = pipe + status_keyset = self._nkey('worker_statuses') + registered_workers = r_server.smembers(status_keyset) + ticker = None + all_active = [] + all_workers = [] + for worker in registered_workers: + w = r_server.hgetall(worker) + if w['worker_name'] != self.worker_name and w['status'] == ACTIVE: + all_active.append(w) + if w['is_ticker'] == 'True' and ticker is None: + ticker = w + all_workers.append(w) + not_busy = self.w_stats.status in (ACTIVE, POLLING) + if not ticker: + if not_busy: + # only if this worker isn't busy, otherwise wait for a free one + for worker in all_workers: + key = self._nkey('worker_status:%s' % worker['worker_name']) + if worker['worker_name'] == self.worker_name: + r_server.hset(key, 'is_ticker', True) + else: + r_server.hset(key, 'is_ticker', False) + logger.info("TICKER: I'm a ticker") + else: + # giving up, only if I'm not alone + if len(all_active) > 1: + key = self._nkey('worker_status:%s' % (self.worker_name)) + r_server.hset(key, 'is_ticker', False) + else: + not_busy = True + return not_busy + else: + logger.info( + "%s is a ticker, I'm a poor worker" % ticker['worker_name']) + return False + + def assign_tasks(self, db): + """ + The real beauty. We don't need to ASSIGN tasks, we just put + them into the relevant queue + """ + st, sd = db.scheduler_task, db.scheduler_task_deps + r_server = self.r_server + now = self.now() + status_keyset = self._nkey('worker_statuses') + with r_server.pipeline() as pipe: + while 1: + try: + # making sure we're the only one doing the job + pipe.watch('ASSIGN_TASKS') + registered_workers = pipe.smembers(status_keyset) + all_workers = [] + for worker in registered_workers: + w = pipe.hgetall(worker) + if w['status'] == ACTIVE: + all_workers.append(Storage(w)) + pipe.execute() + break + except RWatchError: + time.sleep(0.1) + continue + + # build workers as dict of groups + wkgroups = {} + for w in all_workers: + group_names = loads(w.group_names) + for gname in group_names: + if gname not in wkgroups: + wkgroups[gname] = dict( + workers=[{'name': w.worker_name, 'c': 0}]) + else: + wkgroups[gname]['workers'].append( + {'name': w.worker_name, 'c': 0}) + # set queued tasks that expired between "runs" (i.e., you turned off + # the scheduler): then it wasn't expired, but now it is + db( + (st.status.belongs((QUEUED, ASSIGNED))) & + (st.stop_time < now) + ).update(status=EXPIRED) + + # calculate dependencies + deps_with_no_deps = db( + (sd.can_visit == False) & + (~sd.task_child.belongs( + db(sd.can_visit == False)._select(sd.task_parent) + ) + ) + )._select(sd.task_child) + no_deps = db( + (st.status.belongs((QUEUED, ASSIGNED))) & + ( + (sd.id == None) | (st.id.belongs(deps_with_no_deps)) + + ) + )._select(st.id, distinct=True, left=sd.on( + (st.id == sd.task_parent) & + (sd.can_visit == False) + ) + ) + + all_available = db( + (st.status.belongs((QUEUED, ASSIGNED))) & + ((st.times_run < st.repeats) | (st.repeats == 0)) & + (st.start_time <= now) & + ((st.stop_time == None) | (st.stop_time > now)) & + (st.next_run_time <= now) & + (st.enabled == True) & + (st.id.belongs(no_deps)) + ) + + limit = len(all_workers) * (50 / (len(wkgroups) or 1)) + + # let's freeze it up + db.commit() + x = 0 + r_server = self.r_server + for group in wkgroups.keys(): + queued_list = self._nkey('queued:%s' % group) + queued_set = self._nkey('queued_set:%s' % group) + # if are running, let's don't assign them again + running_list = self._nkey('running:%s' % group) + while True: + # the joys for rpoplpush! + t = r_server.rpoplpush(running_list, queued_list) + if not t: + # no more + break + r_server.sadd(queued_set, t) + + tasks = all_available(st.group_name == group).select( + limitby=(0, limit), orderby = st.next_run_time) + + # put tasks in the processing list + + for task in tasks: + x += 1 + gname = task.group_name + + if r_server.sismember(queued_set, task.id): + # already queued, we don't put on the list + continue + r_server.sadd(queued_set, task.id) + r_server.lpush(queued_list, task.id) + d = dict(status=QUEUED) + if not task.task_name: + d['task_name'] = task.function_name + db( + (st.id == task.id) & + (st.status.belongs((QUEUED, ASSIGNED))) + ).update(**d) + db.commit() + # I didn't report tasks but I'm working nonetheless!!!! + if x > 0: + self.w_stats.empty_runs = 0 + self.w_stats.queue = x + self.w_stats.distribution = wkgroups + self.w_stats.workers = len(all_workers) + # I'll be greedy only if tasks queued are equal to the limit + # (meaning there could be others ready to be queued) + self.greedy = x >= limit + logger.info('TICKER: workers are %s', len(all_workers)) + logger.info('TICKER: tasks are %s', x) + + def pop_task(self, db): + r_server = self.r_server + st = self.db.scheduler_task + task = None + # ready to process something + for group in self.group_names: + queued_set = self._nkey('queued_set:%s' % group) + queued_list = self._nkey('queued:%s' % group) + running_list = self._nkey('running:%s' % group) + running_dict = self._nkey('running_dict:%s' % group) + self.w_stats.status = POLLING + # polling for 1 minute in total. If more groups are in, + # polling is 1 minute in total + logger.debug(' polling on %s' , group) + task_id = r_server.brpoplpush(queued_list, running_list, timeout=60/len(self.group_names)) + logger.debug(' finished polling') + self.w_stats.status = ACTIVE + if task_id: + r_server.hset(running_dict, task_id, self.worker_name) + r_server.srem(queued_set, task_id) + task = db( + (st.id == task_id) & + (st.status == QUEUED) + ).select().first() + if not task: + r_server.lrem(running_list, 0, task_id) + r_server.hdel(running_dict, task_id) + r_server.lrem(queued_list, 0, task_id) + logger.error("we received a task that isn't there (%s)" % task_id) + return None + break + now = self.now() + if task: + task.update_record(status=RUNNING, last_run_time=now) + # noone will touch my task! + db.commit() + logger.debug(' work to do %s', task.id) + else: + logger.info('nothing to do (%s)' % self.w_stats.status) + return None + times_run = task.times_run + 1 + if not task.prevent_drift: + next_run_time = task.last_run_time + datetime.timedelta( + seconds=task.period + ) + else: + next_run_time = task.start_time + datetime.timedelta( + seconds=task.period * times_run + ) + if times_run < task.repeats or task.repeats == 0: + # need to run (repeating task) + run_again = True + else: + # no need to run again + run_again = False + run_id = 0 + while True and not self.discard_results: + logger.debug(' new scheduler_run record') + try: + run_id = db.scheduler_run.insert( + task_id=task.id, + status=RUNNING, + start_time=now, + worker_name=self.worker_name) + db.commit() + break + except: + time.sleep(0.5) + db.rollback() + logger.info('new task %(id)s "%(task_name)s"' + ' %(application_name)s.%(function_name)s' % task) + return Task( + app=task.application_name, + function=task.function_name, + timeout=task.timeout, + args=task.args, # in json + vars=task.vars, # in json + task_id=task.id, + run_id=run_id, + run_again=run_again, + next_run_time=next_run_time, + times_run=times_run, + stop_time=task.stop_time, + retry_failed=task.retry_failed, + times_failed=task.times_failed, + sync_output=task.sync_output, + uuid=task.uuid, + group_name=task.group_name) + + def report_task(self, task, task_report): + """ + Needs overwriting only because we need to pop from the + running tasks + """ + r_server = self.r_server + db = self.db + now = self.now() + st = db.scheduler_task + sr = db.scheduler_run + if not self.discard_results: + if task_report.result != 'null' or task_report.tb: + # result is 'null' as a string if task completed + # if it's stopped it's None as NoneType, so we record + # the STOPPED "run" anyway + logger.debug(' recording task report in db (%s)', + task_report.status) + db(sr.id == task.run_id).update( + status=task_report.status, + stop_time=now, + run_result=task_report.result, + run_output=task_report.output, + traceback=task_report.tb) + else: + logger.debug(' deleting task report in db because of no result') + db(sr.id == task.run_id).delete() + # if there is a stop_time and the following run would exceed it + is_expired = (task.stop_time + and task.next_run_time > task.stop_time + and True or False) + status = (task.run_again and is_expired and EXPIRED + or task.run_again and not is_expired + and QUEUED or COMPLETED) + if task_report.status == COMPLETED: + # assigned calculations + d = dict(status=status, + next_run_time=task.next_run_time, + times_run=task.times_run, + times_failed=0, + assigned_worker_name=self.worker_name + ) + db(st.id == task.task_id).update(**d) + if status == COMPLETED: + self.update_dependencies(db, task.task_id) + else: + st_mapping = {'FAILED': 'FAILED', + 'TIMEOUT': 'TIMEOUT', + 'STOPPED': 'FAILED'}[task_report.status] + status = (task.retry_failed + and task.times_failed < task.retry_failed + and QUEUED or task.retry_failed == -1 + and QUEUED or st_mapping) + db(st.id == task.task_id).update( + times_failed=db.scheduler_task.times_failed + 1, + next_run_time=task.next_run_time, + status=status, + assigned_worker_name=self.worker_name + ) + logger.info('task completed (%s)', task_report.status) + running_list = self._nkey('running:%s' % task.group_name) + running_dict = self._nkey('running_dict:%s' % task.group_name) + r_server.lrem(running_list, 0, task.task_id) + r_server.hdel(running_dict, task.task_id) + + def wrapped_pop_task(self): + """Commodity function to call `pop_task` and trap exceptions + If an exception is raised, assume it happened because of database + contention and retries `pop_task` after 0.5 seconds + """ + db = self.db + db.commit() # another nifty db.commit() only for Mysql + x = 0 + while x < 10: + try: + rtn = self.pop_task(db) + return rtn + break + # this is here to "interrupt" any blrpoplpush op easily + except KeyboardInterrupt: + self.give_up() + break + except: + self.w_stats.errors += 1 + db.rollback() + logger.error(' error popping tasks') + x += 1 + time.sleep(0.5) + + def get_workers(self, only_ticker=False): + """ Returns a dict holding worker_name : {**columns} + representing all "registered" workers + only_ticker returns only the worker running as a TICKER, + if there is any + """ + r_server = self.r_server + status_keyset = self._nkey('worker_statuses') + registered_workers = r_server.smembers(status_keyset) + all_workers = {} + for worker in registered_workers: + w = r_server.hgetall(worker) + w = Storage(w) + if not w: + continue + all_workers[w.worker_name] = Storage( + status=w.status, + first_heartbeat=self.str2date(w.first_heartbeat), + last_heartbeat=self.str2date(w.last_heartbeat), + group_names=loads(w.group_names, object_hook=_decode_dict), + is_ticker=w.is_ticker == 'True' and True or False, + worker_stats=loads(w.worker_stats, object_hook=_decode_dict) + ) + if only_ticker: + for k, v in all_workers.iteritems(): + if v['is_ticker']: + return {k: v} + return {} + return all_workers + + def set_worker_status(self, group_names=None, action=ACTIVE, + exclude=None, limit=None, worker_name=None): + """Internal function to set worker's status""" + r_server = self.r_server + all_workers = self.get_workers() + if not group_names: + group_names = self.group_names + elif isinstance(group_names, str): + group_names = [group_names] + exclusion = exclude and exclude.append(action) or [action] + workers = [] + if worker_name is not None: + if worker_name in all_workers.keys(): + workers = [worker_name] + else: + for k, v in all_workers.iteritems(): + if v.status not in exclusion and set(group_names) & set(v.group_names): + workers.append(k) + if limit and worker_name is None: + workers = workers[:limit] + if workers: + with r_server.pipeline() as pipe: + while True: + try: + pipe.watch('SET_WORKER_STATUS') + for w in workers: + worker_key = self._nkey('worker_status:%s' % w) + pipe.hset(worker_key, 'status', action) + pipe.execute() + break + except RWatchError: + time.sleep(0.1) + continue + + def queue_task(self, function, pargs=[], pvars={}, **kwargs): + """ + FIXME: immediate should put item in queue. The hard part is + that currently there are no hooks happening at post-commit time + Queue tasks. This takes care of handling the validation of all + parameters + + Args: + function: the function (anything callable with a __name__) + pargs: "raw" args to be passed to the function. Automatically + jsonified. + pvars: "raw" kwargs to be passed to the function. Automatically + jsonified + kwargs: all the parameters available (basically, every + `scheduler_task` column). If args and vars are here, they should + be jsonified already, and they will override pargs and pvars + + Returns: + a dict just as a normal validate_and_insert(), plus a uuid key + holding the uuid of the queued task. If validation is not passed + ( i.e. some parameters are invalid) both id and uuid will be None, + and you'll get an "error" dict holding the errors found. + """ + if hasattr(function, '__name__'): + function = function.__name__ + targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs) + tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars) + tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid() + tname = 'task_name' in kwargs and kwargs.pop('task_name') or function + immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None + rtn = self.db.scheduler_task.validate_and_insert( + function_name=function, + task_name=tname, + args=targs, + vars=tvars, + uuid=tuuid, + **kwargs) + if not rtn.errors: + rtn.uuid = tuuid + if immediate: + r_server = self.r_server + ticker = self.get_workers(only_ticker=True) + if ticker.keys(): + ticker = ticker.keys()[0] + with r_server.pipeline() as pipe: + while True: + try: + pipe.watch('SET_WORKER_STATUS') + worker_key = self._nkey('worker_status:%s' % ticker) + pipe.hset(worker_key, 'status', 'PICK') + pipe.execute() + break + except RWatchError: + time.sleep(0.1) + continue + else: + rtn.uuid = None + return rtn + + def stop_task(self, ref): + """Shortcut for task termination. + + If the task is RUNNING it will terminate it, meaning that status + will be set as FAILED. + + If the task is QUEUED, its stop_time will be set as to "now", + the enabled flag will be set to False, and the status to STOPPED + + Args: + ref: can be + + - an integer : lookup will be done by scheduler_task.id + - a string : lookup will be done by scheduler_task.uuid + + Returns: + - 1 if task was stopped (meaning an update has been done) + - None if task was not found, or if task was not RUNNING or QUEUED + + Note: + Experimental + """ + r_server = self.r_server + st = self.db.scheduler_task + if isinstance(ref, int): + q = st.id == ref + elif isinstance(ref, str): + q = st.uuid == ref + else: + raise SyntaxError( + "You can retrieve results only by id or uuid") + task = self.db(q).select(st.id, st.status, st.group_name) + task = task.first() + rtn = None + if not task: + return rtn + running_dict = self._nkey('running_dict:%s' % task.group_name) + if task.status == 'RUNNING': + worker_key = r_server.hget(running_dict, task.id) + worker_key = self._nkey('worker_status:%s' % (worker_key)) + r_server.hset(worker_key, 'status', STOP_TASK) + elif task.status == 'QUEUED': + rtn = self.db(q).update( + stop_time=self.now(), + enabled=False, + status=STOPPED) + return rtn diff --git a/gluon/contrib/redis_session.py b/gluon/contrib/redis_session.py index f9e2fa29..c987e64e 100644 --- a/gluon/contrib/redis_session.py +++ b/gluon/contrib/redis_session.py @@ -1,13 +1,18 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- """ Developed by niphlod@gmail.com +License MIT/BSD/GPL + +Redis-backed sessions """ -import redis -from gluon import current -from gluon.storage import Storage -import time import logging import thread +from gluon import current +from gluon.storage import Storage +from gluon.contrib.redis_utils import acquire_lock, release_lock +from gluon.contrib.redis_utils import register_release_lock logger = logging.getLogger("web2py.session.redis") @@ -16,10 +21,20 @@ locker = thread.allocate_lock() def RedisSession(*args, **vars): """ - Usage example: put in models - from gluon.contrib.redis_session import RedisSession - sessiondb = RedisSession('localhost:6379',db=0, session_expiry=False, password=None) - session.connect(request, response, db = sessiondb) + Usage example: put in models:: + + from gluon.contrib.redis_utils import RConn + rconn = RConn() + from gluon.contrib.redis_session + sessiondb = RedisSession(redis_conn=rconn, with_lock=True, session_expiry=False) + session.connect(request, response, db = sessiondb) + + Args: + redis_conn: a redis-like connection object + with_lock: prevent concurrent modifications to the same session + session_expiry: delete automatically sessions after n seconds + (still need to run sessions2trash.py every 1M sessions + or so) Simple slip-in storage for session """ @@ -36,30 +51,9 @@ def RedisSession(*args, **vars): class RedisClient(object): - meta_storage = {} - MAX_RETRIES = 5 - RETRIES = 0 - _release_script = None - - def __init__(self, server='localhost:6379', db=None, debug=False, - session_expiry=False, with_lock=False, password=None): - """session_expiry can be an integer, in seconds, to set the default expiration - of sessions. The corresponding record will be deleted from the redis instance, - and there's virtually no need to run sessions2trash.py - """ - self.server = server - self.password = password - self.db = db or 0 - host, port = (self.server.split(':') + ['6379'])[:2] - port = int(port) - self.debug = debug - if current and current.request: - self.app = current.request.application - else: - self.app = '' - self.r_server = redis.Redis(host=host, port=port, db=self.db, password=self.password) - if with_lock: - RedisClient._release_script = self.r_server.register_script(_LUA_RELEASE_LOCK) + def __init__(self, redis_conn, session_expiry=False, with_lock=False): + self.r_server = redis_conn + self._release_script = register_release_lock(self.r_server) self.tablename = None self.session_expiry = session_expiry self.with_lock = with_lock @@ -93,12 +87,11 @@ class RedisClient(object): class MockTable(object): def __init__(self, db, r_server, tablename, session_expiry, with_lock=False): + # here self.db is the RedisClient instance self.db = db - self.r_server = r_server self.tablename = tablename # set the namespace for sessions of this app - self.keyprefix = 'w2p:sess:%s' % tablename.replace( - 'web2py_session_', '') + self.keyprefix = 'w2p:sess:%s' % tablename.replace('web2py_session_', '') # fast auto-increment id (needed for session handling) self.serial = "%s:serial" % self.keyprefix # index of all the session keys of this app @@ -126,7 +119,7 @@ class MockTable(object): if key == 'id': # return a fake query. We need to query it just by id for normal operations self.query = MockQuery( - field='id', db=self.r_server, + field='id', db=self.db, prefix=self.keyprefix, session_expiry=self.session_expiry, with_lock=self.with_lock, unique_key=self.unique_key ) @@ -140,12 +133,12 @@ class MockTable(object): # 'locked', 'client_ip','created_datetime','modified_datetime' # 'unique_key', 'session_data' # retrieve a new key - newid = str(self.r_server.incr(self.serial)) + newid = str(self.db.r_server.incr(self.serial)) key = self.keyprefix + ':' + newid if self.with_lock: key_lock = key + ':lock' - acquire_lock(self.r_server, key_lock, newid) - with self.r_server.pipeline() as pipe: + acquire_lock(self.db.r_server, key_lock, newid) + with self.db.r_server.pipeline() as pipe: # add it to the index pipe.sadd(self.id_idx, key) # set a hash key with the Storage @@ -154,7 +147,7 @@ class MockTable(object): pipe.expire(key, self.session_expiry) pipe.execute() if self.with_lock: - release_lock(self.r_server, key_lock, newid) + release_lock(self.db, key_lock, newid) return newid @@ -186,8 +179,8 @@ class MockQuery(object): # means that someone wants to retrieve the key self.value key = self.keyprefix + ':' + str(self.value) if self.with_lock: - acquire_lock(self.db, key + ':lock', self.value) - rtn = self.db.hgetall(key) + acquire_lock(self.db.r_server, key + ':lock', self.value, 2) + rtn = self.db.r_server.hgetall(key) if rtn: if self.unique_key: # make sure the id and unique_key are correct @@ -201,13 +194,13 @@ class MockQuery(object): rtn = [] id_idx = "%s:id_idx" % self.keyprefix # find all session keys of this app - allkeys = self.db.smembers(id_idx) + allkeys = self.db.r_server.smembers(id_idx) for sess in allkeys: - val = self.db.hgetall(sess) + val = self.db.r_server.hgetall(sess) if not val: if self.session_expiry: # clean up the idx, because the key expired - self.db.srem(id_idx, sess) + self.db.r_server.srem(id_idx, sess) continue val = Storage(val) # add a delete_record method (necessary for sessions2trash.py) @@ -222,9 +215,9 @@ class MockQuery(object): # means that the session has been found and needs an update if self.op == 'eq' and self.field == 'id' and self.value: key = self.keyprefix + ':' + str(self.value) - if not self.db.exists(key): + if not self.db.r_server.exists(key): return None - with self.db.pipeline() as pipe: + with self.db.r_server.pipeline() as pipe: pipe.hmset(key, kwargs) if self.session_expiry: pipe.expire(key, self.session_expiry) @@ -238,7 +231,7 @@ class MockQuery(object): if self.op == 'eq' and self.field == 'id' and self.value: id_idx = "%s:id_idx" % self.keyprefix key = self.keyprefix + ':' + str(self.value) - with self.db.pipeline() as pipe: + with self.db.r_server.pipeline() as pipe: pipe.delete(key) pipe.srem(id_idx, key) rtn = pipe.execute() @@ -254,29 +247,6 @@ class RecordDeleter(object): def __call__(self): id_idx = "%s:id_idx" % self.keyprefix # remove from the index - self.db.srem(id_idx, self.key) + self.db.r_server.srem(id_idx, self.key) # remove the key itself - self.db.delete(self.key) - - -def acquire_lock(conn, lockname, identifier, ltime=10): - while True: - if conn.set(lockname, identifier, ex=ltime, nx=True): - return identifier - time.sleep(.01) - - -_LUA_RELEASE_LOCK = """ -if redis.call("get", KEYS[1]) == ARGV[1] -then - return redis.call("del", KEYS[1]) -else - return 0 -end -""" - - -def release_lock(conn, lockname, identifier): - return RedisClient._release_script( - keys=[lockname], args=[identifier], - client=conn) + self.db.r_server.delete(self.key) diff --git a/gluon/contrib/redis_utils.py b/gluon/contrib/redis_utils.py new file mode 100644 index 00000000..217ca952 --- /dev/null +++ b/gluon/contrib/redis_utils.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Developed by niphlod@gmail.com +License MIT/BSD/GPL + +Serves as base to implement Redis connection object and various utils +for redis_cache, redis_session and redis_scheduler in the future +Should-could be overriden in case redis doesn't keep up (e.g. cluster support) +to ensure compatibility with another - similar - library +""" + +import logging +import thread +import time +from gluon import current + +logger = logging.getLogger("web2py.redis_utils") + +try: + import redis + from redis.exceptions import WatchError as RWatchError + from redis.exceptions import ConnectionError as RConnectionError +except ImportError: + logger.error("Needs redis library to work") + raise RuntimeError('Needs redis library to work') + + +locker = thread.allocate_lock() + + +def RConn(*args, **vars): + """ + Istantiates a StrictRedis connection with parameters, at the first time + only + """ + locker.acquire() + try: + instance_name = 'redis_conn_' + current.request.application + if not hasattr(RConn, instance_name): + setattr(RConn, instance_name, redis.StrictRedis(*args, **vars)) + return getattr(RConn, instance_name) + finally: + locker.release() + +def acquire_lock(conn, lockname, identifier, ltime=10): + while True: + if conn.set(lockname, identifier, ex=ltime, nx=True): + return identifier + time.sleep(.01) + + +_LUA_RELEASE_LOCK = """ +if redis.call("get", KEYS[1]) == ARGV[1] +then + return redis.call("del", KEYS[1]) +else + return 0 +end +""" + + +def release_lock(instance, lockname, identifier): + return instance._release_script( + keys=[lockname], args=[identifier]) + + +def register_release_lock(conn): + rtn = conn.register_script(_LUA_RELEASE_LOCK) + return rtn