From dc2fd49ecb584747fa7928b38b5594d652f2a51d Mon Sep 17 00:00:00 2001 From: niphlod Date: Thu, 16 May 2013 00:44:24 +0200 Subject: [PATCH] new and improved redis_cache. Got distributed locking and faster clear() function. also, the .increment() was a little bit bugged. --- gluon/contrib/redis_cache.py | 180 ++++++++++++++++++++++++++--------- 1 file changed, 137 insertions(+), 43 deletions(-) diff --git a/gluon/contrib/redis_cache.py b/gluon/contrib/redis_cache.py index 16454c1b..634b91f3 100644 --- a/gluon/contrib/redis_cache.py +++ b/gluon/contrib/redis_cache.py @@ -1,8 +1,7 @@ """ -Developed by 616d41631bff906704951934ffe4015e +Developed by niphlod@gmail.com Released under web2py license because includes gluon/cache.py source code """ - import redis from redis.exceptions import ConnectionError from gluon import current @@ -12,6 +11,8 @@ import time import re import logging import thread +import random + logger = logging.getLogger("web2py.cache.redis") @@ -23,15 +24,42 @@ def RedisCache(*args, **vars): Usage example: put in models from gluon.contrib.redis_cache import RedisCache - cache.redis = RedisCache('localhost:6379',db=None, debug=True) + cache.redis = RedisCache('localhost:6379',db=None, debug=True, with_lock=True) + + :param db: redis db to use (0..16) + :param debug: if True adds to stats() the total_hits and misses + :param with_lock: sets the default locking mode for creating new keys. + By default is False (usualy when you choose Redis you do it + for performances reason) + When True, only one thread/process can set a value concurrently + + When you use cache.redis directly you can use + value = cache.redis('mykey', lambda: time.time(), with_lock=True) + to enforce locking. The with_lock parameter overrides the one set in the + cache.redis instance creation cache.redis.stats() + returns a dictionary with statistics of Redis server + with one additional key ('w2p_keys') showing all keys currently set + from web2py with their TTL - return a dictionary with statistics of Redis server - with one additional key ('w2p_keys') showing all keys currently set - from web2py with their TTL - if debug=True additional tracking is activate and another key is added - ('w2p_stats') showing total_hits and misses + A little wording on how keys are stored (and why the cache_it() function + and the clear() one look a little bit convoluted): there are a lot of + libraries that just store values and then use the KEYS command to delete it. + Until recent releases of this module, that technique was used here too. + In the need of deleting specific keys in a database with zillions keys in it + (other web2py apps, other applications in the need of a Redis stack) the + KEYS command is slow (it needs to scan every key in the database). + So, we use Redis 'sets' to store keys in "buckets"... + - every key created gets "indexed" in a bucket + - all buckets are indexed in a fixed key that never expires + - all keys generated within the same minute go in the same bucket + - every bucket is then set to expire when every key within it is expired + When we need to clear() cached keys: + - we tell Redis to SUNION all buckets + - gives us just the keys that are not expired yet + - buckets that are expired are removed from the fixed set + - we scan the keys and then delete them """ locker.acquire() @@ -49,13 +77,15 @@ class RedisClient(object): MAX_RETRIES = 5 RETRIES = 0 - def __init__(self, server='localhost:6379', db=None, debug=False): + def __init__(self, server='localhost:6379', db=None, debug=False, with_lock=False): self.server = server self.db = db or 0 host, port = (self.server.split(':') + ['6379'])[:2] port = int(port) self.request = current.request self.debug = debug + self.with_lock = with_lock + self.prefix = "w2p:%s:" % (self.request.application) if self.request: app = self.request.application else: @@ -70,67 +100,112 @@ class RedisClient(object): else: self.storage = self.meta_storage[app] + self.cache_set_key = 'w2p:%s:___cache_set' % (self.request.application) + self.r_server = redis.Redis(host=host, port=port, db=self.db) - def __call__(self, key, f, time_expire=300): + def __call__(self, key, f, time_expire=300, with_lock=None): + if with_lock is None: + with_lock = self.with_lock + if time_expire is None: + time_expire = 24 * 60 * 60 + newKey = self.__keyFormat__(key) + value = None + ttl = 0 try: - if time_expire is None: - time_expire = 24 * 60 * 60 - newKey = self.__keyFormat__(key) - value = None + #is there a value obj = self.r_server.get(newKey) - ttl = self.r_server.ttl(newKey) or 0 + #what's its ttl + if obj: + ttl = self.r_server.ttl(newKey) if ttl > time_expire: obj = None if obj: + #was cached if self.debug: self.r_server.incr('web2py_cache_statistics:hit_total') value = pickle.loads(obj) elif f is None: + #delete and never look back self.r_server.delete(newKey) else: - if self.debug: - self.r_server.incr('web2py_cache_statistics:misses') - value = f() - if time_expire == 0: - time_expire = 1 - self.r_server.setex(newKey, pickle.dumps(value), time_expire) + #naive distributed locking + if with_lock: + lock_key = '%s:__lock' % newKey + try: + while True: + lock = self.r_server.setnx(lock_key, 1) + if lock: + value = self.cache_it(newKey, f, time_expire) + break + else: + time.sleep(0.2) + #did someone else create it in the meanwhile ? + obj = self.r_server.get(newKey) + if obj: + value = pickle.loads(obj) + break + finally: + self.r_server.delete(lock_key) + else: + #without distributed locking + value = self.cache_it(newKey, f, time_expire) return value except ConnectionError: - return self.retry_call(key, f, time_expire) + return self.retry_call(key, f, time_expire, with_lock) - def retry_call(self, key, f, time_expire): + def cache_it(self, key, f, time_expire): + if self.debug: + self.r_server.incr('web2py_cache_statistics:misses') + cache_set_key = self.cache_set_key + expireat = int(time.time() + time_expire) + 120 + bucket_key = "%s:%s" % (cache_set_key, expireat / 60) + value = f() + value_ = pickle.dumps(value) + if time_expire == 0: + time_expire = 1 + self.r_server.setex(key, value_, time_expire) + #print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expireat)) + #print 'that will expire on %s' % (bucket_key, time.ctime(((expireat/60) + 1)*60)) + p = self.r_server.pipeline() + #add bucket to the fixed set + p.sadd(cache_set_key, bucket_key) + #sets the key + p.setex(key, value_, time_expire) + #add the key to the bucket + p.sadd(bucket_key, key) + #expire the bucket properly + p.expireat(bucket_key, ((expireat/60) + 1)*60) + p.execute() + return value + + def retry_call(self, key, f, time_expire, with_locking): self.RETRIES += 1 if self.RETRIES <= self.MAX_RETRIES: logger.error("sleeping %s seconds before reconnecting" % (2 * self.RETRIES)) time.sleep(2 * self.RETRIES) - self.__init__(self.server, self.db, self.debug) - return self.__call__(key, f, time_expire) + self.__init__(self.server, self.db, self.debug, self.with_lock) + return self.__call__(key, f, time_expire, with_locking) else: self.RETRIES = 0 raise ConnectionError('Redis instance is unavailable at %s' % ( self.server)) - def increment(self, key, value=1, time_expire=300): + def increment(self, key, value=1): try: newKey = self.__keyFormat__(key) - obj = self.r_server.get(newKey) - if obj: - return self.r_server.incr(newKey, value) - else: - self.r_server.setex(newKey, value, time_expire) - return value + return self.r_server.incr(newKey, value) except ConnectionError: - return self.retry_increment(key, value, time_expire) + return self.retry_increment(key, value) - def retry_increment(self, key, value, time_expire): + def retry_increment(self, key, value): self.RETRIES += 1 if self.RETRIES <= self.MAX_RETRIES: logger.error("sleeping some seconds before reconnecting") time.sleep(2 * self.RETRIES) - self.__init__(self.server, self.db, self.debug) - return self.increment(key, value, time_expire) + self.__init__(self.server, self.db, self.debug, self.with_lock) + return self.increment(key, value) else: self.RETRIES = 0 raise ConnectionError('Redis instance is unavailable at %s' % ( @@ -142,14 +217,34 @@ class RedisClient(object): clear cache entries """ r = re.compile(regex) - prefix = "w2p:%s:" % (self.request.application) + #get all buckets + buckets = self.r_server.smembers(self.cache_set_key) + #get all keys in buckets + if buckets: + keys = self.r_server.sunion(buckets) + else: + return + prefix = self.prefix pipe = self.r_server.pipeline() - for a in self.r_server.keys("%s*" % - (prefix)): + for a in keys: if r.match(str(a).replace(prefix, '', 1)): pipe.delete(a) + if random.randrange(0,100) < 10: + #do this just once in a while (10% chance) + self.clear_buckets(buckets) pipe.execute() + def clear_buckets(self, buckets): + p = self.r_server.pipeline() + for b in buckets: + if not self.r_server.exists(b): + p.srem(self.cache_set_key, b) + p.execute() + + def delete(self, key): + newKey = self.__keyFormat__(key) + return self.r_server.delete(newKey) + def stats(self): statscollector = self.r_server.info() if self.debug: @@ -159,12 +254,11 @@ class RedisClient(object): misses=self.r_server.get('web2py_cache_statistics:misses') ) statscollector['w2p_keys'] = dict() + for a in self.r_server.keys("w2p:%s:*" % ( self.request.application)): - statscollector['w2p_keys']["%s_expire_in_sec" % (a)] = \ - self.r_server.ttl(a) + statscollector['w2p_keys']["%s_expire_in_sec" % (a)] = self.r_server.ttl(a) return statscollector def __keyFormat__(self, key): - return 'w2p:%s:%s' % (self.request.application, - key.replace(' ', '_')) + return '%s%s' % (self.prefix, key.replace(' ', '_'))