Add mongodb and imap tests for dal

Added mongodb uri string for tests
This commit is contained in:
Alan Etkin
2014-01-26 14:13:37 -03:00
parent 2967969add
commit 35d56ec3f0
4 changed files with 344 additions and 49 deletions

View File

@@ -11,6 +11,8 @@ env:
- DB=mysql://root:@localhost/test_w2p
- DB=postgres://postgres:@localhost/test_w2p
- DB=google:datastore
- DB=mongodb://mongodb:mongodb@localhost/test_w2p
- DB=imap://imap:imap@localhost:993
before_script:
- if [[ $TRAVIS_PYTHON_VERSION != '2.7' ]]; then pip install unittest2; fi
- if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install coverage; fi;
@@ -29,6 +31,9 @@ before_script:
- if [[ $DB == google* ]]; then unzip -q google_appengine_$GAERELEASE.zip; fi
- if [[ $DB == google* ]]; then mv -f ./google_appengine/google ./google; fi
- if [[ $DB == mongodb* ]]; then pip install pymongo; fi
- if [[ $DB == mongodb* ]]; then mongo test_w2p --eval 'db.addUser("mongodb", "mongodb");'; fi
#Temporal solution to travis issue #155
- sudo chmod 777 /dev/shm
- sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm
@@ -43,6 +48,7 @@ matrix:
- python: '2.6'
env: DB=google:datastore
script: export COVERAGE_PROCESS_START=gluon/tests/coverage.ini; ./web2py.py --run_system_tests --with_coverage
after_success:
- if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then coverage combine; fi
@@ -51,3 +57,5 @@ after_success:
notifications:
email: true
services: mongodb

View File

@@ -0,0 +1,255 @@
# -*- encoding: utf-8 -*-
from imaplib import ParseFlags
# mockimaplib: A very simple mock server module for imap client APIs
# Copyright (C) 2014 Alan Etkin <spametki@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or(at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/lgpl.html>
"""
mockimaplib allows you to test applications connecting to a dummy imap
service. For more details on the api subset implemented,
refer to the imaplib docs.
The client should configure a dictionary to map imap string queries to sets
of entries stored in a message dummy storage dictionary. The module includes
a small set of default message records (SPAM and MESSAGES), two mailboxes
(Draft and INBOX) and a list of query/resultset entries (RESULTS).
Usage:
>>> import mockimaplib
>>> connection = mockimaplib.IMAP4_SSL(<host>)
>>> connection.login(<user>, <password>)
None
>>> connection.select("INBOX")
("OK", ... <mailbox length>)
# fetch commands specifying single uid or message id
# will try to get messages recorded in SPAM
>>> connection.uid(...)
<search query or fetch result>
# returns a string list of matching message ids
>>> connection.search(<query>)
("OK", ... "1 2 ... n")
"""
MESSAGES = (
'MIME-Version: 1.0\r\nReceived: by 10.140.91.199 with HTTP; Mon, 27 Jan 2014 13:52:30 -0800 (PST)\r\nDate: Mon, 27 Jan 2014 19:52:30 -0200\r\nDelivered-To: nurse@example.com\r\nMessage-ID: <10101010101010010000010101010001010101001010010000001@mail.example.com>\r\nSubject: spam1\r\nFrom: Mr. Gumby <gumby@example.com>\r\nTo: The nurse <nurse@example.com>\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nNurse!\r\n\r\n\r\n',
'MIME-Version: 1.0\r\nReceived: by 10.140.91.199 with HTTP; Mon, 27 Jan 2014 13:52:47 -0800 (PST)\r\nDate: Mon, 27 Jan 2014 19:52:47 -0200\r\nDelivered-To: nurse@example.com\r\nMessage-ID: <101010101010100100000101010100010101010010100100000010@mail.example.com>\r\nSubject: spam2\r\nFrom: Mr. Gumby <gumby@example.com>\r\nTo: The nurse <nurse@example.com>\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nNurse, nurse!',
'MIME-Version: 1.0\r\nReceived: by 10.140.91.199 with HTTP; Mon, 27 Jan 2014 13:54:54 -0800 (PST)\r\nDate: Mon, 27 Jan 2014 19:54:54 -0200\r\nDelivered-To: nurse@example.com\r\nMessage-ID: <1010101010101001000001010101000101010100101001000000101@mail.example.com>\r\nSubject: spamalot1\r\nFrom: Mr. Gumby <gumby@example.com>\r\nTo: The nurse <nurse@example.com>\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nNurse!\r\n\r\n\r\n',
'MIME-Version: 1.0\r\n\r\nReceived: by 10.140.91.199 with HTTP; Mon, 27 Jan 2014 13:54:54 -0800 (PST)\r\nDate: Mon, 27 Jan 2014 19:54:54 -0200\r\nDelivered-To: nurse@example.com\r\nMessage-ID: <101010101010100100000101010100010101010010100100000010101@mail.example.com>\r\nSubject: spamalot2\r\nFrom: Mr. Gumby <gumby@example.com>\r\nTo: The nurse <nurse@example.com>\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nNurse! ... Nurse! ... Nurse!\r\n\r\n\r\n')
SPAM = {
"INBOX": [
{"uid": "483209",
"headers": MESSAGES[0],
"complete": MESSAGES[0],
"flags": ""},
{"uid": "483211",
"headers": MESSAGES[1],
"complete": MESSAGES[1],
"flags": ""},
{"uid": "483225",
"headers": MESSAGES[2],
"complete": MESSAGES[2],
"flags": ""}],
"Draft":[
{"uid": "483432",
"headers": MESSAGES[3],
"complete": MESSAGES[3],
"flags": ""},]
}
RESULTS = {
# <query string>: [<str uid> | <long id>, ...]
"INBOX": {
"(ALL)": (1, 2, 3),
"(1:3)": (1, 2, 3)},
"Draft": {
"(1:1)": (1,)},
}
class Connection(object):
"""Dummy connection object for the imap client.
By default, uses the module SPAM and RESULT
sets (use Connection.setup for custom values)"""
def login(self, user, password):
pass
def __init__(self):
self._readonly = False
self._mailbox = None
self.setup()
def list(self):
return ('OK', ['(\\HasNoChildren) "/" "%s"' % key for key in self.spam])
def select(self, tablename, readonly=False):
self._readonly = readonly
"""args: mailbox, boolean
result[1][0] -> int last message id / mailbox lenght
result[0] = 'OK'
"""
self._mailbox = tablename
return ('OK', (len(SPAM[self._mailbox]), None))
def uid(self, command, uid, arg):
""" args:
command: "search" | "fetch"
uid: None | uid
parts: "(ALL)" | "(RFC822 FLAGS)" | "(RFC822.HEADER FLAGS)"
"search", None, "(ALL)" -> ("OK", ("uid_1 uid_2 ... uid_<mailbox length>", None))
"search", None, "<query>" -> ("OK", ("uid_1 uid_2 ... uid_n", None))
"fetch", uid, parts -> ("OK", (("<id> ...", "<raw message as specified in parts>"), "<flags>")
[0] [1][0][0] [1][0][1] [1][1]
"""
if command == "search":
return self._search(arg)
elif command == "fetch":
return self._fetch(uid, arg)
def _search(self, query):
return ("OK", (" ".join([str(item["uid"]) for item in self._get_messages(query)]), None))
def _fetch(self, value, arg):
try:
message = self.spam[self._mailbox][value - 1]
message_id = value
except TypeError:
for x, item in enumerate(self.spam[self._mailbox]):
if item["uid"] == value:
message = item
message_id = x + 1
break
parts = "headers"
if arg in ("(ALL)", "(RFC822 FLAGS)"):
parts = "complete"
return ("OK", (("%s " % message_id, message[parts]), message["flags"]))
def _get_messages(self, query):
if query.strip().isdigit():
return [self.spam[self._mailbox][int(query.strip()) - 1],]
elif query[1:-1].strip().isdigit():
return [self.spam[self._mailbox][int(query[1:-1].strip()) -1],]
elif query[1:-1].replace("UID", "").strip().isdigit():
for item in self.spam[self._mailbox]:
if item["uid"] == query[1:-1].replace("UID", "").strip():
return [item,]
messages = []
try:
for m in self.results[self._mailbox][query]:
try:
self.spam[self._mailbox][m - 1]["id"] = m
messages.append(self.spam[self._mailbox][m - 1])
except TypeError:
for x, item in enumerate(self.spam[self._mailbox]):
if item["uid"] == m:
item["id"] = x + 1
messages.append(item)
break
except IndexError:
# message removed
pass
return messages
except KeyError:
raise ValueError("The client issued an unexpected query: %s" % query)
def setup(self, spam={}, results={}):
"""adds custom message and query databases or sets
the values to the module defaults.
"""
self.spam = spam
self.results = results
if not spam:
for key in SPAM:
self.spam[key] = []
for d in SPAM[key]:
self.spam[key].append(d.copy())
if not results:
for key in RESULTS:
self.results[key] = RESULTS[key].copy()
def search(self, first, query):
""" args:
first: None
query: string with mailbox query (flags, date, uid, id, ...)
example: '2:15723 BEFORE 27-Jan-2014 FROM "gumby"'
result[1][0] -> "id_1 id_2 ... id_n"
"""
messages = self._get_messages(query)
ids = " ".join([str(item["id"]) for item in messages])
return ("OK", (ids, None))
def append(self, mailbox, flags, struct_time, message):
"""
result, data = self.connection.append(mailbox, flags, struct_time, message)
if result == "OK":
uid = int(re.findall("\d+", str(data))[-1])
"""
last = self.spam[mailbox][-1]
try:
uid = int(last["uid"]) +1
except ValueError:
alluids = []
for _mailbox in self.spam.keys():
for item in self.spam[_mailbox]:
try:
alluids.append(int(item["uid"]))
except:
pass
if len(alluids) > 0:
uid = max(alluids) + 1
else:
uid = 1
flags = "FLAGS " + flags
item = {"uid": str(uid), "headers": message, "complete": message, "flags": flags}
self.spam[mailbox].append(item)
return ("OK", "spam spam %s spam" % uid)
def store(self, *args):
"""
implements some flag commands
args: ("<id>", "<+|->FLAGS", "(\\Flag1 \\Flag2 ... \\Flagn)")
"""
message = self.spam[self._mailbox][int(args[0] - 1)]
old_flags = ParseFlags(message["flags"])
flags = ParseFlags("FLAGS" + args[2])
if args[1].strip().startswith("+"):
message["flags"] = "FLAGS (%s)" % " ".join(set(flags + old_flags))
elif args[1].strip().startswith("-"):
message["flags"] = "FLAGS (%s)" % " ".join([flag for flag in old_flags if not flag in flags])
def expunge(self):
"""implements removal of deleted flag messages"""
for x, item in enumerate(self.spam[self._mailbox]):
if "\\Deleted" in item["flags"]:
self.spam[self._mailbox].pop(x)
class IMAP4(object):
""">>> connection = IMAP4() # creates the dummy imap4 client object"""
def __new__(self, *args, **kwargs):
# args: (server, port)
return Connection()
IMAP4_SSL = IMAP4

View File

@@ -3,7 +3,9 @@ import os, sys
from test_http import *
from test_cache import *
if "google" in (os.getenv("DB") or []):
NOSQL = any([name in (os.getenv("DB") or "")
for name in ("datastore", "mongodb", "imap")])
if NOSQL:
from test_dal_nosql import *
else:
from test_dal import *

View File

@@ -48,27 +48,36 @@ def fix_sys_path():
fix_sys_path()
from dal import DAL, Field, Table, SQLALL
#for travis-ci
DEFAULT_URI = os.environ.get('DB', 'sqlite:memory')
print 'Testing against %s engine (%s)' % (DEFAULT_URI.partition(':')[0], DEFAULT_URI)
IS_GAE = "datastore" in DEFAULT_URI
IS_MONGODB = "mongodb" in DEFAULT_URI
IS_IMAP = "imap" in DEFAULT_URI
if IS_IMAP:
from dal import IMAPAdapter
from contrib import mockimaplib
IMAPAdapter.driver = mockimaplib
from dal import DAL, Field, Table, SQLALL
def drop(table, cascade=None):
if not IS_GAE:
if cascade:
table.drop(cascade)
else:
table.drop()
else:
# mongodb implements drop()
# although it seems it does not work properly
if (IS_GAE or IS_MONGODB or IS_IMAP):
# GAE drop/cleanup is not implemented
db = table._db
db(table).delete()
del db[table._tablename]
del db.tables[db.tables.index(table._tablename)]
db._remove_references_to(table)
else:
if cascade:
table.drop(cascade)
else:
table.drop()
# setup GAE dummy database
@@ -104,7 +113,7 @@ def tearDownModule():
for a in glob.glob('*.table'):
os.unlink(a)
@unittest.skipIf(IS_GAE, 'TODO: Datastore throws "AssertionError: SyntaxError not raised"')
@unittest.skipIf(IS_GAE or IS_IMAP, 'TODO: Datastore throws "AssertionError: SyntaxError not raised"')
class TestFields(unittest.TestCase):
def testFieldName(self):
@@ -157,7 +166,7 @@ class TestFields(unittest.TestCase):
else:
isinstance(f.formatter(datetime.datetime.now()), str)
@unittest.skipIf(IS_GAE, 'TODO: Datastore does accept dict objects as json field input')
@unittest.skipIf(IS_GAE or IS_MONGODB, 'TODO: Datastore does accept dict objects as json field input. MongoDB assertion error Binary("x", 0) != "x"')
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
for ft in ['string', 'text', 'password', 'upload', 'blob']:
@@ -228,7 +237,7 @@ class TestFields(unittest.TestCase):
drop(db.tt)
@unittest.skipIf(IS_GAE, 'TODO: Datastore throws "AssertionError: SyntaxError not raised"')
@unittest.skipIf(IS_GAE or IS_IMAP, 'TODO: Datastore throws "AssertionError: SyntaxError not raised"')
class TestTables(unittest.TestCase):
def testTableNames(self):
@@ -247,7 +256,7 @@ class TestTables(unittest.TestCase):
self.assert_(Table(None, 'a_bc'),
"Table isn't allowing underscores in tablename. It should.")
@unittest.skipIf(IS_IMAP, "Skip IMAP")
class TestAll(unittest.TestCase):
def setUp(self):
@@ -257,7 +266,7 @@ class TestAll(unittest.TestCase):
ans = 'PseudoTable.id, PseudoTable.name, PseudoTable.birthdate'
self.assertEqual(str(SQLALL(self.pt)), ans)
@unittest.skipIf(IS_IMAP, "Skip IMAP")
class TestTable(unittest.TestCase):
def testTableCreation(self):
@@ -280,7 +289,7 @@ class TestTable(unittest.TestCase):
self.assert_('persons.firstname, persons.lastname'
in str(persons.ALL))
@unittest.skipIf(IS_GAE, "No table alias on GAE")
@unittest.skipIf(IS_GAE or IS_MONGODB, "No table alias for this backend")
def testTableAlias(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
persons = Table(db, 'persons', Field('firstname',
@@ -303,24 +312,35 @@ class TestTable(unittest.TestCase):
class TestInsert(unittest.TestCase):
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa'))
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
self.assertEqual(db(db.tt.aa == '1').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3)
self.assertEqual(db(db.tt.aa == '2').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), False)
self.assertEqual(db(db.tt.aa == '2').delete(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
drop(db.tt)
if IS_IMAP:
imap = DAL(DEFAULT_URI)
imap.define_tables()
self.assertEqual(imap.Draft.insert(to="nurse@example.com",
subject="Nurse!",
sender="gumby@example.com",
content="Nurse!\r\nNurse!"), 2)
self.assertEqual(imap.Draft[2].subject, "Nurse!")
self.assertEqual(imap.Draft[2].sender, "gumby@example.com")
self.assertEqual(isinstance(imap.Draft[2].uid, long), True)
self.assertEqual(imap.Draft[2].content[0]["text"], "Nurse!\r\nNurse!")
else:
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa'))
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
self.assertEqual(db(db.tt.aa == '1').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
self.assertEqual(db(db.tt.aa == '1').update(aa='2'), 3)
self.assertEqual(db(db.tt.aa == '2').count(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), False)
self.assertEqual(db(db.tt.aa == '2').delete(), 3)
self.assertEqual(db(db.tt.aa == '2').isempty(), True)
drop(db.tt)
@unittest.skipIf(IS_GAE, 'TODO: Datastore throws "SyntaxError: Not supported (query using or)"')
@unittest.skipIf(IS_GAE or IS_MONGODB or IS_IMAP, 'TODO: Datastore throws "SyntaxError: Not supported (query using or)". MongoDB assertionerror 5L != 3')
class TestSelect(unittest.TestCase):
def testRun(self):
@@ -351,7 +371,7 @@ class TestSelect(unittest.TestCase):
self.assertEqual(db(~(db.tt.aa > '1') & (db.tt.aa > '2')).count(), 0)
drop(db.tt)
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestAddMethod(unittest.TestCase):
def testRun(self):
@@ -366,7 +386,7 @@ class TestAddMethod(unittest.TestCase):
self.assertEqual(len(db.tt.all()), 3)
drop(db.tt)
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestBelongs(unittest.TestCase):
def testRun(self):
@@ -378,7 +398,7 @@ class TestBelongs(unittest.TestCase):
self.assertEqual(isinstance(db.tt.insert(aa='3'), long), True)
self.assertEqual(db(db.tt.aa.belongs(('1', '3'))).count(),
2)
if not (IS_GAE):
if not (IS_GAE or IS_MONGODB):
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.id > 2)._select(db.tt.aa))).count(), 1)
self.assertEqual(db(db.tt.aa.belongs(db(db.tt.aa.belongs(('1',
@@ -388,12 +408,13 @@ class TestBelongs(unittest.TestCase):
db.tt.aa))).count(),
2)
else:
print "Datastore belongs does not accept queries (skipping)"
print "Datastore/Mongodb belongs does not accept queries (skipping)"
drop(db.tt)
@unittest.skipIf(IS_GAE, "Contains not supported on GAE Datastore")
@unittest.skipIf(IS_GAE or IS_IMAP, "Contains not supported on GAE Datastore. TODO: IMAP tests")
class TestContains(unittest.TestCase):
@unittest.skipIf(IS_MONGODB, "TODO: MongoDB Contains error")
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa', 'list:string'), Field('bb','string'))
@@ -410,7 +431,7 @@ class TestContains(unittest.TestCase):
drop(db.tt)
@unittest.skipIf(IS_GAE, "Like not supported on GAE Datastore")
@unittest.skipIf(IS_GAE or IS_MONGODB or IS_IMAP, "Like not supported on GAE Datastore. TODO: IMAP test")
class TestLike(unittest.TestCase):
def testRun(self):
@@ -436,7 +457,7 @@ class TestLike(unittest.TestCase):
self.assertEqual(db(db.tt.aa.like('2%')).count(), 0)
drop(db.tt)
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestDatetime(unittest.TestCase):
def testRun(self):
@@ -453,7 +474,7 @@ class TestDatetime(unittest.TestCase):
self.assertEqual(db(db.tt.aa >= datetime.datetime(1971, 1, 1)).count(), 2)
drop(db.tt)
@unittest.skipIf(IS_GAE, "Expressions not supported in GAE Datastore")
@unittest.skipIf(IS_GAE or IS_MONGODB or IS_IMAP, "Expressions are not supported")
class TestExpressions(unittest.TestCase):
def testRun(self):
@@ -541,7 +562,7 @@ class TestJoin(unittest.TestCase):
drop(db.person)
class TestMinMaxSumAvg(unittest.TestCase):
@unittest.skipIf(IS_GAE, 'TODO: Datastore throws "AttributeError: Row object has no attribute _extra"')
@unittest.skipIf(IS_GAE or IS_MONGODB or IS_IMAP, 'TODO: Datastore throws "AttributeError: Row object has no attribute _extra"')
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa', 'integer'))
@@ -562,7 +583,7 @@ class TestMinMaxSumAvg(unittest.TestCase):
self.assertEqual(db().select(s).first()[s], 2)
drop(db.tt)
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestCache(unittest.TestCase):
def testRun(self):
from cache import CacheInRam
@@ -581,7 +602,7 @@ class TestCache(unittest.TestCase):
self.assertEqual(len(r0),len(r4))
drop(db.tt)
@unittest.skipIf(IS_IMAP, "Skip IMAP")
class TestMigrations(unittest.TestCase):
def testRun(self):
@@ -612,7 +633,7 @@ class TestMigrations(unittest.TestCase):
os.unlink('.storage.table')
class TestReference(unittest.TestCase):
@unittest.skipIf(IS_MONGODB or IS_IMAP, "TODO: MongoDB assertion error (long object has no attribute id)")
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
if DEFAULT_URI.startswith('mssql'):
@@ -636,6 +657,7 @@ class TestReference(unittest.TestCase):
drop(db.tt)
db.commit()
@unittest.skipIf(IS_IMAP, "Skip IMAP")
class TestClientLevelOps(unittest.TestCase):
def testRun(self):
@@ -658,7 +680,7 @@ class TestClientLevelOps(unittest.TestCase):
drop(db.tt)
db.commit()
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestVirtualFields(unittest.TestCase):
def testRun(self):
@@ -673,6 +695,7 @@ class TestVirtualFields(unittest.TestCase):
drop(db.tt)
db.commit()
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestComputedFields(unittest.TestCase):
def testRun(self):
@@ -699,9 +722,10 @@ class TestComputedFields(unittest.TestCase):
drop(db.tt)
db.commit()
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestCommonFilters(unittest.TestCase):
@unittest.skipIf(IS_MONGODB, "TODO: MongoDB Assertion error")
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('t1', Field('aa'))
@@ -727,6 +751,7 @@ class TestCommonFilters(unittest.TestCase):
# drop(db.t2)
drop(db.t1)
@unittest.skipIf(IS_IMAP, "Skip IMAP test")
class TestImportExportFields(unittest.TestCase):
def testRun(self):
@@ -752,6 +777,7 @@ class TestImportExportFields(unittest.TestCase):
drop(db.person)
db.commit()
@unittest.skipIf(IS_IMAP, "Skip IMAP test")
class TestImportExportUuidFields(unittest.TestCase):
def testRun(self):
@@ -777,7 +803,7 @@ class TestImportExportUuidFields(unittest.TestCase):
drop(db.person)
db.commit()
@unittest.skipIf(IS_IMAP, "Skip IMAP test")
class TestDALDictImportExport(unittest.TestCase):
def testRun(self):
@@ -873,7 +899,7 @@ class TestDALDictImportExport(unittest.TestCase):
drop(db6.tvshow)
db6.commit()
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestValidateAndInsert(unittest.TestCase):
def testRun(self):
@@ -898,6 +924,7 @@ class TestValidateAndInsert(unittest.TestCase):
#cleanup table
drop(db.val_and_insert)
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestSelectAsDict(unittest.TestCase):
def testSelect(self):
@@ -916,9 +943,10 @@ class TestSelectAsDict(unittest.TestCase):
drop(db.a_table)
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestRNameTable(unittest.TestCase):
#tests for highly experimental rname attribute
@unittest.skipIf(IS_MONGODB, "TODO: MongoDB assertion error (long object has no attribute id)")
def testSelect(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated tablename'
@@ -1010,9 +1038,10 @@ class TestRNameTable(unittest.TestCase):
self.assertEqual(len(db.person._referenced_by),0)
drop(db.person)
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestRNameFields(unittest.TestCase):
# tests for highly experimental rname attribute
@unittest.skipIf(IS_GAE, 'TODO: Datastore throws unsupported error for AGGREGATE')
@unittest.skipIf(IS_GAE or IS_MONGODB, 'TODO: Datastore throws unsupported error for AGGREGATE. MongoDB assertion error (long object has no attribute id)')
def testSelect(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname'
@@ -1128,7 +1157,7 @@ class TestRNameFields(unittest.TestCase):
drop(db.person)
drop(db.easy_name)
@unittest.skipIf(IS_GAE, 'TODO: Datastore does not accept dict objects as json field input')
@unittest.skipIf(IS_GAE or IS_MONGODB, 'TODO: Datastore does not accept dict objects as json field input. MongoDB assertionerror Binary("x", 0) != "x"')
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
rname = db._adapter.QUOTE_TEMPLATE % 'a very complicated fieldname'
@@ -1284,6 +1313,7 @@ class TestRNameFields(unittest.TestCase):
self.assertEqual(len(db.person._referenced_by),0)
drop(db.person)
@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
class TestQuoting(unittest.TestCase):
# tests for case sensitivity