fixed import_from_csv_field and new tests, thanks Jonathan

This commit is contained in:
mdipierro
2012-07-31 19:24:26 -05:00
parent ece313efe1
commit 41c3cdeebe
3 changed files with 67 additions and 23 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.00.0 (2012-07-31 15:06:22) dev
Version 2.00.0 (2012-07-31 19:24:21) dev
+19 -22
View File
@@ -7566,24 +7566,24 @@ class Table(dict):
id_map=None,
null='<NULL>',
unique='uuid',
id_offset={}, # id_offset only used when id_map is None
id_offset=None, # id_offset used only when id_map is None
*args, **kwargs
):
"""
import records from csv file. Column headers must have same names as
table fields. field 'id' is ignored. If column names read 'table.file'
the 'table.' prefix is ignored.
'unique' argument is a field which must be unique
(typically a uuid field)
'restore' argument is default False.
If set True will remove old values
in table first.
'id_map' If set to None will not map id.
Import records from csv file.
Column headers must have same names as table fields.
Field 'id' is ignored.
If column names read 'table.file' the 'table.' prefix is ignored.
'unique' argument is a field which must be unique
(typically a uuid field)
'restore' argument is default False;
if set True will remove old values in table first.
'id_map' ff set to None will not map ids.
The import will keep the id numbers in the restored table.
This assumes that there is an field of type id that
is integer and in incrementing order.
Will keep the id numbers in restored table.
"""
"""
delimiter = kwargs.get('delimiter', ',')
quotechar = kwargs.get('quotechar', '"')
@@ -7643,12 +7643,13 @@ class Table(dict):
return False
first = True
unique_idx = None
for line in reader:
if not line:
break
if not colnames:
colnames = [x.split('.',1)[-1] for x in line][:len(line)]
cols, cid = [], []
cols, cid = [], None
for i,colname in enumerate(colnames):
if is_id(colname):
cid = i
@@ -7659,10 +7660,10 @@ class Table(dict):
else:
items = [fix(self[colnames[i]], line[i], id_map, id_offset) \
for i in cols if colnames[i] in self.fields]
if not id_map and cid:
if not id_map and cid is not None and id_offset is not None and not unique_idx:
csv_id = int(line[cid])
curr_id = self.insert(**dict(items))
del_id = curr_id
if first:
first = False
# First curr_id is bigger than csv_id,
@@ -7672,17 +7673,13 @@ class Table(dict):
id_offset[self._tablename] = curr_id-csv_id
else:
id_offset[self._tablename] = 0
# create new id until we get the same as old_id
# create new id until we get the same as old_id+offset
while curr_id<csv_id+id_offset[self._tablename]:
self._db(self._db[self][colnames[cid]] == curr_id).delete()
curr_id = self.insert(**dict(items))
# remove ids that are not used
while del_id<csv_id:
query = self._db[self][colnames[cid]] == del_id
self._db(query).delete()
del_id += 1
# Validation. Check for duplicate of 'unique' &,
# if present, update instead of insert.
elif not unique or unique not in colnames:
elif not unique_idx:
new_id = self.insert(**dict(items))
else:
unique_value = line[unique_idx]
@@ -7693,7 +7690,7 @@ class Table(dict):
new_id = record[self._id.name]
else:
new_id = self.insert(**dict(items))
if id_map and cid != []:
if id_map and cid is not None:
id_map_self[int(line[cid])] = new_id
def with_alias(self, alias):
+47
View File
@@ -13,6 +13,7 @@ else:
import unittest
import datetime
import cStringIO
from dal import DAL, Field, Table, SQLALL
ALLOWED_DATATYPES = [
@@ -483,6 +484,52 @@ class TestVirtualFields(unittest.TestCase):
db.t.drop()
db.commit()
class TestImportExportFields(unittest.TestCase):
def testRun(self):
db = DAL('sqlite:memory:')
db.define_table('person', Field('name'))
db.define_table('pet',Field('friend',db.person),Field('name'))
for n in range(2):
db(db.pet).delete()
db(db.person).delete()
for k in range(10):
id = db.person.insert(name=str(k))
db.pet.insert(friend=id,name=str(k))
db.commit()
stream = cStringIO.StringIO()
db.export_to_csv_file(stream)
db(db.pet).delete()
db(db.person).delete()
stream = cStringIO.StringIO(stream.getvalue())
db.import_from_csv_file(stream)
assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==10
db.pet.drop()
db.person.drop()
db.commit()
class TestImportExportUuidFields(unittest.TestCase):
def testRun(self):
db = DAL('sqlite:memory:')
db.define_table('person', Field('name'),Field('uuid'))
db.define_table('pet',Field('friend',db.person),Field('name'))
for n in range(2):
db(db.pet).delete()
db(db.person).delete()
for k in range(10):
id = db.person.insert(name=str(k),uuid=str(k))
db.pet.insert(friend=id,name=str(k))
db.commit()
stream = cStringIO.StringIO()
db.export_to_csv_file(stream)
stream = cStringIO.StringIO(stream.getvalue())
db.import_from_csv_file(stream)
assert db(db.person).count()==10
assert db(db.person.id==db.pet.friend)(db.person.name==db.pet.name).count()==20
db.pet.drop()
db.person.drop()
db.commit()
if __name__ == '__main__':
unittest.main()