allow a dict as uri, issue 1316, thanks Alan

This commit is contained in:
mdipierro
2013-02-13 09:36:07 -06:00
parent 098da4124f
commit 7ef19ecf47
3 changed files with 162 additions and 20 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.4.1-alpha.2+timestamp.2013.02.13.09.30.42
Version 2.4.1-alpha.2+timestamp.2013.02.13.09.35.21
+71 -19
View File
@@ -7005,6 +7005,13 @@ class DAL(object):
db = DAL('sqlite://test.db')
db.define_table('tablename', Field('fieldname1'),
Field('fieldname2'))
(experimental)
you can pass a dict object as uri with the uri string
and table/field definitions. For an example of valid data check
the output of:
>>> db.as_dict(flat=True, sanitize=False)
"""
def __new__(cls, uri='sqlite://dummy.db', *args, **kwargs):
@@ -7109,7 +7116,7 @@ class DAL(object):
db._adapter.commit_prepared(keys[i])
return
def __init__(self, uri='sqlite://dummy.db',
def __init__(self, uri=None,
pool_size=0, folder=None,
db_codec='UTF-8', check_reserved=None,
migrate=True, fake_migrate=False,
@@ -7145,16 +7152,26 @@ class DAL(object):
:attempts (defaults to 5). Number of times to attempt connecting
"""
dbdict = None
DEFAULT_URI = 'sqlite://dummy.db'
items = None
if isinstance(uri, dict):
if "items" in uri:
items = uri.pop("items")
try:
newuri = uri.pop("uri")
except KeyError:
newuri = DEFAULT_URI
locals().update(uri)
uri = newuri
elif not uri:
uri = DEFAULT_URI
if uri == '<zombie>' and db_uid is not None: return
elif isinstance(uri, dict):
dbdict = uri
uri = dbdict["uri"]
codec = dbdict["codec"] or codec
if not decode_credentials:
credential_decoder = lambda cred: cred
else:
credential_decoder = lambda cred: urllib.unquote(cred)
self._folder = folder
if folder:
self.set_folder(folder)
self._uri = uri
@@ -7172,6 +7189,13 @@ class DAL(object):
self._LAZY_TABLES = {}
self._lazy_tables = lazy_tables
self._tables = SQLCallableList()
self._driver_args = driver_args
self._adapter_args = adapter_args
self._check_reserved = check_reserved
self._decode_credentials = decode_credentials
self._attempts = attempts
self._do_connect = do_connect
if not str(attempts).isdigit() or attempts < 0:
attempts = 5
if uri:
@@ -7232,9 +7256,9 @@ class DAL(object):
self._fake_migrate = fake_migrate
self._migrate_enabled = migrate_enabled
self._fake_migrate_all = fake_migrate_all
if auto_import or dbdict:
if auto_import or items:
self.import_table_definitions(adapter.folder,
items=dbdict["items"])
items=items)
@property
def tables(self):
@@ -7247,10 +7271,16 @@ class DAL(object):
for tablename, table in items.iteritems():
# TODO: read all field/table options
fields = []
for fieldname, field in table["items"].iteritems():
type = field["type"]
fields.append(Field(fieldname, type))
self.define_table(tablename, *fields)
# remove unsupported/illegal Table arguments
[table.pop(name) for name in ("name", "fields") if
name in table]
if "items" in table:
for fieldname, field in table.pop("items").iteritems():
# remove unsupported/illegal Field arguments
[field.pop(key) for key in ("requires", "name",
"compute", "colname") if key in field]
fields.append(Field(str(fieldname), **field))
self.define_table(str(tablename), *fields, **table)
else:
for filename in glob.glob(pattern):
tfile = self._adapter.file_open(filename, 'r')
@@ -7611,12 +7641,20 @@ def index():
return table
def as_dict(self, flat=False, sanitize=True):
dbname = codec = uid = uri = None
dbname = db_uid = uri = None
if not sanitize:
uri, dbname, codec, uid = (self._uri, self._dbname,
self._db_codec, self._db_uid)
uri, dbname, db_uid = (self._uri, self._dbname, self._db_uid)
db_as_dict = dict(items={}, tables=[], uri=uri, dbname=dbname,
codec=codec, uid=uid)
db_uid=db_uid,
**dict([(k, getattr(self, "_" + k)) for
k in 'pool_size','folder','db_codec',
'check_reserved','migrate','fake_migrate',
'migrate_enabled','fake_migrate_all',
'decode_credentials','driver_args',
'adapter_args', 'attempts',
'bigint_id','debug','lazy_tables',
'do_connect']))
for table in self:
tablename = str(table)
db_as_dict["tables"].append(tablename)
@@ -8533,7 +8571,12 @@ class Table(object):
def as_dict(self, flat=False, sanitize=True):
tablename = str(self)
table_as_dict = dict(name=tablename, items={}, fields=[])
table_as_dict = dict(name=tablename, items={}, fields=[],
sequence_name=self._sequence_name,
trigger_name=self._trigger_name,
common_filter=self._common_filter, format=self._format,
singular=self._singular, plural=self._plural)
for field in self:
if (field.readable or field.writable) or (not sanitize):
table_as_dict["fields"].append(field.name)
@@ -9213,8 +9256,17 @@ class Field(Expression):
return Expression(self.db, self.db._adapter.COUNT, self, distinct, 'integer')
def as_dict(self, flat=False, sanitize=True):
attrs = ("readable", "writable", "label", "default", "name",
"type", "represent", "compute")
attrs = ('type', 'length', 'default', 'required',
'ondelete', 'notnull', 'unique', 'uploadfield',
'widget', 'label', 'comment', 'writable', 'readable',
'update', 'authorize', 'autodelete', 'represent',
'uploadfolder', 'uploadseparate', 'uploadfs',
'compute', 'custom_store', 'custom_retrieve',
'custom_retrieve_file_properties', 'custom_delete',
'filter_in', 'filter_out', 'custom_qualifier',
'map_none', 'name')
SERIALIZABLE_TYPES = (int, long, basestring, dict, list,
float, tuple, bool, type(None))
def flatten(obj):
+90
View File
@@ -597,6 +597,96 @@ class TestImportExportUuidFields(unittest.TestCase):
db.person.drop()
db.commit()
class TestDALDictImportExport(unittest.TestCase):
def testRun(self):
db = DAL('sqlite:memory:')
db.define_table('person', Field('name', default="Michael"),Field('uuid'))
db.define_table('pet',Field('friend',db.person),Field('name'))
dbdict = db.as_dict(flat=True, sanitize=False)
assert isinstance(dbdict, dict)
uri = dbdict["uri"]
assert isinstance(uri, basestring) and uri
assert len(dbdict["items"]) == 2
assert len(dbdict["items"]["person"]["items"]) == 3
assert dbdict["items"]["person"]["items"]["name"]["type"] == db.person.name.type
assert dbdict["items"]["person"]["items"]["name"]["default"] == db.person.name.default
assert dbdict
db2 = DAL(dbdict)
assert len(db.tables) == len(db2.tables)
assert hasattr(db2, "pet") and isinstance(db2.pet, Table)
assert hasattr(db2.pet, "friend") and isinstance(db2.pet.friend, Field)
have_serializers = True
try:
import serializers
dbjson = db.as_json(sanitize=False)
assert isinstance(dbjson, basestring) and len(dbjson) > 0
db3 = DAL(serializers.loads_json(dbjson))
assert hasattr(db3, "person") and hasattr(db3.person, "uuid") and\
db3.person.uuid.type == db.person.uuid.type
db3.pet.drop()
db3.person.drop()
db3.commit()
except ImportError:
pass
mpfc = "Monty Python's Flying Circus"
dbdict4 = {"uri": 'sqlite:memory:',
"items":{"staff":{"items": {"name":
{"default":"Michael"},
"food":
{"default":"Spam"}}},
"show":{"items": {"name":
{"default":mpfc},
"rating":
{"type":"double"}}}}}
db4 = DAL(dbdict4)
assert "staff" in db4.tables
assert "name" in db4.staff
assert db4.show.rating.type == "double"
assert db4.show.insert() is not None
assert db4(db4.show).select().first().id == 1
assert db4(db4.show).select().first().name == mpfc
dbdict5 = {"uri": 'sqlite:memory:'}
db5 = DAL(dbdict5)
assert db5.tables in ([], None)
dbdict6 = {"items":{"staff":{},
"show":{"items": {"name": {},
"rating":
{"type":"double"}}}}}
db6 = DAL(dbdict6)
assert len(db6["staff"].fields) == 1
assert "name" in db6["show"].fields
# the following would fail (see issue 1332)
# assert db6.staff.insert() is not None
# assert db6(db6.staff).select().first().id == 1
dbdict7 = {}
db7 = DAL(dbdict7)
db7.tables() in (None, [])
assert not str(db7) in ("", None)
db6.staff.drop()
db6.show.drop()
db6.commit()
db4.staff.drop()
db4.show.drop()
db4.commit()
db2.pet.drop()
db2.person.drop()
db2.commit()
db.pet.drop()
db.person.drop()
db.commit()
if __name__ == '__main__':
unittest.main()
tearDownModule()