From 7ec3fe3bfe9cc6c85b9b2fa1b4532426acaa11ef Mon Sep 17 00:00:00 2001 From: mdipierro Date: Sun, 30 Sep 2012 08:37:27 -0500 Subject: [PATCH] experimental DAL rewrite (work in progress) --- VERSION | 2 +- gluon/dal.py | 172 +++++++++++++++++++++++++-------------------------- 2 files changed, 85 insertions(+), 89 deletions(-) diff --git a/VERSION b/VERSION index b55d9ad4..82e36f39 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.0.9 (2012-09-30 08:31:56) dev +Version 2.0.9 (2012-09-30 08:37:19) dev diff --git a/gluon/dal.py b/gluon/dal.py index e46a0fc0..39137bc0 100644 --- a/gluon/dal.py +++ b/gluon/dal.py @@ -218,11 +218,11 @@ try: except ImportError: have_validators = False -logger = logging.getLogger("web2py.dal") +LOGGER = logging.getLogger("web2py.dal") DEFAULT = lambda:0 -sql_locker = threading.RLock() -thread_local = threading.local() +GLOBAL_LOCKER = threading.RLock() +THREAD_LOCAL = threading.local() # internal representation of tables with field # ., tables and fields may only be [a-zA-Z0-9_] @@ -263,13 +263,13 @@ if not 'google' in DRIVERS: from pysqlite2 import dbapi2 as sqlite2 DRIVERS.append('SQLite(sqlite2)') except ImportError: - logger.debug('no SQLite drivers pysqlite2.dbapi2') + LOGGER.debug('no SQLite drivers pysqlite2.dbapi2') try: from sqlite3 import dbapi2 as sqlite3 DRIVERS.append('SQLite(sqlite3)') except ImportError: - logger.debug('no SQLite drivers sqlite3') + LOGGER.debug('no SQLite drivers sqlite3') try: # first try contrib driver, then from site-packages (if installed) @@ -284,13 +284,13 @@ if not 'google' in DRIVERS: import pymysql DRIVERS.append('MySQL(pymysql)') except ImportError: - logger.debug('no MySQL driver pymysql') + LOGGER.debug('no MySQL driver pymysql') try: import MySQLdb DRIVERS.append('MySQL(MySQLdb)') except ImportError: - logger.debug('no MySQL driver MySQLDB') + LOGGER.debug('no MySQL driver MySQLDB') try: @@ -298,7 +298,7 @@ if not 'google' in DRIVERS: from psycopg2.extensions import adapt as psycopg2_adapt DRIVERS.append('PostgreSQL(psycopg2)') except ImportError: - logger.debug('no PostgreSQL driver psycopg2') + LOGGER.debug('no PostgreSQL driver psycopg2') try: # first try contrib driver, then from site-packages (if installed) @@ -308,13 +308,13 @@ if not 'google' in DRIVERS: import pg8000.dbapi as pg8000 DRIVERS.append('PostgreSQL(pg8000)') except ImportError: - logger.debug('no PostgreSQL driver pg8000') + LOGGER.debug('no PostgreSQL driver pg8000') try: import cx_Oracle DRIVERS.append('Oracle(cx_Oracle)') except ImportError: - logger.debug('no Oracle driver cx_Oracle') + LOGGER.debug('no Oracle driver cx_Oracle') try: import pyodbc @@ -322,53 +322,53 @@ if not 'google' in DRIVERS: DRIVERS.append('DB2(pyodbc)') DRIVERS.append('Teradata(pyodbc)') except ImportError: - logger.debug('no MSSQL/DB2/Teradata driver pyodbc') + LOGGER.debug('no MSSQL/DB2/Teradata driver pyodbc') try: import Sybase DRIVERS.append('Sybase(Sybase)') except ImportError: - logger.debug('no Sybase driver') + LOGGER.debug('no Sybase driver') try: import kinterbasdb DRIVERS.append('Interbase(kinterbasdb)') DRIVERS.append('Firebird(kinterbasdb)') except ImportError: - logger.debug('no Firebird/Interbase driver kinterbasdb') + LOGGER.debug('no Firebird/Interbase driver kinterbasdb') try: import fdb DRIVERS.append('Firbird(fdb)') except ImportError: - logger.debug('no Firebird driver fdb') + LOGGER.debug('no Firebird driver fdb') ##### try: import firebirdsql DRIVERS.append('Firebird(firebirdsql)') except ImportError: - logger.debug('no Firebird driver firebirdsql') + LOGGER.debug('no Firebird driver firebirdsql') try: import informixdb DRIVERS.append('Informix(informixdb)') - logger.warning('Informix support is experimental') + LOGGER.warning('Informix support is experimental') except ImportError: - logger.debug('no Informix driver informixdb') + LOGGER.debug('no Informix driver informixdb') try: import sapdb DRIVERS.append('SQL(sapdb)') - logger.warning('SAPDB support is experimental') + LOGGER.warning('SAPDB support is experimental') except ImportError: - logger.debug('no SAP driver sapdb') + LOGGER.debug('no SAP driver sapdb') try: import cubriddb DRIVERS.append('Cubrid(cubriddb)') - logger.warning('Cubrid support is experimental') + LOGGER.warning('Cubrid support is experimental') except ImportError: - logger.debug('no Cubrid driver cubriddb') + LOGGER.debug('no Cubrid driver cubriddb') try: from com.ziclix.python.sql import zxJDBC @@ -378,36 +378,36 @@ if not 'google' in DRIVERS: zxJDBC_sqlite = java.sql.DriverManager DRIVERS.append('PostgreSQL(zxJDBC)') DRIVERS.append('SQLite(zxJDBC)') - logger.warning('zxJDBC support is experimental') + LOGGER.warning('zxJDBC support is experimental') is_jdbc = True except ImportError: - logger.debug('no SQLite/PostgreSQL driver zxJDBC') + LOGGER.debug('no SQLite/PostgreSQL driver zxJDBC') is_jdbc = False try: import ingresdbi DRIVERS.append('Ingres(ingresdbi)') except ImportError: - logger.debug('no Ingres driver ingresdbi') + LOGGER.debug('no Ingres driver ingresdbi') # NOTE could try JDBC....... try: import couchdb DRIVERS.append('CouchDB(couchdb)') except ImportError: - logger.debug('no Couchdb driver couchdb') + LOGGER.debug('no Couchdb driver couchdb') try: import pymongo DRIVERS.append('MongoDB(pymongo)') except: - logger.debug('no MongoDB driver pymongo') + LOGGER.debug('no MongoDB driver pymongo') try: import imaplib DRIVERS.append('IMAP(imaplib)') except: - logger.debug('no IMAP driver imaplib') + LOGGER.debug('no IMAP driver imaplib') PLURALIZE_RULES = [ (re.compile('child$'), re.compile('child$'), 'children'), @@ -493,42 +493,48 @@ class ConnectionPool(object): @staticmethod def set_folder(folder): - thread_local.folder = folder + THREAD_LOCAL.folder = folder # ## this allows gluon to commit/rollback all dbs in this thread + @staticmethod + def recycle_connection(adapter,action): + if action: + if callable(action): + action(adapter) + else: + getattr(adapter, action)() + # ## if you want pools, recycle this connection + really = True + if adapter.pool_size: + GLOBAL_LOCKER.acquire() + pool = ConnectionPool.pools[adapter.uri] + if len(pool) < adapter.pool_size: + pool.append(adapter.connection) + really = False + GLOBAL_LOCKER.release() + if really: + getattr(adapter, 'close')() + @staticmethod def close_all_instances(action): """ to close cleanly databases in a multithreaded environment """ - if hasattr(thread_local, 'instances'): - while thread_local.instances: - instance = thread_local.instances.pop() - if action: - if callable(action): - action(instance) - else: - getattr(instance, action)() - # ## if you want pools, recycle this connection - really = True - if instance.pool_size: - sql_locker.acquire() - pool = ConnectionPool.pools[instance.uri] - if len(pool) < instance.pool_size: - pool.append(instance.connection) - really = False - sql_locker.release() - if really: - getattr(instance, 'close')() - if instance.db._singleton_code in thread_local.db_instances: - del thread_local.db_instances[instance.db._singleton_code] - + dbs = getattr(THREAD_LOCAL,'db_instances',{}).items() + for singleton_code, db in dbs: + try: + adapter = db._adapter + except AttributeError: + pass + else: + ConnectionPool.recycle_connection(adapter,action) + del THREAD_LOCAL.db_instances[singleton_code] if callable(action): action(None) return def find_or_make_work_folder(self): """ this actually does not make the folder. it has to be there """ - self.folder = getattr(thread_local,'folder','') + self.folder = getattr(THREAD_LOCAL,'folder','') # Creating the folder if it does not exist if False and self.folder and not exists(self.folder): @@ -540,7 +546,7 @@ class ConnectionPool(object): def reconnect(self): """ allows a thread to re-connect to server or re-pool """ - self.close_all_instances(False) + ConnectionPool.recycle_connection(self,False) ### WHY? self.pool_connection(self._connection_function) self.after_connection() @@ -560,12 +566,12 @@ class ConnectionPool(object): else: uri = self.uri while True: - sql_locker.acquire() + GLOBAL_LOCKER.acquire() if not uri in pools: pools[uri] = [] if pools[uri]: self.connection = pools[uri].pop() - sql_locker.release() + GLOBAL_LOCKER.release() self.cursor = cursor and self.connection.cursor() try: if self.cursor and self.check_active_connection: @@ -574,14 +580,10 @@ class ConnectionPool(object): except: pass else: - sql_locker.release() + GLOBAL_LOCKER.release() self.connection = f() self.cursor = cursor and self.connection.cursor() break - if not hasattr(thread_local,'instances'): - thread_local.instances = [] - thread_local.instances.append(self) - ################################################################################### # this is a generic adapter that does nothing; all others are derived from this one @@ -1663,7 +1665,7 @@ class BaseAdapter(ConnectionPool): def log_execute(self, *a, **b): command = a[0] if self.db._debug: - logger.debug('SQL: %s' % command) + LOGGER.debug('SQL: %s' % command) self.db._lastsql = command t0 = time.time() ret = self.cursor.execute(*a, **b) @@ -2983,7 +2985,7 @@ class MSSQLAdapter(BaseAdapter): if not dsn: raise SyntaxError, 'DSN required' except SyntaxError, e: - logger.error('NdGpatch error') + LOGGER.error('NdGpatch error') raise e # was cnxn = 'DSN=%s' % dsn cnxn = dsn @@ -3181,7 +3183,7 @@ class SybaseAdapter(MSSQLAdapter): if not dsn: raise SyntaxError, 'DSN required' except SyntaxError, e: - logger.error('NdGpatch error') + LOGGER.error('NdGpatch error') raise e else: m = self.REGEX_URI.match(uri) @@ -4015,7 +4017,7 @@ class GoogleSQLAdapter(UseDatabaseStoredFile,MySQLAdapter): self.uri = uri self.pool_size = pool_size self.db_codec = db_codec - self.folder = folder or pjoin('$HOME',thread_local.folder.split( + self.folder = folder or pjoin('$HOME',THREAD_LOCAL.folder.split( os.sep+'applications'+os.sep,1)[1]) ruri = uri.split("://")[1] m = self.REGEX_URI.match(ruri) @@ -4585,7 +4587,7 @@ class GoogleDatastoreAdapter(NoSQLAdapter): setattr(item, field.name, self.represent(value,field.type)) item.put() counter += 1 - logger.info(str(counter)) + LOGGER.info(str(counter)) return counter def insert(self,table,fields): @@ -5531,12 +5533,12 @@ class IMAPAdapter(NoSQLAdapter): else: uri = self.uri while True: - sql_locker.acquire() + GLOBAL_LOCKER.acquire() if not uri in pools: pools[uri] = [] if pools[uri]: self.connection = pools[uri].pop() - sql_locker.release() + GLOBAL_LOCKER.release() self.cursor = cursor and self.connection.cursor() if self.cursor and self.check_active_connection: try: @@ -5548,15 +5550,11 @@ class IMAPAdapter(NoSQLAdapter): self.connection = f() break else: - sql_locker.release() + GLOBAL_LOCKER.release() self.connection = f() self.cursor = cursor and self.connection.cursor() break - if not hasattr(thread_local,'instances'): - thread_local.instances = [] - thread_local.instances.append(self) - def get_last_message(self, tablename): last_message = None # request mailbox list to the server @@ -5567,7 +5565,7 @@ class IMAPAdapter(NoSQLAdapter): result = self.connection.select(self.connection.mailbox_names[tablename]) last_message = int(result[1][0]) except (IndexError, ValueError, TypeError, KeyError), e: - logger.debug("Error retrieving the last mailbox sequence number. %s" % str(e)) + LOGGER.debug("Error retrieving the last mailbox sequence number. %s" % str(e)) return last_message def get_uid_bounds(self, tablename): @@ -5599,7 +5597,7 @@ class IMAPAdapter(NoSQLAdapter): try: dayname, datestring = date.split(",") except (ValueError): - logger.debug("Could not parse date text: %s" % date) + LOGGER.debug("Could not parse date text: %s" % date) return None date_list = datestring.strip().split() year = int(date_list[2]) @@ -5729,7 +5727,7 @@ class IMAPAdapter(NoSQLAdapter): def create_table(self, *args, **kwargs): # not implemented - logger.debug("Create table feature is not implemented for %s" % type(self)) + LOGGER.debug("Create table feature is not implemented for %s" % type(self)) def _select(self,query,fields,attributes): """ Search and Fetch records and return web2py @@ -6057,7 +6055,7 @@ class IMAPAdapter(NoSQLAdapter): try: pedestal, threshold = self.get_uid_bounds(first.tablename) except TypeError, e: - logger.debug("Error requesting uid bounds: %s", str(e)) + LOGGER.debug("Error requesting uid bounds: %s", str(e)) return "" try: lower_limit = int(self.expand(second)) + 1 @@ -6085,7 +6083,7 @@ class IMAPAdapter(NoSQLAdapter): try: pedestal, threshold = self.get_uid_bounds(first.tablename) except TypeError, e: - logger.debug("Error requesting uid bounds: %s", str(e)) + LOGGER.debug("Error requesting uid bounds: %s", str(e)) return "" lower_limit = self.expand(second) result = "UID %s:%s" % (lower_limit, threshold) @@ -6104,7 +6102,7 @@ class IMAPAdapter(NoSQLAdapter): try: pedestal, threshold = self.get_uid_bounds(first.tablename) except TypeError, e: - logger.debug("Error requesting uid bounds: %s", str(e)) + LOGGER.debug("Error requesting uid bounds: %s", str(e)) return "" try: upper_limit = int(self.expand(second)) - 1 @@ -6128,7 +6126,7 @@ class IMAPAdapter(NoSQLAdapter): try: pedestal, threshold = self.get_uid_bounds(first.tablename) except TypeError, e: - logger.debug("Error requesting uid bounds: %s", str(e)) + LOGGER.debug("Error requesting uid bounds: %s", str(e)) return "" upper_limit = int(self.expand(second)) result = "UID %s:%s" % (pedestal, upper_limit) @@ -6572,19 +6570,19 @@ class DAL(object): """ def __new__(cls, uri='sqlite://dummy.db', *args, **kwargs): - if not hasattr(thread_local,'db_instances'): - thread_local.db_instances = {} + if not hasattr(THREAD_LOCAL,'db_instances'): + THREAD_LOCAL.db_instances = {} if 'singleton_code' in kwargs: singleton_code = kwargs['singleton_code'] del kwargs['singleton_code'] singleton_code = hashlib.md5(repr(uri)).hexdigest() try: - db = thread_local.db_instances[singleton_code] + db = THREAD_LOCAL.db_instances[singleton_code] if args or kwargs: raise RuntimeError, 'Cannot duplicate a Singleton' except KeyError: db = super(DAL, cls).__new__(cls, uri, *args, **kwargs) - thread_local.db_instances[singleton_code] = db + THREAD_LOCAL.db_instances[singleton_code] = db db._singleton_code = singleton_code return db @@ -7062,12 +7060,12 @@ def index(): args_get('fake_migrate',self._fake_migrate) polymodel = args_get('polymodel',None) try: - sql_locker.acquire() + GLOBAL_LOCKER.acquire() self._adapter.create_table(table,migrate=migrate, fake_migrate=fake_migrate, polymodel=polymodel) finally: - sql_locker.release() + GLOBAL_LOCKER.release() else: table._dbt = None on_define = args_get('on_define',None) @@ -7132,10 +7130,8 @@ def index(): def close(self): adapter = self._adapter - if adapter in thread_local.instances: - thread_local.instances.remove(adapter) - if self._singleton_code in thread_local.db_instances: - del thread_local.db_instances[self._singleton_code] + if self._singleton_code in THREAD_LOCAL.db_instances: + del THREAD_LOCAL.db_instances[self._singleton_code] adapter.close() def executesql(self, query, placeholders=None, as_dict=False,