Merge pull request #882 from leonelcamara/lock_disk

Finely grained lock disk
This commit is contained in:
mdipierro
2015-03-30 01:25:55 -05:00
+103 -77
View File
@@ -30,8 +30,10 @@ import hashlib
import datetime
import tempfile
from gluon import recfile
from gluon import portalocker
from collections import defaultdict
try:
from collections import OrderdDict
from collections import OrderedDict
except ImportError:
from gluon.contrib.ordereddict import OrderedDict
try:
@@ -282,49 +284,17 @@ class CacheOnDisk(CacheAbstract):
class PersistentStorage(object):
"""
Implements a key based storage in disk.
Implements a key based thread/process-safe safe storage in disk.
"""
def __init__(self, folder):
def __init__(self, folder, file_lock_time_wait=0.1):
self.folder = folder
self.key_filter_in = lambda key: key
self.key_filter_out = lambda key: key
# Check the best way to do atomic file replacement.
if sys.version_info >= (3, 3):
self.replace = os.replace
elif sys.platform == "win32":
import ctypes
from ctypes import wintypes
ReplaceFile = ctypes.windll.kernel32.ReplaceFileW
ReplaceFile.restype = wintypes.BOOL
ReplaceFile.argtypes = [
wintypes.LPWSTR,
wintypes.LPWSTR,
wintypes.LPWSTR,
wintypes.DWORD,
wintypes.LPVOID,
wintypes.LPVOID,
]
self.file_lock_time_wait = file_lock_time_wait # How long we should wait before retrying to lock a file held by another process
# We still need a mutex for each file as portalocker only blocks other processes
self.file_locks = defaultdict(thread.allocate_lock)
def replace_windows(src, dst):
"""
The Windows filesystem has a 256 character limit for the filename.
To use filenames longer than that, the '\\?\' prefix needs to be used.
By default, this prefix is added to all windows filenames,
when accessing it.
View this for details: http://stackoverflow.com/a/23230380/348142
"""
windows_prefix = "\\\\?\\"
dst = windows_prefix + dst
src = windows_prefix + src
if not ReplaceFile(dst, src, None, 0, 0, 0):
os.rename(src, dst)
self.replace = replace_windows
else:
# POSIX rename() is always atomic
self.replace = os.rename
# Make sure we use valid filenames.
if sys.platform == "win32":
@@ -346,37 +316,58 @@ class CacheOnDisk(CacheAbstract):
self.key_filter_out = key_filter_out_windows
def wait_portalock(self, val_file):
"""
Wait for the process file lock.
"""
while True:
try:
portalocker.lock(val_file, portalocker.LOCK_EX)
break
except:
time.sleep(self.file_lock_time_wait)
def acquire(self, key):
self.file_locks[key].acquire()
def release(self, key):
self.file_locks[key].release()
def __setitem__(self, key, value):
tmp_name, tmp_path = tempfile.mkstemp(dir=self.folder)
tmp = os.fdopen(tmp_name, 'wb')
try:
pickle.dump((time.time(), value), tmp, pickle.HIGHEST_PROTOCOL)
finally:
tmp.close()
key = self.key_filter_in(key)
fullfilename = os.path.join(self.folder, recfile.generate(key))
if not os.path.exists(os.path.dirname(fullfilename)):
os.makedirs(os.path.dirname(fullfilename))
self.replace(tmp_path, fullfilename)
val_file = recfile.open(key, mode='wb', path=self.folder)
self.wait_portalock(val_file)
pickle.dump(value, val_file, pickle.HIGHEST_PROTOCOL)
val_file.close()
def __getitem__(self, key):
key = self.key_filter_in(key)
if recfile.exists(key, path=self.folder):
timestamp, value = pickle.load(recfile.open(key, 'rb', path=self.folder))
return value
else:
try:
val_file = recfile.open(key, mode='rb', path=self.folder)
except IOError:
raise KeyError
self.wait_portalock(val_file)
value = pickle.load(recfile.open(key, 'rb', path=self.folder))
val_file.close()
return value
def __contains__(self, key):
key = self.key_filter_in(key)
return recfile.exists(key, path=self.folder)
return (key in self.file_locks) or recfile.exists(key, path=self.folder)
def __delitem__(self, key):
key = self.key_filter_in(key)
recfile.remove(key, path=self.folder)
try:
recfile.remove(key, path=self.folder)
except IOError:
raise KeyError
def __iter__(self):
@@ -385,6 +376,33 @@ class CacheOnDisk(CacheAbstract):
yield self.key_filter_out(filename)
def safe_apply(self, key, function, default_value=None):
"""
Safely apply a function to the value of a key in storage and set
the return value of the function to it.
Return the result of applying the function.
"""
key = self.key_filter_in(key)
exists = True
try:
val_file = recfile.open(key, mode='r+b', path=self.folder)
except IOError:
exists = False
val_file = recfile.open(key, mode='wb', path=self.folder)
self.wait_portalock(val_file)
if exists:
timestamp, value = pickle.load(val_file)
else:
value = default_value
new_value = function(value)
val_file.seek(0)
pickle.dump((time.time(), new_value), val_file, pickle.HIGHEST_PROTOCOL)
val_file.truncate()
val_file.close()
return new_value
def keys(self):
return list(self.__iter__())
@@ -396,11 +414,6 @@ class CacheOnDisk(CacheAbstract):
return default
def clear(self):
for key in self:
del self[key]
def __init__(self, request=None, folder=None):
self.initialized = False
self.request = request
@@ -426,22 +439,32 @@ class CacheOnDisk(CacheAbstract):
self.storage = CacheOnDisk.PersistentStorage(folder)
if not CacheAbstract.cache_stats_name in self.storage:
self.storage[CacheAbstract.cache_stats_name] = {'hit_total': 0, 'misses': 0}
def __call__(self, key, f,
time_expire=DEFAULT_TIME_EXPIRE):
self.initialize()
def inc_hit_total(v):
v['hit_total'] += 1
return v
def inc_misses(v):
v['misses'] += 1
return v
dt = time_expire
self.storage.acquire(key)
self.storage.acquire(CacheAbstract.cache_stats_name)
item = self.storage.get(key)
self.storage[CacheAbstract.cache_stats_name]['hit_total'] += 1
self.storage.safe_apply(CacheAbstract.cache_stats_name, inc_hit_total,
default_value={'hit_total': 0, 'misses': 0})
if item and f is None:
del self.storage[key]
if f is None:
self.storage.release(CacheAbstract.cache_stats_name)
self.storage.release(key)
return None
now = time.time()
@@ -451,8 +474,11 @@ class CacheOnDisk(CacheAbstract):
else:
value = f()
self.storage[key] = (now, value)
self.storage[CacheAbstract.cache_stats_name]['misses'] += 1
self.storage.safe_apply(CacheAbstract.cache_stats_name, inc_misses,
default_value={'hit_total': 0, 'misses': 0})
self.storage.release(CacheAbstract.cache_stats_name)
self.storage.release(key)
return value
@@ -460,24 +486,24 @@ class CacheOnDisk(CacheAbstract):
self.initialize()
storage = self.storage
if regex is None:
storage.clear()
keys = storage
else:
self._clear(storage, regex)
if not CacheAbstract.cache_stats_name in storage:
storage[CacheAbstract.cache_stats_name] = {
'hit_total': 0, 'misses': 0}
r = re.compile(regex)
keys = (key for key in storage if r.match(key))
for key in keys:
storage.acquire(key)
try:
del storage[key]
except KeyError:
pass
storage.release(key)
def increment(self, key, value=1):
self.initialize()
storage = self.storage
try:
if key in storage:
value = storage[key][1] + value
storage[key] = (time.time(), value)
except:
pass
self.storage.acquire(key)
value = self.storage.safe_apply(key, lambda x: x + value, default_value=0)
self.storage.release(key)
return value