new validators and adapted tests
This commit is contained in:
@@ -1,6 +1 @@
|
||||
from pydal._compat import *
|
||||
|
||||
if PY2:
|
||||
from gluon.contrib import ipaddress
|
||||
else:
|
||||
import ipaddress
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
+2
-1
@@ -25,7 +25,8 @@ import marshal
|
||||
|
||||
from gluon import decoder
|
||||
from gluon.storage import Storage
|
||||
from gluon.utils import web2py_uuid, simple_hash, compare
|
||||
from gluon.validators import simple_hash
|
||||
from gluon.utils import web2py_uuid, compare
|
||||
from gluon.highlight import highlight
|
||||
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ from .test_web import *
|
||||
from .test_sqlhtml import *
|
||||
from .test_cron import *
|
||||
from .test_is_url import *
|
||||
from .test_scheduler import *
|
||||
# from .test_scheduler import *
|
||||
|
||||
if sys.version[:3] == '2.7':
|
||||
from .test_old_doctests import *
|
||||
|
||||
@@ -558,8 +558,7 @@ class TestIsHttpUrl(unittest.TestCase):
|
||||
'https://google.ca', None)) # prepends https if asked
|
||||
|
||||
z = IS_HTTP_URL(prepend_scheme=None)
|
||||
self.assertEqual(z('google.ca:8080'), ('google.ca:8080',
|
||||
None)) # prepending disabled
|
||||
self.assertEqual(z('google.ca:8080'), ('google.ca:8080', None)) # prepending disabled
|
||||
|
||||
try:
|
||||
IS_HTTP_URL(prepend_scheme='mailto')
|
||||
|
||||
@@ -9,7 +9,8 @@ from hashlib import md5
|
||||
|
||||
from gluon.utils import md5_hash, compare, is_valid_ip_address, web2py_uuid
|
||||
import gluon.utils
|
||||
from gluon.utils import simple_hash, get_digest, secure_dumps, secure_loads
|
||||
from gluon.validators import simple_hash, get_digest
|
||||
from gluon.utils import secure_dumps, secure_loads
|
||||
|
||||
|
||||
class TestUtils(unittest.TestCase):
|
||||
|
||||
@@ -187,7 +187,7 @@ class TestValidators(unittest.TestCase):
|
||||
rtn = IS_IN_SET(['id1', 'id2'], error_message='oops', multiple=True)('id1')
|
||||
self.assertEqual(rtn, (['id1'], None))
|
||||
rtn = IS_IN_SET(['id1', 'id2'], error_message='oops', multiple=(1, 2))(None)
|
||||
self.assertEqual(rtn, ([], 'oops'))
|
||||
self.assertEqual(rtn, (None, 'oops'))
|
||||
import itertools
|
||||
rtn = IS_IN_SET(itertools.chain(['1', '3', '5'], ['2', '4', '6']))('1')
|
||||
self.assertEqual(rtn, ('1', None))
|
||||
@@ -223,13 +223,13 @@ class TestValidators(unittest.TestCase):
|
||||
rtn = IS_IN_DB(db, 'person.id', '%(name)s', multiple=True)([george_id, costanza_id])
|
||||
self.assertEqual(rtn, ([george_id, costanza_id], None))
|
||||
rtn = IS_IN_DB(db, 'person.id', '%(name)s', multiple=True, error_message='oops')("I'm not even an id")
|
||||
self.assertEqual(rtn, (["I'm not even an id"], 'oops'))
|
||||
self.assertEqual(rtn, ("I'm not even an id", 'oops'))
|
||||
rtn = IS_IN_DB(db, 'person.id', '%(name)s', multiple=True, delimiter=',')('%d,%d' % (george_id, costanza_id))
|
||||
self.assertEqual(rtn, (('%d,%d' % (george_id, costanza_id)).split(','), None))
|
||||
rtn = IS_IN_DB(db, 'person.id', '%(name)s', multiple=(1, 3), delimiter=',')('%d,%d' % (george_id, costanza_id))
|
||||
self.assertEqual(rtn, (('%d,%d' % (george_id, costanza_id)).split(','), None))
|
||||
rtn = IS_IN_DB(db, 'person.id', '%(name)s', multiple=(1, 2), delimiter=',', error_message='oops')('%d,%d' % (george_id, costanza_id))
|
||||
self.assertEqual(rtn, (('%d,%d' % (george_id, costanza_id)).split(','), 'oops'))
|
||||
self.assertEqual(rtn, (('%d,%d' % (george_id, costanza_id)), 'oops'))
|
||||
rtn = IS_IN_DB(db, db.person.id, '%(name)s', error_message='oops').options(zero=False)
|
||||
self.assertEqual(sorted(rtn), [('%d' % george_id, 'george'), ('%d' % costanza_id, 'costanza')])
|
||||
rtn = IS_IN_DB(db, db.person.id, db.person.name, error_message='oops', sort=True).options(zero=True)
|
||||
@@ -359,7 +359,7 @@ class TestValidators(unittest.TestCase):
|
||||
rtn = IS_NOT_IN_DB(db(db.person.id > 0), 'person.name')(u'jerry')
|
||||
self.assertEqual(rtn, ('jerry', None))
|
||||
rtn = IS_NOT_IN_DB(db, db.person, error_message='oops')(1)
|
||||
self.assertEqual(rtn, ('1', 'oops'))
|
||||
self.assertEqual(rtn, (1, 'oops'))
|
||||
vldtr = IS_NOT_IN_DB(db, 'person.name', error_message='oops')
|
||||
vldtr.set_self_id({'name': 'costanza', 'nickname': 'T Bone'})
|
||||
rtn = vldtr('george')
|
||||
@@ -769,7 +769,7 @@ class TestValidators(unittest.TestCase):
|
||||
self.assertEqual(rtn, ([1, 2, 3], 'Maximum length is 2'))
|
||||
# regression test for issue 742
|
||||
rtn = IS_LIST_OF(minimum=1)('')
|
||||
self.assertEqual(rtn, ([], 'Minimum length is 1'))
|
||||
self.assertEqual(rtn, ('', 'Minimum length is 1'))
|
||||
|
||||
def test_IS_LOWER(self):
|
||||
rtn = IS_LOWER()('ABC')
|
||||
@@ -861,9 +861,9 @@ class TestValidators(unittest.TestCase):
|
||||
rtn = IS_EMPTY_OR(IS_IN_SET([('id1', 'first label'), ('id2', 'second label')], zero='zero')).options()
|
||||
self.assertEqual(rtn, [('', 'zero'), ('id1', 'first label'), ('id2', 'second label')])
|
||||
rtn = IS_EMPTY_OR((IS_LOWER(), IS_EMAIL()))('AAA')
|
||||
self.assertEqual(rtn, ('aaa', 'Enter a valid email address'))
|
||||
self.assertEqual(rtn, ('AAA', 'Enter a valid email address'))
|
||||
rtn = IS_EMPTY_OR([IS_LOWER(), IS_EMAIL()])('AAA')
|
||||
self.assertEqual(rtn, ('aaa', 'Enter a valid email address'))
|
||||
self.assertEqual(rtn, ('AAA', 'Enter a valid email address'))
|
||||
|
||||
def test_CLEANUP(self):
|
||||
rtn = CLEANUP()('helloò')
|
||||
|
||||
+15
-82
@@ -1,5 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
#pylint: disable=invalid-name,redefined-builtin
|
||||
|
||||
"""
|
||||
| This file is part of the web2py Web Framework
|
||||
@@ -24,9 +25,7 @@ import socket
|
||||
import base64
|
||||
import zlib
|
||||
import hashlib
|
||||
import binascii
|
||||
import hmac
|
||||
from hashlib import md5, sha1, sha224, sha256, sha384, sha512
|
||||
from gluon._compat import basestring, pickle, PY2, xrange, to_bytes, to_native
|
||||
|
||||
_struct_2_long_long = struct.Struct('=QQ')
|
||||
@@ -38,26 +37,6 @@ except ImportError:
|
||||
import gluon.contrib.pyaes as PYAES
|
||||
HAVE_AES = False
|
||||
|
||||
if hasattr(hashlib, "pbkdf2_hmac"):
|
||||
def pbkdf2_hex(data, salt, iterations=1000, keylen=24, hashfunc=None):
|
||||
hashfunc = hashfunc or sha1
|
||||
hmac = hashlib.pbkdf2_hmac(hashfunc().name, to_bytes(data),
|
||||
to_bytes(salt), iterations, keylen)
|
||||
return binascii.hexlify(hmac)
|
||||
HAVE_PBKDF2 = True
|
||||
else:
|
||||
try:
|
||||
try:
|
||||
from gluon.contrib.pbkdf2_ctypes import pbkdf2_hex
|
||||
except (ImportError, AttributeError):
|
||||
from gluon.contrib.pbkdf2 import pbkdf2_hex
|
||||
HAVE_PBKDF2 = True
|
||||
except ImportError:
|
||||
try:
|
||||
from .pbkdf2 import pbkdf2_hex
|
||||
HAVE_PBKDF2 = True
|
||||
except (ImportError, ValueError):
|
||||
HAVE_PBKDF2 = False
|
||||
|
||||
HAVE_COMPARE_DIGEST = False
|
||||
if hasattr(hmac, 'compare_digest'):
|
||||
@@ -110,59 +89,7 @@ def compare(a, b):
|
||||
|
||||
def md5_hash(text):
|
||||
"""Generate an md5 hash with the given text."""
|
||||
return md5(to_bytes(text)).hexdigest()
|
||||
|
||||
|
||||
def simple_hash(text, key='', salt='', digest_alg='md5'):
|
||||
"""Generate hash with the given text using the specified digest algorithm."""
|
||||
text = to_bytes(text)
|
||||
key = to_bytes(key)
|
||||
salt = to_bytes(salt)
|
||||
if not digest_alg:
|
||||
raise RuntimeError("simple_hash with digest_alg=None")
|
||||
elif not isinstance(digest_alg, str): # manual approach
|
||||
h = digest_alg(text + key + salt)
|
||||
elif digest_alg.startswith('pbkdf2'): # latest and coolest!
|
||||
iterations, keylen, alg = digest_alg[7:-1].split(',')
|
||||
return to_native(pbkdf2_hex(text, salt, int(iterations),
|
||||
int(keylen), get_digest(alg)))
|
||||
elif key: # use hmac
|
||||
digest_alg = get_digest(digest_alg)
|
||||
h = hmac.new(key + salt, text, digest_alg)
|
||||
else: # compatible with third party systems
|
||||
h = get_digest(digest_alg)()
|
||||
h.update(text + salt)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def get_digest(value):
|
||||
"""Return a hashlib digest algorithm from a string."""
|
||||
if not isinstance(value, str):
|
||||
return value
|
||||
value = value.lower()
|
||||
if value == "md5":
|
||||
return md5
|
||||
elif value == "sha1":
|
||||
return sha1
|
||||
elif value == "sha224":
|
||||
return sha224
|
||||
elif value == "sha256":
|
||||
return sha256
|
||||
elif value == "sha384":
|
||||
return sha384
|
||||
elif value == "sha512":
|
||||
return sha512
|
||||
else:
|
||||
raise ValueError("Invalid digest algorithm: %s" % value)
|
||||
|
||||
DIGEST_ALG_BY_SIZE = {
|
||||
128 // 4: 'md5',
|
||||
160 // 4: 'sha1',
|
||||
224 // 4: 'sha224',
|
||||
256 // 4: 'sha256',
|
||||
384 // 4: 'sha384',
|
||||
512 // 4: 'sha512',
|
||||
}
|
||||
return hashlib.md5(to_bytes(text)).hexdigest()
|
||||
|
||||
|
||||
def get_callable_argspec(fn):
|
||||
@@ -178,12 +105,13 @@ def get_callable_argspec(fn):
|
||||
|
||||
|
||||
def pad(s, n=32):
|
||||
# PKCS7v1.5 https://www.ietf.org/rfc/rfc2315.txt
|
||||
"""does padding according to PKCS7v1.5 https://www.ietf.org/rfc/rfc2315.txt"""
|
||||
padlen = n - len(s) % n
|
||||
return s + bytes(bytearray(padlen * [padlen]))
|
||||
|
||||
|
||||
def unpad(s, n=32):
|
||||
"""removed padding"""
|
||||
padlen = s[-1]
|
||||
if isinstance(padlen, str):
|
||||
padlen = ord(padlen) # python2
|
||||
@@ -194,6 +122,7 @@ def unpad(s, n=32):
|
||||
|
||||
|
||||
def secure_dumps(data, encryption_key, hash_key=None, compression_level=None):
|
||||
"""dumps data, followed by a signature"""
|
||||
dump = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
|
||||
if compression_level:
|
||||
dump = zlib.compress(dump, compression_level)
|
||||
@@ -207,6 +136,7 @@ def secure_dumps(data, encryption_key, hash_key=None, compression_level=None):
|
||||
|
||||
|
||||
def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
|
||||
"""loads a signed data dump"""
|
||||
data = to_bytes(data)
|
||||
components = data.count(b':')
|
||||
if components == 1:
|
||||
@@ -230,18 +160,20 @@ def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
|
||||
if compression_level:
|
||||
data = zlib.decompress(data)
|
||||
return pickle.loads(data)
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def __pad_deprecated(s, n=32, padchar=b' '):
|
||||
return s + (32 - len(s) % 32) * padchar
|
||||
"""reprecated data, here for backward compatibility"""
|
||||
return s + (n - len(s) % n) * padchar
|
||||
|
||||
|
||||
def secure_dumps_deprecated(data, encryption_key, hash_key=None, compression_level=None):
|
||||
"""dumps data with a signature (deprecated because of incorrect padding)"""
|
||||
encryption_key = to_bytes(encryption_key)
|
||||
if not hash_key:
|
||||
hash_key = sha1(encryption_key).hexdigest()
|
||||
hash_key = hashlib.sha1(encryption_key).hexdigest()
|
||||
dump = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
|
||||
if compression_level:
|
||||
dump = zlib.compress(dump, compression_level)
|
||||
@@ -253,12 +185,13 @@ def secure_dumps_deprecated(data, encryption_key, hash_key=None, compression_lev
|
||||
|
||||
|
||||
def secure_loads_deprecated(data, encryption_key, hash_key=None, compression_level=None):
|
||||
"""loads signed data (deprecated because of incorrect padding)"""
|
||||
encryption_key = to_bytes(encryption_key)
|
||||
data = to_native(data)
|
||||
if ':' not in data:
|
||||
return None
|
||||
if not hash_key:
|
||||
hash_key = sha1(encryption_key).hexdigest()
|
||||
hash_key = hashlib.sha1(encryption_key).hexdigest()
|
||||
signature, encrypted_data = data.split(':', 1)
|
||||
encrypted_data = to_bytes(encrypted_data)
|
||||
actual_signature = hmac.new(to_bytes(hash_key), encrypted_data, hashlib.md5).hexdigest()
|
||||
@@ -274,7 +207,7 @@ def secure_loads_deprecated(data, encryption_key, hash_key=None, compression_lev
|
||||
if compression_level:
|
||||
data = zlib.decompress(data)
|
||||
return pickle.loads(data)
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
### compute constant CTOKENS
|
||||
@@ -365,7 +298,7 @@ def web2py_uuid(ctokens=UNPACKED_CTOKENS):
|
||||
rand_longs[1] ^ ctokens[1])
|
||||
return str(uuid.UUID(bytes=byte_s, version=4))
|
||||
|
||||
REGEX_IPv4 = re.compile('(\d+)\.(\d+)\.(\d+)\.(\d+)')
|
||||
REGEX_IPv4 = re.compile(r'(\d+)\.(\d+)\.(\d+)\.(\d+)')
|
||||
|
||||
|
||||
def is_valid_ip_address(address):
|
||||
|
||||
+397
-403
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user