fix utils
This commit is contained in:
@@ -15,13 +15,13 @@ from .test_routes import *
|
||||
from .test_router import *
|
||||
from .test_validators import *
|
||||
from .test_tools import *
|
||||
from .test_utils import *
|
||||
|
||||
if sys.version[:3] == '2.7':
|
||||
from .test_compileapp import *
|
||||
from .test_is_url import *
|
||||
from .test_languages import *
|
||||
from .test_serializers import *
|
||||
from .test_utils import *
|
||||
|
||||
from .test_appadmin import *
|
||||
from .test_scheduler import *
|
||||
from .test_web import *
|
||||
|
||||
@@ -12,7 +12,7 @@ from gluon.utils import md5_hash, compare, is_valid_ip_address, web2py_uuid
|
||||
|
||||
import hashlib
|
||||
from hashlib import md5, sha1, sha224, sha256, sha384, sha512
|
||||
from gluon.utils import simple_hash, get_digest, secure_dumps, secure_loads
|
||||
from gluon.utils import simple_hash, get_digest, secure_dumps, secure_loads, basestring
|
||||
|
||||
|
||||
class TestUtils(unittest.TestCase):
|
||||
@@ -94,8 +94,8 @@ class TestUtils(unittest.TestCase):
|
||||
secured = secure_dumps(testobj, testkey)
|
||||
original = secure_loads(secured, testkey)
|
||||
self.assertEqual(testobj, original)
|
||||
self.assertTrue(isinstance(secured, basestring))
|
||||
self.assertTrue(':' in secured)
|
||||
self.assertTrue(isinstance(secured, bytes))
|
||||
self.assertTrue(b':' in secured)
|
||||
|
||||
large_testobj = [x for x in range(1000)]
|
||||
secured_comp = secure_dumps(large_testobj, testkey, compression_level=9)
|
||||
|
||||
@@ -85,7 +85,7 @@ def compare(a, b):
|
||||
|
||||
def md5_hash(text):
|
||||
""" Generates a md5 hash with the given text """
|
||||
return md5(text).hexdigest()
|
||||
return md5(to_bytes(text)).hexdigest()
|
||||
|
||||
|
||||
def simple_hash(text, key='', salt='', digest_alg='md5'):
|
||||
@@ -157,11 +157,12 @@ def get_callable_argspec(fn):
|
||||
return inspect.getargspec(inspectable)
|
||||
|
||||
|
||||
def pad(s, n=32, padchar=' '):
|
||||
def pad(s, n=32, padchar=b' '):
|
||||
return s + (32 - len(s) % 32) * padchar
|
||||
|
||||
|
||||
def secure_dumps(data, encryption_key, hash_key=None, compression_level=None):
|
||||
encryption_key = to_bytes(encryption_key)
|
||||
if not hash_key:
|
||||
hash_key = sha1(encryption_key).hexdigest()
|
||||
dump = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
|
||||
@@ -170,17 +171,20 @@ def secure_dumps(data, encryption_key, hash_key=None, compression_level=None):
|
||||
key = pad(encryption_key)[:32]
|
||||
cipher, IV = AES_new(key)
|
||||
encrypted_data = base64.urlsafe_b64encode(IV + cipher.encrypt(pad(dump)))
|
||||
signature = hmac.new(hash_key, encrypted_data).hexdigest()
|
||||
return signature + ':' + encrypted_data
|
||||
signature = to_bytes(hmac.new(to_bytes(hash_key), encrypted_data).hexdigest())
|
||||
return signature + b':' + encrypted_data
|
||||
|
||||
|
||||
def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
|
||||
encryption_key = to_bytes(encryption_key)
|
||||
data = to_native(data)
|
||||
if ':' not in data:
|
||||
return None
|
||||
if not hash_key:
|
||||
hash_key = sha1(encryption_key).hexdigest()
|
||||
signature, encrypted_data = data.split(':', 1)
|
||||
actual_signature = hmac.new(hash_key, encrypted_data).hexdigest()
|
||||
encrypted_data = to_bytes(encrypted_data)
|
||||
actual_signature = hmac.new(to_bytes(hash_key), encrypted_data).hexdigest()
|
||||
if not compare(signature, actual_signature):
|
||||
return None
|
||||
key = pad(encryption_key)[:32]
|
||||
@@ -189,7 +193,7 @@ def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
|
||||
cipher, _ = AES_new(key, IV=IV)
|
||||
try:
|
||||
data = cipher.decrypt(encrypted_data)
|
||||
data = data.rstrip(' ')
|
||||
data = data.rstrip(b' ')
|
||||
if compression_level:
|
||||
data = zlib.decompress(data)
|
||||
return pickle.loads(data)
|
||||
|
||||
Reference in New Issue
Block a user