fix tools
This commit is contained in:
+5
-2
@@ -21,7 +21,10 @@ if PY2:
|
||||
import ConfigParser as configparser
|
||||
from email.MIMEBase import MIMEBase
|
||||
from email.Header import Header
|
||||
from email import MIMEMultipart, MIMEText, Encoders, Charset
|
||||
from email import Encoders, Charset
|
||||
from email.MIMEMultipart import MIMEMultipart
|
||||
from email.MIMEText import MIMEText
|
||||
from email.Charset import add_charset, QP as charset_QP
|
||||
from urllib import FancyURLopener, urlencode, urlopen
|
||||
from urllib import quote as urllib_quote, unquote as urllib_unquote
|
||||
from string import maketrans
|
||||
@@ -88,7 +91,7 @@ else:
|
||||
from email.mime.text import MIMEText
|
||||
from email import encoders as Encoders
|
||||
from email.header import Header
|
||||
from email.charset import Charset
|
||||
from email.charset import Charset, add_charset, QP as charset_QP
|
||||
from urllib.request import FancyURLopener, urlopen
|
||||
from urllib.parse import quote as urllib_quote, unquote as urllib_unquote, urlencode
|
||||
import html
|
||||
|
||||
@@ -19,6 +19,7 @@ Additions:
|
||||
- .pickle: application/python-pickle
|
||||
- .w2p': application/w2p
|
||||
"""
|
||||
from gluon._compat import to_native
|
||||
|
||||
__all__ = ['contenttype']
|
||||
|
||||
@@ -842,7 +843,7 @@ def contenttype(filename, default='text/plain'):
|
||||
"""
|
||||
Returns the Content-Type string matching extension of the given filename.
|
||||
"""
|
||||
|
||||
filename=to_native(filename)
|
||||
i = filename.rfind('.')
|
||||
if i >= 0:
|
||||
default = CONTENT_TYPE.get(filename[i:].lower(), default)
|
||||
|
||||
+3
-3
@@ -18,7 +18,7 @@ import pkgutil
|
||||
import logging
|
||||
from cgi import escape
|
||||
from threading import RLock
|
||||
from gluon._compat import copyreg, PY2, maketrans, iterkeys, unicodeT, to_unicode, to_bytes, iteritems
|
||||
from gluon._compat import copyreg, PY2, maketrans, iterkeys, unicodeT, to_unicode, to_bytes, iteritems, _local_html_escape, to_native
|
||||
|
||||
from gluon.portalocker import read_locked, LockedFile
|
||||
from gluon.utf8 import Utf8
|
||||
@@ -426,7 +426,7 @@ class lazyT(object):
|
||||
return len(str(self))
|
||||
|
||||
def xml(self):
|
||||
return str(self) if self.M else escape(str(self))
|
||||
return str(self) if self.M else _local_html_escape(str(self), quote=False)
|
||||
|
||||
def encode(self, *a, **b):
|
||||
return str(self).encode(*a, **b)
|
||||
@@ -821,7 +821,7 @@ class translator(object):
|
||||
self.language_file != self.default_language_file:
|
||||
write_dict(self.language_file, self.t)
|
||||
return regex_backslash.sub(
|
||||
lambda m: m.group(1).translate(ttab_in), str(mt))
|
||||
lambda m: m.group(1).translate(ttab_in), to_native(mt))
|
||||
|
||||
def params_substitution(self, message, symbols):
|
||||
"""
|
||||
|
||||
@@ -83,6 +83,8 @@ def custom_json(o):
|
||||
return int(o)
|
||||
elif isinstance(o, decimal.Decimal):
|
||||
return str(o)
|
||||
elif isinstance(o, (bytes, bytearray)):
|
||||
return str(o)
|
||||
elif isinstance(o, lazyT):
|
||||
return str(o)
|
||||
elif isinstance(o, XmlComponent):
|
||||
|
||||
+4
-4
@@ -19,7 +19,7 @@ import urllib
|
||||
import re
|
||||
|
||||
import os
|
||||
from gluon._compat import StringIO, unichr, urllib_quote, iteritems
|
||||
from gluon._compat import StringIO, unichr, urllib_quote, iteritems, basestring, long, unicodeT
|
||||
from gluon.http import HTTP, redirect
|
||||
from gluon.html import XmlComponent, truncate_string
|
||||
from gluon.html import XML, SPAN, TAG, A, DIV, CAT, UL, LI, TEXTAREA, BR, IMG
|
||||
@@ -1141,7 +1141,7 @@ class SQLFORM(FORM):
|
||||
|
||||
# try to retrieve the indicated record using its id
|
||||
# otherwise ignore it
|
||||
if record and isinstance(record, (int, long, str, unicode)):
|
||||
if record and isinstance(record, (int, long, str, unicodeT)):
|
||||
if not str(record).isdigit():
|
||||
raise HTTP(404, "Object not found")
|
||||
record = table._db(table._id == record).select().first()
|
||||
@@ -1651,7 +1651,7 @@ class SQLFORM(FORM):
|
||||
original_filename = os.path.split(f)[1]
|
||||
elif hasattr(f, 'file'):
|
||||
(source_file, original_filename) = (f.file, f.filename)
|
||||
elif isinstance(f, (str, unicode)):
|
||||
elif isinstance(f, (str, unicodeT)):
|
||||
# do not know why this happens, it should not
|
||||
(source_file, original_filename) = \
|
||||
(StringIO(f), 'file.txt')
|
||||
@@ -3426,7 +3426,7 @@ class ExportClass(object):
|
||||
"""
|
||||
if value is None:
|
||||
return '<NULL>'
|
||||
elif isinstance(value, unicode):
|
||||
elif isinstance(value, unicodeT):
|
||||
return value.encode('utf8')
|
||||
elif isinstance(value, Reference):
|
||||
return int(value)
|
||||
|
||||
@@ -14,14 +14,14 @@ from .test_contribs import *
|
||||
from .test_routes import *
|
||||
from .test_router import *
|
||||
from .test_validators import *
|
||||
|
||||
from .test_tools import *
|
||||
if sys.version[:3] == '2.7':
|
||||
from .test_compileapp import *
|
||||
from .test_is_url import *
|
||||
from .test_languages import *
|
||||
from .test_serializers import *
|
||||
from .test_utils import *
|
||||
from .test_tools import *
|
||||
|
||||
from .test_appadmin import *
|
||||
from .test_scheduler import *
|
||||
from .test_web import *
|
||||
|
||||
@@ -547,7 +547,7 @@ class TestAuth(unittest.TestCase):
|
||||
def test_basic_blank_forms(self):
|
||||
for f in ['login', 'retrieve_password', 'retrieve_username', 'register']:
|
||||
html_form = getattr(self.auth, f)().xml()
|
||||
self.assertTrue('name="_formkey"' in html_form)
|
||||
self.assertTrue(b'name="_formkey"' in html_form)
|
||||
|
||||
for f in ['logout', 'verify_email', 'reset_password', 'change_password', 'profile', 'groups']:
|
||||
self.assertRaisesRegexp(HTTP, "303*", getattr(self.auth, f))
|
||||
@@ -712,7 +712,7 @@ class TestAuth(unittest.TestCase):
|
||||
self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare()
|
||||
self.auth.settings.bulk_register_enabled = True
|
||||
bulk_register_form = self.auth.bulk_register(max_emails=10).xml()
|
||||
self.assertTrue('name="_formkey"' in bulk_register_form)
|
||||
self.assertTrue(b'name="_formkey"' in bulk_register_form)
|
||||
|
||||
# TODO: def test_manage_tokens(self):
|
||||
# TODO: def test_reset_password(self):
|
||||
@@ -723,12 +723,12 @@ class TestAuth(unittest.TestCase):
|
||||
def test_change_password(self):
|
||||
self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare()
|
||||
change_password_form = getattr(self.auth, 'change_password')().xml()
|
||||
self.assertTrue('name="_formkey"' in change_password_form)
|
||||
self.assertTrue(b'name="_formkey"' in change_password_form)
|
||||
|
||||
def test_profile(self):
|
||||
self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare()
|
||||
profile_form = getattr(self.auth, 'profile')().xml()
|
||||
self.assertTrue('name="_formkey"' in profile_form)
|
||||
self.assertTrue(b'name="_formkey"' in profile_form)
|
||||
|
||||
# TODO: def test_run_login_onaccept(self):
|
||||
# TODO: def test_jwt(self):
|
||||
@@ -766,7 +766,7 @@ class TestAuth(unittest.TestCase):
|
||||
|
||||
# basic impersonate() test that return a read form
|
||||
self.assertEqual(self.auth.impersonate().xml(),
|
||||
'<form action="#" enctype="multipart/form-data" method="post"><table><tr id="no_table_user_id__row"><td class="w2p_fl"><label class="" for="no_table_user_id" id="no_table_user_id__label">User Id: </label></td><td class="w2p_fw"><input class="integer" id="no_table_user_id" name="user_id" type="text" value="" /></td><td class="w2p_fc"></td></tr><tr id="submit_record__row"><td class="w2p_fl"></td><td class="w2p_fw"><input type="submit" value="Submit" /></td><td class="w2p_fc"></td></tr></table></form>')
|
||||
b'<form action="#" enctype="multipart/form-data" method="post"><table><tr id="no_table_user_id__row"><td class="w2p_fl"><label class="" for="no_table_user_id" id="no_table_user_id__label">User Id: </label></td><td class="w2p_fw"><input class="integer" id="no_table_user_id" name="user_id" type="text" value="" /></td><td class="w2p_fc"></td></tr><tr id="submit_record__row"><td class="w2p_fl"></td><td class="w2p_fw"><input type="submit" value="Submit" /></td><td class="w2p_fc"></td></tr></table></form>')
|
||||
# bart impersonate itself
|
||||
self.assertEqual(self.auth.impersonate(bart_id), None)
|
||||
self.assertFalse(self.auth.is_impersonating()) # User shouldn't impersonate itself?
|
||||
@@ -776,7 +776,7 @@ class TestAuth(unittest.TestCase):
|
||||
self.assertTrue(self.auth.is_impersonating())
|
||||
self.assertEqual(self.auth.user_id, omer_id) # we make it really sure
|
||||
self.assertEqual(impersonate_form.xml(),
|
||||
'<form action="#" enctype="multipart/form-data" method="post"><table><tr id="auth_user_id__row"><td class="w2p_fl"><label class="readonly" for="auth_user_id" id="auth_user_id__label">Id: </label></td><td class="w2p_fw"><span id="auth_user_id">2</span></td><td class="w2p_fc"></td></tr><tr id="auth_user_first_name__row"><td class="w2p_fl"><label class="readonly" for="auth_user_first_name" id="auth_user_first_name__label">First name: </label></td><td class="w2p_fw">Omer</td><td class="w2p_fc"></td></tr><tr id="auth_user_last_name__row"><td class="w2p_fl"><label class="readonly" for="auth_user_last_name" id="auth_user_last_name__label">Last name: </label></td><td class="w2p_fw">Simpson</td><td class="w2p_fc"></td></tr><tr id="auth_user_email__row"><td class="w2p_fl"><label class="readonly" for="auth_user_email" id="auth_user_email__label">E-mail: </label></td><td class="w2p_fw">omer@test.com</td><td class="w2p_fc"></td></tr><tr id="auth_user_username__row"><td class="w2p_fl"><label class="readonly" for="auth_user_username" id="auth_user_username__label">Username: </label></td><td class="w2p_fw">omer</td><td class="w2p_fc"></td></tr></table><div style="display:none;"><input name="id" type="hidden" value="2" /></div></form>')
|
||||
b'<form action="#" enctype="multipart/form-data" method="post"><table><tr id="auth_user_id__row"><td class="w2p_fl"><label class="readonly" for="auth_user_id" id="auth_user_id__label">Id: </label></td><td class="w2p_fw"><span id="auth_user_id">2</span></td><td class="w2p_fc"></td></tr><tr id="auth_user_first_name__row"><td class="w2p_fl"><label class="readonly" for="auth_user_first_name" id="auth_user_first_name__label">First name: </label></td><td class="w2p_fw">Omer</td><td class="w2p_fc"></td></tr><tr id="auth_user_last_name__row"><td class="w2p_fl"><label class="readonly" for="auth_user_last_name" id="auth_user_last_name__label">Last name: </label></td><td class="w2p_fw">Simpson</td><td class="w2p_fc"></td></tr><tr id="auth_user_email__row"><td class="w2p_fl"><label class="readonly" for="auth_user_email" id="auth_user_email__label">E-mail: </label></td><td class="w2p_fw">omer@test.com</td><td class="w2p_fc"></td></tr><tr id="auth_user_username__row"><td class="w2p_fl"><label class="readonly" for="auth_user_username" id="auth_user_username__label">Username: </label></td><td class="w2p_fw">omer</td><td class="w2p_fc"></td></tr></table><div style="display:none;"><input name="id" type="hidden" value="2" /></div></form>')
|
||||
self.auth.logout_bare()
|
||||
# Failing impersonation
|
||||
# User lacking impersonate membership
|
||||
@@ -800,7 +800,7 @@ class TestAuth(unittest.TestCase):
|
||||
def test_groups(self):
|
||||
self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare()
|
||||
self.assertEqual(self.auth.groups().xml(),
|
||||
'<table><tr><td><h3>user_1(1)</h3></td></tr><tr><td><p></p></td></tr></table>')
|
||||
b'<table><tr><td><h3>user_1(1)</h3></td></tr><tr><td><p></p></td></tr></table>')
|
||||
|
||||
def test_not_authorized(self):
|
||||
self.current.request.ajax = 'facke_ajax_request'
|
||||
|
||||
+53
-58
@@ -13,7 +13,8 @@ Auth, Mail, PluginManager and various utilities
|
||||
import base64
|
||||
from functools import reduce
|
||||
from gluon._compat import pickle, thread, urllib2, Cookie, StringIO, configparser, MIMEBase, MIMEMultipart, \
|
||||
MIMEText, Encoders, Charset, long, urllib_quote, iteritems
|
||||
MIMEText, Encoders, Charset, long, urllib_quote, iteritems, to_bytes, to_native, add_charset, \
|
||||
charset_QP, basestring, unicodeT, to_unicode
|
||||
import datetime
|
||||
import logging
|
||||
import sys
|
||||
@@ -241,6 +242,7 @@ class Mail(object):
|
||||
if filename is None:
|
||||
raise Exception('Missing attachment name')
|
||||
payload = payload.read()
|
||||
#FIXME PY3 can be used to_native?
|
||||
filename = filename.encode(encoding)
|
||||
if content_type is None:
|
||||
content_type = contenttype(filename)
|
||||
@@ -248,9 +250,9 @@ class Mail(object):
|
||||
self.my_payload = payload
|
||||
MIMEBase.__init__(self, *content_type.split('/', 1))
|
||||
self.set_payload(payload)
|
||||
self['Content-Disposition'] = 'attachment; filename="%s"' % filename
|
||||
self['Content-Disposition'] = 'attachment; filename="%s"' % to_native(filename, encoding)
|
||||
if content_id is not None:
|
||||
self['Content-Id'] = '<%s>' % content_id.encode(encoding)
|
||||
self['Content-Id'] = '<%s>' % to_native(content_id, encoding)
|
||||
Encoders.encode_base64(self)
|
||||
|
||||
def __init__(self, server=None, sender=None, login=None, tls=True):
|
||||
@@ -405,7 +407,7 @@ class Mail(object):
|
||||
"""
|
||||
|
||||
# We don't want to use base64 encoding for unicode mail
|
||||
Charset.add_charset('utf-8', Charset.QP, Charset.QP, 'utf-8')
|
||||
add_charset('utf-8', charset_QP, charset_QP, 'utf-8')
|
||||
|
||||
def encode_header(key):
|
||||
if [c for c in key if 32 > ord(c) or ord(c) > 127]:
|
||||
@@ -428,12 +430,12 @@ class Mail(object):
|
||||
|
||||
if not raw and attachments:
|
||||
# Use multipart/mixed if there is attachments
|
||||
payload_in = MIMEMultipart.MIMEMultipart('mixed')
|
||||
payload_in = MIMEMultipart('mixed')
|
||||
elif raw:
|
||||
# no encoding configuration for raw messages
|
||||
if not isinstance(message, basestring):
|
||||
message = message.read()
|
||||
if isinstance(message, unicode):
|
||||
if isinstance(message, unicodeT):
|
||||
text = message.encode('utf-8')
|
||||
elif not encoding == 'utf-8':
|
||||
text = message.decode(encoding).encode('utf-8')
|
||||
@@ -442,7 +444,7 @@ class Mail(object):
|
||||
# No charset passed to avoid transport encoding
|
||||
# NOTE: some unicode encoded strings will produce
|
||||
# unreadable mail contents.
|
||||
payload_in = MIMEText.MIMEText(text)
|
||||
payload_in = MIMEText(text)
|
||||
if to:
|
||||
if not isinstance(to, (list, tuple)):
|
||||
to = [to]
|
||||
@@ -471,14 +473,14 @@ class Mail(object):
|
||||
if text is not None:
|
||||
if not isinstance(text, basestring):
|
||||
text = text.read()
|
||||
if isinstance(text, unicode):
|
||||
if isinstance(text, unicodeT):
|
||||
text = text.encode('utf-8')
|
||||
elif not encoding == 'utf-8':
|
||||
text = text.decode(encoding).encode('utf-8')
|
||||
if html is not None:
|
||||
if not isinstance(html, basestring):
|
||||
html = html.read()
|
||||
if isinstance(html, unicode):
|
||||
if isinstance(html, unicodeT):
|
||||
html = html.encode('utf-8')
|
||||
elif not encoding == 'utf-8':
|
||||
html = html.decode(encoding).encode('utf-8')
|
||||
@@ -486,15 +488,13 @@ class Mail(object):
|
||||
# Construct mime part only if needed
|
||||
if text is not None and html:
|
||||
# We have text and html we need multipart/alternative
|
||||
attachment = MIMEMultipart.MIMEMultipart('alternative')
|
||||
attachment.attach(MIMEText.MIMEText(text, _charset='utf-8'))
|
||||
attachment.attach(
|
||||
MIMEText.MIMEText(html, 'html', _charset='utf-8'))
|
||||
attachment = MIMEMultipart('alternative')
|
||||
attachment.attach(MIMEText(text, _charset='utf-8'))
|
||||
attachment.attach(MIMEText(html, 'html', _charset='utf-8'))
|
||||
elif text is not None:
|
||||
attachment = MIMEText.MIMEText(text, _charset='utf-8')
|
||||
attachment = MIMEText(text, _charset='utf-8')
|
||||
elif html:
|
||||
attachment = \
|
||||
MIMEText.MIMEText(html, 'html', _charset='utf-8')
|
||||
attachment = MIMEText(html, 'html', _charset='utf-8')
|
||||
|
||||
if attachments:
|
||||
# If there is attachments put text and html into
|
||||
@@ -560,12 +560,11 @@ class Mail(object):
|
||||
c.op_sign(plain, sig, mode.DETACH)
|
||||
sig.seek(0, 0)
|
||||
# make it part of the email
|
||||
payload = \
|
||||
MIMEMultipart.MIMEMultipart('signed',
|
||||
boundary=None,
|
||||
_subparts=None,
|
||||
**dict(micalg="pgp-sha1",
|
||||
protocol="application/pgp-signature"))
|
||||
payload = MIMEMultipart('signed',
|
||||
boundary=None,
|
||||
_subparts=None,
|
||||
**dict(micalg="pgp-sha1",
|
||||
protocol="application/pgp-signature"))
|
||||
# insert the origin payload
|
||||
payload.attach(payload_in)
|
||||
# insert the detached signature
|
||||
@@ -605,10 +604,10 @@ class Mail(object):
|
||||
c.op_encrypt(recipients, 1, plain, cipher)
|
||||
cipher.seek(0, 0)
|
||||
# make it a part of the email
|
||||
payload = MIMEMultipart.MIMEMultipart('encrypted',
|
||||
boundary=None,
|
||||
_subparts=None,
|
||||
**dict(protocol="application/pgp-encrypted"))
|
||||
payload = MIMEMultipart('encrypted',
|
||||
boundary=None,
|
||||
_subparts=None,
|
||||
**dict(protocol="application/pgp-encrypted"))
|
||||
p = MIMEBase("application", 'pgp-encrypted')
|
||||
p.set_payload("Version: 1\r\n")
|
||||
payload.attach(p)
|
||||
@@ -729,29 +728,29 @@ class Mail(object):
|
||||
payload = payload_in
|
||||
|
||||
if from_address:
|
||||
payload['From'] = encoded_or_raw(from_address.decode(encoding))
|
||||
payload['From'] = encoded_or_raw(to_unicode(from_address, encoding))
|
||||
else:
|
||||
payload['From'] = encoded_or_raw(sender.decode(encoding))
|
||||
payload['From'] = encoded_or_raw(to_unicode(sender, encoding))
|
||||
origTo = to[:]
|
||||
if to:
|
||||
payload['To'] = encoded_or_raw(', '.join(to).decode(encoding))
|
||||
payload['To'] = encoded_or_raw(to_unicode(', '.join(to), encoding))
|
||||
if reply_to:
|
||||
payload['Reply-To'] = encoded_or_raw(reply_to.decode(encoding))
|
||||
payload['Reply-To'] = encoded_or_raw(to_unicode(reply_to, encoding))
|
||||
if cc:
|
||||
payload['Cc'] = encoded_or_raw(', '.join(cc).decode(encoding))
|
||||
payload['Cc'] = encoded_or_raw(to_unicode(', '.join(cc), encoding))
|
||||
to.extend(cc)
|
||||
if bcc:
|
||||
to.extend(bcc)
|
||||
payload['Subject'] = encoded_or_raw(subject.decode(encoding))
|
||||
payload['Subject'] = encoded_or_raw(to_unicode(subject, encoding))
|
||||
payload['Date'] = email.utils.formatdate()
|
||||
for k, v in iteritems(headers):
|
||||
payload[k] = encoded_or_raw(v.decode(encoding))
|
||||
payload[k] = encoded_or_raw(to_unicode(v, encoding))
|
||||
result = {}
|
||||
try:
|
||||
if self.settings.server == 'logging':
|
||||
entry = 'email not sent\n%s\nFrom: %s\nTo: %s\nSubject: %s\n\n%s\n%s\n' % \
|
||||
('-' * 40, sender, ', '.join(to), subject, text or html, '-' * 40)
|
||||
logger.warn(entry)
|
||||
logger.warning(entry)
|
||||
elif self.settings.server.startswith('logging:'):
|
||||
entry = 'email not sent\n%s\nFrom: %s\nTo: %s\nSubject: %s\n\n%s\n%s\n' % \
|
||||
('-' * 40, sender, ', '.join(to), subject, text or html, '-' * 40)
|
||||
@@ -773,16 +772,16 @@ class Mail(object):
|
||||
if attachments:
|
||||
result = mail.send_mail(
|
||||
sender=sender, to=origTo,
|
||||
subject=unicode(subject, encoding), body=unicode(text, encoding), html=html,
|
||||
subject=to_unicode(subject, encoding), body=to_unicode(text, encoding), html=html,
|
||||
attachments=attachments, **xcc)
|
||||
elif html and (not raw):
|
||||
result = mail.send_mail(
|
||||
sender=sender, to=origTo,
|
||||
subject=unicode(subject, encoding), body=unicode(text, encoding), html=html, **xcc)
|
||||
subject=to_unicode(subject, encoding), body=to_unicode(text, encoding), html=html, **xcc)
|
||||
else:
|
||||
result = mail.send_mail(
|
||||
sender=sender, to=origTo,
|
||||
subject=unicode(subject, encoding), body=unicode(text, encoding), **xcc)
|
||||
subject=to_unicode(subject, encoding), body=to_unicode(text, encoding), **xcc)
|
||||
else:
|
||||
smtp_args = self.settings.server.split(':')
|
||||
kwargs = dict(timeout=self.settings.timeout)
|
||||
@@ -800,7 +799,7 @@ class Mail(object):
|
||||
sender, to, payload.as_string())
|
||||
server.quit()
|
||||
except Exception as e:
|
||||
logger.warn('Mail.send failure:%s' % e)
|
||||
logger.warning('Mail.send failure:%s' % e)
|
||||
self.result = result
|
||||
self.error = e
|
||||
return False
|
||||
@@ -1250,8 +1249,7 @@ class AuthJWT(object):
|
||||
|
||||
@staticmethod
|
||||
def jwt_b64e(string):
|
||||
if isinstance(string, unicode):
|
||||
string = string.encode('utf-8', 'strict')
|
||||
string = to_bytes(string)
|
||||
return base64.urlsafe_b64encode(string).strip(b'=')
|
||||
|
||||
@staticmethod
|
||||
@@ -1260,47 +1258,44 @@ class AuthJWT(object):
|
||||
called with a unicode string).
|
||||
The result is also a bytestring.
|
||||
"""
|
||||
if isinstance(string, unicode):
|
||||
string = string.encode('ascii', 'ignore')
|
||||
return base64.urlsafe_b64decode(string + '=' * (-len(string) % 4))
|
||||
string = to_bytes(string, 'ascii', 'ignore')
|
||||
return base64.urlsafe_b64decode(string + b'=' * (-len(string) % 4))
|
||||
|
||||
def generate_token(self, payload):
|
||||
secret = self.secret_key
|
||||
secret = to_bytes(self.secret_key)
|
||||
if self.salt:
|
||||
if callable(self.salt):
|
||||
secret = "%s$%s" % (secret, self.salt(payload))
|
||||
else:
|
||||
secret = "%s$%s" % (secret, self.salt)
|
||||
if isinstance(secret, unicode):
|
||||
if isinstance(secret, unicodeT):
|
||||
secret = secret.encode('ascii', 'ignore')
|
||||
b64h = self.cached_b64h
|
||||
b64p = self.jwt_b64e(serializers.json(payload))
|
||||
jbody = b64h + '.' + b64p
|
||||
jbody = b64h + b'.' + b64p
|
||||
mauth = hmac.new(key=secret, msg=jbody, digestmod=self.digestmod)
|
||||
jsign = self.jwt_b64e(mauth.digest())
|
||||
return jbody + '.' + jsign
|
||||
return to_native(jbody + b'.' + jsign)
|
||||
|
||||
def verify_signature(self, body, signature, secret):
|
||||
mauth = hmac.new(key=secret, msg=body, digestmod=self.digestmod)
|
||||
return compare(self.jwt_b64e(mauth.digest()), signature)
|
||||
|
||||
def load_token(self, token):
|
||||
if isinstance(token, unicode):
|
||||
token = token.encode('utf-8', 'strict')
|
||||
body, sig = token.rsplit('.', 1)
|
||||
b64h, b64b = body.split('.', 1)
|
||||
token = to_bytes(token, 'utf-8', 'strict')
|
||||
body, sig = token.rsplit(b'.', 1)
|
||||
b64h, b64b = body.split(b'.', 1)
|
||||
if b64h != self.cached_b64h:
|
||||
# header not the same
|
||||
raise HTTP(400, u'Invalid JWT Header')
|
||||
secret = self.secret_key
|
||||
tokend = serializers.loads_json(self.jwt_b64d(b64b))
|
||||
tokend = serializers.loads_json(to_native(self.jwt_b64d(b64b)))
|
||||
if self.salt:
|
||||
if callable(self.salt):
|
||||
secret = "%s$%s" % (secret, self.salt(tokend))
|
||||
else:
|
||||
secret = "%s$%s" % (secret, self.salt)
|
||||
if isinstance(secret, unicode):
|
||||
secret = secret.encode('ascii', 'ignore')
|
||||
secret = to_bytes(secret, 'ascii', 'ignore')
|
||||
if not self.verify_signature(body, sig, secret):
|
||||
# signature verification failed
|
||||
raise HTTP(400, u'Token signature is invalid')
|
||||
@@ -3560,8 +3555,8 @@ class Auth(object):
|
||||
password = ''
|
||||
specials = r'!#$*'
|
||||
for i in range(0, 3):
|
||||
password += random.choice(string.lowercase)
|
||||
password += random.choice(string.uppercase)
|
||||
password += random.choice(string.ascii_lowercase)
|
||||
password += random.choice(string.ascii_uppercase)
|
||||
password += random.choice(string.digits)
|
||||
password += random.choice(specials)
|
||||
return ''.join(random.sample(password, len(password)))
|
||||
@@ -3994,7 +3989,7 @@ class Auth(object):
|
||||
requires = table_user[passfield].requires
|
||||
if not isinstance(requires, (list, tuple)):
|
||||
requires = [requires]
|
||||
requires = filter(lambda t: isinstance(t, CRYPT), requires)
|
||||
requires = list(filter(lambda t: isinstance(t, CRYPT), requires))
|
||||
if requires:
|
||||
requires[0].min_length = 0
|
||||
form = SQLFORM.factory(
|
||||
@@ -5621,7 +5616,7 @@ class Service(object):
|
||||
args = request.args
|
||||
|
||||
def none_exception(value):
|
||||
if isinstance(value, unicode):
|
||||
if isinstance(value, unicodeT):
|
||||
return value.encode('utf8')
|
||||
if hasattr(value, 'isoformat'):
|
||||
return value.isoformat()[:19].replace('T', ' ')
|
||||
|
||||
Reference in New Issue
Block a user