Changed CacheOnDisk to use fine grained locking instead of atomic replacement

This commit is contained in:
Leonel Câmara
2015-03-27 01:49:48 +00:00
parent bd899ea304
commit 9050840962

View File

@@ -30,8 +30,10 @@ import hashlib
import datetime
import tempfile
from gluon import recfile
from gluon import portalocker
from collections import defaultdict
try:
from collections import OrderdDict
from collections import OrderedDict
except ImportError:
from gluon.contrib.ordereddict import OrderedDict
try:
@@ -282,49 +284,17 @@ class CacheOnDisk(CacheAbstract):
class PersistentStorage(object):
"""
Implements a key based storage in disk.
Implements a key based thread/process-safe safe storage in disk.
"""
def __init__(self, folder):
def __init__(self, folder, file_lock_time_wait=0.1):
self.folder = folder
self.key_filter_in = lambda key: key
self.key_filter_out = lambda key: key
# Check the best way to do atomic file replacement.
if sys.version_info >= (3, 3):
self.replace = os.replace
elif sys.platform == "win32":
import ctypes
from ctypes import wintypes
ReplaceFile = ctypes.windll.kernel32.ReplaceFileW
ReplaceFile.restype = wintypes.BOOL
ReplaceFile.argtypes = [
wintypes.LPWSTR,
wintypes.LPWSTR,
wintypes.LPWSTR,
wintypes.DWORD,
wintypes.LPVOID,
wintypes.LPVOID,
]
self.file_lock_time_wait = file_lock_time_wait # How long we should wait before retrying to lock a file held by another process
# We still need a mutex for each file as portalocker only blocks other processes
self.file_locks = defaultdict(thread.allocate_lock)
def replace_windows(src, dst):
"""
The Windows filesystem has a 256 character limit for the filename.
To use filenames longer than that, the '\\?\' prefix needs to be used.
By default, this prefix is added to all windows filenames,
when accessing it.
View this for details: http://stackoverflow.com/a/23230380/348142
"""
windows_prefix = "\\\\?\\"
dst = windows_prefix + dst
src = windows_prefix + src
if not ReplaceFile(dst, src, None, 0, 0, 0):
os.rename(src, dst)
self.replace = replace_windows
else:
# POSIX rename() is always atomic
self.replace = os.rename
# Make sure we use valid filenames.
if sys.platform == "win32":
@@ -346,37 +316,56 @@ class CacheOnDisk(CacheAbstract):
self.key_filter_out = key_filter_out_windows
def wait_portalock(self, val_file):
"""
Wait for the process file lock.
"""
while True:
try:
portalocker.lock(val_file, portalocker.LOCK_EX)
break
except:
time.sleep(self.file_lock_time_wait)
def __setitem__(self, key, value):
tmp_name, tmp_path = tempfile.mkstemp(dir=self.folder)
tmp = os.fdopen(tmp_name, 'wb')
try:
pickle.dump((time.time(), value), tmp, pickle.HIGHEST_PROTOCOL)
finally:
tmp.close()
key = self.key_filter_in(key)
fullfilename = os.path.join(self.folder, recfile.generate(key))
if not os.path.exists(os.path.dirname(fullfilename)):
os.makedirs(os.path.dirname(fullfilename))
self.replace(tmp_path, fullfilename)
self.file_locks[key].acquire()
val_file = recfile.open(key, mode='wb', path=self.folder)
self.wait_portalock(val_file)
pickle.dump(value, val_file, pickle.HIGHEST_PROTOCOL)
val_file.close()
portalocker.unlock(val_file)
self.file_locks[key].release()
def __getitem__(self, key):
key = self.key_filter_in(key)
if recfile.exists(key, path=self.folder):
timestamp, value = pickle.load(recfile.open(key, 'rb', path=self.folder))
return value
else:
self.file_locks[key].acquire()
try:
val_file = recfile.open(key, mode='rb', path=self.folder)
except IOError:
self.file_locks[key].release()
raise KeyError
self.wait_portalock(val_file)
value = pickle.load(recfile.open(key, 'rb', path=self.folder))
val_file.close()
portalocker.unlock(val_file)
self.file_locks[key].release()
return value
def __contains__(self, key):
key = self.key_filter_in(key)
return recfile.exists(key, path=self.folder)
return (key in self.file_locks) or recfile.exists(key, path=self.folder)
def __delitem__(self, key):
key = self.key_filter_in(key)
self.file_locks[key].acquire()
recfile.remove(key, path=self.folder)
del self.file_locks[key]
def __iter__(self):
@@ -385,6 +374,36 @@ class CacheOnDisk(CacheAbstract):
yield self.key_filter_out(filename)
def safe_apply(self, key, function, default_value=None):
"""
Safely apply a function to the value of a key in storage and set
the return value of the function to it.
Return the result of applying the function.
"""
key = self.key_filter_in(key)
self.file_locks[key].acquire()
exists = True
try:
val_file = recfile.open(key, mode='r+b', path=self.folder)
except IOError:
exists = False
val_file = recfile.open(key, mode='wb', path=self.folder)
self.wait_portalock(val_file)
if exists:
timestamp, value = pickle.load(val_file)
else:
value = default_value
new_value = function(value)
val_file.seek(0)
pickle.dump((time.time(), new_value), val_file, pickle.HIGHEST_PROTOCOL)
val_file.truncate()
val_file.close()
portalocker.unlock(val_file)
self.file_locks[key].release()
return new_value
def keys(self):
return list(self.__iter__())
@@ -395,10 +414,12 @@ class CacheOnDisk(CacheAbstract):
except KeyError:
return default
def clear(self):
for key in self:
del self[key]
try:
del self[key]
except KeyError:
pass
def __init__(self, request=None, folder=None):
@@ -426,17 +447,23 @@ class CacheOnDisk(CacheAbstract):
self.storage = CacheOnDisk.PersistentStorage(folder)
if not CacheAbstract.cache_stats_name in self.storage:
self.storage[CacheAbstract.cache_stats_name] = {'hit_total': 0, 'misses': 0}
def __call__(self, key, f,
time_expire=DEFAULT_TIME_EXPIRE):
self.initialize()
def inc_hit_total(v):
v['hit_total'] += 1
return v
def inc_misses(v):
v['misses'] += 1
return v
dt = time_expire
item = self.storage.get(key)
self.storage[CacheAbstract.cache_stats_name]['hit_total'] += 1
self.storage.safe_apply(CacheAbstract.cache_stats_name, inc_hit_total,
default_value={'hit_total': 0, 'misses': 0})
if item and f is None:
del self.storage[key]
@@ -451,7 +478,8 @@ class CacheOnDisk(CacheAbstract):
else:
value = f()
self.storage[key] = (now, value)
self.storage[CacheAbstract.cache_stats_name]['misses'] += 1
self.storage.safe_apply(CacheAbstract.cache_stats_name, inc_misses,
default_value={'hit_total': 0, 'misses': 0})
return value
@@ -464,21 +492,10 @@ class CacheOnDisk(CacheAbstract):
else:
self._clear(storage, regex)
if not CacheAbstract.cache_stats_name in storage:
storage[CacheAbstract.cache_stats_name] = {
'hit_total': 0, 'misses': 0}
def increment(self, key, value=1):
self.initialize()
storage = self.storage
try:
if key in storage:
value = storage[key][1] + value
storage[key] = (time.time(), value)
except:
pass
return value
return self.storage.safe_apply(key, lambda x: x + value, default_value=0)