experimental DAL rewrite (work in progress)

This commit is contained in:
mdipierro
2012-09-30 08:37:27 -05:00
parent 9ce1cde6ca
commit 7ec3fe3bfe
2 changed files with 85 additions and 89 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.0.9 (2012-09-30 08:31:56) dev
Version 2.0.9 (2012-09-30 08:37:19) dev
+84 -88
View File
@@ -218,11 +218,11 @@ try:
except ImportError:
have_validators = False
logger = logging.getLogger("web2py.dal")
LOGGER = logging.getLogger("web2py.dal")
DEFAULT = lambda:0
sql_locker = threading.RLock()
thread_local = threading.local()
GLOBAL_LOCKER = threading.RLock()
THREAD_LOCAL = threading.local()
# internal representation of tables with field
# <table>.<field>, tables and fields may only be [a-zA-Z0-9_]
@@ -263,13 +263,13 @@ if not 'google' in DRIVERS:
from pysqlite2 import dbapi2 as sqlite2
DRIVERS.append('SQLite(sqlite2)')
except ImportError:
logger.debug('no SQLite drivers pysqlite2.dbapi2')
LOGGER.debug('no SQLite drivers pysqlite2.dbapi2')
try:
from sqlite3 import dbapi2 as sqlite3
DRIVERS.append('SQLite(sqlite3)')
except ImportError:
logger.debug('no SQLite drivers sqlite3')
LOGGER.debug('no SQLite drivers sqlite3')
try:
# first try contrib driver, then from site-packages (if installed)
@@ -284,13 +284,13 @@ if not 'google' in DRIVERS:
import pymysql
DRIVERS.append('MySQL(pymysql)')
except ImportError:
logger.debug('no MySQL driver pymysql')
LOGGER.debug('no MySQL driver pymysql')
try:
import MySQLdb
DRIVERS.append('MySQL(MySQLdb)')
except ImportError:
logger.debug('no MySQL driver MySQLDB')
LOGGER.debug('no MySQL driver MySQLDB')
try:
@@ -298,7 +298,7 @@ if not 'google' in DRIVERS:
from psycopg2.extensions import adapt as psycopg2_adapt
DRIVERS.append('PostgreSQL(psycopg2)')
except ImportError:
logger.debug('no PostgreSQL driver psycopg2')
LOGGER.debug('no PostgreSQL driver psycopg2')
try:
# first try contrib driver, then from site-packages (if installed)
@@ -308,13 +308,13 @@ if not 'google' in DRIVERS:
import pg8000.dbapi as pg8000
DRIVERS.append('PostgreSQL(pg8000)')
except ImportError:
logger.debug('no PostgreSQL driver pg8000')
LOGGER.debug('no PostgreSQL driver pg8000')
try:
import cx_Oracle
DRIVERS.append('Oracle(cx_Oracle)')
except ImportError:
logger.debug('no Oracle driver cx_Oracle')
LOGGER.debug('no Oracle driver cx_Oracle')
try:
import pyodbc
@@ -322,53 +322,53 @@ if not 'google' in DRIVERS:
DRIVERS.append('DB2(pyodbc)')
DRIVERS.append('Teradata(pyodbc)')
except ImportError:
logger.debug('no MSSQL/DB2/Teradata driver pyodbc')
LOGGER.debug('no MSSQL/DB2/Teradata driver pyodbc')
try:
import Sybase
DRIVERS.append('Sybase(Sybase)')
except ImportError:
logger.debug('no Sybase driver')
LOGGER.debug('no Sybase driver')
try:
import kinterbasdb
DRIVERS.append('Interbase(kinterbasdb)')
DRIVERS.append('Firebird(kinterbasdb)')
except ImportError:
logger.debug('no Firebird/Interbase driver kinterbasdb')
LOGGER.debug('no Firebird/Interbase driver kinterbasdb')
try:
import fdb
DRIVERS.append('Firbird(fdb)')
except ImportError:
logger.debug('no Firebird driver fdb')
LOGGER.debug('no Firebird driver fdb')
#####
try:
import firebirdsql
DRIVERS.append('Firebird(firebirdsql)')
except ImportError:
logger.debug('no Firebird driver firebirdsql')
LOGGER.debug('no Firebird driver firebirdsql')
try:
import informixdb
DRIVERS.append('Informix(informixdb)')
logger.warning('Informix support is experimental')
LOGGER.warning('Informix support is experimental')
except ImportError:
logger.debug('no Informix driver informixdb')
LOGGER.debug('no Informix driver informixdb')
try:
import sapdb
DRIVERS.append('SQL(sapdb)')
logger.warning('SAPDB support is experimental')
LOGGER.warning('SAPDB support is experimental')
except ImportError:
logger.debug('no SAP driver sapdb')
LOGGER.debug('no SAP driver sapdb')
try:
import cubriddb
DRIVERS.append('Cubrid(cubriddb)')
logger.warning('Cubrid support is experimental')
LOGGER.warning('Cubrid support is experimental')
except ImportError:
logger.debug('no Cubrid driver cubriddb')
LOGGER.debug('no Cubrid driver cubriddb')
try:
from com.ziclix.python.sql import zxJDBC
@@ -378,36 +378,36 @@ if not 'google' in DRIVERS:
zxJDBC_sqlite = java.sql.DriverManager
DRIVERS.append('PostgreSQL(zxJDBC)')
DRIVERS.append('SQLite(zxJDBC)')
logger.warning('zxJDBC support is experimental')
LOGGER.warning('zxJDBC support is experimental')
is_jdbc = True
except ImportError:
logger.debug('no SQLite/PostgreSQL driver zxJDBC')
LOGGER.debug('no SQLite/PostgreSQL driver zxJDBC')
is_jdbc = False
try:
import ingresdbi
DRIVERS.append('Ingres(ingresdbi)')
except ImportError:
logger.debug('no Ingres driver ingresdbi')
LOGGER.debug('no Ingres driver ingresdbi')
# NOTE could try JDBC.......
try:
import couchdb
DRIVERS.append('CouchDB(couchdb)')
except ImportError:
logger.debug('no Couchdb driver couchdb')
LOGGER.debug('no Couchdb driver couchdb')
try:
import pymongo
DRIVERS.append('MongoDB(pymongo)')
except:
logger.debug('no MongoDB driver pymongo')
LOGGER.debug('no MongoDB driver pymongo')
try:
import imaplib
DRIVERS.append('IMAP(imaplib)')
except:
logger.debug('no IMAP driver imaplib')
LOGGER.debug('no IMAP driver imaplib')
PLURALIZE_RULES = [
(re.compile('child$'), re.compile('child$'), 'children'),
@@ -493,42 +493,48 @@ class ConnectionPool(object):
@staticmethod
def set_folder(folder):
thread_local.folder = folder
THREAD_LOCAL.folder = folder
# ## this allows gluon to commit/rollback all dbs in this thread
@staticmethod
def recycle_connection(adapter,action):
if action:
if callable(action):
action(adapter)
else:
getattr(adapter, action)()
# ## if you want pools, recycle this connection
really = True
if adapter.pool_size:
GLOBAL_LOCKER.acquire()
pool = ConnectionPool.pools[adapter.uri]
if len(pool) < adapter.pool_size:
pool.append(adapter.connection)
really = False
GLOBAL_LOCKER.release()
if really:
getattr(adapter, 'close')()
@staticmethod
def close_all_instances(action):
""" to close cleanly databases in a multithreaded environment """
if hasattr(thread_local, 'instances'):
while thread_local.instances:
instance = thread_local.instances.pop()
if action:
if callable(action):
action(instance)
else:
getattr(instance, action)()
# ## if you want pools, recycle this connection
really = True
if instance.pool_size:
sql_locker.acquire()
pool = ConnectionPool.pools[instance.uri]
if len(pool) < instance.pool_size:
pool.append(instance.connection)
really = False
sql_locker.release()
if really:
getattr(instance, 'close')()
if instance.db._singleton_code in thread_local.db_instances:
del thread_local.db_instances[instance.db._singleton_code]
dbs = getattr(THREAD_LOCAL,'db_instances',{}).items()
for singleton_code, db in dbs:
try:
adapter = db._adapter
except AttributeError:
pass
else:
ConnectionPool.recycle_connection(adapter,action)
del THREAD_LOCAL.db_instances[singleton_code]
if callable(action):
action(None)
return
def find_or_make_work_folder(self):
""" this actually does not make the folder. it has to be there """
self.folder = getattr(thread_local,'folder','')
self.folder = getattr(THREAD_LOCAL,'folder','')
# Creating the folder if it does not exist
if False and self.folder and not exists(self.folder):
@@ -540,7 +546,7 @@ class ConnectionPool(object):
def reconnect(self):
""" allows a thread to re-connect to server or re-pool """
self.close_all_instances(False)
ConnectionPool.recycle_connection(self,False) ### WHY?
self.pool_connection(self._connection_function)
self.after_connection()
@@ -560,12 +566,12 @@ class ConnectionPool(object):
else:
uri = self.uri
while True:
sql_locker.acquire()
GLOBAL_LOCKER.acquire()
if not uri in pools:
pools[uri] = []
if pools[uri]:
self.connection = pools[uri].pop()
sql_locker.release()
GLOBAL_LOCKER.release()
self.cursor = cursor and self.connection.cursor()
try:
if self.cursor and self.check_active_connection:
@@ -574,14 +580,10 @@ class ConnectionPool(object):
except:
pass
else:
sql_locker.release()
GLOBAL_LOCKER.release()
self.connection = f()
self.cursor = cursor and self.connection.cursor()
break
if not hasattr(thread_local,'instances'):
thread_local.instances = []
thread_local.instances.append(self)
###################################################################################
# this is a generic adapter that does nothing; all others are derived from this one
@@ -1663,7 +1665,7 @@ class BaseAdapter(ConnectionPool):
def log_execute(self, *a, **b):
command = a[0]
if self.db._debug:
logger.debug('SQL: %s' % command)
LOGGER.debug('SQL: %s' % command)
self.db._lastsql = command
t0 = time.time()
ret = self.cursor.execute(*a, **b)
@@ -2983,7 +2985,7 @@ class MSSQLAdapter(BaseAdapter):
if not dsn:
raise SyntaxError, 'DSN required'
except SyntaxError, e:
logger.error('NdGpatch error')
LOGGER.error('NdGpatch error')
raise e
# was cnxn = 'DSN=%s' % dsn
cnxn = dsn
@@ -3181,7 +3183,7 @@ class SybaseAdapter(MSSQLAdapter):
if not dsn:
raise SyntaxError, 'DSN required'
except SyntaxError, e:
logger.error('NdGpatch error')
LOGGER.error('NdGpatch error')
raise e
else:
m = self.REGEX_URI.match(uri)
@@ -4015,7 +4017,7 @@ class GoogleSQLAdapter(UseDatabaseStoredFile,MySQLAdapter):
self.uri = uri
self.pool_size = pool_size
self.db_codec = db_codec
self.folder = folder or pjoin('$HOME',thread_local.folder.split(
self.folder = folder or pjoin('$HOME',THREAD_LOCAL.folder.split(
os.sep+'applications'+os.sep,1)[1])
ruri = uri.split("://")[1]
m = self.REGEX_URI.match(ruri)
@@ -4585,7 +4587,7 @@ class GoogleDatastoreAdapter(NoSQLAdapter):
setattr(item, field.name, self.represent(value,field.type))
item.put()
counter += 1
logger.info(str(counter))
LOGGER.info(str(counter))
return counter
def insert(self,table,fields):
@@ -5531,12 +5533,12 @@ class IMAPAdapter(NoSQLAdapter):
else:
uri = self.uri
while True:
sql_locker.acquire()
GLOBAL_LOCKER.acquire()
if not uri in pools:
pools[uri] = []
if pools[uri]:
self.connection = pools[uri].pop()
sql_locker.release()
GLOBAL_LOCKER.release()
self.cursor = cursor and self.connection.cursor()
if self.cursor and self.check_active_connection:
try:
@@ -5548,15 +5550,11 @@ class IMAPAdapter(NoSQLAdapter):
self.connection = f()
break
else:
sql_locker.release()
GLOBAL_LOCKER.release()
self.connection = f()
self.cursor = cursor and self.connection.cursor()
break
if not hasattr(thread_local,'instances'):
thread_local.instances = []
thread_local.instances.append(self)
def get_last_message(self, tablename):
last_message = None
# request mailbox list to the server
@@ -5567,7 +5565,7 @@ class IMAPAdapter(NoSQLAdapter):
result = self.connection.select(self.connection.mailbox_names[tablename])
last_message = int(result[1][0])
except (IndexError, ValueError, TypeError, KeyError), e:
logger.debug("Error retrieving the last mailbox sequence number. %s" % str(e))
LOGGER.debug("Error retrieving the last mailbox sequence number. %s" % str(e))
return last_message
def get_uid_bounds(self, tablename):
@@ -5599,7 +5597,7 @@ class IMAPAdapter(NoSQLAdapter):
try:
dayname, datestring = date.split(",")
except (ValueError):
logger.debug("Could not parse date text: %s" % date)
LOGGER.debug("Could not parse date text: %s" % date)
return None
date_list = datestring.strip().split()
year = int(date_list[2])
@@ -5729,7 +5727,7 @@ class IMAPAdapter(NoSQLAdapter):
def create_table(self, *args, **kwargs):
# not implemented
logger.debug("Create table feature is not implemented for %s" % type(self))
LOGGER.debug("Create table feature is not implemented for %s" % type(self))
def _select(self,query,fields,attributes):
""" Search and Fetch records and return web2py
@@ -6057,7 +6055,7 @@ class IMAPAdapter(NoSQLAdapter):
try:
pedestal, threshold = self.get_uid_bounds(first.tablename)
except TypeError, e:
logger.debug("Error requesting uid bounds: %s", str(e))
LOGGER.debug("Error requesting uid bounds: %s", str(e))
return ""
try:
lower_limit = int(self.expand(second)) + 1
@@ -6085,7 +6083,7 @@ class IMAPAdapter(NoSQLAdapter):
try:
pedestal, threshold = self.get_uid_bounds(first.tablename)
except TypeError, e:
logger.debug("Error requesting uid bounds: %s", str(e))
LOGGER.debug("Error requesting uid bounds: %s", str(e))
return ""
lower_limit = self.expand(second)
result = "UID %s:%s" % (lower_limit, threshold)
@@ -6104,7 +6102,7 @@ class IMAPAdapter(NoSQLAdapter):
try:
pedestal, threshold = self.get_uid_bounds(first.tablename)
except TypeError, e:
logger.debug("Error requesting uid bounds: %s", str(e))
LOGGER.debug("Error requesting uid bounds: %s", str(e))
return ""
try:
upper_limit = int(self.expand(second)) - 1
@@ -6128,7 +6126,7 @@ class IMAPAdapter(NoSQLAdapter):
try:
pedestal, threshold = self.get_uid_bounds(first.tablename)
except TypeError, e:
logger.debug("Error requesting uid bounds: %s", str(e))
LOGGER.debug("Error requesting uid bounds: %s", str(e))
return ""
upper_limit = int(self.expand(second))
result = "UID %s:%s" % (pedestal, upper_limit)
@@ -6572,19 +6570,19 @@ class DAL(object):
"""
def __new__(cls, uri='sqlite://dummy.db', *args, **kwargs):
if not hasattr(thread_local,'db_instances'):
thread_local.db_instances = {}
if not hasattr(THREAD_LOCAL,'db_instances'):
THREAD_LOCAL.db_instances = {}
if 'singleton_code' in kwargs:
singleton_code = kwargs['singleton_code']
del kwargs['singleton_code']
singleton_code = hashlib.md5(repr(uri)).hexdigest()
try:
db = thread_local.db_instances[singleton_code]
db = THREAD_LOCAL.db_instances[singleton_code]
if args or kwargs:
raise RuntimeError, 'Cannot duplicate a Singleton'
except KeyError:
db = super(DAL, cls).__new__(cls, uri, *args, **kwargs)
thread_local.db_instances[singleton_code] = db
THREAD_LOCAL.db_instances[singleton_code] = db
db._singleton_code = singleton_code
return db
@@ -7062,12 +7060,12 @@ def index():
args_get('fake_migrate',self._fake_migrate)
polymodel = args_get('polymodel',None)
try:
sql_locker.acquire()
GLOBAL_LOCKER.acquire()
self._adapter.create_table(table,migrate=migrate,
fake_migrate=fake_migrate,
polymodel=polymodel)
finally:
sql_locker.release()
GLOBAL_LOCKER.release()
else:
table._dbt = None
on_define = args_get('on_define',None)
@@ -7132,10 +7130,8 @@ def index():
def close(self):
adapter = self._adapter
if adapter in thread_local.instances:
thread_local.instances.remove(adapter)
if self._singleton_code in thread_local.db_instances:
del thread_local.db_instances[self._singleton_code]
if self._singleton_code in THREAD_LOCAL.db_instances:
del THREAD_LOCAL.db_instances[self._singleton_code]
adapter.close()
def executesql(self, query, placeholders=None, as_dict=False,