From cc3f045efa91653bc913a937a9a42ed09793d24e Mon Sep 17 00:00:00 2001 From: Ricardo Pedroso Date: Mon, 13 May 2013 23:06:24 +0100 Subject: [PATCH 1/3] get rid of val == dict() --- gluon/contrib/redis_session.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/gluon/contrib/redis_session.py b/gluon/contrib/redis_session.py index cadd2db1..cf5c73a5 100644 --- a/gluon/contrib/redis_session.py +++ b/gluon/contrib/redis_session.py @@ -155,11 +155,7 @@ class MockQuery(object): if self.op == 'eq' and self.field == 'id' and self.value: #means that someone wants to retrieve the key self.value rtn = self.db.hgetall("%s:%s" % (self.keyprefix, self.value)) - if rtn == dict(): - #return an empty resultset for non existing key - return [] - else: - return [Storage(rtn)] + return [Storage(rtn)] if rtn else [] elif self.op == 'ge' and self.field == 'id' and self.value == 0: #means that someone wants the complete list rtn = [] @@ -168,13 +164,11 @@ class MockQuery(object): allkeys = self.db.smembers(id_idx) for sess in allkeys: val = self.db.hgetall(sess) - if val == dict(): + if not val: if self.session_expiry: #clean up the idx, because the key expired self.db.srem(id_idx, sess) - continue - else: - continue + continue val = Storage(val) #add a delete_record method (necessary for sessions2trash.py) val.delete_record = RecordDeleter( From 693c5cd5e493bb92521ef1a375d42aed6f4d710a Mon Sep 17 00:00:00 2001 From: Ricardo Pedroso Date: Sat, 18 May 2013 20:26:34 +0100 Subject: [PATCH 2/3] add an optional lock mechanism to redis sessions --- gluon/contrib/redis_session.py | 85 +++++++++++++++++++++++++--------- 1 file changed, 64 insertions(+), 21 deletions(-) diff --git a/gluon/contrib/redis_session.py b/gluon/contrib/redis_session.py index cf5c73a5..0b05a723 100644 --- a/gluon/contrib/redis_session.py +++ b/gluon/contrib/redis_session.py @@ -42,8 +42,10 @@ class RedisClient(object): meta_storage = {} MAX_RETRIES = 5 RETRIES = 0 + _release_script = None - def __init__(self, server='localhost:6379', db=None, debug=False, session_expiry=False): + def __init__(self, server='localhost:6379', db=None, debug=False, + session_expiry=False, with_lock=False): """session_expiry can be an integer, in seconds, to set the default expiration of sessions. The corresponding record will be deleted from the redis instance, and there's virtually no need to run sessions2trash.py @@ -58,8 +60,12 @@ class RedisClient(object): else: self.app = '' self.r_server = redis.Redis(host=host, port=port, db=self.db) + if with_lock: + RedisClient._release_script = \ + self.r_server.register_script(_LUA_RELEASE_LOCK) self.tablename = None self.session_expiry = session_expiry + self.with_lock = with_lock def get(self, what, default): return self.tablename @@ -71,7 +77,8 @@ class RedisClient(object): def define_table(self, tablename, *fields, **args): if not self.tablename: self.tablename = MockTable( - self, self.r_server, tablename, self.session_expiry) + self, self.r_server, tablename, self.session_expiry, + self.with_lock) return self.tablename def __getitem__(self, key): @@ -88,7 +95,7 @@ class RedisClient(object): class MockTable(object): - def __init__(self, db, r_server, tablename, session_expiry): + def __init__(self, db, r_server, tablename, session_expiry, with_lock=False): self.db = db self.r_server = r_server self.tablename = tablename @@ -101,15 +108,14 @@ class MockTable(object): self.id_idx = "%s:id_idx" % self.keyprefix #remember the session_expiry setting self.session_expiry = session_expiry - - def getserial(self): - #return an auto-increment id - return "%s" % self.r_server.incr(self.serial, 1) + self.with_lock = with_lock def __getattr__(self, key): if key == 'id': #return a fake query. We need to query it just by id for normal operations - self.query = MockQuery(field='id', db=self.r_server, prefix=self.keyprefix, session_expiry=self.session_expiry) + self.query = MockQuery(field='id', db=self.r_server, + prefix=self.keyprefix, session_expiry=self.session_expiry, + with_lock=self.with_lock) return self.query elif key == '_db': #needed because of the calls in sessions2trash.py and globals.py @@ -120,14 +126,21 @@ class MockTable(object): #'locked', 'client_ip','created_datetime','modified_datetime' #'unique_key', 'session_data' #retrieve a new key - newid = self.getserial() - key = "%s:%s" % (self.keyprefix, newid) - #add it to the index - self.r_server.sadd(self.id_idx, key) - #set a hash key with the Storage - self.r_server.hmset(key, kwargs) - if self.session_expiry: - self.r_server.expire(key, self.session_expiry) + newid = str(self.r_server.incr(self.serial)) + key = self.keyprefix + ':' + newid + if self.with_lock: + key_lock = key + ':lock' + acquire_lock(self.r_server, key_lock, newid) + with self.r_server.pipeline() as pipe: + #add it to the index + pipe.sadd(self.id_idx, key) + #set a hash key with the Storage + pipe.hmset(key, kwargs) + if self.session_expiry: + pipe.expire(key, self.session_expiry) + pipe.execute() + if self.with_lock: + release_lock(self.r_server, key_lock, newid) return newid @@ -135,13 +148,15 @@ class MockQuery(object): """a fake Query object that supports querying by id and listing all keys. No other operation is supported """ - def __init__(self, field=None, db=None, prefix=None, session_expiry=False): + def __init__(self, field=None, db=None, prefix=None, session_expiry=False, + with_lock=False): self.field = field self.value = None self.db = db self.keyprefix = prefix self.op = None self.session_expiry = session_expiry + self.with_lock = with_lock def __eq__(self, value, op='eq'): self.value = value @@ -154,7 +169,10 @@ class MockQuery(object): def select(self): if self.op == 'eq' and self.field == 'id' and self.value: #means that someone wants to retrieve the key self.value - rtn = self.db.hgetall("%s:%s" % (self.keyprefix, self.value)) + key = self.keyprefix + ':' + self.value + if self.with_lock: + acquire_lock(self.db, key + ':lock', self.value) + rtn = self.db.hgetall(key) return [Storage(rtn)] if rtn else [] elif self.op == 'ge' and self.field == 'id' and self.value == 0: #means that someone wants the complete list @@ -182,9 +200,13 @@ class MockQuery(object): #means that the session has been found and needs an update if self.op == 'eq' and self.field == 'id' and self.value: key = "%s:%s" % (self.keyprefix, self.value) - rtn = self.db.hmset(key, kwargs) - if self.session_expiry: - self.db.expire(key, self.session_expiry) + with self.db.pipeline() as pipe: + pipe.hmset(key, kwargs) + if self.session_expiry: + pipe.expire(key, self.session_expiry) + rtn = pipe.execute()[0] + if self.with_lock: + release_lock(self.db, key + ':lock', self.value) return rtn @@ -200,3 +222,24 @@ class RecordDeleter(object): self.db.srem(id_idx, self.key) #remove the key itself self.db.delete(self.key) + + +def acquire_lock(conn, lockname, identifier, ltime=10): + while True: + if conn.set(lockname, identifier, ex=ltime, nx=True): + return identifier + time.sleep(.01) + + +_LUA_RELEASE_LOCK = """ +if redis.call("get", KEYS[1]) == ARGV[1] +then + return redis.call("del", KEYS[1]) +else + return 0 +end +""" + + +def release_lock(conn, lockname, identifier): + return RedisClient._release_script(keys=[lockname], args=[identifier]) From 15263ea1d6b361b75307e9ec33e905a1c50f040e Mon Sep 17 00:00:00 2001 From: Ricardo Pedroso Date: Sat, 18 May 2013 22:26:35 +0100 Subject: [PATCH 3/3] add missing client parameter to lua script --- gluon/contrib/redis_session.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gluon/contrib/redis_session.py b/gluon/contrib/redis_session.py index 0b05a723..14ce094c 100644 --- a/gluon/contrib/redis_session.py +++ b/gluon/contrib/redis_session.py @@ -242,4 +242,5 @@ end def release_lock(conn, lockname, identifier): - return RedisClient._release_script(keys=[lockname], args=[identifier]) + return RedisClient._release_script(keys=[lockname], args=[identifier], + client=conn)