fixed convertions for compatibility with Python3. Add further testing of redis sessions functions.

This commit is contained in:
2019-12-12 14:37:41 +00:00
parent 8f84b5df34
commit 022ddd49c4
2 changed files with 25 additions and 7 deletions

View File

@@ -90,12 +90,16 @@ class RedisClient(object):
fields = self.tablename.fields
typed_dict = dict()
converters = {
'boolean': lambda x: 1 if x == '1' else 0,
'boolean': lambda x: 1 if x.decode() == '1' else 0,
'blob': lambda x: x,
}
for field, ftype in fields:
if field not in dict_string:
continue
typed_dict[field] = converters[ftype](dict_string[field]) if ftype in converters else dict_string[field]
if ftype in converters:
typed_dict[field] = converters[ftype](dict_string[field])
else:
typed_dict[field] = dict_string[field].decode()
return typed_dict
@@ -186,7 +190,11 @@ class MockQuery(object):
self.value = value
self.op = op
def __gt__(self, value, op='ge'):
def __ge__(self, value, op='ge'):
self.value = value
self.op = op
def __gt__(self, value, op='gt'):
self.value = value
self.op = op
@@ -205,7 +213,7 @@ class MockQuery(object):
else:
rtn = None
return [Storage(self.db.convert_dict_string(rtn))] if rtn else []
elif self.op == 'ge' and self.field == 'id' and self.value == 0:
elif self.op in ('ge', 'gt') and self.field == 'id' and self.value == 0:
# means that someone wants the complete list
rtn = []
id_idx = "%s:id_idx" % self.keyprefix
@@ -218,7 +226,7 @@ class MockQuery(object):
# clean up the idx, because the key expired
self.db.r_server.srem(id_idx, sess)
continue
val = Storage(val)
val = Storage(self.db.convert_dict_string(val))
# add a delete_record method (necessary for sessions2trash.py)
val.delete_record = RecordDeleter(
self.db, sess, self.keyprefix)

View File

@@ -74,9 +74,19 @@ class TestRedis(unittest.TestCase):
)
record_id = table.insert(**dd)
data_from_db = db(table.id == record_id).select()[0]
self.assertDictEqual(Storage(dd), data_from_db)
self.assertDictEqual(Storage(dd), data_from_db, 'get inserted dict')
dd['locked'] = 1
table._db(table.id == record_id).update(**dd)
data_from_db = db(table.id == record_id).select()[0]
self.assertDictEqual(Storage(dd), data_from_db)
self.assertDictEqual(Storage(dd), data_from_db, 'get the updated value')
all_sessions = db(table.id > 0).select()
self.assertIsNotNone(all_sessions, 'we must have some keys in db')
for entry in all_sessions:
res = entry.delete_record()
self.assertIsNone(res, 'delete should return None')
empty_sessions = db(table.id > 0).select()
self.assertEqual(empty_sessions, [], 'no sessions left')