Merge pull request #1158 from niphlod/enhancement/redis_toolset

new redis toolset to use with web2py
This commit is contained in:
mdipierro
2016-01-04 09:43:58 -06:00
5 changed files with 980 additions and 128 deletions
+30 -1
View File
@@ -1,9 +1,38 @@
## trunk
- new JWT implementation (experimental)
- new gluon.contrib.redis_scheduler
- BREAKING: changes to gluon.contrib.redis_cache
BEFORE:
from gluon.contrib.redis_cache import RedisCache
cache.redis = RedisCache('localhost:6379',db=None, debug=True)
NOW:
from gluon.contrib.redis_utils import RConn
from gluon.contrib.redis_cache import RedisCache
rconn = RConn()
# or RConn(host='localhost', port=6379,
# db=0, password=None, socket_timeout=None,
# socket_connect_timeout=None, .....)
# exactly as a redis.StrictRedis instance
cache.redis = RedisCache(redis_conn=rconn, debug=True)
- BREAKING: changes to gluon.contrib.redis_session
BEFORE:
from gluon.contrib.redis_session import RedisSession
sessiondb = RedisSession('localhost:6379',db=0, session_expiry=False)
session.connect(request, response, db = sessiondb)
NOW:
from gluon.contrib.redis_utils import RConn
from gluon.contrib.redis_session import RedisSession
rconn = RConn()
sessiondb = RedisSession(redis_conn=rconn, session_expiry=False)
session.connect(request, response, db = sessiondb)
## 2.13.1-2
- fixed a security issue in request_reset_password
- added fabfile.py
- fixed oauth2 renew token, thanks dokime7
- fixed add_membership, del_membership, add_membership IntegrityError (when auth.enable_record_versioning)
- fixed add_membership, del_membership, add_membership IntegrityError (when auth.enable_record_versioning)
- allow passing unicode to template render
- allow IS_NOT_IN_DB to work with custom primarykey, thanks timmyborg
- allow HttpOnly cookies
+52 -54
View File
@@ -2,20 +2,20 @@
Developed by niphlod@gmail.com
Released under web2py license because includes gluon/cache.py source code
"""
import redis
from redis.exceptions import ConnectionError
from gluon import current
from gluon.cache import CacheAbstract
try:
import cPickle as pickle
import cPickle as pickle
except:
import pickle
import pickle
import time
import re
import logging
import thread
import random
from gluon import current
from gluon.cache import CacheAbstract
from gluon.contrib.redis_utils import acquire_lock, release_lock
from gluon.contrib.redis_utils import register_release_lock, RConnectionError
logger = logging.getLogger("web2py.cache.redis")
@@ -24,17 +24,22 @@ locker = thread.allocate_lock()
def RedisCache(*args, **vars):
"""
Usage example: put in models
Usage example: put in models::
from gluon.contrib.redis_cache import RedisCache
cache.redis = RedisCache('localhost:6379',db=None, debug=True, with_lock=True, password=None)
from gluon.contrib.redis_utils import RConn
rconn = RConn()
from gluon.contrib.redis_cache import RedisCache
cache.redis = RedisCache(redis_conn=rconn, debug=True, with_lock=True)
:param db: redis db to use (0..16)
:param debug: if True adds to stats() the total_hits and misses
:param with_lock: sets the default locking mode for creating new keys.
Args:
redis_conn: a redis-like connection object
debug: if True adds to stats() the total_hits and misses
with_lock: sets the default locking mode for creating new keys.
By default is False (usualy when you choose Redis you do it
for performances reason)
When True, only one thread/process can set a value concurrently
fail_gracefully: if redis is unavailable, returns the value computing it
instead of raising an exception
When you use cache.redis directly you can use
value = cache.redis('mykey', lambda: time.time(), with_lock=True)
@@ -81,22 +86,19 @@ class RedisClient(object):
MAX_RETRIES = 5
RETRIES = 0
def __init__(self, server='localhost:6379', db=None, debug=False, with_lock=False, password=None):
self.server = server
self.password = password
self.db = db or 0
host, port = (self.server.split(':') + ['6379'])[:2]
port = int(port)
def __init__(self, redis_conn=None, debug=False,
with_lock=False, fail_gracefully=False):
self.request = current.request
self.debug = debug
self.with_lock = with_lock
self.prefix = "w2p:%s:" % (self.request.application)
self.fail_gracefully = fail_gracefully
self.prefix = "w2p:cache:%s:" % (self.request.application)
if self.request:
app = self.request.application
else:
app = ''
if not app in self.meta_storage:
if app not in self.meta_storage:
self.storage = self.meta_storage[app] = {
CacheAbstract.cache_stats_name: {
'hit_total': 0,
@@ -107,7 +109,8 @@ class RedisClient(object):
self.cache_set_key = 'w2p:%s:___cache_set' % (self.request.application)
self.r_server = redis.Redis(host=host, port=port, db=self.db, password=self.password)
self.r_server = redis_conn
self._release_script = register_release_lock(self.r_server)
def initialize(self):
pass
@@ -140,26 +143,20 @@ class RedisClient(object):
#naive distributed locking
if with_lock:
lock_key = '%s:__lock' % newKey
try:
while True:
lock = self.r_server.setnx(lock_key, 1)
if lock:
value = self.cache_it(newKey, f, time_expire)
break
else:
time.sleep(0.2)
#did someone else create it in the meanwhile ?
obj = self.r_server.get(newKey)
if obj:
value = pickle.loads(obj)
break
finally:
self.r_server.delete(lock_key)
randomvalue = time.time()
al = acquire_lock(self.r_server, lock_key, randomvalue)
#someone may have computed it
obj = self.r_server.get(newKey)
if obj is None:
value = self.cache_it(newKey, f, time_expire)
else:
value = pickle.loads(obj)
release_lock(self, lock_key, al)
else:
#without distributed locking
value = self.cache_it(newKey, f, time_expire)
return value
except ConnectionError:
except RConnectionError:
return self.retry_call(key, f, time_expire, with_lock)
def cache_it(self, key, f, time_expire):
@@ -172,39 +169,42 @@ class RedisClient(object):
value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
if time_expire == 0:
time_expire = 1
self.r_server.setex(key, value_, time_expire)
self.r_server.setex(key, time_expire, value_)
#print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expireat))
#print 'that will expire on %s' % (bucket_key, time.ctime(((expireat/60) + 1)*60))
p = self.r_server.pipeline()
#add bucket to the fixed set
p.sadd(cache_set_key, bucket_key)
#sets the key
p.setex(key, value_, time_expire)
p.setex(key, time_expire, value_)
#add the key to the bucket
p.sadd(bucket_key, key)
#expire the bucket properly
p.expireat(bucket_key, ((expireat/60) + 1)*60)
p.expireat(bucket_key, ((expireat / 60) + 1) * 60)
p.execute()
return value
def retry_call(self, key, f, time_expire, with_locking):
def retry_call(self, key, f, time_expire, with_lock):
self.RETRIES += 1
if self.RETRIES <= self.MAX_RETRIES:
logger.error("sleeping %s seconds before reconnecting" %
(2 * self.RETRIES))
time.sleep(2 * self.RETRIES)
self.__init__(self.server, self.db, self.debug, self.with_lock)
return self.__call__(key, f, time_expire, with_locking)
if self.fail_gracefully:
self.RETRIES = 0
return f()
return self.__call__(key, f, time_expire, with_lock)
else:
self.RETRIES = 0
raise ConnectionError('Redis instance is unavailable at %s' % (
self.server))
if self.fail_gracefully:
return f
raise RConnectionError('Redis instance is unavailable')
def increment(self, key, value=1):
try:
newKey = self.__keyFormat__(key)
return self.r_server.incr(newKey, value)
except ConnectionError:
except RConnectionError:
return self.retry_increment(key, value)
def retry_increment(self, key, value):
@@ -212,12 +212,10 @@ class RedisClient(object):
if self.RETRIES <= self.MAX_RETRIES:
logger.error("sleeping some seconds before reconnecting")
time.sleep(2 * self.RETRIES)
self.__init__(self.server, self.db, self.debug, self.with_lock)
return self.increment(key, value)
else:
self.RETRIES = 0
raise ConnectionError('Redis instance is unavailable at %s' % (
self.server))
raise RConnectionError('Redis instance is unavailable')
def clear(self, regex):
"""
@@ -225,9 +223,9 @@ class RedisClient(object):
clear cache entries
"""
r = re.compile(regex)
#get all buckets
# get all buckets
buckets = self.r_server.smembers(self.cache_set_key)
#get all keys in buckets
# get all keys in buckets
if buckets:
keys = self.r_server.sunion(buckets)
else:
@@ -237,8 +235,8 @@ class RedisClient(object):
for a in keys:
if r.match(str(a).replace(prefix, '', 1)):
pipe.delete(a)
if random.randrange(0,100) < 10:
#do this just once in a while (10% chance)
if random.randrange(0, 100) < 10:
# do this just once in a while (10% chance)
self.clear_buckets(buckets)
pipe.execute()
+785
View File
@@ -0,0 +1,785 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
| This file is part of the web2py Web Framework
| Created by niphlod@gmail.com
| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
Scheduler with redis backend
---------------------------------
"""
USAGE = """
## Example
For any existing app
Create File: app/models/scheduler.py ======
from gluon.contrib.redis_utils import RConn
from gluon.contrib.redis_scheduler import RScheduler
def demo1(*args,**vars):
print 'you passed args=%s and vars=%s' % (args, vars)
return 'done!'
def demo2():
1/0
rconn = RConn()
mysched = RScheduler(db, dict(demo1=demo1,demo2=demo2), ...., redis_conn=rconn)
## run worker nodes with:
cd web2py
python web2py.py -K app
"""
import os
import time
import socket
import datetime
import logging
path = os.getcwd()
if 'WEB2PY_PATH' not in os.environ:
os.environ['WEB2PY_PATH'] = path
try:
from gluon.contrib.simplejson import loads, dumps
except:
from simplejson import loads, dumps
IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid())
logger = logging.getLogger('web2py.rscheduler.%s' % IDENTIFIER)
from gluon.utils import web2py_uuid
from gluon.storage import Storage
from gluon.scheduler import *
from gluon.scheduler import _decode_dict
from gluon.contrib.redis_utils import RWatchError
POLLING = 'POLLING'
class RScheduler(Scheduler):
def __init__(self, db, tasks=None, migrate=True,
worker_name=None, group_names=None, heartbeat=HEARTBEAT,
max_empty_runs=0, discard_results=False, utc_time=False,
redis_conn=None, mode=1):
"""
Highly-experimental coordination with redis
Takes all args from Scheduler except redis_conn which
must be something closer to a StrictRedis instance.
My only regret - and the reason why I kept this under the hood for a
while - is that it's hard to hook up in web2py to something happening
right after the commit to a table, which will enable this version of the
scheduler to process "immediate" tasks right away instead of waiting a
few seconds (see FIXME in queue_task())
mode is reserved for future usage patterns.
Right now it moves the coordination (which is the most intensive
routine in the scheduler in matters of IPC) of workers to redis.
I'd like to have incrementally redis-backed modes of operations,
such as e.g.:
- 1: IPC through redis (which is the current implementation)
- 2: Store task results in redis (which will relieve further pressure
from the db leaving the scheduler_run table empty and possibly
keep things smooth as tasks results can be set to expire
after a bit of time)
- 3: Move all the logic for storing and queueing tasks to redis
itself - which means no scheduler_task usage too - and use
the database only as an historical record-bookkeeping
(e.g. for reporting)
As usual, I'm eager to see your comments.
"""
Scheduler.__init__(self, db, tasks=tasks, migrate=migrate,
worker_name=worker_name, group_names=group_names,
heartbeat=heartbeat, max_empty_runs=max_empty_runs,
discard_results=discard_results, utc_time=utc_time)
self.r_server = redis_conn
from gluon import current
self._application = current.request.application or 'appname'
def _nkey(self, key):
"""Helper to restrict all keys to a namespace
and track them"""
prefix = 'w2p:rsched:%s' % self._application
allkeys = '%s:allkeys' % prefix
newkey = "%s:%s" % (prefix, key)
self.r_server.sadd(allkeys, newkey)
return newkey
def prune_all(self):
"""
Just to be fair and implement a method
that does housekeeping
"""
all_keys = self._nkey('allkeys')
with self.r_server.pipeline() as pipe:
while True:
try:
pipe.watch('PRUNE_ALL')
while True:
k = pipe.spop(all_keys)
if k is None:
break
pipe.delete(k)
pipe.execute()
break
except RWatchError:
time.sleep(0.1)
continue
def dt2str(self, value):
return value.strftime('%Y-%m-%d %H:%M:%S')
def str2date(self, value):
return datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
def send_heartbeat(self, counter):
"""
workers coordination has evolved into something is not that
easy. Here we try to do what we need in a single transaction,
and retry that transaction if something goes wrong
"""
with self.r_server.pipeline() as pipe:
while True:
try:
pipe.watch('SEND_HEARTBEAT')
self.inner_send_heartbeat(counter, pipe)
pipe.execute()
self.adj_hibernation()
self.sleep()
break
except RWatchError:
time.sleep(0.1)
continue
def inner_send_heartbeat(self, counter, pipe):
"""
Does a few things:
- registers the workers
- accepts commands sent to workers (KILL, TERMINATE, PICK, DISABLED, etc)
- adjusts sleep
- saves stats
- elects master
- does "housecleaning" for dead workers
- triggers tasks assignment
"""
r_server = pipe
status_keyset = self._nkey('worker_statuses')
status_key = self._nkey('worker_status:%s' % (self.worker_name))
now = self.now()
mybackedstatus = r_server.hgetall(status_key)
if not mybackedstatus:
r_server.hmset(
status_key,
dict(
status=ACTIVE, worker_name=self.worker_name,
first_heartbeat=self.dt2str(now),
last_heartbeat=self.dt2str(now),
group_names=dumps(self.group_names), is_ticker=False,
worker_stats=dumps(self.w_stats))
)
r_server.sadd(status_keyset, status_key)
if not self.w_stats.status == POLLING:
self.w_stats.status = ACTIVE
self.w_stats.sleep = self.heartbeat
mybackedstatus = ACTIVE
else:
mybackedstatus = mybackedstatus['status']
if mybackedstatus == DISABLED:
# keep sleeping
self.w_stats.status = DISABLED
r_server.hmset(
status_key,
dict(last_heartbeat=self.dt2str(now),
worker_stats=dumps(self.w_stats))
)
elif mybackedstatus == TERMINATE:
self.w_stats.status = TERMINATE
logger.debug("Waiting to terminate the current task")
self.give_up()
elif mybackedstatus == KILL:
self.w_stats.status = KILL
self.die()
else:
if mybackedstatus == STOP_TASK:
logger.info('Asked to kill the current task')
self.terminate_process()
logger.info('........recording heartbeat (%s)',
self.w_stats.status)
r_server.hmset(
status_key,
dict(
last_heartbeat=self.dt2str(now), status=ACTIVE,
worker_stats=dumps(self.w_stats)
)
)
# newroutine
r_server.expire(status_key, self.heartbeat * 3 * 15)
self.w_stats.sleep = self.heartbeat # re-activating the process
if self.w_stats.status not in (RUNNING, POLLING):
self.w_stats.status = ACTIVE
self.do_assign_tasks = False
if counter % 5 == 0 or mybackedstatus == PICK:
try:
logger.info(
' freeing workers that have not sent heartbeat')
registered_workers = r_server.smembers(status_keyset)
allkeys = self._nkey('allkeys')
for worker in registered_workers:
w = r_server.hgetall(worker)
w = Storage(w)
if not w:
r_server.srem(status_keyset, worker)
logger.info('removing %s from %s', worker, allkeys)
r_server.srem(allkeys, worker)
continue
try:
self.is_a_ticker = self.being_a_ticker(pipe)
except:
pass
if self.w_stats.status in (ACTIVE, POLLING):
self.do_assign_tasks = True
if self.is_a_ticker and self.do_assign_tasks:
# I'm a ticker, and 5 loops passed without reassigning tasks,
# let's do that and loop again
if not self.db_thread:
logger.debug('thread building own DAL object')
self.db_thread = DAL(
self.db._uri, folder=self.db._adapter.folder)
self.define_tables(self.db_thread, migrate=False)
db = self.db_thread
self.wrapped_assign_tasks(db)
return None
except:
logger.error('Error assigning tasks')
def being_a_ticker(self, pipe):
"""
This is slightly more convoluted than the original
but if far more efficient
"""
r_server = pipe
status_keyset = self._nkey('worker_statuses')
registered_workers = r_server.smembers(status_keyset)
ticker = None
all_active = []
all_workers = []
for worker in registered_workers:
w = r_server.hgetall(worker)
if w['worker_name'] != self.worker_name and w['status'] == ACTIVE:
all_active.append(w)
if w['is_ticker'] == 'True' and ticker is None:
ticker = w
all_workers.append(w)
not_busy = self.w_stats.status in (ACTIVE, POLLING)
if not ticker:
if not_busy:
# only if this worker isn't busy, otherwise wait for a free one
for worker in all_workers:
key = self._nkey('worker_status:%s' % worker['worker_name'])
if worker['worker_name'] == self.worker_name:
r_server.hset(key, 'is_ticker', True)
else:
r_server.hset(key, 'is_ticker', False)
logger.info("TICKER: I'm a ticker")
else:
# giving up, only if I'm not alone
if len(all_active) > 1:
key = self._nkey('worker_status:%s' % (self.worker_name))
r_server.hset(key, 'is_ticker', False)
else:
not_busy = True
return not_busy
else:
logger.info(
"%s is a ticker, I'm a poor worker" % ticker['worker_name'])
return False
def assign_tasks(self, db):
"""
The real beauty. We don't need to ASSIGN tasks, we just put
them into the relevant queue
"""
st, sd = db.scheduler_task, db.scheduler_task_deps
r_server = self.r_server
now = self.now()
status_keyset = self._nkey('worker_statuses')
with r_server.pipeline() as pipe:
while 1:
try:
# making sure we're the only one doing the job
pipe.watch('ASSIGN_TASKS')
registered_workers = pipe.smembers(status_keyset)
all_workers = []
for worker in registered_workers:
w = pipe.hgetall(worker)
if w['status'] == ACTIVE:
all_workers.append(Storage(w))
pipe.execute()
break
except RWatchError:
time.sleep(0.1)
continue
# build workers as dict of groups
wkgroups = {}
for w in all_workers:
group_names = loads(w.group_names)
for gname in group_names:
if gname not in wkgroups:
wkgroups[gname] = dict(
workers=[{'name': w.worker_name, 'c': 0}])
else:
wkgroups[gname]['workers'].append(
{'name': w.worker_name, 'c': 0})
# set queued tasks that expired between "runs" (i.e., you turned off
# the scheduler): then it wasn't expired, but now it is
db(
(st.status.belongs((QUEUED, ASSIGNED))) &
(st.stop_time < now)
).update(status=EXPIRED)
# calculate dependencies
deps_with_no_deps = db(
(sd.can_visit == False) &
(~sd.task_child.belongs(
db(sd.can_visit == False)._select(sd.task_parent)
)
)
)._select(sd.task_child)
no_deps = db(
(st.status.belongs((QUEUED, ASSIGNED))) &
(
(sd.id == None) | (st.id.belongs(deps_with_no_deps))
)
)._select(st.id, distinct=True, left=sd.on(
(st.id == sd.task_parent) &
(sd.can_visit == False)
)
)
all_available = db(
(st.status.belongs((QUEUED, ASSIGNED))) &
((st.times_run < st.repeats) | (st.repeats == 0)) &
(st.start_time <= now) &
((st.stop_time == None) | (st.stop_time > now)) &
(st.next_run_time <= now) &
(st.enabled == True) &
(st.id.belongs(no_deps))
)
limit = len(all_workers) * (50 / (len(wkgroups) or 1))
# let's freeze it up
db.commit()
x = 0
r_server = self.r_server
for group in wkgroups.keys():
queued_list = self._nkey('queued:%s' % group)
queued_set = self._nkey('queued_set:%s' % group)
# if are running, let's don't assign them again
running_list = self._nkey('running:%s' % group)
while True:
# the joys for rpoplpush!
t = r_server.rpoplpush(running_list, queued_list)
if not t:
# no more
break
r_server.sadd(queued_set, t)
tasks = all_available(st.group_name == group).select(
limitby=(0, limit), orderby = st.next_run_time)
# put tasks in the processing list
for task in tasks:
x += 1
gname = task.group_name
if r_server.sismember(queued_set, task.id):
# already queued, we don't put on the list
continue
r_server.sadd(queued_set, task.id)
r_server.lpush(queued_list, task.id)
d = dict(status=QUEUED)
if not task.task_name:
d['task_name'] = task.function_name
db(
(st.id == task.id) &
(st.status.belongs((QUEUED, ASSIGNED)))
).update(**d)
db.commit()
# I didn't report tasks but I'm working nonetheless!!!!
if x > 0:
self.w_stats.empty_runs = 0
self.w_stats.queue = x
self.w_stats.distribution = wkgroups
self.w_stats.workers = len(all_workers)
# I'll be greedy only if tasks queued are equal to the limit
# (meaning there could be others ready to be queued)
self.greedy = x >= limit
logger.info('TICKER: workers are %s', len(all_workers))
logger.info('TICKER: tasks are %s', x)
def pop_task(self, db):
r_server = self.r_server
st = self.db.scheduler_task
task = None
# ready to process something
for group in self.group_names:
queued_set = self._nkey('queued_set:%s' % group)
queued_list = self._nkey('queued:%s' % group)
running_list = self._nkey('running:%s' % group)
running_dict = self._nkey('running_dict:%s' % group)
self.w_stats.status = POLLING
# polling for 1 minute in total. If more groups are in,
# polling is 1 minute in total
logger.debug(' polling on %s' , group)
task_id = r_server.brpoplpush(queued_list, running_list, timeout=60/len(self.group_names))
logger.debug(' finished polling')
self.w_stats.status = ACTIVE
if task_id:
r_server.hset(running_dict, task_id, self.worker_name)
r_server.srem(queued_set, task_id)
task = db(
(st.id == task_id) &
(st.status == QUEUED)
).select().first()
if not task:
r_server.lrem(running_list, 0, task_id)
r_server.hdel(running_dict, task_id)
r_server.lrem(queued_list, 0, task_id)
logger.error("we received a task that isn't there (%s)" % task_id)
return None
break
now = self.now()
if task:
task.update_record(status=RUNNING, last_run_time=now)
# noone will touch my task!
db.commit()
logger.debug(' work to do %s', task.id)
else:
logger.info('nothing to do (%s)' % self.w_stats.status)
return None
times_run = task.times_run + 1
if not task.prevent_drift:
next_run_time = task.last_run_time + datetime.timedelta(
seconds=task.period
)
else:
next_run_time = task.start_time + datetime.timedelta(
seconds=task.period * times_run
)
if times_run < task.repeats or task.repeats == 0:
# need to run (repeating task)
run_again = True
else:
# no need to run again
run_again = False
run_id = 0
while True and not self.discard_results:
logger.debug(' new scheduler_run record')
try:
run_id = db.scheduler_run.insert(
task_id=task.id,
status=RUNNING,
start_time=now,
worker_name=self.worker_name)
db.commit()
break
except:
time.sleep(0.5)
db.rollback()
logger.info('new task %(id)s "%(task_name)s"'
' %(application_name)s.%(function_name)s' % task)
return Task(
app=task.application_name,
function=task.function_name,
timeout=task.timeout,
args=task.args, # in json
vars=task.vars, # in json
task_id=task.id,
run_id=run_id,
run_again=run_again,
next_run_time=next_run_time,
times_run=times_run,
stop_time=task.stop_time,
retry_failed=task.retry_failed,
times_failed=task.times_failed,
sync_output=task.sync_output,
uuid=task.uuid,
group_name=task.group_name)
def report_task(self, task, task_report):
"""
Needs overwriting only because we need to pop from the
running tasks
"""
r_server = self.r_server
db = self.db
now = self.now()
st = db.scheduler_task
sr = db.scheduler_run
if not self.discard_results:
if task_report.result != 'null' or task_report.tb:
# result is 'null' as a string if task completed
# if it's stopped it's None as NoneType, so we record
# the STOPPED "run" anyway
logger.debug(' recording task report in db (%s)',
task_report.status)
db(sr.id == task.run_id).update(
status=task_report.status,
stop_time=now,
run_result=task_report.result,
run_output=task_report.output,
traceback=task_report.tb)
else:
logger.debug(' deleting task report in db because of no result')
db(sr.id == task.run_id).delete()
# if there is a stop_time and the following run would exceed it
is_expired = (task.stop_time
and task.next_run_time > task.stop_time
and True or False)
status = (task.run_again and is_expired and EXPIRED
or task.run_again and not is_expired
and QUEUED or COMPLETED)
if task_report.status == COMPLETED:
# assigned calculations
d = dict(status=status,
next_run_time=task.next_run_time,
times_run=task.times_run,
times_failed=0,
assigned_worker_name=self.worker_name
)
db(st.id == task.task_id).update(**d)
if status == COMPLETED:
self.update_dependencies(db, task.task_id)
else:
st_mapping = {'FAILED': 'FAILED',
'TIMEOUT': 'TIMEOUT',
'STOPPED': 'FAILED'}[task_report.status]
status = (task.retry_failed
and task.times_failed < task.retry_failed
and QUEUED or task.retry_failed == -1
and QUEUED or st_mapping)
db(st.id == task.task_id).update(
times_failed=db.scheduler_task.times_failed + 1,
next_run_time=task.next_run_time,
status=status,
assigned_worker_name=self.worker_name
)
logger.info('task completed (%s)', task_report.status)
running_list = self._nkey('running:%s' % task.group_name)
running_dict = self._nkey('running_dict:%s' % task.group_name)
r_server.lrem(running_list, 0, task.task_id)
r_server.hdel(running_dict, task.task_id)
def wrapped_pop_task(self):
"""Commodity function to call `pop_task` and trap exceptions
If an exception is raised, assume it happened because of database
contention and retries `pop_task` after 0.5 seconds
"""
db = self.db
db.commit() # another nifty db.commit() only for Mysql
x = 0
while x < 10:
try:
rtn = self.pop_task(db)
return rtn
break
# this is here to "interrupt" any blrpoplpush op easily
except KeyboardInterrupt:
self.give_up()
break
except:
self.w_stats.errors += 1
db.rollback()
logger.error(' error popping tasks')
x += 1
time.sleep(0.5)
def get_workers(self, only_ticker=False):
""" Returns a dict holding worker_name : {**columns}
representing all "registered" workers
only_ticker returns only the worker running as a TICKER,
if there is any
"""
r_server = self.r_server
status_keyset = self._nkey('worker_statuses')
registered_workers = r_server.smembers(status_keyset)
all_workers = {}
for worker in registered_workers:
w = r_server.hgetall(worker)
w = Storage(w)
if not w:
continue
all_workers[w.worker_name] = Storage(
status=w.status,
first_heartbeat=self.str2date(w.first_heartbeat),
last_heartbeat=self.str2date(w.last_heartbeat),
group_names=loads(w.group_names, object_hook=_decode_dict),
is_ticker=w.is_ticker == 'True' and True or False,
worker_stats=loads(w.worker_stats, object_hook=_decode_dict)
)
if only_ticker:
for k, v in all_workers.iteritems():
if v['is_ticker']:
return {k: v}
return {}
return all_workers
def set_worker_status(self, group_names=None, action=ACTIVE,
exclude=None, limit=None, worker_name=None):
"""Internal function to set worker's status"""
r_server = self.r_server
all_workers = self.get_workers()
if not group_names:
group_names = self.group_names
elif isinstance(group_names, str):
group_names = [group_names]
exclusion = exclude and exclude.append(action) or [action]
workers = []
if worker_name is not None:
if worker_name in all_workers.keys():
workers = [worker_name]
else:
for k, v in all_workers.iteritems():
if v.status not in exclusion and set(group_names) & set(v.group_names):
workers.append(k)
if limit and worker_name is None:
workers = workers[:limit]
if workers:
with r_server.pipeline() as pipe:
while True:
try:
pipe.watch('SET_WORKER_STATUS')
for w in workers:
worker_key = self._nkey('worker_status:%s' % w)
pipe.hset(worker_key, 'status', action)
pipe.execute()
break
except RWatchError:
time.sleep(0.1)
continue
def queue_task(self, function, pargs=[], pvars={}, **kwargs):
"""
FIXME: immediate should put item in queue. The hard part is
that currently there are no hooks happening at post-commit time
Queue tasks. This takes care of handling the validation of all
parameters
Args:
function: the function (anything callable with a __name__)
pargs: "raw" args to be passed to the function. Automatically
jsonified.
pvars: "raw" kwargs to be passed to the function. Automatically
jsonified
kwargs: all the parameters available (basically, every
`scheduler_task` column). If args and vars are here, they should
be jsonified already, and they will override pargs and pvars
Returns:
a dict just as a normal validate_and_insert(), plus a uuid key
holding the uuid of the queued task. If validation is not passed
( i.e. some parameters are invalid) both id and uuid will be None,
and you'll get an "error" dict holding the errors found.
"""
if hasattr(function, '__name__'):
function = function.__name__
targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs)
tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars)
tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid()
tname = 'task_name' in kwargs and kwargs.pop('task_name') or function
immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None
rtn = self.db.scheduler_task.validate_and_insert(
function_name=function,
task_name=tname,
args=targs,
vars=tvars,
uuid=tuuid,
**kwargs)
if not rtn.errors:
rtn.uuid = tuuid
if immediate:
r_server = self.r_server
ticker = self.get_workers(only_ticker=True)
if ticker.keys():
ticker = ticker.keys()[0]
with r_server.pipeline() as pipe:
while True:
try:
pipe.watch('SET_WORKER_STATUS')
worker_key = self._nkey('worker_status:%s' % ticker)
pipe.hset(worker_key, 'status', 'PICK')
pipe.execute()
break
except RWatchError:
time.sleep(0.1)
continue
else:
rtn.uuid = None
return rtn
def stop_task(self, ref):
"""Shortcut for task termination.
If the task is RUNNING it will terminate it, meaning that status
will be set as FAILED.
If the task is QUEUED, its stop_time will be set as to "now",
the enabled flag will be set to False, and the status to STOPPED
Args:
ref: can be
- an integer : lookup will be done by scheduler_task.id
- a string : lookup will be done by scheduler_task.uuid
Returns:
- 1 if task was stopped (meaning an update has been done)
- None if task was not found, or if task was not RUNNING or QUEUED
Note:
Experimental
"""
r_server = self.r_server
st = self.db.scheduler_task
if isinstance(ref, int):
q = st.id == ref
elif isinstance(ref, str):
q = st.uuid == ref
else:
raise SyntaxError(
"You can retrieve results only by id or uuid")
task = self.db(q).select(st.id, st.status, st.group_name)
task = task.first()
rtn = None
if not task:
return rtn
running_dict = self._nkey('running_dict:%s' % task.group_name)
if task.status == 'RUNNING':
worker_key = r_server.hget(running_dict, task.id)
worker_key = self._nkey('worker_status:%s' % (worker_key))
r_server.hset(worker_key, 'status', STOP_TASK)
elif task.status == 'QUEUED':
rtn = self.db(q).update(
stop_time=self.now(),
enabled=False,
status=STOPPED)
return rtn
+43 -73
View File
@@ -1,13 +1,18 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Developed by niphlod@gmail.com
License MIT/BSD/GPL
Redis-backed sessions
"""
import redis
from gluon import current
from gluon.storage import Storage
import time
import logging
import thread
from gluon import current
from gluon.storage import Storage
from gluon.contrib.redis_utils import acquire_lock, release_lock
from gluon.contrib.redis_utils import register_release_lock
logger = logging.getLogger("web2py.session.redis")
@@ -16,10 +21,20 @@ locker = thread.allocate_lock()
def RedisSession(*args, **vars):
"""
Usage example: put in models
from gluon.contrib.redis_session import RedisSession
sessiondb = RedisSession('localhost:6379',db=0, session_expiry=False, password=None)
session.connect(request, response, db = sessiondb)
Usage example: put in models::
from gluon.contrib.redis_utils import RConn
rconn = RConn()
from gluon.contrib.redis_session
sessiondb = RedisSession(redis_conn=rconn, with_lock=True, session_expiry=False)
session.connect(request, response, db = sessiondb)
Args:
redis_conn: a redis-like connection object
with_lock: prevent concurrent modifications to the same session
session_expiry: delete automatically sessions after n seconds
(still need to run sessions2trash.py every 1M sessions
or so)
Simple slip-in storage for session
"""
@@ -36,30 +51,9 @@ def RedisSession(*args, **vars):
class RedisClient(object):
meta_storage = {}
MAX_RETRIES = 5
RETRIES = 0
_release_script = None
def __init__(self, server='localhost:6379', db=None, debug=False,
session_expiry=False, with_lock=False, password=None):
"""session_expiry can be an integer, in seconds, to set the default expiration
of sessions. The corresponding record will be deleted from the redis instance,
and there's virtually no need to run sessions2trash.py
"""
self.server = server
self.password = password
self.db = db or 0
host, port = (self.server.split(':') + ['6379'])[:2]
port = int(port)
self.debug = debug
if current and current.request:
self.app = current.request.application
else:
self.app = ''
self.r_server = redis.Redis(host=host, port=port, db=self.db, password=self.password)
if with_lock:
RedisClient._release_script = self.r_server.register_script(_LUA_RELEASE_LOCK)
def __init__(self, redis_conn, session_expiry=False, with_lock=False):
self.r_server = redis_conn
self._release_script = register_release_lock(self.r_server)
self.tablename = None
self.session_expiry = session_expiry
self.with_lock = with_lock
@@ -93,12 +87,11 @@ class RedisClient(object):
class MockTable(object):
def __init__(self, db, r_server, tablename, session_expiry, with_lock=False):
# here self.db is the RedisClient instance
self.db = db
self.r_server = r_server
self.tablename = tablename
# set the namespace for sessions of this app
self.keyprefix = 'w2p:sess:%s' % tablename.replace(
'web2py_session_', '')
self.keyprefix = 'w2p:sess:%s' % tablename.replace('web2py_session_', '')
# fast auto-increment id (needed for session handling)
self.serial = "%s:serial" % self.keyprefix
# index of all the session keys of this app
@@ -126,7 +119,7 @@ class MockTable(object):
if key == 'id':
# return a fake query. We need to query it just by id for normal operations
self.query = MockQuery(
field='id', db=self.r_server,
field='id', db=self.db,
prefix=self.keyprefix, session_expiry=self.session_expiry,
with_lock=self.with_lock, unique_key=self.unique_key
)
@@ -140,12 +133,12 @@ class MockTable(object):
# 'locked', 'client_ip','created_datetime','modified_datetime'
# 'unique_key', 'session_data'
# retrieve a new key
newid = str(self.r_server.incr(self.serial))
newid = str(self.db.r_server.incr(self.serial))
key = self.keyprefix + ':' + newid
if self.with_lock:
key_lock = key + ':lock'
acquire_lock(self.r_server, key_lock, newid)
with self.r_server.pipeline() as pipe:
acquire_lock(self.db.r_server, key_lock, newid)
with self.db.r_server.pipeline() as pipe:
# add it to the index
pipe.sadd(self.id_idx, key)
# set a hash key with the Storage
@@ -154,7 +147,7 @@ class MockTable(object):
pipe.expire(key, self.session_expiry)
pipe.execute()
if self.with_lock:
release_lock(self.r_server, key_lock, newid)
release_lock(self.db, key_lock, newid)
return newid
@@ -186,8 +179,8 @@ class MockQuery(object):
# means that someone wants to retrieve the key self.value
key = self.keyprefix + ':' + str(self.value)
if self.with_lock:
acquire_lock(self.db, key + ':lock', self.value)
rtn = self.db.hgetall(key)
acquire_lock(self.db.r_server, key + ':lock', self.value, 2)
rtn = self.db.r_server.hgetall(key)
if rtn:
if self.unique_key:
# make sure the id and unique_key are correct
@@ -201,13 +194,13 @@ class MockQuery(object):
rtn = []
id_idx = "%s:id_idx" % self.keyprefix
# find all session keys of this app
allkeys = self.db.smembers(id_idx)
allkeys = self.db.r_server.smembers(id_idx)
for sess in allkeys:
val = self.db.hgetall(sess)
val = self.db.r_server.hgetall(sess)
if not val:
if self.session_expiry:
# clean up the idx, because the key expired
self.db.srem(id_idx, sess)
self.db.r_server.srem(id_idx, sess)
continue
val = Storage(val)
# add a delete_record method (necessary for sessions2trash.py)
@@ -222,9 +215,9 @@ class MockQuery(object):
# means that the session has been found and needs an update
if self.op == 'eq' and self.field == 'id' and self.value:
key = self.keyprefix + ':' + str(self.value)
if not self.db.exists(key):
if not self.db.r_server.exists(key):
return None
with self.db.pipeline() as pipe:
with self.db.r_server.pipeline() as pipe:
pipe.hmset(key, kwargs)
if self.session_expiry:
pipe.expire(key, self.session_expiry)
@@ -238,7 +231,7 @@ class MockQuery(object):
if self.op == 'eq' and self.field == 'id' and self.value:
id_idx = "%s:id_idx" % self.keyprefix
key = self.keyprefix + ':' + str(self.value)
with self.db.pipeline() as pipe:
with self.db.r_server.pipeline() as pipe:
pipe.delete(key)
pipe.srem(id_idx, key)
rtn = pipe.execute()
@@ -254,29 +247,6 @@ class RecordDeleter(object):
def __call__(self):
id_idx = "%s:id_idx" % self.keyprefix
# remove from the index
self.db.srem(id_idx, self.key)
self.db.r_server.srem(id_idx, self.key)
# remove the key itself
self.db.delete(self.key)
def acquire_lock(conn, lockname, identifier, ltime=10):
while True:
if conn.set(lockname, identifier, ex=ltime, nx=True):
return identifier
time.sleep(.01)
_LUA_RELEASE_LOCK = """
if redis.call("get", KEYS[1]) == ARGV[1]
then
return redis.call("del", KEYS[1])
else
return 0
end
"""
def release_lock(conn, lockname, identifier):
return RedisClient._release_script(
keys=[lockname], args=[identifier],
client=conn)
self.db.r_server.delete(self.key)
+70
View File
@@ -0,0 +1,70 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Developed by niphlod@gmail.com
License MIT/BSD/GPL
Serves as base to implement Redis connection object and various utils
for redis_cache, redis_session and redis_scheduler in the future
Should-could be overriden in case redis doesn't keep up (e.g. cluster support)
to ensure compatibility with another - similar - library
"""
import logging
import thread
import time
from gluon import current
logger = logging.getLogger("web2py.redis_utils")
try:
import redis
from redis.exceptions import WatchError as RWatchError
from redis.exceptions import ConnectionError as RConnectionError
except ImportError:
logger.error("Needs redis library to work")
raise RuntimeError('Needs redis library to work')
locker = thread.allocate_lock()
def RConn(*args, **vars):
"""
Istantiates a StrictRedis connection with parameters, at the first time
only
"""
locker.acquire()
try:
instance_name = 'redis_conn_' + current.request.application
if not hasattr(RConn, instance_name):
setattr(RConn, instance_name, redis.StrictRedis(*args, **vars))
return getattr(RConn, instance_name)
finally:
locker.release()
def acquire_lock(conn, lockname, identifier, ltime=10):
while True:
if conn.set(lockname, identifier, ex=ltime, nx=True):
return identifier
time.sleep(.01)
_LUA_RELEASE_LOCK = """
if redis.call("get", KEYS[1]) == ARGV[1]
then
return redis.call("del", KEYS[1])
else
return 0
end
"""
def release_lock(instance, lockname, identifier):
return instance._release_script(
keys=[lockname], args=[identifier])
def register_release_lock(conn):
rtn = conn.register_script(_LUA_RELEASE_LOCK)
return rtn