diff --git a/gluon/cache.py b/gluon/cache.py index 2f030297..e5586d83 100644 --- a/gluon/cache.py +++ b/gluon/cache.py @@ -30,8 +30,10 @@ import hashlib import datetime import tempfile from gluon import recfile +from gluon import portalocker +from collections import defaultdict try: - from collections import OrderdDict + from collections import OrderedDict except ImportError: from gluon.contrib.ordereddict import OrderedDict try: @@ -282,49 +284,17 @@ class CacheOnDisk(CacheAbstract): class PersistentStorage(object): """ - Implements a key based storage in disk. + Implements a key based thread/process-safe safe storage in disk. """ - def __init__(self, folder): + def __init__(self, folder, file_lock_time_wait=0.1): self.folder = folder self.key_filter_in = lambda key: key self.key_filter_out = lambda key: key - # Check the best way to do atomic file replacement. - if sys.version_info >= (3, 3): - self.replace = os.replace - elif sys.platform == "win32": - import ctypes - from ctypes import wintypes - ReplaceFile = ctypes.windll.kernel32.ReplaceFileW - ReplaceFile.restype = wintypes.BOOL - ReplaceFile.argtypes = [ - wintypes.LPWSTR, - wintypes.LPWSTR, - wintypes.LPWSTR, - wintypes.DWORD, - wintypes.LPVOID, - wintypes.LPVOID, - ] + self.file_lock_time_wait = file_lock_time_wait # How long we should wait before retrying to lock a file held by another process + # We still need a mutex for each file as portalocker only blocks other processes + self.file_locks = defaultdict(thread.allocate_lock) - def replace_windows(src, dst): - """ - The Windows filesystem has a 256 character limit for the filename. - To use filenames longer than that, the '\\?\' prefix needs to be used. - By default, this prefix is added to all windows filenames, - when accessing it. - View this for details: http://stackoverflow.com/a/23230380/348142 - """ - windows_prefix = "\\\\?\\" - dst = windows_prefix + dst - src = windows_prefix + src - - if not ReplaceFile(dst, src, None, 0, 0, 0): - os.rename(src, dst) - - self.replace = replace_windows - else: - # POSIX rename() is always atomic - self.replace = os.rename # Make sure we use valid filenames. if sys.platform == "win32": @@ -346,37 +316,56 @@ class CacheOnDisk(CacheAbstract): self.key_filter_out = key_filter_out_windows + def wait_portalock(self, val_file): + """ + Wait for the process file lock. + """ + while True: + try: + portalocker.lock(val_file, portalocker.LOCK_EX) + break + except: + time.sleep(self.file_lock_time_wait) + + def __setitem__(self, key, value): - tmp_name, tmp_path = tempfile.mkstemp(dir=self.folder) - tmp = os.fdopen(tmp_name, 'wb') - try: - pickle.dump((time.time(), value), tmp, pickle.HIGHEST_PROTOCOL) - finally: - tmp.close() key = self.key_filter_in(key) - fullfilename = os.path.join(self.folder, recfile.generate(key)) - if not os.path.exists(os.path.dirname(fullfilename)): - os.makedirs(os.path.dirname(fullfilename)) - self.replace(tmp_path, fullfilename) + self.file_locks[key].acquire() + val_file = recfile.open(key, mode='wb', path=self.folder) + self.wait_portalock(val_file) + pickle.dump(value, val_file, pickle.HIGHEST_PROTOCOL) + val_file.close() + portalocker.unlock(val_file) + self.file_locks[key].release() def __getitem__(self, key): key = self.key_filter_in(key) - if recfile.exists(key, path=self.folder): - timestamp, value = pickle.load(recfile.open(key, 'rb', path=self.folder)) - return value - else: + self.file_locks[key].acquire() + try: + val_file = recfile.open(key, mode='rb', path=self.folder) + except IOError: + self.file_locks[key].release() raise KeyError + self.wait_portalock(val_file) + value = pickle.load(recfile.open(key, 'rb', path=self.folder)) + val_file.close() + portalocker.unlock(val_file) + self.file_locks[key].release() + return value + def __contains__(self, key): key = self.key_filter_in(key) - return recfile.exists(key, path=self.folder) + return (key in self.file_locks) or recfile.exists(key, path=self.folder) def __delitem__(self, key): key = self.key_filter_in(key) + self.file_locks[key].acquire() recfile.remove(key, path=self.folder) + del self.file_locks[key] def __iter__(self): @@ -385,6 +374,36 @@ class CacheOnDisk(CacheAbstract): yield self.key_filter_out(filename) + def safe_apply(self, key, function, default_value=None): + """ + Safely apply a function to the value of a key in storage and set + the return value of the function to it. + + Return the result of applying the function. + """ + key = self.key_filter_in(key) + self.file_locks[key].acquire() + exists = True + try: + val_file = recfile.open(key, mode='r+b', path=self.folder) + except IOError: + exists = False + val_file = recfile.open(key, mode='wb', path=self.folder) + self.wait_portalock(val_file) + if exists: + timestamp, value = pickle.load(val_file) + else: + value = default_value + new_value = function(value) + val_file.seek(0) + pickle.dump((time.time(), new_value), val_file, pickle.HIGHEST_PROTOCOL) + val_file.truncate() + val_file.close() + portalocker.unlock(val_file) + self.file_locks[key].release() + return new_value + + def keys(self): return list(self.__iter__()) @@ -395,10 +414,12 @@ class CacheOnDisk(CacheAbstract): except KeyError: return default - def clear(self): for key in self: - del self[key] + try: + del self[key] + except KeyError: + pass def __init__(self, request=None, folder=None): @@ -426,17 +447,23 @@ class CacheOnDisk(CacheAbstract): self.storage = CacheOnDisk.PersistentStorage(folder) - if not CacheAbstract.cache_stats_name in self.storage: - self.storage[CacheAbstract.cache_stats_name] = {'hit_total': 0, 'misses': 0} - def __call__(self, key, f, time_expire=DEFAULT_TIME_EXPIRE): self.initialize() + def inc_hit_total(v): + v['hit_total'] += 1 + return v + + def inc_misses(v): + v['misses'] += 1 + return v + dt = time_expire item = self.storage.get(key) - self.storage[CacheAbstract.cache_stats_name]['hit_total'] += 1 + self.storage.safe_apply(CacheAbstract.cache_stats_name, inc_hit_total, + default_value={'hit_total': 0, 'misses': 0}) if item and f is None: del self.storage[key] @@ -451,7 +478,8 @@ class CacheOnDisk(CacheAbstract): else: value = f() self.storage[key] = (now, value) - self.storage[CacheAbstract.cache_stats_name]['misses'] += 1 + self.storage.safe_apply(CacheAbstract.cache_stats_name, inc_misses, + default_value={'hit_total': 0, 'misses': 0}) return value @@ -464,21 +492,10 @@ class CacheOnDisk(CacheAbstract): else: self._clear(storage, regex) - if not CacheAbstract.cache_stats_name in storage: - storage[CacheAbstract.cache_stats_name] = { - 'hit_total': 0, 'misses': 0} - def increment(self, key, value=1): self.initialize() - storage = self.storage - try: - if key in storage: - value = storage[key][1] + value - storage[key] = (time.time(), value) - except: - pass - return value + return self.storage.safe_apply(key, lambda x: x + value, default_value=0)