Merge pull request #365 from michele-comitini/entity_quoting_parameter

added entity_quoting (False by default) parameter to DAL
This commit is contained in:
mdipierro
2014-01-28 13:42:37 -08:00
2 changed files with 54 additions and 44 deletions
+30 -11
View File
@@ -675,15 +675,31 @@ class ConnectionPool(object):
# metaclass to prepare adapter classes static values
###################################################################################
class AdapterMeta(type):
def __new__(cls, clsname, bases, dct):
classobj = super(AdapterMeta, cls).__new__(cls, clsname, bases, dct)
classobj.REGEX_TABLE_DOT_FIELD = re.compile(r'^' + \
classobj.QUOTE_TEMPLATE % REGEX_NO_GREEDY_ENTITY_NAME + \
r'\.' + \
classobj.QUOTE_TEMPLATE % REGEX_NO_GREEDY_ENTITY_NAME + \
r'$')
return classobj
"""Metaclass to support manipulation of adapter classes.
At the moment is used to intercept entity_quoting argument passed to DAL.
"""
def __call__(cls, *args, **kwargs):
entity_quoting = kwargs.get('entity_quoting', False)
if 'entity_quoting' in kwargs:
del kwargs['entity_quoting']
obj = super(AdapterMeta, cls).__call__(*args, **kwargs)
if not entity_quoting:
quot = obj.QUOTE_TEMPLATE = '%s'
regex_ent = r'(\w+)'
else:
quot = obj.QUOTE_TEMPLATE
regex_ent = REGEX_NO_GREEDY_ENTITY_NAME
obj.REGEX_TABLE_DOT_FIELD = re.compile(r'^' + \
quot % regex_ent + \
r'\.' + \
quot % regex_ent + \
r'$')
return obj
###################################################################################
# this is a generic adapter that does nothing; all others are derived from this one
###################################################################################
@@ -7709,7 +7725,8 @@ class DAL(object):
adapter_args=None, attempts=5, auto_import=False,
bigint_id=False, debug=False, lazy_tables=False,
db_uid=None, do_connect=True,
after_connection=None, tables=None, ignore_field_case=True):
after_connection=None, tables=None, ignore_field_case=True,
entity_quoting=False):
"""
Creates a new Database Abstraction Layer instance.
@@ -7820,7 +7837,8 @@ class DAL(object):
driver_args=driver_args or {},
adapter_args=adapter_args or {},
do_connect=do_connect,
after_connection=after_connection)
after_connection=after_connection,
entity_quoting=entity_quoting)
self._adapter = ADAPTERS[self._dbname](**kwargs)
types = ADAPTERS[self._dbname].types
# copy so multiple DAL() possible
@@ -7847,7 +7865,8 @@ class DAL(object):
else:
self._adapter = BaseAdapter(db=self,pool_size=0,
uri='None',folder=folder,
db_codec=db_codec, after_connection=after_connection)
db_codec=db_codec, after_connection=after_connection,
entity_quoting=entity_quoting)
migrate = fake_migrate = False
adapter = self._adapter
self._uri_hash = hashlib_md5(adapter.uri).hexdigest()
+24 -33
View File
@@ -882,7 +882,7 @@ class TestRNameTable(unittest.TestCase):
def testSelect(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated tablename'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'a very complicated tablename'
db.define_table(
'easy_name',
Field('a_field'),
@@ -911,14 +911,14 @@ class TestRNameTable(unittest.TestCase):
avg = db.easy_name.id.avg()
rtn = db(db.easy_name.id > 0).select(avg)
self.assertEqual(rtn[0][avg], 3)
rname = db._adapter.QUOTE_TEMPLATE % 'this is the person table'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'this is the person table'
db.define_table(
'person',
Field('name', default="Michael"),
Field('uuid'),
rname=rname
)
rname = db._adapter.QUOTE_TEMPLATE % 'this is the pet table'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'this is the pet table'
db.define_table(
'pet',
Field('friend','reference person'),
@@ -991,7 +991,7 @@ class TestRNameTable(unittest.TestCase):
for key in ['reference','reference FK']:
db._adapter.types[key]=db._adapter.types[key].replace(
'%(on_delete_action)s','NO ACTION')
rname = db._adapter.QUOTE_TEMPLATE % 'the cubs'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'the cubs'
db.define_table('pet_farm',
Field('name'),
Field('father','reference pet_farm'),
@@ -1034,8 +1034,8 @@ class TestRNameTable(unittest.TestCase):
def testJoin(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'this is table t1'
rname2 = db._adapter.QUOTE_TEMPLATE % 'this is table t2'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'this is table t1'
rname2 = db._adapter.__class__.QUOTE_TEMPLATE % 'this is table t2'
db.define_table('t1', Field('aa'), rname=rname)
db.define_table('t2', Field('aa'), Field('b', db.t1), rname=rname2)
i1 = db.t1.insert(aa='1')
@@ -1105,8 +1105,8 @@ class TestRNameFields(unittest.TestCase):
# tests for highly experimental rname attribute
def testSelect(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname'
rname2 = db._adapter.QUOTE_TEMPLATE % 'rrating from 1 to 10'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'a very complicated fieldname'
rname2 = db._adapter.__class__.QUOTE_TEMPLATE % 'rrating from 1 to 10'
db.define_table(
'easy_name',
Field('a_field', rname=rname),
@@ -1140,13 +1140,13 @@ class TestRNameFields(unittest.TestCase):
rtn = db(db.easy_name.id > 0).select(avg)
self.assertEqual(rtn[0][avg], 2)
rname = db._adapter.QUOTE_TEMPLATE % 'this is the person name'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'this is the person name'
db.define_table(
'person',
Field('name', default="Michael", rname=rname),
Field('uuid')
)
rname = db._adapter.QUOTE_TEMPLATE % 'this is the pet name'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'this is the pet name'
db.define_table(
'pet',
Field('friend','reference person'),
@@ -1213,7 +1213,7 @@ class TestRNameFields(unittest.TestCase):
self.assertEqual(rtn[2].pet.name, 'Gertie')
#aliases
rname = db._adapter.QUOTE_TEMPLATE % 'the cub name'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'the cub name'
if DEFAULT_URI.startswith('mssql'):
#multiple cascade gotcha
for key in ['reference','reference FK']:
@@ -1260,7 +1260,7 @@ class TestRNameFields(unittest.TestCase):
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'a very complicated fieldname'
for ft in ['string', 'text', 'password', 'upload', 'blob']:
db.define_table('tt', Field('aa', ft, default='', rname=rname))
self.assertEqual(db.tt.insert(aa='x'), 1)
@@ -1330,7 +1330,7 @@ class TestRNameFields(unittest.TestCase):
def testInsert(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'a very complicated fieldname'
db.define_table('tt', Field('aa', rname=rname))
self.assertEqual(db.tt.insert(aa='1'), 1)
self.assertEqual(db.tt.insert(aa='1'), 2)
@@ -1346,8 +1346,8 @@ class TestRNameFields(unittest.TestCase):
def testJoin(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'this is field aa'
rname2 = db._adapter.QUOTE_TEMPLATE % 'this is field b'
rname = db._adapter.__class__.QUOTE_TEMPLATE % 'this is field aa'
rname2 = db._adapter.__class__.QUOTE_TEMPLATE % 'this is field b'
db.define_table('t1', Field('aa', rname=rname))
db.define_table('t2', Field('aa', rname=rname), Field('b', db.t1, rname=rname2))
i1 = db.t1.insert(aa='1')
@@ -1416,35 +1416,26 @@ class TestQuoting(unittest.TestCase):
# tests for case sensitivity
def testCase(self):
return
db = DAL(DEFAULT_URI, check_reserved=['all'], ignore_field_case=False)
db = DAL(DEFAULT_URI, check_reserved=['all'], ignore_field_case=False, entity_quoting=True)
if DEFAULT_URI.startswith('mssql'):
#multiple cascade gotcha
for key in ['reference','reference FK']:
db._adapter.types[key]=db._adapter.types[key].replace(
'%(on_delete_action)s','NO ACTION')
# test table case
t0 = db.define_table('B',
t0 = db.define_table('t0',
Field('f', 'string'))
try:
t1 = db.define_table('b',
Field('B', t0),
Field('words', 'text'))
except Exception, e:
# An error is expected when database does not support case
# sensitive entity names.
if DEFAULT_URI.startswith('sqlite:'):
self.assertTrue(isinstance(e, db._adapter.driver.OperationalError))
return
raise e
t1 = db.define_table('b',
Field('B', t0),
Field('words', 'text'))
blather = 'blah blah and so'
t0[0] = {'f': 'content'}
t1[0] = {'B': int(t0[1]['id']),
'words': blather}
r = db(db.B.id==db.b.B).select()
r = db(db.t0.id==db.b.B).select()
self.assertEqual(r[0].b.words, blather)
@@ -1453,12 +1444,12 @@ class TestQuoting(unittest.TestCase):
# test field case
try:
t0 = db.define_table('table is a test',
t0 = db.define_table('table_is_a_test',
Field('a_a'),
Field('a_A'))
except Exception, e:
# some db does not support case sensitive field names mysql is one of them.
if DEFAULT_URI.startswith('mysql:'):
if DEFAULT_URI.startswith('mysql:') or DEFAULT_URI.startswith('sqlite:'):
db.rollback()
return
raise e