From 0f2b2daeab37a4037cd283a35febe66e143e3fe5 Mon Sep 17 00:00:00 2001 From: niphlod Date: Sun, 27 Oct 2013 13:37:59 +0100 Subject: [PATCH] fixed issue with ordereddict in python < 2.7, added rname support also for fields --- gluon/contrib/ordereddict.py | 127 +++++++++++ gluon/dal.py | 49 +++-- gluon/tests/test_dal.py | 407 ++++++++++++++++++++++++++++++++++- 3 files changed, 558 insertions(+), 25 deletions(-) create mode 100644 gluon/contrib/ordereddict.py diff --git a/gluon/contrib/ordereddict.py b/gluon/contrib/ordereddict.py new file mode 100644 index 00000000..7242b506 --- /dev/null +++ b/gluon/contrib/ordereddict.py @@ -0,0 +1,127 @@ +# Copyright (c) 2009 Raymond Hettinger +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +from UserDict import DictMixin + +class OrderedDict(dict, DictMixin): + + def __init__(self, *args, **kwds): + if len(args) > 1: + raise TypeError('expected at most 1 arguments, got %d' % len(args)) + try: + self.__end + except AttributeError: + self.clear() + self.update(*args, **kwds) + + def clear(self): + self.__end = end = [] + end += [None, end, end] # sentinel node for doubly linked list + self.__map = {} # key --> [key, prev, next] + dict.clear(self) + + def __setitem__(self, key, value): + if key not in self: + end = self.__end + curr = end[1] + curr[2] = end[1] = self.__map[key] = [key, curr, end] + dict.__setitem__(self, key, value) + + def __delitem__(self, key): + dict.__delitem__(self, key) + key, prev, next = self.__map.pop(key) + prev[2] = next + next[1] = prev + + def __iter__(self): + end = self.__end + curr = end[2] + while curr is not end: + yield curr[0] + curr = curr[2] + + def __reversed__(self): + end = self.__end + curr = end[1] + while curr is not end: + yield curr[0] + curr = curr[1] + + def popitem(self, last=True): + if not self: + raise KeyError('dictionary is empty') + if last: + key = reversed(self).next() + else: + key = iter(self).next() + value = self.pop(key) + return key, value + + def __reduce__(self): + items = [[k, self[k]] for k in self] + tmp = self.__map, self.__end + del self.__map, self.__end + inst_dict = vars(self).copy() + self.__map, self.__end = tmp + if inst_dict: + return (self.__class__, (items,), inst_dict) + return self.__class__, (items,) + + def keys(self): + return list(self) + + setdefault = DictMixin.setdefault + update = DictMixin.update + pop = DictMixin.pop + values = DictMixin.values + items = DictMixin.items + iterkeys = DictMixin.iterkeys + itervalues = DictMixin.itervalues + iteritems = DictMixin.iteritems + + def __repr__(self): + if not self: + return '%s()' % (self.__class__.__name__,) + return '%s(%r)' % (self.__class__.__name__, self.items()) + + def copy(self): + return self.__class__(self) + + @classmethod + def fromkeys(cls, iterable, value=None): + d = cls() + for key in iterable: + d[key] = value + return d + + def __eq__(self, other): + if isinstance(other, OrderedDict): + if len(self) != len(other): + return False + for p, q in zip(self.items(), other.items()): + if p != q: + return False + return True + return dict.__eq__(self, other) + + def __ne__(self, other): + return not self == other diff --git a/gluon/dal.py b/gluon/dal.py index 41a1b29d..715b42e4 100644 --- a/gluon/dal.py +++ b/gluon/dal.py @@ -172,10 +172,9 @@ import uuid import glob import traceback import platform -import collections -PYTHON_VERSION = sys.version_info[0] -if PYTHON_VERSION == 2: +PYTHON_VERSION = sys.version_info[:3] +if PYTHON_VERSION[0] == 2: import cPickle as pickle import cStringIO as StringIO import copy_reg as copyreg @@ -189,6 +188,12 @@ else: hashlib_md5 = lambda s: hashlib.md5(bytes(s,'utf8')) bytes, unicode = bytes, str +if PYTHON_VERSION[:2] < (2, 7): + from gluon.contrib.ordereddict import OrderedDict +else: + from collections import OrderedDict + + CALLABLETYPES = (types.LambdaType, types.FunctionType, types.BuiltinFunctionType, types.MethodType, types.BuiltinMethodType) @@ -855,7 +860,7 @@ class BaseAdapter(ConnectionPool): foreign_key = '%s (%s)' % (rtablename, rfieldname), table_name = tablename, - field_name = field_name, + field_name = field._rname or field.name, on_delete_action=field.ondelete) else: # make a guess here for circular references @@ -881,7 +886,7 @@ class BaseAdapter(ConnectionPool): ) ftype = types[field_type[:9]] % dict( index_name = field_name+'__idx', - field_name = field_name, + field_name = field._rname or field.name, constraint_name = constraint_name, foreign_key = '%s (%s)' % (real_referenced, id_fieldname), @@ -958,13 +963,15 @@ class BaseAdapter(ConnectionPool): # geometry fields are added after the table has been created, not now if not (self.dbengine == 'postgres' and \ field_type.startswith('geom')): - fields.append('%s %s' % (field_name, ftype)) + #fetch the rname if it's there + field_rname = "%s" % (field._rname or field_name) + fields.append('%s %s' % (field_rname, ftype)) other = ';' # backend-specific extensions to fields if self.dbengine == 'mysql': if not hasattr(table, "_primarykey"): - fields.append('PRIMARY KEY(%s)' % table._id.name) + fields.append('PRIMARY KEY(%s)' % (table._id.name or table._id._rname)) other = ' ENGINE=InnoDB CHARACTER SET utf8;' fields = ',\n '.join(fields) @@ -979,7 +986,6 @@ class BaseAdapter(ConnectionPool): foreign_table = rtablename, foreign_key = ', '.join(pkeys), on_delete_action = field.ondelete) - #if there's a _rname, let's use that instead table_rname = table._rname or tablename @@ -1252,7 +1258,7 @@ class BaseAdapter(ConnectionPool): def _insert(self, table, fields): table_rname = table._rname or table if fields: - keys = ','.join(f.name for f, v in fields) + keys = ','.join(f._rname or f.name for f, v in fields) values = ','.join(self.expand(v, f.type) for f, v in fields) return 'INSERT INTO %s(%s) VALUES (%s);' % (table_rname, keys, values) else: @@ -1437,7 +1443,10 @@ class BaseAdapter(ConnectionPool): table_rname = et._ot and et._tablename or et._rname or et._tablename else: table_rname = et._tablename - out = '%s.%s' % (table_rname, expression.name) + if not colnames: + out = '%s.%s' % (table_rname, expression._rname or expression.name) + else: + out = '%s.%s' % (table_rname, expression.name) if field_type == 'string' and not expression.type in ( 'string','text','json','password'): out = self.CAST(out, self.types['text']) @@ -1513,7 +1522,7 @@ class BaseAdapter(ConnectionPool): sql_w = ' WHERE ' + self.expand(query) else: sql_w = '' - sql_v = ','.join(['%s=%s' % (field.name, + sql_v = ','.join(['%s=%s' % (field._rname or field.name, self.expand(value, field.type)) \ for (field, value) in fields]) tablename = "%s" % (self.db[tablename]._rname or tablename) @@ -1729,7 +1738,7 @@ class BaseAdapter(ConnectionPool): sql_o += ' ORDER BY %s' % ', '.join( ['%s.%s'%(t,x) for t in tablenames for x in ( hasattr(self.db[t],'_primarykey') and self.db[t]._primarykey - or [self.db[t]._id.name] + or [self.db[t]._id._rname or self.db[t]._id.name] ) ] ) @@ -2302,7 +2311,7 @@ class SQLiteAdapter(BaseAdapter): else: self.dbpath = uri.split('://',1)[1] if self.dbpath[0] != '/': - if PYTHON_VERSION == 2: + if PYTHON_VERSION[0] == 2: self.dbpath = pjoin( self.folder.decode(path_encoding).encode('utf8'), self.dbpath) else: @@ -2312,7 +2321,7 @@ class SQLiteAdapter(BaseAdapter): if not 'detect_types' in driver_args and do_connect: driver_args['detect_types'] = self.driver.PARSE_DECLTYPES def connector(dbpath=self.dbpath, driver_args=driver_args): - return self.driver.Connection(dbpath, **driver_args) + return self.driver.Connection(dbpath, **driver_args) self.connector = connector if do_connect: self.reconnect() @@ -2321,7 +2330,7 @@ class SQLiteAdapter(BaseAdapter): SQLiteAdapter.web2py_extract) self.connection.create_function("REGEXP", 2, SQLiteAdapter.web2py_regexp) - + if self.adapter_args.get('foreign_keys',True): self.execute('PRAGMA foreign_keys=ON;') @@ -8141,7 +8150,7 @@ def index(): if not db_group: del THREAD_LOCAL.db_instances[self._db_uid] - def executesql(self, query, placeholders=None, as_dict=False, + def executesql(self, query, placeholders=None, as_dict=False, fields=None, colnames=None, as_ordered_dict=False): """ placeholders is optional and will always be None. @@ -8214,7 +8223,7 @@ def index(): # convert the list for each row into a dictionary so it's # easier to work with. row['field_name'] rather than row[0] if as_ordered_dict: - _dict = collections.OrderedDict + _dict = OrderedDict else: _dict = dict return [_dict(zip(fields,row)) for row in data] @@ -8636,7 +8645,7 @@ class Table(object): referees = pr.pop(self._tablename) for referee in referees: self._referenced_by.append(referee) - + def _filter_fields(self, record, id=False): return dict([(k, v) for (k, v) in record.iteritems() if k @@ -9576,6 +9585,7 @@ class Field(Expression): filter_out = None, custom_qualifier = None, map_none = None, + rname = None ): self._db = self.db = None # both for backward compatibility self.op = None @@ -9622,6 +9632,7 @@ class Field(Expression): self.label = label if label!=None else fieldname.replace('_',' ').title() self.requires = requires if requires!=None else [] self.map_none = map_none + self._rname = rname def set_attributes(self,*args,**attributes): self.__dict__.update(*args,**attributes) @@ -9788,7 +9799,7 @@ class Field(Expression): return Expression(self.db, self.db._adapter.COUNT, self, distinct, 'integer') def as_dict(self, flat=False, sanitize=True): - attrs = ("name", 'authorize', 'represent', 'ondelete', + attrs = ('name', 'authorize', 'represent', 'ondelete', 'custom_store', 'autodelete', 'custom_retrieve', 'filter_out', 'uploadseparate', 'widget', 'uploadfs', 'update', 'custom_delete', 'uploadfield', 'uploadfolder', diff --git a/gluon/tests/test_dal.py b/gluon/tests/test_dal.py index d9bbbad0..1b4c5dcd 100644 --- a/gluon/tests/test_dal.py +++ b/gluon/tests/test_dal.py @@ -559,7 +559,7 @@ class TestMigrations(unittest.TestCase): class TestReference(unittest.TestCase): - def testRun(self): + def testRun(self): db = DAL(DEFAULT_URI, check_reserved=['all']) if DEFAULT_URI.startswith('mssql'): #multiple cascade gotcha @@ -840,11 +840,27 @@ class TestValidateAndInsert(unittest.TestCase): #cleanup table db.val_and_insert.drop() +class TestSelectAsDict(unittest.TestCase): + + def testSelect(self): + db = DAL(DEFAULT_URI, check_reserved=['all']) + db.define_table( + 'a_table', + Field('b_field'), + Field('a_field'), + ) + db.a_table.insert(a_field="aa1", b_field="bb1") + rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_dict=True) + self.assertEqual(rtn[0]['b_field'], 'bb1') + rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True) + self.assertEqual(rtn[0]['b_field'], 'bb1') + self.assertEqual(rtn[0].keys(), ['id', 'b_field', 'a_field']) + db.a_table.drop() + + +class TestRNameTable(unittest.TestCase): + #tests for highly experimental rname attribute -class TestRName(unittest.TestCase): - """ - tests for highly experimental rname attribute - """ def testSelect(self): db = DAL(DEFAULT_URI, check_reserved=['all']) rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated tablename' @@ -997,8 +1013,387 @@ class TestRName(unittest.TestCase): db.person.drop() db.easy_name.drop() + def testJoin(self): + db = DAL(DEFAULT_URI, check_reserved=['all']) + rname = db._adapter.QUOTE_TEMPLATE % 'this is table t1' + rname2 = db._adapter.QUOTE_TEMPLATE % 'this is table t2' + db.define_table('t1', Field('aa'), rname=rname) + db.define_table('t2', Field('aa'), Field('b', db.t1), rname=rname2) + i1 = db.t1.insert(aa='1') + i2 = db.t1.insert(aa='2') + i3 = db.t1.insert(aa='3') + db.t2.insert(aa='4', b=i1) + db.t2.insert(aa='5', b=i2) + db.t2.insert(aa='6', b=i2) + self.assertEqual(len(db(db.t1.id + == db.t2.b).select(orderby=db.t1.aa + | db.t2.aa)), 3) + self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa + | db.t2.aa)[2].t1.aa, '2') + self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa + | db.t2.aa)[2].t2.aa, '6') + self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)), 4) + self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2') + self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6') + self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3') + self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None) + self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(), + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa, groupby=db.t1.aa)), + 3) + self.assertEqual(db().select(db.t1.aa, db.t2.id.count(), + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa, + groupby=db.t1.aa)[0]._extra[db.t2.id.count()], + 1) + self.assertEqual(db().select(db.t1.aa, db.t2.id.count(), + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa, + groupby=db.t1.aa)[1]._extra[db.t2.id.count()], + 2) + self.assertEqual(db().select(db.t1.aa, db.t2.id.count(), + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa, + groupby=db.t1.aa)[2]._extra[db.t2.id.count()], + 0) + db.t2.drop() + db.t1.drop() + + db.define_table('person',Field('name'), rname=rname) + id = db.person.insert(name="max") + self.assertEqual(id.name,'max') + db.define_table('dog',Field('name'),Field('ownerperson','reference person'), rname=rname2) + db.dog.insert(name='skipper',ownerperson=1) + row = db(db.person.id==db.dog.ownerperson).select().first() + self.assertEqual(row[db.person.name],'max') + self.assertEqual(row['person.name'],'max') + db.dog.drop() + self.assertEqual(len(db.person._referenced_by),0) + db.person.drop() + + +class TestRNameFields(unittest.TestCase): + # tests for highly experimental rname attribute + def testSelect(self): + db = DAL(DEFAULT_URI, check_reserved=['all']) + rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname' + rname2 = db._adapter.QUOTE_TEMPLATE % 'rrating from 1 to 10' + db.define_table( + 'easy_name', + Field('a_field', rname=rname), + Field('rating', 'integer', rname=rname2, default=2) + ) + rtn = db.easy_name.insert(a_field='a') + self.assertEqual(rtn.id, 1) + rtn = db(db.easy_name.a_field == 'a').select() + self.assertEqual(len(rtn), 1) + self.assertEqual(rtn[0].id, 1) + self.assertEqual(rtn[0].a_field, 'a') + db.easy_name.insert(a_field='b') + rtn = db(db.easy_name.id > 0).delete() + self.assertEqual(rtn, 2) + rtn = db(db.easy_name.id > 0).count() + self.assertEqual(rtn, 0) + db.easy_name.insert(a_field='a') + db.easy_name.insert(a_field='b') + rtn = db(db.easy_name.id > 0).count() + self.assertEqual(rtn, 2) + rtn = db(db.easy_name.a_field == 'a').update(a_field='c') + rtn = db(db.easy_name.a_field == 'c').count() + self.assertEqual(rtn, 1) + rtn = db(db.easy_name.a_field != 'c').count() + self.assertEqual(rtn, 1) + avg = db.easy_name.id.avg() + rtn = db(db.easy_name.id > 0).select(avg) + self.assertEqual(rtn[0][avg], 3) + + avg = db.easy_name.rating.avg() + rtn = db(db.easy_name.id > 0).select(avg) + self.assertEqual(rtn[0][avg], 2) + + rname = db._adapter.QUOTE_TEMPLATE % 'this is the person name' + db.define_table( + 'person', + Field('name', default="Michael", rname=rname), + Field('uuid') + ) + rname = db._adapter.QUOTE_TEMPLATE % 'this is the pet name' + db.define_table( + 'pet', + Field('friend','reference person'), + Field('name', rname=rname) + ) + michael = db.person.insert() #default insert + john = db.person.insert(name='John') + luke = db.person.insert(name='Luke') + + #michael owns Phippo + phippo = db.pet.insert(friend=michael, name="Phippo") + #john owns Dunstin and Gertie + dunstin = db.pet.insert(friend=john, name="Dunstin") + gertie = db.pet.insert(friend=john, name="Gertie") + + rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id|db.pet.id) + self.assertEqual(len(rtn), 3) + self.assertEqual(rtn[0].person.id, michael) + self.assertEqual(rtn[0].person.name, 'Michael') + self.assertEqual(rtn[0].pet.id, phippo) + self.assertEqual(rtn[0].pet.name, 'Phippo') + self.assertEqual(rtn[1].person.id, john) + self.assertEqual(rtn[1].person.name, 'John') + self.assertEqual(rtn[1].pet.name, 'Dunstin') + self.assertEqual(rtn[2].pet.name, 'Gertie') + #fetch owners, eventually with pet + #main point is retrieving Luke with no pets + rtn = db(db.person.id > 0).select( + orderby=db.person.id|db.pet.id, + left=db.pet.on(db.person.id == db.pet.friend) + ) + self.assertEqual(rtn[0].person.id, michael) + self.assertEqual(rtn[0].person.name, 'Michael') + self.assertEqual(rtn[0].pet.id, phippo) + self.assertEqual(rtn[0].pet.name, 'Phippo') + self.assertEqual(rtn[3].person.name, 'Luke') + self.assertEqual(rtn[3].person.id, luke) + self.assertEqual(rtn[3].pet.name, None) + #lets test a subquery + subq = db(db.pet.name == "Gertie")._select(db.pet.friend) + rtn = db(db.person.id.belongs(subq)).select() + self.assertEqual(rtn[0].id, 2) + self.assertEqual(rtn[0]('person.name'), 'John') + #as dict + rtn = db(db.person.id > 0).select().as_dict() + self.assertEqual(rtn[1]['name'], 'Michael') + #as list + rtn = db(db.person.id > 0).select().as_list() + self.assertEqual(rtn[0]['name'], 'Michael') + #isempty + rtn = db(db.person.id > 0).isempty() + self.assertEqual(rtn, False) + #join argument + rtn = db(db.person).select(orderby=db.person.id|db.pet.id, + join=db.pet.on(db.person.id==db.pet.friend)) + self.assertEqual(len(rtn), 3) + self.assertEqual(rtn[0].person.id, michael) + self.assertEqual(rtn[0].person.name, 'Michael') + self.assertEqual(rtn[0].pet.id, phippo) + self.assertEqual(rtn[0].pet.name, 'Phippo') + self.assertEqual(rtn[1].person.id, john) + self.assertEqual(rtn[1].person.name, 'John') + self.assertEqual(rtn[1].pet.name, 'Dunstin') + self.assertEqual(rtn[2].pet.name, 'Gertie') + + #aliases + rname = db._adapter.QUOTE_TEMPLATE % 'the cub name' + if DEFAULT_URI.startswith('mssql'): + #multiple cascade gotcha + for key in ['reference','reference FK']: + db._adapter.types[key]=db._adapter.types[key].replace( + '%(on_delete_action)s','NO ACTION') + db.define_table('pet_farm', + Field('name', rname=rname), + Field('father','reference pet_farm'), + Field('mother','reference pet_farm'), + ) + + minali = db.pet_farm.insert(name='Minali') + osbert = db.pet_farm.insert(name='Osbert') + #they had a cub + selina = db.pet_farm.insert(name='Selina', father=osbert, mother=minali) + + father = db.pet_farm.with_alias('father') + mother = db.pet_farm.with_alias('mother') + + #fetch pets with relatives + rtn = db().select( + db.pet_farm.name, father.name, mother.name, + left=[ + father.on(father.id == db.pet_farm.father), + mother.on(mother.id == db.pet_farm.mother) + ], + orderby=db.pet_farm.id + ) + + self.assertEqual(len(rtn), 3) + self.assertEqual(rtn[0].pet_farm.name, 'Minali') + self.assertEqual(rtn[0].father.name, None) + self.assertEqual(rtn[0].mother.name, None) + self.assertEqual(rtn[1].pet_farm.name, 'Osbert') + self.assertEqual(rtn[2].pet_farm.name, 'Selina') + self.assertEqual(rtn[2].father.name, 'Osbert') + self.assertEqual(rtn[2].mother.name, 'Minali') + + #clean up + db.pet_farm.drop() + db.pet.drop() + db.person.drop() + db.easy_name.drop() + + def testRun(self): + db = DAL(DEFAULT_URI, check_reserved=['all']) + rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname' + for ft in ['string', 'text', 'password', 'upload', 'blob']: + db.define_table('tt', Field('aa', ft, default='', rname=rname)) + self.assertEqual(db.tt.insert(aa='x'), 1) + self.assertEqual(db().select(db.tt.aa)[0].aa, 'x') + db.tt.drop() + db.define_table('tt', Field('aa', 'integer', default=1, rname=rname)) + self.assertEqual(db.tt.insert(aa=3), 1) + self.assertEqual(db().select(db.tt.aa)[0].aa, 3) + db.tt.drop() + db.define_table('tt', Field('aa', 'double', default=1, rname=rname)) + self.assertEqual(db.tt.insert(aa=3.1), 1) + self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1) + db.tt.drop() + db.define_table('tt', Field('aa', 'boolean', default=True, rname=rname)) + self.assertEqual(db.tt.insert(aa=True), 1) + self.assertEqual(db().select(db.tt.aa)[0].aa, True) + db.tt.drop() + db.define_table('tt', Field('aa', 'json', default={}, rname=rname)) + self.assertEqual(db.tt.insert(aa={}), 1) + self.assertEqual(db().select(db.tt.aa)[0].aa, {}) + db.tt.drop() + db.define_table('tt', Field('aa', 'date', + default=datetime.date.today(), rname=rname)) + t0 = datetime.date.today() + self.assertEqual(db.tt.insert(aa=t0), 1) + self.assertEqual(db().select(db.tt.aa)[0].aa, t0) + db.tt.drop() + db.define_table('tt', Field('aa', 'datetime', + default=datetime.datetime.today(), rname=rname)) + t0 = datetime.datetime( + 1971, + 12, + 21, + 10, + 30, + 55, + 0, + ) + self.assertEqual(db.tt.insert(aa=t0), 1) + self.assertEqual(db().select(db.tt.aa)[0].aa, t0) + + ## Row APIs + row = db().select(db.tt.aa)[0] + self.assertEqual(db.tt[1].aa,t0) + self.assertEqual(db.tt['aa'],db.tt.aa) + self.assertEqual(db.tt(1).aa,t0) + self.assertTrue(db.tt(1,aa=None)==None) + self.assertFalse(db.tt(1,aa=t0)==None) + self.assertEqual(row.aa,t0) + self.assertEqual(row['aa'],t0) + self.assertEqual(row['tt.aa'],t0) + self.assertEqual(row('tt.aa'),t0) + + ## Lazy and Virtual fields + db.tt.b = Field.Virtual(lambda row: row.tt.aa) + db.tt.c = Field.Lazy(lambda row: row.tt.aa) + row = db().select(db.tt.aa)[0] + self.assertEqual(row.b,t0) + self.assertEqual(row.c(),t0) + + db.tt.drop() + db.define_table('tt', Field('aa', 'time', default='11:30', rname=rname)) + t0 = datetime.time(10, 30, 55) + self.assertEqual(db.tt.insert(aa=t0), 1) + self.assertEqual(db().select(db.tt.aa)[0].aa, t0) + db.tt.drop() + + def testInsert(self): + db = DAL(DEFAULT_URI, check_reserved=['all']) + rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname' + db.define_table('tt', Field('aa', rname=rname)) + self.assertEqual(db.tt.insert(aa='1'), 1) + self.assertEqual(db.tt.insert(aa='1'), 2) + self.assertEqual(db.tt.insert(aa='1'), 3) + self.assertEqual(db(db.tt.aa == '1').count(), 3) + self.assertEqual(db(db.tt.aa == '2').isempty(), True) + self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3) + self.assertEqual(db(db.tt.aa == '2').count(), 3) + self.assertEqual(db(db.tt.aa == '2').isempty(), False) + self.assertEqual(db(db.tt.aa == '2').delete(), 3) + self.assertEqual(db(db.tt.aa == '2').isempty(), True) + db.tt.drop() + + def testJoin(self): + db = DAL(DEFAULT_URI, check_reserved=['all']) + rname = db._adapter.QUOTE_TEMPLATE % 'this is field aa' + rname2 = db._adapter.QUOTE_TEMPLATE % 'this is field b' + db.define_table('t1', Field('aa', rname=rname)) + db.define_table('t2', Field('aa', rname=rname), Field('b', db.t1, rname=rname2)) + i1 = db.t1.insert(aa='1') + i2 = db.t1.insert(aa='2') + i3 = db.t1.insert(aa='3') + db.t2.insert(aa='4', b=i1) + db.t2.insert(aa='5', b=i2) + db.t2.insert(aa='6', b=i2) + self.assertEqual(len(db(db.t1.id + == db.t2.b).select(orderby=db.t1.aa + | db.t2.aa)), 3) + self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa + | db.t2.aa)[2].t1.aa, '2') + self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa + | db.t2.aa)[2].t2.aa, '6') + self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)), 4) + self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2') + self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6') + self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3') + self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None) + self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(), + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa, groupby=db.t1.aa)), + 3) + self.assertEqual(db().select(db.t1.aa, db.t2.id.count(), + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa, + groupby=db.t1.aa)[0]._extra[db.t2.id.count()], + 1) + self.assertEqual(db().select(db.t1.aa, db.t2.id.count(), + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa, + groupby=db.t1.aa)[1]._extra[db.t2.id.count()], + 2) + self.assertEqual(db().select(db.t1.aa, db.t2.id.count(), + left=db.t2.on(db.t1.id == db.t2.b), + orderby=db.t1.aa, + groupby=db.t1.aa)[2]._extra[db.t2.id.count()], + 0) + db.t2.drop() + db.t1.drop() + + db.define_table('person',Field('name', rname=rname)) + id = db.person.insert(name="max") + self.assertEqual(id.name,'max') + db.define_table('dog',Field('name', rname=rname),Field('ownerperson','reference person', rname=rname2)) + db.dog.insert(name='skipper',ownerperson=1) + row = db(db.person.id==db.dog.ownerperson).select().first() + self.assertEqual(row[db.person.name],'max') + self.assertEqual(row['person.name'],'max') + db.dog.drop() + self.assertEqual(len(db.person._referenced_by),0) + db.person.drop() if __name__ == '__main__': unittest.main() - tearDownModule() + tearDownModule() \ No newline at end of file