fixed issue with ordereddict in python < 2.7, added rname support also for fields

This commit is contained in:
niphlod
2013-10-27 13:37:59 +01:00
parent d960513ef2
commit 0f2b2daeab
3 changed files with 558 additions and 25 deletions
+127
View File
@@ -0,0 +1,127 @@
# Copyright (c) 2009 Raymond Hettinger
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
from UserDict import DictMixin
class OrderedDict(dict, DictMixin):
def __init__(self, *args, **kwds):
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__end
except AttributeError:
self.clear()
self.update(*args, **kwds)
def clear(self):
self.__end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.__map = {} # key --> [key, prev, next]
dict.clear(self)
def __setitem__(self, key, value):
if key not in self:
end = self.__end
curr = end[1]
curr[2] = end[1] = self.__map[key] = [key, curr, end]
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self, key)
key, prev, next = self.__map.pop(key)
prev[2] = next
next[1] = prev
def __iter__(self):
end = self.__end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.__end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def popitem(self, last=True):
if not self:
raise KeyError('dictionary is empty')
if last:
key = reversed(self).next()
else:
key = iter(self).next()
value = self.pop(key)
return key, value
def __reduce__(self):
items = [[k, self[k]] for k in self]
tmp = self.__map, self.__end
del self.__map, self.__end
inst_dict = vars(self).copy()
self.__map, self.__end = tmp
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def keys(self):
return list(self)
setdefault = DictMixin.setdefault
update = DictMixin.update
pop = DictMixin.pop
values = DictMixin.values
items = DictMixin.items
iterkeys = DictMixin.iterkeys
itervalues = DictMixin.itervalues
iteritems = DictMixin.iteritems
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, self.items())
def copy(self):
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
d = cls()
for key in iterable:
d[key] = value
return d
def __eq__(self, other):
if isinstance(other, OrderedDict):
if len(self) != len(other):
return False
for p, q in zip(self.items(), other.items()):
if p != q:
return False
return True
return dict.__eq__(self, other)
def __ne__(self, other):
return not self == other
+30 -19
View File
@@ -172,10 +172,9 @@ import uuid
import glob
import traceback
import platform
import collections
PYTHON_VERSION = sys.version_info[0]
if PYTHON_VERSION == 2:
PYTHON_VERSION = sys.version_info[:3]
if PYTHON_VERSION[0] == 2:
import cPickle as pickle
import cStringIO as StringIO
import copy_reg as copyreg
@@ -189,6 +188,12 @@ else:
hashlib_md5 = lambda s: hashlib.md5(bytes(s,'utf8'))
bytes, unicode = bytes, str
if PYTHON_VERSION[:2] < (2, 7):
from gluon.contrib.ordereddict import OrderedDict
else:
from collections import OrderedDict
CALLABLETYPES = (types.LambdaType, types.FunctionType,
types.BuiltinFunctionType,
types.MethodType, types.BuiltinMethodType)
@@ -855,7 +860,7 @@ class BaseAdapter(ConnectionPool):
foreign_key = '%s (%s)' % (rtablename,
rfieldname),
table_name = tablename,
field_name = field_name,
field_name = field._rname or field.name,
on_delete_action=field.ondelete)
else:
# make a guess here for circular references
@@ -881,7 +886,7 @@ class BaseAdapter(ConnectionPool):
)
ftype = types[field_type[:9]] % dict(
index_name = field_name+'__idx',
field_name = field_name,
field_name = field._rname or field.name,
constraint_name = constraint_name,
foreign_key = '%s (%s)' % (real_referenced,
id_fieldname),
@@ -958,13 +963,15 @@ class BaseAdapter(ConnectionPool):
# geometry fields are added after the table has been created, not now
if not (self.dbengine == 'postgres' and \
field_type.startswith('geom')):
fields.append('%s %s' % (field_name, ftype))
#fetch the rname if it's there
field_rname = "%s" % (field._rname or field_name)
fields.append('%s %s' % (field_rname, ftype))
other = ';'
# backend-specific extensions to fields
if self.dbengine == 'mysql':
if not hasattr(table, "_primarykey"):
fields.append('PRIMARY KEY(%s)' % table._id.name)
fields.append('PRIMARY KEY(%s)' % (table._id.name or table._id._rname))
other = ' ENGINE=InnoDB CHARACTER SET utf8;'
fields = ',\n '.join(fields)
@@ -979,7 +986,6 @@ class BaseAdapter(ConnectionPool):
foreign_table = rtablename,
foreign_key = ', '.join(pkeys),
on_delete_action = field.ondelete)
#if there's a _rname, let's use that instead
table_rname = table._rname or tablename
@@ -1252,7 +1258,7 @@ class BaseAdapter(ConnectionPool):
def _insert(self, table, fields):
table_rname = table._rname or table
if fields:
keys = ','.join(f.name for f, v in fields)
keys = ','.join(f._rname or f.name for f, v in fields)
values = ','.join(self.expand(v, f.type) for f, v in fields)
return 'INSERT INTO %s(%s) VALUES (%s);' % (table_rname, keys, values)
else:
@@ -1437,7 +1443,10 @@ class BaseAdapter(ConnectionPool):
table_rname = et._ot and et._tablename or et._rname or et._tablename
else:
table_rname = et._tablename
out = '%s.%s' % (table_rname, expression.name)
if not colnames:
out = '%s.%s' % (table_rname, expression._rname or expression.name)
else:
out = '%s.%s' % (table_rname, expression.name)
if field_type == 'string' and not expression.type in (
'string','text','json','password'):
out = self.CAST(out, self.types['text'])
@@ -1513,7 +1522,7 @@ class BaseAdapter(ConnectionPool):
sql_w = ' WHERE ' + self.expand(query)
else:
sql_w = ''
sql_v = ','.join(['%s=%s' % (field.name,
sql_v = ','.join(['%s=%s' % (field._rname or field.name,
self.expand(value, field.type)) \
for (field, value) in fields])
tablename = "%s" % (self.db[tablename]._rname or tablename)
@@ -1729,7 +1738,7 @@ class BaseAdapter(ConnectionPool):
sql_o += ' ORDER BY %s' % ', '.join(
['%s.%s'%(t,x) for t in tablenames for x in (
hasattr(self.db[t],'_primarykey') and self.db[t]._primarykey
or [self.db[t]._id.name]
or [self.db[t]._id._rname or self.db[t]._id.name]
)
]
)
@@ -2302,7 +2311,7 @@ class SQLiteAdapter(BaseAdapter):
else:
self.dbpath = uri.split('://',1)[1]
if self.dbpath[0] != '/':
if PYTHON_VERSION == 2:
if PYTHON_VERSION[0] == 2:
self.dbpath = pjoin(
self.folder.decode(path_encoding).encode('utf8'), self.dbpath)
else:
@@ -2312,7 +2321,7 @@ class SQLiteAdapter(BaseAdapter):
if not 'detect_types' in driver_args and do_connect:
driver_args['detect_types'] = self.driver.PARSE_DECLTYPES
def connector(dbpath=self.dbpath, driver_args=driver_args):
return self.driver.Connection(dbpath, **driver_args)
return self.driver.Connection(dbpath, **driver_args)
self.connector = connector
if do_connect: self.reconnect()
@@ -2321,7 +2330,7 @@ class SQLiteAdapter(BaseAdapter):
SQLiteAdapter.web2py_extract)
self.connection.create_function("REGEXP", 2,
SQLiteAdapter.web2py_regexp)
if self.adapter_args.get('foreign_keys',True):
self.execute('PRAGMA foreign_keys=ON;')
@@ -8141,7 +8150,7 @@ def index():
if not db_group:
del THREAD_LOCAL.db_instances[self._db_uid]
def executesql(self, query, placeholders=None, as_dict=False,
def executesql(self, query, placeholders=None, as_dict=False,
fields=None, colnames=None, as_ordered_dict=False):
"""
placeholders is optional and will always be None.
@@ -8214,7 +8223,7 @@ def index():
# convert the list for each row into a dictionary so it's
# easier to work with. row['field_name'] rather than row[0]
if as_ordered_dict:
_dict = collections.OrderedDict
_dict = OrderedDict
else:
_dict = dict
return [_dict(zip(fields,row)) for row in data]
@@ -8636,7 +8645,7 @@ class Table(object):
referees = pr.pop(self._tablename)
for referee in referees:
self._referenced_by.append(referee)
def _filter_fields(self, record, id=False):
return dict([(k, v) for (k, v) in record.iteritems() if k
@@ -9576,6 +9585,7 @@ class Field(Expression):
filter_out = None,
custom_qualifier = None,
map_none = None,
rname = None
):
self._db = self.db = None # both for backward compatibility
self.op = None
@@ -9622,6 +9632,7 @@ class Field(Expression):
self.label = label if label!=None else fieldname.replace('_',' ').title()
self.requires = requires if requires!=None else []
self.map_none = map_none
self._rname = rname
def set_attributes(self,*args,**attributes):
self.__dict__.update(*args,**attributes)
@@ -9788,7 +9799,7 @@ class Field(Expression):
return Expression(self.db, self.db._adapter.COUNT, self, distinct, 'integer')
def as_dict(self, flat=False, sanitize=True):
attrs = ("name", 'authorize', 'represent', 'ondelete',
attrs = ('name', 'authorize', 'represent', 'ondelete',
'custom_store', 'autodelete', 'custom_retrieve',
'filter_out', 'uploadseparate', 'widget', 'uploadfs',
'update', 'custom_delete', 'uploadfield', 'uploadfolder',
+401 -6
View File
@@ -559,7 +559,7 @@ class TestMigrations(unittest.TestCase):
class TestReference(unittest.TestCase):
def testRun(self):
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
if DEFAULT_URI.startswith('mssql'):
#multiple cascade gotcha
@@ -840,11 +840,27 @@ class TestValidateAndInsert(unittest.TestCase):
#cleanup table
db.val_and_insert.drop()
class TestSelectAsDict(unittest.TestCase):
def testSelect(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table(
'a_table',
Field('b_field'),
Field('a_field'),
)
db.a_table.insert(a_field="aa1", b_field="bb1")
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_dict=True)
self.assertEqual(rtn[0]['b_field'], 'bb1')
rtn = db.executesql("SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True)
self.assertEqual(rtn[0]['b_field'], 'bb1')
self.assertEqual(rtn[0].keys(), ['id', 'b_field', 'a_field'])
db.a_table.drop()
class TestRNameTable(unittest.TestCase):
#tests for highly experimental rname attribute
class TestRName(unittest.TestCase):
"""
tests for highly experimental rname attribute
"""
def testSelect(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated tablename'
@@ -997,8 +1013,387 @@ class TestRName(unittest.TestCase):
db.person.drop()
db.easy_name.drop()
def testJoin(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'this is table t1'
rname2 = db._adapter.QUOTE_TEMPLATE % 'this is table t2'
db.define_table('t1', Field('aa'), rname=rname)
db.define_table('t2', Field('aa'), Field('b', db.t1), rname=rname2)
i1 = db.t1.insert(aa='1')
i2 = db.t1.insert(aa='2')
i3 = db.t1.insert(aa='3')
db.t2.insert(aa='4', b=i1)
db.t2.insert(aa='5', b=i2)
db.t2.insert(aa='6', b=i2)
self.assertEqual(len(db(db.t1.id
== db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)), 3)
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t2.aa, '6')
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)), 4)
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa, groupby=db.t1.aa)),
3)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
1)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
2)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
0)
db.t2.drop()
db.t1.drop()
db.define_table('person',Field('name'), rname=rname)
id = db.person.insert(name="max")
self.assertEqual(id.name,'max')
db.define_table('dog',Field('name'),Field('ownerperson','reference person'), rname=rname2)
db.dog.insert(name='skipper',ownerperson=1)
row = db(db.person.id==db.dog.ownerperson).select().first()
self.assertEqual(row[db.person.name],'max')
self.assertEqual(row['person.name'],'max')
db.dog.drop()
self.assertEqual(len(db.person._referenced_by),0)
db.person.drop()
class TestRNameFields(unittest.TestCase):
# tests for highly experimental rname attribute
def testSelect(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname'
rname2 = db._adapter.QUOTE_TEMPLATE % 'rrating from 1 to 10'
db.define_table(
'easy_name',
Field('a_field', rname=rname),
Field('rating', 'integer', rname=rname2, default=2)
)
rtn = db.easy_name.insert(a_field='a')
self.assertEqual(rtn.id, 1)
rtn = db(db.easy_name.a_field == 'a').select()
self.assertEqual(len(rtn), 1)
self.assertEqual(rtn[0].id, 1)
self.assertEqual(rtn[0].a_field, 'a')
db.easy_name.insert(a_field='b')
rtn = db(db.easy_name.id > 0).delete()
self.assertEqual(rtn, 2)
rtn = db(db.easy_name.id > 0).count()
self.assertEqual(rtn, 0)
db.easy_name.insert(a_field='a')
db.easy_name.insert(a_field='b')
rtn = db(db.easy_name.id > 0).count()
self.assertEqual(rtn, 2)
rtn = db(db.easy_name.a_field == 'a').update(a_field='c')
rtn = db(db.easy_name.a_field == 'c').count()
self.assertEqual(rtn, 1)
rtn = db(db.easy_name.a_field != 'c').count()
self.assertEqual(rtn, 1)
avg = db.easy_name.id.avg()
rtn = db(db.easy_name.id > 0).select(avg)
self.assertEqual(rtn[0][avg], 3)
avg = db.easy_name.rating.avg()
rtn = db(db.easy_name.id > 0).select(avg)
self.assertEqual(rtn[0][avg], 2)
rname = db._adapter.QUOTE_TEMPLATE % 'this is the person name'
db.define_table(
'person',
Field('name', default="Michael", rname=rname),
Field('uuid')
)
rname = db._adapter.QUOTE_TEMPLATE % 'this is the pet name'
db.define_table(
'pet',
Field('friend','reference person'),
Field('name', rname=rname)
)
michael = db.person.insert() #default insert
john = db.person.insert(name='John')
luke = db.person.insert(name='Luke')
#michael owns Phippo
phippo = db.pet.insert(friend=michael, name="Phippo")
#john owns Dunstin and Gertie
dunstin = db.pet.insert(friend=john, name="Dunstin")
gertie = db.pet.insert(friend=john, name="Gertie")
rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id|db.pet.id)
self.assertEqual(len(rtn), 3)
self.assertEqual(rtn[0].person.id, michael)
self.assertEqual(rtn[0].person.name, 'Michael')
self.assertEqual(rtn[0].pet.id, phippo)
self.assertEqual(rtn[0].pet.name, 'Phippo')
self.assertEqual(rtn[1].person.id, john)
self.assertEqual(rtn[1].person.name, 'John')
self.assertEqual(rtn[1].pet.name, 'Dunstin')
self.assertEqual(rtn[2].pet.name, 'Gertie')
#fetch owners, eventually with pet
#main point is retrieving Luke with no pets
rtn = db(db.person.id > 0).select(
orderby=db.person.id|db.pet.id,
left=db.pet.on(db.person.id == db.pet.friend)
)
self.assertEqual(rtn[0].person.id, michael)
self.assertEqual(rtn[0].person.name, 'Michael')
self.assertEqual(rtn[0].pet.id, phippo)
self.assertEqual(rtn[0].pet.name, 'Phippo')
self.assertEqual(rtn[3].person.name, 'Luke')
self.assertEqual(rtn[3].person.id, luke)
self.assertEqual(rtn[3].pet.name, None)
#lets test a subquery
subq = db(db.pet.name == "Gertie")._select(db.pet.friend)
rtn = db(db.person.id.belongs(subq)).select()
self.assertEqual(rtn[0].id, 2)
self.assertEqual(rtn[0]('person.name'), 'John')
#as dict
rtn = db(db.person.id > 0).select().as_dict()
self.assertEqual(rtn[1]['name'], 'Michael')
#as list
rtn = db(db.person.id > 0).select().as_list()
self.assertEqual(rtn[0]['name'], 'Michael')
#isempty
rtn = db(db.person.id > 0).isempty()
self.assertEqual(rtn, False)
#join argument
rtn = db(db.person).select(orderby=db.person.id|db.pet.id,
join=db.pet.on(db.person.id==db.pet.friend))
self.assertEqual(len(rtn), 3)
self.assertEqual(rtn[0].person.id, michael)
self.assertEqual(rtn[0].person.name, 'Michael')
self.assertEqual(rtn[0].pet.id, phippo)
self.assertEqual(rtn[0].pet.name, 'Phippo')
self.assertEqual(rtn[1].person.id, john)
self.assertEqual(rtn[1].person.name, 'John')
self.assertEqual(rtn[1].pet.name, 'Dunstin')
self.assertEqual(rtn[2].pet.name, 'Gertie')
#aliases
rname = db._adapter.QUOTE_TEMPLATE % 'the cub name'
if DEFAULT_URI.startswith('mssql'):
#multiple cascade gotcha
for key in ['reference','reference FK']:
db._adapter.types[key]=db._adapter.types[key].replace(
'%(on_delete_action)s','NO ACTION')
db.define_table('pet_farm',
Field('name', rname=rname),
Field('father','reference pet_farm'),
Field('mother','reference pet_farm'),
)
minali = db.pet_farm.insert(name='Minali')
osbert = db.pet_farm.insert(name='Osbert')
#they had a cub
selina = db.pet_farm.insert(name='Selina', father=osbert, mother=minali)
father = db.pet_farm.with_alias('father')
mother = db.pet_farm.with_alias('mother')
#fetch pets with relatives
rtn = db().select(
db.pet_farm.name, father.name, mother.name,
left=[
father.on(father.id == db.pet_farm.father),
mother.on(mother.id == db.pet_farm.mother)
],
orderby=db.pet_farm.id
)
self.assertEqual(len(rtn), 3)
self.assertEqual(rtn[0].pet_farm.name, 'Minali')
self.assertEqual(rtn[0].father.name, None)
self.assertEqual(rtn[0].mother.name, None)
self.assertEqual(rtn[1].pet_farm.name, 'Osbert')
self.assertEqual(rtn[2].pet_farm.name, 'Selina')
self.assertEqual(rtn[2].father.name, 'Osbert')
self.assertEqual(rtn[2].mother.name, 'Minali')
#clean up
db.pet_farm.drop()
db.pet.drop()
db.person.drop()
db.easy_name.drop()
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname'
for ft in ['string', 'text', 'password', 'upload', 'blob']:
db.define_table('tt', Field('aa', ft, default='', rname=rname))
self.assertEqual(db.tt.insert(aa='x'), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, 'x')
db.tt.drop()
db.define_table('tt', Field('aa', 'integer', default=1, rname=rname))
self.assertEqual(db.tt.insert(aa=3), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, 3)
db.tt.drop()
db.define_table('tt', Field('aa', 'double', default=1, rname=rname))
self.assertEqual(db.tt.insert(aa=3.1), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1)
db.tt.drop()
db.define_table('tt', Field('aa', 'boolean', default=True, rname=rname))
self.assertEqual(db.tt.insert(aa=True), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, True)
db.tt.drop()
db.define_table('tt', Field('aa', 'json', default={}, rname=rname))
self.assertEqual(db.tt.insert(aa={}), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, {})
db.tt.drop()
db.define_table('tt', Field('aa', 'date',
default=datetime.date.today(), rname=rname))
t0 = datetime.date.today()
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
db.tt.drop()
db.define_table('tt', Field('aa', 'datetime',
default=datetime.datetime.today(), rname=rname))
t0 = datetime.datetime(
1971,
12,
21,
10,
30,
55,
0,
)
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
## Row APIs
row = db().select(db.tt.aa)[0]
self.assertEqual(db.tt[1].aa,t0)
self.assertEqual(db.tt['aa'],db.tt.aa)
self.assertEqual(db.tt(1).aa,t0)
self.assertTrue(db.tt(1,aa=None)==None)
self.assertFalse(db.tt(1,aa=t0)==None)
self.assertEqual(row.aa,t0)
self.assertEqual(row['aa'],t0)
self.assertEqual(row['tt.aa'],t0)
self.assertEqual(row('tt.aa'),t0)
## Lazy and Virtual fields
db.tt.b = Field.Virtual(lambda row: row.tt.aa)
db.tt.c = Field.Lazy(lambda row: row.tt.aa)
row = db().select(db.tt.aa)[0]
self.assertEqual(row.b,t0)
self.assertEqual(row.c(),t0)
db.tt.drop()
db.define_table('tt', Field('aa', 'time', default='11:30', rname=rname))
t0 = datetime.time(10, 30, 55)
self.assertEqual(db.tt.insert(aa=t0), 1)
self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
db.tt.drop()
def testInsert(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname'
db.define_table('tt', Field('aa', rname=rname))
self.assertEqual(db.tt.insert(aa='1'), 1)
self.assertEqual(db.tt.insert(aa='1'), 2)
self.assertEqual(db.tt.insert(aa='1'), 3)
self.assertEqual(db(db.tt.aa == '1').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3)
self.assertEqual(db(db.tt.aa == '2').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), False)
self.assertEqual(db(db.tt.aa == '2').delete(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
db.tt.drop()
def testJoin(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'this is field aa'
rname2 = db._adapter.QUOTE_TEMPLATE % 'this is field b'
db.define_table('t1', Field('aa', rname=rname))
db.define_table('t2', Field('aa', rname=rname), Field('b', db.t1, rname=rname2))
i1 = db.t1.insert(aa='1')
i2 = db.t1.insert(aa='2')
i3 = db.t1.insert(aa='3')
db.t2.insert(aa='4', b=i1)
db.t2.insert(aa='5', b=i2)
db.t2.insert(aa='6', b=i2)
self.assertEqual(len(db(db.t1.id
== db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)), 3)
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa
| db.t2.aa)[2].t2.aa, '6')
self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)), 4)
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t1.aa, '2')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[2].t2.aa, '6')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t1.aa, '3')
self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa | db.t2.aa)[3].t2.aa, None)
self.assertEqual(len(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa, groupby=db.t1.aa)),
3)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[0]._extra[db.t2.id.count()],
1)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[1]._extra[db.t2.id.count()],
2)
self.assertEqual(db().select(db.t1.aa, db.t2.id.count(),
left=db.t2.on(db.t1.id == db.t2.b),
orderby=db.t1.aa,
groupby=db.t1.aa)[2]._extra[db.t2.id.count()],
0)
db.t2.drop()
db.t1.drop()
db.define_table('person',Field('name', rname=rname))
id = db.person.insert(name="max")
self.assertEqual(id.name,'max')
db.define_table('dog',Field('name', rname=rname),Field('ownerperson','reference person', rname=rname2))
db.dog.insert(name='skipper',ownerperson=1)
row = db(db.person.id==db.dog.ownerperson).select().first()
self.assertEqual(row[db.person.name],'max')
self.assertEqual(row['person.name'],'max')
db.dog.drop()
self.assertEqual(len(db.person._referenced_by),0)
db.person.drop()
if __name__ == '__main__':
unittest.main()
tearDownModule()
tearDownModule()