diff --git a/gluon/authapi.py b/gluon/authapi.py new file mode 100644 index 00000000..981a314c --- /dev/null +++ b/gluon/authapi.py @@ -0,0 +1,1020 @@ +# -*- coding: utf-8 -*- +""" +| This file is part of the web2py Web Framework +| Copyrighted by Massimo Di Pierro +| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) +""" +from gluon import current +from gluon.storage import Messages, Settings, Storage +from gluon.utils import web2py_uuid +from gluon.validators import CRYPT, IS_EMAIL, IS_EQUAL_TO, IS_INT_IN_RANGE, IS_LOWER, IS_MATCH, IS_NOT_EMPTY, IS_NOT_IN_DB +from pydal.objects import Table, Field, Row +import datetime +from gluon.settings import global_settings + +DEFAULT = lambda: None + + +class AuthAPI(object): + """ + AuthAPI is a barebones Auth implementation which does not have a concept of + HTML forms or redirects, emailing or even an URL, you are responsible for + all that if you use it. + The main Auth functions such as login, logout, register, profile are designed + in a Dict In -> Dict Out logic so, for instance, if you set + registration_requires_verification you are responsible for sending the key to + the user and even rolling back the transaction if you can't do it. + + NOTES: * It does not support all the callbacks Traditional Auth does yet. + Some of the callbacks will not be supported. + Check the method signatures to find out which ones are supported. + * register_fields and profile_fields settings are ignored for now. + + WARNING: No builtin CSRF protection whatsoever. + """ + + default_settings = { + 'create_user_groups': 'user_%(id)s', + 'email_case_sensitive': False, + 'everybody_group_id': None, + 'expiration': 3600, + 'keep_session_onlogin': True, + 'keep_session_onlogout': False, + 'logging_enabled': True, + 'login_after_registration': False, + 'login_email_validate': True, + 'login_userfield': None, + 'logout_onlogout': None, + 'long_expiration': 3600 * 24 * 30, + 'ondelete': 'CASCADE', + 'password_field': 'password', + 'password_min_length': 4, + 'registration_requires_approval': False, + 'registration_requires_verification': False, + 'renew_session_onlogin': True, + 'renew_session_onlogout': True, + 'table_event_name': 'auth_event', + 'table_group_name': 'auth_group', + 'table_membership_name': 'auth_membership', + 'table_permission_name': 'auth_permission', + 'table_user_name': 'auth_user', + 'use_username': False, + 'username_case_sensitive': True + } + + default_messages = { + 'add_group_log': 'Group %(group_id)s created', + 'add_membership_log': None, + 'add_permission_log': None, + 'change_password_log': 'User %(id)s Password changed', + 'del_group_log': 'Group %(group_id)s deleted', + 'del_membership_log': None, + 'del_permission_log': None, + 'email_taken': 'This email already has an account', + 'group_description': 'Group uniquely assigned to user %(id)s', + 'has_membership_log': None, + 'has_permission_log': None, + 'invalid_email': 'Invalid email', + 'key_verified': 'Key verified', + 'invalid_login': 'Invalid login', + 'invalid_password': 'Invalid password', + 'invalid_user': 'Invalid user', + 'invalid_key': 'Invalid key', + 'invalid_username': 'Invalid username', + 'logged_in': 'Logged in', + 'logged_out': 'Logged out', + 'login_failed_log': None, + 'login_log': 'User %(id)s Logged-in', + 'logout_log': 'User %(id)s Logged-out', + 'mismatched_password': "Password fields don't match", + 'password_changed': 'Password changed', + 'profile_log': 'User %(id)s Profile updated', + 'profile_updated': 'Profile updated', + 'register_log': 'User %(id)s Registered', + 'registration_pending': 'Registration is pending approval', + 'registration_successful': 'Registration successful', + 'registration_verifying': 'Registration needs verification', + 'username_taken': 'Username already taken', + 'verify_log': 'User %(id)s verified registration key' + } + + def __init__(self, db=None, hmac_key=None, signature=True): + self.db = db + session = current.session + auth = session.auth + self.user_groups = auth and auth.user_groups or {} + now = current.request.now + # if we have auth info + # if not expired it, used it + # if expired, clear the session + # else, only clear auth info in the session + if auth: + delta = datetime.timedelta(days=0, seconds=auth.expiration) + if auth.last_visit and auth.last_visit + delta > now: + self.user = auth.user + # this is a trick to speed up sessions to avoid many writes + if (now - auth.last_visit).seconds > (auth.expiration / 10): + auth.last_visit = now + else: + self.user = None + if session.auth: + del session.auth + session.renew(clear_session=True) + else: + self.user = None + if session.auth: + del session.auth + + settings = self.settings = Settings(self.__class__.default_settings) + settings.update( + extra_fields={}, + hmac_key=hmac_key, + ) + settings.lock_keys = True + messages = self.messages = Messages(current.T) + messages.update(self.default_messages) + messages.lock_keys = True + if signature: + self.define_signature() + else: + self.signature = None + + def __validate(self, value, requires): + if not isinstance(requires, (list, tuple)): + requires = [requires] + for validator in requires: + (value, error) = validator(value) + if error: + return (value, error) + return (value, None) + + def _get_migrate(self, tablename, migrate=True): + + if type(migrate).__name__ == 'str': + return (migrate + tablename + '.table') + elif migrate == False: + return False + else: + return True + + def _get_user_id(self): + """accessor for auth.user_id""" + return self.user and self.user.id or None + + user_id = property(_get_user_id, doc="user.id or None") + + def table_user(self): + return self.db[self.settings.table_user_name] + + def table_group(self): + return self.db[self.settings.table_group_name] + + def table_membership(self): + return self.db[self.settings.table_membership_name] + + def table_permission(self): + return self.db[self.settings.table_permission_name] + + def table_event(self): + return self.db[self.settings.table_event_name] + + def define_signature(self): + db = self.db + settings = self.settings + request = current.request + T = current.T + reference_user = 'reference %s' % settings.table_user_name + + def lazy_user(auth=self): + return auth.user_id + + def represent(id, record=None, s=settings): + try: + user = s.table_user(id) + return '%s %s' % (user.get("first_name", user.get("email")), + user.get("last_name", '')) + except: + return id + ondelete = self.settings.ondelete + self.signature = Table( + self.db, 'auth_signature', + Field('is_active', 'boolean', + default=True, + readable=False, writable=False, + label=T('Is Active')), + Field('created_on', 'datetime', + default=request.now, + writable=False, readable=False, + label=T('Created On')), + Field('created_by', + reference_user, + default=lazy_user, represent=represent, + writable=False, readable=False, + label=T('Created By'), ondelete=ondelete), + Field('modified_on', 'datetime', + update=request.now, default=request.now, + writable=False, readable=False, + label=T('Modified On')), + Field('modified_by', + reference_user, represent=represent, + default=lazy_user, update=lazy_user, + writable=False, readable=False, + label=T('Modified By'), ondelete=ondelete)) + + def define_tables(self, username=None, signature=None, migrate=None, + fake_migrate=None): + """ + To be called unless tables are defined manually + + Examples: + Use as:: + + # defines all needed tables and table files + # 'myprefix_auth_user.table', ... + auth.define_tables(migrate='myprefix_') + + # defines all needed tables without migration/table files + auth.define_tables(migrate=False) + + """ + + db = self.db + if migrate is None: + migrate = db._migrate + if fake_migrate is None: + fake_migrate = db._fake_migrate + settings = self.settings + if username is None: + username = settings.use_username + else: + settings.use_username = username + if not self.signature: + self.define_signature() + if signature: + signature_list = [self.signature] + elif not signature: + signature_list = [] + elif isinstance(signature, Table): + signature_list = [signature] + else: + signature_list = signature + is_not_empty = IS_NOT_EMPTY(error_message=self.messages.is_empty) + is_crypted = CRYPT(key=settings.hmac_key, + min_length=settings.password_min_length) + is_unique_email = [ + IS_EMAIL(error_message=self.messages.invalid_email), + IS_NOT_IN_DB(db, '%s.email' % settings.table_user_name, + error_message=self.messages.email_taken)] + if not settings.email_case_sensitive: + is_unique_email.insert(1, IS_LOWER()) + if settings.table_user_name not in db.tables: + passfield = settings.password_field + extra_fields = settings.extra_fields.get( + settings.table_user_name, []) + signature_list + if username or settings.cas_provider: # cas_provider Will always be None here but we compare it anyway so subclasses can use our define_tables + is_unique_username = \ + [IS_MATCH('[\w\.\-]+', strict=True, + error_message=self.messages.invalid_username), + IS_NOT_IN_DB(db, '%s.username' % settings.table_user_name, + error_message=self.messages.username_taken)] + if not settings.username_case_sensitive: + is_unique_username.insert(1, IS_LOWER()) + db.define_table( + settings.table_user_name, + Field('first_name', length=128, default='', + label=self.messages.label_first_name, + requires=is_not_empty), + Field('last_name', length=128, default='', + label=self.messages.label_last_name, + requires=is_not_empty), + Field('email', length=512, default='', + label=self.messages.label_email, + requires=is_unique_email), + Field('username', length=128, default='', + label=self.messages.label_username, + requires=is_unique_username), + Field(passfield, 'password', length=512, + readable=False, label=self.messages.label_password, + requires=[is_crypted]), + Field('registration_key', length=512, + writable=False, readable=False, default='', + label=self.messages.label_registration_key), + Field('reset_password_key', length=512, + writable=False, readable=False, default='', + label=self.messages.label_reset_password_key), + Field('registration_id', length=512, + writable=False, readable=False, default='', + label=self.messages.label_registration_id), + *extra_fields, + **dict( + migrate=self._get_migrate(settings.table_user_name, + migrate), + fake_migrate=fake_migrate, + format='%(username)s')) + else: + db.define_table( + settings.table_user_name, + Field('first_name', length=128, default='', + label=self.messages.label_first_name, + requires=is_not_empty), + Field('last_name', length=128, default='', + label=self.messages.label_last_name, + requires=is_not_empty), + Field('email', length=512, default='', + label=self.messages.label_email, + requires=is_unique_email), + Field(passfield, 'password', length=512, + readable=False, label=self.messages.label_password, + requires=[is_crypted]), + Field('registration_key', length=512, + writable=False, readable=False, default='', + label=self.messages.label_registration_key), + Field('reset_password_key', length=512, + writable=False, readable=False, default='', + label=self.messages.label_reset_password_key), + Field('registration_id', length=512, + writable=False, readable=False, default='', + label=self.messages.label_registration_id), + *extra_fields, + **dict( + migrate=self._get_migrate(settings.table_user_name, + migrate), + fake_migrate=fake_migrate, + format='%(first_name)s %(last_name)s (%(id)s)')) + reference_table_user = 'reference %s' % settings.table_user_name + if settings.table_group_name not in db.tables: + extra_fields = settings.extra_fields.get( + settings.table_group_name, []) + signature_list + db.define_table( + settings.table_group_name, + Field('role', length=512, default='', + label=self.messages.label_role, + requires=IS_NOT_IN_DB(db, '%s.role' % settings.table_group_name)), + Field('description', 'text', + label=self.messages.label_description), + *extra_fields, + **dict( + migrate=self._get_migrate( + settings.table_group_name, migrate), + fake_migrate=fake_migrate, + format='%(role)s (%(id)s)')) + reference_table_group = 'reference %s' % settings.table_group_name + if settings.table_membership_name not in db.tables: + extra_fields = settings.extra_fields.get( + settings.table_membership_name, []) + signature_list + db.define_table( + settings.table_membership_name, + Field('user_id', reference_table_user, + label=self.messages.label_user_id), + Field('group_id', reference_table_group, + label=self.messages.label_group_id), + *extra_fields, + **dict( + migrate=self._get_migrate( + settings.table_membership_name, migrate), + fake_migrate=fake_migrate)) + if settings.table_permission_name not in db.tables: + extra_fields = settings.extra_fields.get( + settings.table_permission_name, []) + signature_list + db.define_table( + settings.table_permission_name, + Field('group_id', reference_table_group, + label=self.messages.label_group_id), + Field('name', default='default', length=512, + label=self.messages.label_name, + requires=is_not_empty), + Field('table_name', length=512, + label=self.messages.label_table_name), + Field('record_id', 'integer', default=0, + label=self.messages.label_record_id, + requires=IS_INT_IN_RANGE(0, 10 ** 9)), + *extra_fields, + **dict( + migrate=self._get_migrate( + settings.table_permission_name, migrate), + fake_migrate=fake_migrate)) + if settings.table_event_name not in db.tables: + db.define_table( + settings.table_event_name, + Field('time_stamp', 'datetime', + default=current.request.now, + label=self.messages.label_time_stamp), + Field('client_ip', + default=current.request.client, + label=self.messages.label_client_ip), + Field('user_id', reference_table_user, default=None, + label=self.messages.label_user_id), + Field('origin', default='auth', length=512, + label=self.messages.label_origin, + requires=is_not_empty), + Field('description', 'text', default='', + label=self.messages.label_description, + requires=is_not_empty), + *settings.extra_fields.get(settings.table_event_name, []), + **dict( + migrate=self._get_migrate( + settings.table_event_name, migrate), + fake_migrate=fake_migrate)) + + return self + + def log_event(self, description, vars=None, origin='auth'): + """ + Examples: + Use as:: + + auth.log_event(description='this happened', origin='auth') + + """ + if not self.settings.logging_enabled or not description: + return + elif self.is_logged_in(): + user_id = self.user.id + else: + user_id = None # user unknown + vars = vars or {} + # log messages should not be translated + if type(description).__name__ == 'lazyT': + description = description.m + self.table_event().insert(description=str(description % vars), origin=origin, user_id=user_id) + + def id_group(self, role): + """ + Returns the group_id of the group specified by the role + """ + rows = self.db(self.table_group().role == role).select() + if not rows: + return None + return rows[0].id + + def user_group(self, user_id=None): + """ + Returns the group_id of the group uniquely associated to this user + i.e. `role=user:[user_id]` + """ + return self.id_group(self.user_group_role(user_id)) + + def user_group_role(self, user_id=None): + if not self.settings.create_user_groups: + return None + if user_id: + user = self.table_user()[user_id] + else: + user = self.user + return self.settings.create_user_groups % user + + def add_group(self, role, description=''): + """ + Creates a group associated to a role + """ + group_id = self.table_group().insert(role=role, description=description) + self.log_event(self.messages['add_group_log'], dict(group_id=group_id, role=role)) + return group_id + + def del_group(self, group_id): + """ + Deletes a group + """ + self.db(self.table_group().id == group_id).delete() + self.db(self.table_membership().group_id == group_id).delete() + self.db(self.table_permission().group_id == group_id).delete() + if group_id in self.user_groups: + del self.user_groups[group_id] + self.log_event(self.messages.del_group_log, dict(group_id=group_id)) + + def update_groups(self): + if not self.user: + return + user_groups = self.user_groups = {} + if current.session.auth: + current.session.auth.user_groups = self.user_groups + table_group = self.table_group() + table_membership = self.table_membership() + memberships = self.db( + table_membership.user_id == self.user.id).select() + for membership in memberships: + group = table_group(membership.group_id) + if group: + user_groups[membership.group_id] = group.role + + def add_membership(self, group_id=None, user_id=None, role=None): + """ + Gives user_id membership of group_id or role + if user is None than user_id is that of current logged in user + """ + + group_id = group_id or self.id_group(role) + try: + group_id = int(group_id) + except: + group_id = self.id_group(group_id) # interpret group_id as a role + if not user_id and self.user: + user_id = self.user.id + if not group_id: + raise ValueError('group_id not provided or invalid') + if not user_id: + raise ValueError('user_id not provided or invalid') + membership = self.table_membership() + db = membership._db + record = db((membership.user_id == user_id) & + (membership.group_id == group_id), + ignore_common_filters=True).select().first() + if record: + if hasattr(record, 'is_active') and not record.is_active: + record.update_record(is_active=True) + return record.id + else: + id = membership.insert(group_id=group_id, user_id=user_id) + if role: + self.user_groups[group_id] = role + else: + self.update_groups() + self.log_event(self.messages['add_membership_log'], + dict(user_id=user_id, group_id=group_id)) + return id + + def del_membership(self, group_id=None, user_id=None, role=None): + """ + Revokes membership from group_id to user_id + if user_id is None than user_id is that of current logged in user + """ + + group_id = group_id or self.id_group(role) + try: + group_id = int(group_id) + except: + group_id = self.id_group(group_id) # interpret group_id as a role + if not user_id and self.user: + user_id = self.user.id + membership = self.table_membership() + self.log_event(self.messages['del_membership_log'], + dict(user_id=user_id, group_id=group_id)) + ret = self.db(membership.user_id == user_id)(membership.group_id == group_id).delete() + if group_id in self.user_groups: + del self.user_groups[group_id] + return ret + + def has_membership(self, group_id=None, user_id=None, role=None): + """ + Checks if user is member of group_id or role + """ + group_id = group_id or self.id_group(role) + try: + group_id = int(group_id) + except: + group_id = self.id_group(group_id) # interpret group_id as a role + if not user_id and self.user: + user_id = self.user.id + membership = self.table_membership() + if group_id and user_id and self.db((membership.user_id == user_id) & + (membership.group_id == group_id)).select(): + r = True + else: + r = False + self.log_event(self.messages['has_membership_log'], + dict(user_id=user_id, group_id=group_id, check=r)) + return r + + def add_permission(self, + group_id, + name='any', + table_name='', + record_id=0, + ): + """ + Gives group_id 'name' access to 'table_name' and 'record_id' + """ + + permission = self.table_permission() + if group_id == 0: + group_id = self.user_group() + record = self.db((permission.group_id == group_id) & + (permission.name == name) & + (permission.table_name == str(table_name)) & + (permission.record_id == long(record_id)), + ignore_common_filters=True + ).select(limitby=(0, 1), orderby_on_limitby=False).first() + if record: + if hasattr(record, 'is_active') and not record.is_active: + record.update_record(is_active=True) + id = record.id + else: + id = permission.insert(group_id=group_id, name=name, + table_name=str(table_name), + record_id=long(record_id)) + self.log_event(self.messages['add_permission_log'], + dict(permission_id=id, group_id=group_id, + name=name, table_name=table_name, + record_id=record_id)) + return id + + def del_permission(self, + group_id, + name='any', + table_name='', + record_id=0, + ): + """ + Revokes group_id 'name' access to 'table_name' and 'record_id' + """ + + permission = self.table_permission() + self.log_event(self.messages['del_permission_log'], + dict(group_id=group_id, name=name, + table_name=table_name, record_id=record_id)) + return self.db(permission.group_id == + group_id)(permission.name == + name)(permission.table_name == + str(table_name))(permission.record_id == + long(record_id)).delete() + + def has_permission(self, + name='any', + table_name='', + record_id=0, + user_id=None, + group_id=None, + ): + """ + Checks if user_id or current logged in user is member of a group + that has 'name' permission on 'table_name' and 'record_id' + if group_id is passed, it checks whether the group has the permission + """ + + if not group_id and self.settings.everybody_group_id and \ + self.has_permission(name, table_name, record_id, user_id=None, + group_id=self.settings.everybody_group_id): + return True + + if not user_id and not group_id and self.user: + user_id = self.user.id + if user_id: + membership = self.table_membership() + rows = self.db(membership.user_id == user_id).select(membership.group_id) + groups = set([row.group_id for row in rows]) + if group_id and group_id not in groups: + return False + else: + groups = set([group_id]) + permission = self.table_permission() + rows = self.db(permission.name == + name)(permission.table_name == + str(table_name))(permission.record_id == + record_id).select(permission.group_id) + groups_required = set([row.group_id for row in rows]) + if record_id: + rows = self.db(permission.name == + name)(permission.table_name == + str(table_name))(permission.record_id == + 0).select(permission.group_id) + groups_required = groups_required.union(set([row.group_id for row in rows])) + if groups.intersection(groups_required): + r = True + else: + r = False + if user_id: + self.log_event(self.messages['has_permission_log'], + dict(user_id=user_id, name=name, + table_name=table_name, record_id=record_id)) + return r + + def is_logged_in(self): + """ + Checks if the user is logged in and returns True/False. + If so user is in auth.user as well as in session.auth.user + """ + if self.user: + return True + return False + + def login_user(self, user): + """ + Logins the `user = db.auth_user(id)` + """ + if global_settings.web2py_runtime_gae: + user = Row(self.table_user()._filter_fields(user, id=True)) + delattr(user, 'password') + else: + user = Row(user) + for key in list(user.keys()): + value = user[key] + if callable(value) or key == 'password': + delattr(user, key) + if self.settings.renew_session_onlogin: + current.session.renew(clear_session=not self.settings.keep_session_onlogin) + current.session.auth = Storage(user=user, + last_visit=current.request.now, + expiration=self.settings.expiration, + hmac_key=web2py_uuid()) + self.user = user + self.update_groups() + + def login(self, log=DEFAULT, **kwargs): + """ + Login a user + + Keyword Args: + username/email/name_of_your_username_field (string) - username + password/name_of_your_passfield (string) - user's password + remember_me (boolean) - extend the duration of the login to settings.long_expiration + """ + settings = self.settings + session = current.session + table_user = self.table_user() + + if 'username' in table_user.fields or \ + not settings.login_email_validate: + userfield_validator = IS_NOT_EMPTY(error_message=self.messages.is_empty) + if not settings.username_case_sensitive: + userfield_validator = [IS_LOWER(), userfield_validator] + else: + userfield_validator = IS_EMAIL(error_message=self.messages.invalid_email) + if not settings.email_case_sensitive: + userfield_validator = [IS_LOWER(), userfield_validator] + + passfield = settings.password_field + + if log is DEFAULT: + log = self.messages['login_log'] + + user = None + + # Setup the default field used for the userfield + if self.settings.login_userfield: + userfield = self.settings.login_userfield + else: + if 'username' in table_user.fields: + userfield = 'username' + else: + userfield = 'email' + + # Get the userfield from kwargs and validate it + userfield_value = kwargs.get(userfield) + if userfield_value is None: + raise KeyError('%s not found in kwargs' % userfield) + + validated, error = self.__validate(userfield_value, userfield_validator) + + if error: + return {'errors': {userfield: error}, 'message': self.messages.invalid_login, 'user': None} + + # Get the user for this userfield and check it + user = table_user(**{userfield: validated}) + + if user is None: + return {'errors': {userfield: self.messages.invalid_user}, 'message': self.messages.invalid_login, 'user': None} + + if (user.registration_key or '').startswith('pending'): + return {'errors': None, 'message': self.messages.registration_pending, 'user': None} + elif user.registration_key in ('disabled', 'blocked'): + return {'errors': None, 'message': self.messages.login_disabled, 'user': None} + elif (user.registration_key is not None and user.registration_key.strip()): + return {'errors': None, 'message': self.messages.registration_verifying, 'user': None} + + # Finally verify the password + passfield = settings.password_field + password = table_user[passfield].validate(kwargs.get(passfield, ''))[0] + + if password == user[passfield]: + self.login_user(user) + session.auth.expiration = \ + kwargs.get('remember_me', False) and \ + settings.long_expiration or \ + settings.expiration + session.auth.remember_me = kwargs.get('remember_me', False) + self.log_event(log, user) + return {'errors': None, 'message': self.messages.logged_in, 'user': {k: user[k] for k in table_user.fields if table_user[k].readable}} + else: + self.log_event(self.messages['login_failed_log'], kwargs) + return {'errors': {passfield: self.messages.invalid_password}, 'message': self.messages.invalid_login, 'user': None} + + def logout(self, log=DEFAULT, onlogout=DEFAULT, **kwargs): + """ + Logs out user + """ + settings = self.settings + session = current.session + + if onlogout is DEFAULT: + onlogout = settings.logout_onlogout + if onlogout: + onlogout(self.user) + if log is DEFAULT: + log = self.messages['logout_log'] + if self.user: + self.log_event(log, self.user) + + session.auth = None + self.user = None + if settings.renew_session_onlogout: + session.renew(clear_session=not settings.keep_session_onlogout) + + return {'errors': None, 'message': self.messages.logged_out, 'user': None} + + def register(self, log=DEFAULT, **kwargs): + """ + Register a user. + """ + + table_user = self.table_user() + settings = self.settings + + if self.is_logged_in(): + raise AssertionError('User trying to register is logged in') + + if log is DEFAULT: + log = self.messages['register_log'] + + if self.settings.login_userfield: + userfield = self.settings.login_userfield + elif 'username' in table_user.fields: + userfield = 'username' + else: + userfield = 'email' + + # Ensure the username field is unique. + unique_validator = IS_NOT_IN_DB(self.db, table_user[userfield]) + userfield_validator = table_user[userfield].requires + if userfield_validator is None: + userfield_validator = unique_validator + elif isinstance(userfield_validator, (list, tuple)): + if not any([isinstance(validator, IS_NOT_IN_DB) for validator in + userfield_validator]): + if isinstance(userfield_validator, list): + userfield_validator.append(unique_validator) + else: + userfield_validator += (unique_validator, ) + elif not isinstance(userfield_validator, IS_NOT_IN_DB): + userfield_validator = [userfield_validator, unique_validator] + table_user[userfield].requires = userfield_validator + + passfield = settings.password_field + + try: # Make sure we have our original minimum length + table_user[passfield].requires[-1].min_length = settings.password_min_length + except: + pass + + key = web2py_uuid() + if settings.registration_requires_approval: + key = 'pending-' + key + + table_user.registration_key.default = key + + result = table_user.validate_and_insert(**kwargs) + if result.errors: + return {'errors': result.errors.as_dict(), 'message': None, 'user': None} + + user = table_user[result.id] + + message = self.messages.registration_successful + + if settings.create_user_groups: + d = user.as_dict() + description = self.messages.group_description % d + group_id = self.add_group(settings.create_user_groups % d, description) + self.add_membership(group_id, result.id) + + if self.settings.everybody_group_id: + self.add_membership(self.settings.everybody_group_id, result) + + if settings.registration_requires_verification: + d = {k: user[k] for k in table_user.fields if table_user[k].readable} + d['key']= key + if settings.login_after_registration and not settings.registration_requires_approval: + self.login_user(user) + return {'errors': None, 'message': None, 'user': d} + + if settings.registration_requires_approval: + user.update_record(registration_key='pending') + message = self.messages.registration_pending + elif settings.login_after_registration: + user.update_record(registration_key='') + self.login_user(user) + message = self.messages.logged_in + + self.log_event(log, user) + + return {'errors': None, 'message': message, 'user': {k: user[k] for k in table_user.fields if table_user[k].readable}} + + def profile(self, log=DEFAULT, **kwargs): + """ + Lets the user change his/her profile + """ + + table_user = self.table_user() + settings = self.settings + table_user[settings.password_field].writable = False + + if not self.is_logged_in(): + raise AssertionError('User is not logged in') + + if not kwargs: + user = table_user[self.user.id] + return {'errors': None, 'message': None, 'user': {k: user[k] for k in table_user.fields if table_user[k].readable}} + + result = self.db(table_user.id == self.user.id).validate_and_update(**kwargs) + user = table_user[self.user.id] + + if result.errors: + return {'errors': result.errors, 'message': None, 'user': {k: user[k] for k in table_user.fields if table_user[k].readable}} + + if log is DEFAULT: + log = self.messages['profile_log'] + + self.log_event(log, user) + self.user.update(**kwargs) + return {'errors': None, 'message': self.messages.profile_updated, 'user': {k: user[k] for k in table_user.fields if table_user[k].readable}} + + def change_password(self, log=DEFAULT, **kwargs): + """ + Lets the user change password + + Keyword Args: + old_password (string) - User's current password + new_password (string) - User's new password + new_password2 (string) - Verify the new password + """ + settings = self.settings + messages = self.messages + + if not self.is_logged_in(): + raise AssertionError('User is not logged in') + + db = self.db + table_user = self.table_user() + s = db(table_user.id == self.user.id) + + request = current.request + session = current.session + passfield = settings.password_field + + requires = table_user[passfield].requires + if not isinstance(requires, (list, tuple)): + requires = [requires] + requires = list(filter(lambda t: isinstance(t, CRYPT), requires)) + if requires: + requires[0].min_length = 0 + + old_password = kwargs.get('old_password', '') + new_password = kwargs.get('new_password', '') + new_password2 = kwargs.get('new_password2', '') + + validator_old = requires + validator_pass2 = IS_EQUAL_TO(new_password, error_message=messages.mismatched_password) + + old_password, error_old = self.__validate(old_password, validator_old) + new_password2, error_new2 = self.__validate(new_password2, validator_pass2) + + errors = {} + if error_old: + errors['old_password'] = error_old + if error_new2: + errors['new_password2'] = error_new2 + if errors: + return {'errors': errors, 'message': None} + + current_user = s.select(limitby=(0, 1), orderby_on_limitby=False).first() + if not old_password == current_user[passfield]: + return {'errors': {'old_password': messages.invalid_password}, 'message': None} + else: + d = {passfield: new_password} + resp = s.validate_and_update(**d) + if resp.errors: + return {'errors': {'new_password': resp.errors[passfield]}, 'message': None} + if log is DEFAULT: + log = messages['change_password_log'] + self.log_event(log, self.user) + return {'errors': None, 'message': messages.password_changed} + + def verify_key(self, + key=None, + ignore_approval=False, + log=DEFAULT, + ): + """ + Verify a given registration_key actually exists in the user table. + Resets the key to empty string '' or 'pending' if + setttings.registration_requires_approval is true. + + Keyword Args: + key (string) - User's registration key + """ + table_user = self.table_user() + user = table_user(registration_key=key) + if (user is None) or (key is None): + return {'errors': {'key': self.messages.invalid_key}, 'message': self.messages.invalid_key } + + if self.settings.registration_requires_approval: + user.update_record(registration_key='pending') + result = {'errors': None, 'message': self.messages.registration_pending} + else: + user.update_record(registration_key='') + result = {'errors': None, 'message': self.messages.key_verified} + # make sure session has same user.registration_key as db record + if current.session.auth and current.session.auth.user: + current.session.auth.user.registration_key = user.registration_key + if log is DEFAULT: + log = self.messages['verify_log'] + self.log_event(log, user) + return result diff --git a/gluon/tests/__init__.py b/gluon/tests/__init__.py index 2f111fe3..a0e3fa56 100644 --- a/gluon/tests/__init__.py +++ b/gluon/tests/__init__.py @@ -13,6 +13,7 @@ from .test_contribs import * from .test_routes import * from .test_router import * from .test_validators import * +from .test_authapi import * from .test_tools import * from .test_utils import * from .test_serializers import * diff --git a/gluon/tests/test_authapi.py b/gluon/tests/test_authapi.py new file mode 100644 index 00000000..cd00afb6 --- /dev/null +++ b/gluon/tests/test_authapi.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" Unit tests for authapi """ +import os +import unittest +from gluon.globals import Request, Response, Session +from gluon.languages import translator +from gluon.dal import DAL, Field +from gluon.authapi import AuthAPI +from gluon.storage import Storage +from gluon._compat import to_bytes, to_native, add_charset + +DEFAULT_URI = os.getenv('DB', 'sqlite:memory') + + +class TestAuthAPI(unittest.TestCase): + + def setUp(self): + self.request = Request(env={}) + self.request.application = 'a' + self.request.controller = 'c' + self.request.function = 'f' + self.request.folder = 'applications/admin' + self.response = Response() + self.session = Session() + T = translator('', 'en') + self.session.connect(self.request, self.response) + from gluon.globals import current + self.current = current + self.current.request = self.request + self.current.response = self.response + self.current.session = self.session + self.current.T = T + self.db = DAL(DEFAULT_URI, check_reserved=['all']) + self.auth = AuthAPI(self.db) + self.auth.define_tables(username=True, signature=False) + # Create a user + self.auth.table_user().validate_and_insert(first_name='Bart', + last_name='Simpson', + username='bart', + email='bart@simpson.com', + password='bart_password', + registration_key='', + registration_id='' + ) + self.db.commit() + + def test_login(self): + result = self.auth.login(**{'username': 'bart', 'password': 'bart_password'}) + self.assertTrue(self.auth.is_logged_in()) + self.assertTrue(result['user']['email'] == 'bart@simpson.com') + self.auth.logout() + self.assertFalse(self.auth.is_logged_in()) + self.auth.settings.username_case_sensitive = False + result = self.auth.login(**{'username': 'BarT', 'password': 'bart_password'}) + self.assertTrue(self.auth.is_logged_in()) + + def test_logout(self): + self.auth.login(**{'username': 'bart', 'password': 'bart_password'}) + self.assertTrue(self.auth.is_logged_in()) + result = self.auth.logout() + self.assertTrue(not self.auth.is_logged_in()) + self.assertTrue(result['user'] is None) + + def test_register(self): + self.auth.settings.login_after_registration = True + result = self.auth.register(**{ + 'username': 'lisa', + 'first_name': 'Lisa', + 'last_name': 'Simpson', + 'email': 'lisa@simpson.com', + 'password': 'lisa_password' + }) + self.assertTrue(result['user']['email'] == 'lisa@simpson.com') + self.assertTrue(self.auth.is_logged_in()) + with self.assertRaises(AssertionError): # Can't register if you're logged in + result = self.auth.register(**{ + 'username': 'lisa', + 'first_name': 'Lisa', + 'last_name': 'Simpson', + 'email': 'lisa@simpson.com', + 'password': 'lisa_password' + }) + self.auth.logout() + self.auth.settings.login_after_registration = False + result = self.auth.register(**{ + 'username': 'barney', + 'first_name': 'Barney', + 'last_name': 'Gumble', + 'email': 'barney@simpson.com', + 'password': 'barney_password' + }) + self.assertTrue(result['user']['email'] == 'barney@simpson.com') + self.assertFalse(self.auth.is_logged_in()) + self.auth.settings.login_userfield = 'email' + result = self.auth.register(**{ + 'username': 'lisa', + 'first_name': 'Lisa', + 'last_name': 'Simpson', + 'email': 'lisa@simpson.com', + 'password': 'lisa_password' + }) + self.assertTrue(result['errors']['email'] == self.auth.messages.email_taken) + self.assertTrue(result['user'] is None) + self.auth.settings.registration_requires_verification = True + result = self.auth.register(**{ + 'username': 'homer', + 'first_name': 'Homer', + 'last_name': 'Simpson', + 'email': 'homer@simpson.com', + 'password': 'homer_password' + }) + self.assertTrue('key' in result['user']) + + def test_profile(self): + with self.assertRaises(AssertionError): + # We are not logged in + self.auth.profile() + self.auth.login(**{'username': 'bart', 'password': 'bart_password'}) + self.assertTrue(self.auth.is_logged_in()) + result = self.auth.profile(email='bartolo@simpson.com') + self.assertTrue(result['user']['email'] == 'bartolo@simpson.com') + self.assertTrue(self.auth.table_user()[result['user']['id']].email == 'bartolo@simpson.com') + + def test_change_password(self): + with self.assertRaises(AssertionError): + # We are not logged in + self.auth.change_password() + self.auth.login(**{'username': 'bart', 'password': 'bart_password'}) + self.assertTrue(self.auth.is_logged_in()) + self.auth.change_password(old_password='bart_password', new_password='1234', new_password2='1234') + self.auth.logout() + self.assertTrue(not self.auth.is_logged_in()) + self.auth.login(username='bart', password='1234') + self.assertTrue(self.auth.is_logged_in()) + result = self.auth.change_password(old_password='bart_password', new_password='1234', new_password2='5678') + self.assertTrue('new_password2' in result['errors']) + result = self.auth.change_password(old_password='bart_password', new_password='1234', new_password2='1234') + self.assertTrue('old_password' in result['errors']) + + def test_verify_key(self): + self.auth.settings.registration_requires_verification = True + result = self.auth.register(**{ + 'username': 'homer', + 'first_name': 'Homer', + 'last_name': 'Simpson', + 'email': 'homer@simpson.com', + 'password': 'homer_password' + }) + self.assertTrue('key' in result['user']) + homer_id = result['user']['id'] + homers_key = result['user']['key'] + result = self.auth.verify_key(key=None) + self.assertTrue(result['errors'] is not None) + result = self.auth.verify_key(key='12345') + self.assertTrue(result['errors'] is not None) + result = self.auth.verify_key(key=homers_key) + self.assertTrue(result['errors'] is None) + self.assertEqual(self.auth.table_user()[homer_id].registration_key, '') + self.auth.settings.registration_requires_approval = True + result = self.auth.register(**{ + 'username': 'lisa', + 'first_name': 'Lisa', + 'last_name': 'Simpson', + 'email': 'lisa@simpson.com', + 'password': 'lisa_password' + }) + lisa_id = result['user']['id'] + result = self.auth.verify_key(key=result['user']['key']) + self.assertEqual(self.auth.table_user()[lisa_id].registration_key, 'pending') diff --git a/gluon/tests/test_validators.py b/gluon/tests/test_validators.py index c98b4c26..a7a35e90 100644 --- a/gluon/tests/test_validators.py +++ b/gluon/tests/test_validators.py @@ -702,15 +702,19 @@ class TestValidators(unittest.TestCase): def test_IS_LOWER(self): rtn = IS_LOWER()('ABC') + self.assertEqual(rtn, ('abc', None)) + rtn = IS_LOWER()(b'ABC') self.assertEqual(rtn, (b'abc', None)) rtn = IS_LOWER()('Ñ') - self.assertEqual(rtn, (b'\xc3\xb1', None)) + self.assertEqual(rtn, ('ñ', None)) def test_IS_UPPER(self): rtn = IS_UPPER()('abc') + self.assertEqual(rtn, ('ABC', None)) + rtn = IS_UPPER()(b'abc') self.assertEqual(rtn, (b'ABC', None)) rtn = IS_UPPER()('ñ') - self.assertEqual(rtn, (b'\xc3\x91', None)) + self.assertEqual(rtn, ('Ñ', None)) def test_IS_SLUG(self): rtn = IS_SLUG()('abc123') diff --git a/gluon/tools.py b/gluon/tools.py index eb4a61f9..471f0166 100644 --- a/gluon/tools.py +++ b/gluon/tools.py @@ -36,6 +36,7 @@ import json from email import message_from_string +from gluon.authapi import AuthAPI from gluon.contenttype import contenttype from gluon.storage import Storage, StorageList, Settings, Messages from gluon.utils import web2py_uuid, compare @@ -55,7 +56,7 @@ Field = DAL.Field __all__ = ['Mail', 'Auth', 'Recaptcha', 'Recaptcha2', 'Crud', 'Service', 'Wiki', 'PluginManager', 'fetch', 'geocode', 'reverse_geocode', 'prettydate'] -### mind there are two loggers here (logger and crud.settings.logger)! +# mind there are two loggers here (logger and crud.settings.logger)! logger = logging.getLogger("web2py") DEFAULT = lambda: None @@ -245,7 +246,7 @@ class Mail(object): if filename is None: raise Exception('Missing attachment name') payload = payload.read() - #FIXME PY3 can be used to_native? + # FIXME PY3 can be used to_native? filename = filename.encode(encoding) if content_type is None: content_type = contenttype(filename) @@ -633,13 +634,13 @@ class Mail(object): x509_sign_chainfile = x509_sign_chainfile or self.settings.x509_sign_chainfile x509_sign_certfile = x509_sign_certfile or self.settings.x509_sign_certfile or \ - x509_sign_keyfile or self.settings.x509_sign_certfile + x509_sign_keyfile or self.settings.x509_sign_certfile # crypt certfiles could be a string or a list x509_crypt_certfiles = x509_crypt_certfiles or self.settings.x509_crypt_certfiles x509_nocerts = x509_nocerts or\ - self.settings.x509_nocerts + self.settings.x509_nocerts # need m2crypto try: @@ -697,8 +698,8 @@ class Mail(object): # make an encryption cert's stack for crypt_certfile in x509_crypt_certfiles: certfile = X509.load_cert(crypt_certfile)\ - if os.path.isfile(crypt_certfile)\ - else X509.load_cert_string(crypt_certfile) + if os.path.isfile(crypt_certfile)\ + else X509.load_cert_string(crypt_certfile) sk.push(certfile) s.set_x509_stack(sk) @@ -768,10 +769,10 @@ class Mail(object): xcc['reply_to'] = reply_to from google.appengine.api import mail attachments = attachments and [mail.Attachment( - a.my_filename, - a.my_payload, - content_id='' % k - ) for k, a in enumerate(attachments) if not raw] + a.my_filename, + a.my_payload, + content_id='' % k + ) for k, a in enumerate(attachments) if not raw] if attachments: result = mail.send_mail( sender=sender, to=origTo, @@ -841,7 +842,7 @@ class Recaptcha(DIV): options='', comment='', ajax=False - ): + ): request = request or current.request self.request_vars = request and request.vars or current.request.vars self.remote_addr = request.env.remote_addr @@ -1217,7 +1218,7 @@ class AuthJWT(object): salt=None, additional_payload=None, before_authorization=None, - max_header_length=4*1024, + max_header_length=4 * 1024, ): self.secret_key = secret_key self.auth = auth @@ -1318,9 +1319,9 @@ class AuthJWT(object): while sticking to a somewhat-stable Auth API. """ # TODO: Check the following comment - ## is the following safe or should we use - ## calendar.timegm(datetime.datetime.utcnow().timetuple()) - ## result seem to be the same (seconds since epoch, in UTC) + # is the following safe or should we use + # calendar.timegm(datetime.datetime.utcnow().timetuple()) + # result seem to be the same (seconds since epoch, in UTC) now = time.mktime(datetime.datetime.now().timetuple()) expires = now + self.expiration payload = dict( @@ -1484,172 +1485,119 @@ class AuthJWT(object): return decorator -class Auth(object): - default_settings = dict( - hideerror=False, - password_min_length=4, - cas_maps=None, - reset_password_requires_verification=False, - registration_requires_verification=False, - registration_requires_approval=False, - bulk_register_enabled=False, - login_after_registration=False, - login_after_password_change=True, - alternate_requires_registration=False, - create_user_groups="user_%(id)s", - everybody_group_id=None, - manager_actions={}, - auth_manager_role=None, - two_factor_authentication_group=None, - auth_two_factor_enabled=False, - auth_two_factor_tries_left=3, - login_captcha=None, - register_captcha=None, - pre_registration_div=None, - retrieve_username_captcha=None, - retrieve_password_captcha=None, - captcha=None, - prevent_open_redirect_attacks=True, - prevent_password_reset_attacks=True, - expiration=3600, # one hour - long_expiration=3600 * 30 * 24, # one month - remember_me_form=True, - allow_basic_login=False, - allow_basic_login_only=False, - on_failed_authentication=lambda x: redirect(x), - formstyle=None, - label_separator=None, - logging_enabled=True, - allow_delete_accounts=False, - password_field='password', - table_user_name='auth_user', - table_group_name='auth_group', - table_membership_name='auth_membership', - table_permission_name='auth_permission', - table_event_name='auth_event', - table_cas_name='auth_cas', - table_token_name='auth_token', - table_user=None, - table_group=None, - table_membership=None, - table_permission=None, - table_event=None, - table_cas=None, - showid=False, - use_username=False, - login_email_validate=True, - login_userfield=None, - multi_login=False, - logout_onlogout=None, - register_fields=None, - register_verify_password=True, - profile_fields=None, - email_case_sensitive=True, - username_case_sensitive=True, - update_fields=['email'], - ondelete="CASCADE", - client_side=True, - renew_session_onlogin=True, - renew_session_onlogout=True, - keep_session_onlogin=True, - keep_session_onlogout=False, - wiki=Settings(), - ) - # ## these are messages that can be customized - default_messages = dict( - login_button='Log In', - register_button='Sign Up', - password_reset_button='Request reset password', - password_change_button='Change password', - profile_save_button='Apply changes', - submit_button='Submit', - verify_password='Verify Password', - delete_label='Check to delete', - function_disabled='Function disabled', - access_denied='Insufficient privileges', - registration_verifying='Registration needs verification', - registration_pending='Registration is pending approval', - email_taken='This email already has an account', - invalid_username='Invalid username', - username_taken='Username already taken', - login_disabled='Login disabled by administrator', - logged_in='Logged in', - email_sent='Email sent', - unable_to_send_email='Unable to send email', - email_verified='Email verified', - logged_out='Logged out', - registration_successful='Registration successful', - invalid_email='Invalid email', - invalid_login='Invalid login', - invalid_user='Invalid user', - invalid_password='Invalid password', - invalid_two_factor_code = 'Incorrect code. {0} more attempt(s) remaining.', - is_empty="Cannot be empty", - mismatched_password="Password fields don't match", - verify_email='Welcome %(username)s! Click on the link %(link)s to verify your email', - verify_email_subject='Email verification', - username_sent='Your username was emailed to you', - new_password_sent='A new password was emailed to you', - password_changed='Password changed', - retrieve_username='Your username is: %(username)s', - retrieve_username_subject='Username retrieve', - retrieve_password='Your password is: %(password)s', - retrieve_password_subject='Password retrieve', - reset_password='Click on the link %(link)s to reset your password', - reset_password_subject='Password reset', - bulk_invite_subject='Invitation to join %(site)s', - retrieve_two_factor_code='Your temporary login code is {0}', - retrieve_two_factor_code_subject='Two-step Login Authentication Code', - bulk_invite_body='You have been invited to join %(site)s, click %(link)s to complete the process', - invalid_reset_password='Invalid reset password', - profile_updated='Profile updated', - new_password='New password', - old_password='Old password', - group_description='Group uniquely assigned to user %(id)s', - register_log='User %(id)s Registered', - login_log='User %(id)s Logged-in', - login_failed_log=None, - logout_log='User %(id)s Logged-out', - profile_log='User %(id)s Profile updated', - verify_email_log='User %(id)s Verification email sent', - retrieve_username_log='User %(id)s Username retrieved', - retrieve_password_log='User %(id)s Password retrieved', - reset_password_log='User %(id)s Password reset', - change_password_log='User %(id)s Password changed', - add_group_log='Group %(group_id)s created', - del_group_log='Group %(group_id)s deleted', - add_membership_log=None, - del_membership_log=None, - has_membership_log=None, - add_permission_log=None, - del_permission_log=None, - has_permission_log=None, - impersonate_log='User %(id)s is impersonating %(other_id)s', - label_first_name='First name', - label_last_name='Last name', - label_username='Username', - label_email='E-mail', - label_password='Password', - label_registration_key='Registration key', - label_reset_password_key='Reset Password key', - label_registration_id='Registration identifier', - label_role='Role', - label_description='Description', - label_user_id='User ID', - label_group_id='Group ID', - label_name='Name', - label_table_name='Object or table name', - label_record_id='Record ID', - label_time_stamp='Timestamp', - label_client_ip='Client IP', - label_origin='Origin', - label_remember_me="Remember me (for 30 days)", - label_two_factor='Authentication code', - two_factor_comment = 'This code was emailed to you and is required for login.', - verify_password_comment='please input your password again', - ) +class Auth(AuthAPI): + default_settings = dict(AuthAPI.default_settings, + allow_basic_login=False, + allow_basic_login_only=False, + allow_delete_accounts=False, + alternate_requires_registration=False, + auth_manager_role=None, + auth_two_factor_enabled=False, + auth_two_factor_tries_left=3, + bulk_register_enabled=False, + captcha=None, + cas_maps=None, + client_side=True, + formstyle=None, + hideerror=False, + label_separator=None, + login_after_password_change=True, + login_after_registration=False, + login_captcha=None, + long_expiration=3600 * 30 * 24, # one month + mailer=None, + manager_actions={}, + multi_login=False, + on_failed_authentication=lambda x: redirect(x), + pre_registration_div=None, + prevent_open_redirect_attacks=True, + prevent_password_reset_attacks=True, + profile_fields=None, + register_captcha=None, + register_fields=None, + register_verify_password=True, + remember_me_form=True, + reset_password_requires_verification=False, + retrieve_password_captcha=None, + retrieve_username_captcha=None, + showid=False, + table_cas=None, + table_cas_name='auth_cas', + table_event=None, + table_group=None, + table_membership=None, + table_permission=None, + table_token_name='auth_token', + table_user=None, + two_factor_authentication_group=None, + update_fields=['email'], + wiki=Settings() + ) + # ## these are messages that can be customized + default_messages = dict(AuthAPI.default_messages, + access_denied='Insufficient privileges', + bulk_invite_body='You have been invited to join %(site)s, click %(link)s to complete the process', + bulk_invite_subject='Invitation to join %(site)s', + delete_label='Check to delete', + email_sent='Email sent', + email_verified='Email verified', + function_disabled='Function disabled', + impersonate_log='User %(id)s is impersonating %(other_id)s', + invalid_reset_password='Invalid reset password', + invalid_two_factor_code='Incorrect code. {0} more attempt(s) remaining.', + is_empty="Cannot be empty", + label_client_ip='Client IP', + label_description='Description', + label_email='E-mail', + label_first_name='First name', + label_group_id='Group ID', + label_last_name='Last name', + label_name='Name', + label_origin='Origin', + label_password='Password', + label_record_id='Record ID', + label_registration_id='Registration identifier', + label_registration_key='Registration key', + label_remember_me="Remember me (for 30 days)", + label_reset_password_key='Reset Password key', + label_role='Role', + label_table_name='Object or table name', + label_time_stamp='Timestamp', + label_two_factor='Authentication code', + label_user_id='User ID', + label_username='Username', + login_button='Log In', + login_disabled='Login disabled by administrator', + new_password='New password', + new_password_sent='A new password was emailed to you', + old_password='Old password', + password_change_button='Change password', + password_reset_button='Request reset password', + profile_save_button='Apply changes', + register_button='Sign Up', + reset_password='Click on the link %(link)s to reset your password', + reset_password_log='User %(id)s Password reset', + reset_password_subject='Password reset', + retrieve_password='Your password is: %(password)s', + retrieve_password_log='User %(id)s Password retrieved', + retrieve_password_subject='Password retrieve', + retrieve_two_factor_code='Your temporary login code is {0}', + retrieve_two_factor_code_subject='Two-step Login Authentication Code', + retrieve_username='Your username is: %(username)s', + retrieve_username_log='User %(id)s Username retrieved', + retrieve_username_subject='Username retrieve', + submit_button='Submit', + two_factor_comment='This code was emailed to you and is required for login.', + unable_send_email='Unable to send email', + username_sent='Your username was emailed to you', + verify_email='Welcome %(username)s! Click on the link %(link)s to verify your email', + verify_email_log='User %(id)s Verification email sent', + verify_email_subject='Email verification', + verify_password='Verify Password', + verify_password_comment='please input your password again' + ) """ Class for authentication, authorization, role based access control. @@ -1816,7 +1764,7 @@ class Auth(object): self.user = auth.user # this is a trick to speed up sessions to avoid many writes if (now - auth.last_visit).seconds > (auth.expiration / 10): - auth.last_visit = request.now + auth.last_visit = now else: self.user = None if session.auth: @@ -1893,15 +1841,14 @@ class Auth(object): # ## these are messages that can be customized messages = self.messages = Messages(current.T) messages.update(Auth.default_messages) - messages.update(ajax_failed_authentication= - DIV(H4('NOT AUTHORIZED'), - 'Please ', - A('login', - _href=self.settings.login_url + - ('?_next=' + urllib_quote(current.request.env.http_web2py_component_location)) - if current.request.env.http_web2py_component_location else ''), - ' to view this content.', - _class='not-authorized alert alert-block')) + messages.update(ajax_failed_authentication=DIV(H4('NOT AUTHORIZED'), + 'Please ', + A('login', + _href=self.settings.login_url + + ('?_next=' + urllib_quote(current.request.env.http_web2py_component_location)) + if current.request.env.http_web2py_component_location else ''), + ' to view this content.', + _class='not-authorized alert alert-block')) messages.lock_keys = True # for "remember me" option @@ -1932,31 +1879,10 @@ class Auth(object): parts = next.split('/') if not ':' in parts[0]: return next - elif len(parts)>2 and parts[0].endswith(':') and parts[1:3]==['', host]: + elif len(parts) > 2 and parts[0].endswith(':') and parts[1:3] == ['', host]: return next return None - def _get_user_id(self): - """accessor for auth.user_id""" - return self.user and self.user.id or None - - user_id = property(_get_user_id, doc="user.id or None") - - def table_user(self): - return self.db[self.settings.table_user_name] - - def table_group(self): - return self.db[self.settings.table_group_name] - - def table_membership(self): - return self.db[self.settings.table_membership_name] - - def table_permission(self): - return self.db[self.settings.table_permission_name] - - def table_event(self): - return self.db[self.settings.table_event_name] - def table_cas(self): return self.db[self.settings.table_cas_name] @@ -2062,7 +1988,7 @@ class Auth(object): if callable(user_identifier): user_identifier = user_identifier(self.user) elif ((isinstance(user_identifier, str) or - type(user_identifier).__name__ == 'lazyT') and + type(user_identifier).__name__ == 'lazyT') and re.search(r'%\(.+\)s', user_identifier)): user_identifier = user_identifier % self.user if not user_identifier: @@ -2090,13 +2016,13 @@ class Auth(object): self.bar[0][3].append((item['name'], False, item['href'])) def bootstrap3(): # Default web2py scaffolding - def rename(icon): return icon+' '+icon.replace('icon', 'glyphicon') - self.bar = UL(LI(Anr(I(_class=rename('icon '+items[0]['icon'])), + def rename(icon): return icon + ' ' + icon.replace('icon', 'glyphicon') + self.bar = UL(LI(Anr(I(_class=rename('icon ' + items[0]['icon'])), ' ' + items[0]['name'], _href=items[0]['href'])), _class='dropdown-menu') del items[0] for item in items: - self.bar.insert(-1, LI(Anr(I(_class=rename('icon '+item['icon'])), + self.bar.insert(-1, LI(Anr(I(_class=rename('icon ' + item['icon'])), ' ' + item['name'], _href=item['href']))) self.bar.insert(-1, LI('', _class='divider')) @@ -2213,15 +2139,6 @@ class Auth(object): return self.bar - def __get_migrate(self, tablename, migrate=True): - - if type(migrate).__name__ == 'str': - return (migrate + tablename + '.table') - elif migrate == False: - return False - else: - return True - def enable_record_versioning(self, tables, archive_db=None, @@ -2260,56 +2177,13 @@ class Auth(object): for table in tables: fieldnames = table.fields() if ('id' in fieldnames and - 'modified_on' in fieldnames and - not current_record in fieldnames): + 'modified_on' in fieldnames and + not current_record in fieldnames): table._enable_record_versioning(archive_db=archive_db, archive_name=archive_names, current_record=current_record, current_record_label=current_record_label) - def define_signature(self): - db = self.db - settings = self.settings - request = current.request - T = current.T - reference_user = 'reference %s' % settings.table_user_name - - def lazy_user(auth=self): - return auth.user_id - - def represent(id, record=None, s=settings): - try: - user = s.table_user(id) - return '%s %s' % (user.get("first_name", user.get("email")), - user.get("last_name", '')) - except: - return id - ondelete = self.settings.ondelete - self.signature = Table( - self.db, 'auth_signature', - Field('is_active', 'boolean', - default=True, - readable=False, writable=False, - label=T('Is Active')), - Field('created_on', 'datetime', - default=request.now, - writable=False, readable=False, - label=T('Created On')), - Field('created_by', - reference_user, - default=lazy_user, represent=represent, - writable=False, readable=False, - label=T('Created By'), ondelete=ondelete), - Field('modified_on', 'datetime', - update=request.now, default=request.now, - writable=False, readable=False, - label=T('Modified On')), - Field('modified_by', - reference_user, represent=represent, - default=lazy_user, update=lazy_user, - writable=False, readable=False, - label=T('Modified By'), ondelete=ondelete)) - def define_tables(self, username=None, signature=None, enable_tokens=False, migrate=None, fake_migrate=None): """ @@ -2333,179 +2207,11 @@ class Auth(object): if fake_migrate is None: fake_migrate = db._fake_migrate settings = self.settings - if username is None: - username = settings.use_username - else: - settings.use_username = username settings.enable_tokens = enable_tokens - if not self.signature: - self.define_signature() - if signature: - signature_list = [self.signature] - elif not signature: - signature_list = [] - elif isinstance(signature, Table): - signature_list = [signature] - else: - signature_list = signature - is_not_empty = IS_NOT_EMPTY(error_message=self.messages.is_empty) - is_crypted = CRYPT(key=settings.hmac_key, - min_length=settings.password_min_length) - is_unique_email = [ - IS_EMAIL(error_message=self.messages.invalid_email), - IS_NOT_IN_DB(db, '%s.email' % settings.table_user_name, - error_message=self.messages.email_taken)] - if not settings.email_case_sensitive: - is_unique_email.insert(1, IS_LOWER()) - if settings.table_user_name not in db.tables: - passfield = settings.password_field - extra_fields = settings.extra_fields.get( - settings.table_user_name, []) + signature_list - if username or settings.cas_provider: - is_unique_username = \ - [IS_MATCH('[\w\.\-]+', strict=True, - error_message=self.messages.invalid_username), - IS_NOT_IN_DB(db, '%s.username' % settings.table_user_name, - error_message=self.messages.username_taken)] - if not settings.username_case_sensitive: - is_unique_username.insert(1, IS_LOWER()) - db.define_table( - settings.table_user_name, - Field('first_name', length=128, default='', - label=self.messages.label_first_name, - requires=is_not_empty), - Field('last_name', length=128, default='', - label=self.messages.label_last_name, - requires=is_not_empty), - Field('email', length=512, default='', - label=self.messages.label_email, - requires=is_unique_email), - Field('username', length=128, default='', - label=self.messages.label_username, - requires=is_unique_username), - Field(passfield, 'password', length=512, - readable=False, label=self.messages.label_password, - requires=[is_crypted]), - Field('registration_key', length=512, - writable=False, readable=False, default='', - label=self.messages.label_registration_key), - Field('reset_password_key', length=512, - writable=False, readable=False, default='', - label=self.messages.label_reset_password_key), - Field('registration_id', length=512, - writable=False, readable=False, default='', - label=self.messages.label_registration_id), - *extra_fields, - **dict( - migrate=self.__get_migrate(settings.table_user_name, - migrate), - fake_migrate=fake_migrate, - format='%(username)s')) - else: - db.define_table( - settings.table_user_name, - Field('first_name', length=128, default='', - label=self.messages.label_first_name, - requires=is_not_empty), - Field('last_name', length=128, default='', - label=self.messages.label_last_name, - requires=is_not_empty), - Field('email', length=512, default='', - label=self.messages.label_email, - requires=is_unique_email), - Field(passfield, 'password', length=512, - readable=False, label=self.messages.label_password, - requires=[is_crypted]), - Field('registration_key', length=512, - writable=False, readable=False, default='', - label=self.messages.label_registration_key), - Field('reset_password_key', length=512, - writable=False, readable=False, default='', - label=self.messages.label_reset_password_key), - Field('registration_id', length=512, - writable=False, readable=False, default='', - label=self.messages.label_registration_id), - *extra_fields, - **dict( - migrate=self.__get_migrate(settings.table_user_name, - migrate), - fake_migrate=fake_migrate, - format='%(first_name)s %(last_name)s (%(id)s)')) - reference_table_user = 'reference %s' % settings.table_user_name - if settings.table_group_name not in db.tables: - extra_fields = settings.extra_fields.get( - settings.table_group_name, []) + signature_list - db.define_table( - settings.table_group_name, - Field('role', length=512, default='', - label=self.messages.label_role, - requires=IS_NOT_IN_DB(db, '%s.role' % settings.table_group_name)), - Field('description', 'text', - label=self.messages.label_description), - *extra_fields, - **dict( - migrate=self.__get_migrate( - settings.table_group_name, migrate), - fake_migrate=fake_migrate, - format='%(role)s (%(id)s)')) - reference_table_group = 'reference %s' % settings.table_group_name - if settings.table_membership_name not in db.tables: - extra_fields = settings.extra_fields.get( - settings.table_membership_name, []) + signature_list - db.define_table( - settings.table_membership_name, - Field('user_id', reference_table_user, - label=self.messages.label_user_id), - Field('group_id', reference_table_group, - label=self.messages.label_group_id), - *extra_fields, - **dict( - migrate=self.__get_migrate( - settings.table_membership_name, migrate), - fake_migrate=fake_migrate)) - if settings.table_permission_name not in db.tables: - extra_fields = settings.extra_fields.get( - settings.table_permission_name, []) + signature_list - db.define_table( - settings.table_permission_name, - Field('group_id', reference_table_group, - label=self.messages.label_group_id), - Field('name', default='default', length=512, - label=self.messages.label_name, - requires=is_not_empty), - Field('table_name', length=512, - label=self.messages.label_table_name), - Field('record_id', 'integer', default=0, - label=self.messages.label_record_id, - requires=IS_INT_IN_RANGE(0, 10 ** 9)), - *extra_fields, - **dict( - migrate=self.__get_migrate( - settings.table_permission_name, migrate), - fake_migrate=fake_migrate)) - if settings.table_event_name not in db.tables: - db.define_table( - settings.table_event_name, - Field('time_stamp', 'datetime', - default=current.request.now, - label=self.messages.label_time_stamp), - Field('client_ip', - default=current.request.client, - label=self.messages.label_client_ip), - Field('user_id', reference_table_user, default=None, - label=self.messages.label_user_id), - Field('origin', default='auth', length=512, - label=self.messages.label_origin, - requires=is_not_empty), - Field('description', 'text', default='', - label=self.messages.label_description, - requires=is_not_empty), - *settings.extra_fields.get(settings.table_event_name, []), - **dict( - migrate=self.__get_migrate( - settings.table_event_name, migrate), - fake_migrate=fake_migrate)) + super(Auth, self).define_tables(username, signature, migrate, fake_migrate) + now = current.request.now + reference_table_user = 'reference %s' % settings.table_user_name if settings.cas_domains: if settings.table_cas_name not in db.tables: db.define_table( @@ -2518,7 +2224,7 @@ class Auth(object): Field('renew', 'boolean', default=False), *settings.extra_fields.get(settings.table_cas_name, []), **dict( - migrate=self.__get_migrate( + migrate=self._get_migrate( settings.table_cas_name, migrate), fake_migrate=fake_migrate)) if settings.enable_tokens: @@ -2532,7 +2238,7 @@ class Auth(object): Field('expires_on', 'datetime', default=datetime.datetime(2999, 12, 31)), Field('token', writable=False, default=web2py_uuid, unique=True), *extra_fields, - **dict(migrate=self.__get_migrate(settings.table_token_name, migrate), + **dict(migrate=self._get_migrate(settings.table_token_name, migrate), fake_migrate=fake_migrate)) if not db._lazy_tables: settings.table_user = db[settings.table_user_name] @@ -2566,26 +2272,6 @@ class Auth(object): maps=maps) return self - def log_event(self, description, vars=None, origin='auth'): - """ - Examples: - Use as:: - - auth.log_event(description='this happened', origin='auth') - - """ - if not self.settings.logging_enabled or not description: - return - elif self.is_logged_in(): - user_id = self.user.id - else: - user_id = None # user unknown - vars = vars or {} - # log messages should not be translated - if type(description).__name__ == 'lazyT': - description = description.m - self.table_event().insert(description=str(description % vars), origin=origin, user_id=user_id) - def get_or_create_user(self, keys, update_fields=['email'], login=True, get=True): """ @@ -2683,29 +2369,6 @@ class Auth(object): raise http_401 return (True, True, is_valid_user) - def login_user(self, user): - """ - Logins the `user = db.auth_user(id)` - """ - from gluon.settings import global_settings - if global_settings.web2py_runtime_gae: - user = Row(self.table_user()._filter_fields(user, id=True)) - delattr(user, 'password') - else: - user = Row(user) - for key in list(user.keys()): - value = user[key] - if callable(value) or key == 'password': - delattr(user, key) - if self.settings.renew_session_onlogin: - current.session.renew(clear_session=not self.settings.keep_session_onlogin) - current.session.auth = Storage(user=user, - last_visit=current.request.now, - expiration=self.settings.expiration, - hmac_key=web2py_uuid()) - self.user = user - self.update_groups() - def _get_login_settings(self): table_user = self.table_user() userfield = self.settings.login_userfield or 'username' \ @@ -2726,7 +2389,7 @@ class Auth(object): settings.passfield].validate(password)[0] if ((user.registration_key is None or not user.registration_key.strip()) and - password == user[settings.passfield]): + password == user[settings.passfield]): self.login_user(user) return user else: @@ -3368,7 +3031,7 @@ class Auth(object): if self.settings.register_fields is None: self.settings.register_fields = [f.name for f in table_user if f.writable] k = self.settings.register_fields.index(passfield) - self.settings.register_fields.insert(k+1, "password_two") + self.settings.register_fields.insert(k + 1, "password_two") extra_fields = [ Field("password_two", "password", requires=IS_EQUAL_TO(request.post_vars.get(passfield, None), @@ -3447,16 +3110,6 @@ class Auth(object): return form - def is_logged_in(self): - """ - Checks if the user is logged in and returns True/False. - If so user is in auth.user as well as in session.auth.user - """ - - if self.user: - return True - return False - def verify_email(self, next=DEFAULT, onaccept=DEFAULT, @@ -3508,7 +3161,7 @@ class Auth(object): response = current.response session = current.session captcha = self.settings.retrieve_username_captcha or \ - (self.settings.retrieve_username_captcha is not False and self.settings.captcha) + (self.settings.retrieve_username_captcha is not False and self.settings.captcha) if not self.settings.mailer: response.flash = self.messages.function_disabled return '' @@ -3627,7 +3280,7 @@ class Auth(object): d = { passfield: str(table_user[passfield].validate(password)[0]), 'registration_key': '' - } + } user.update_record(**d) if self.settings.mailer and \ self.settings.mailer.send(to=form.vars.email, @@ -3684,7 +3337,6 @@ class Auth(object): raise Exception except Exception as e: session.flash = self.messages.invalid_reset_password - redirect(self.url('login', vars=dict(test=e))) redirect(next, client_side=self.settings.client_side) passfield = self.settings.password_field form = SQLFORM.factory( @@ -3768,9 +3420,9 @@ class Auth(object): else: emails_fail.append(email) emails_fail += emails[max_emails:] - form = DIV(H4('Emails sent'), UL(*[A(x, _href='mailto:'+x) for x in emails_sent]), - H4('Emails failed'), UL(*[A(x, _href='mailto:'+x) for x in emails_fail]), - H4('Emails existing'), UL(*[A(x, _href='mailto:'+x) for x in emails_exist])) + form = DIV(H4('Emails sent'), UL(*[A(x, _href='mailto:' + x) for x in emails_sent]), + H4('Emails failed'), UL(*[A(x, _href='mailto:' + x) for x in emails_fail]), + H4('Emails existing'), UL(*[A(x, _href='mailto:' + x) for x in emails_exist])) return form def manage_tokens(self): @@ -3874,7 +3526,7 @@ class Auth(object): response = current.response session = current.session captcha = self.settings.retrieve_password_captcha or \ - (self.settings.retrieve_password_captcha is not False and self.settings.captcha) + (self.settings.retrieve_password_captcha is not False and self.settings.captcha) if next is DEFAULT: next = self.get_vars_next() or self.settings.request_reset_password_next @@ -4077,7 +3729,7 @@ class Auth(object): formstyle=self.settings.formstyle, separator=self.settings.label_separator, deletable=self.settings.allow_delete_accounts, - ) + ) if form.accepts(request, session, formname='profile', onvalidation=onvalidation, @@ -4205,21 +3857,6 @@ class Auth(object): else: return SQLFORM(table_user, user.id, readonly=True) - def update_groups(self): - if not self.user: - return - user_groups = self.user_groups = {} - if current.session.auth: - current.session.auth.user_groups = self.user_groups - table_group = self.table_group() - table_membership = self.table_membership() - memberships = self.db( - table_membership.user_id == self.user.id).select() - for membership in memberships: - group = table_group(membership.group_id) - if group: - user_groups[membership.group_id] = group.role - def groups(self): """ Displays the groups and their roles for the logged in user @@ -4337,6 +3974,7 @@ class Auth(object): If role is provided instead of group_id then the group_id is calculated. """ + def has_membership(self=self, group_id=group_id, role=role): return self.has_membership(group_id=group_id, role=role) return self.requires(has_membership, otherwise=otherwise) @@ -4348,6 +3986,7 @@ class Auth(object): if user logged in is not a member of any group (role) that has 'name' access to 'table_name', 'record_id'. """ + def has_permission(self=self, name=name, table_name=table_name, record_id=record_id): return self.has_permission(name, table_name, record_id) return self.requires(has_permission, otherwise=otherwise) @@ -4363,231 +4002,6 @@ class Auth(object): return URL.verify(current.request, user_signature=True, hash_vars=hash_vars) return self.requires(verify, otherwise) - def add_group(self, role, description=''): - """ - Creates a group associated to a role - """ - group_id = self.table_group().insert(role=role, description=description) - self.log_event(self.messages['add_group_log'], dict(group_id=group_id, role=role)) - return group_id - - def del_group(self, group_id): - """ - Deletes a group - """ - self.db(self.table_group().id == group_id).delete() - self.db(self.table_membership().group_id == group_id).delete() - self.db(self.table_permission().group_id == group_id).delete() - if group_id in self.user_groups: - del self.user_groups[group_id] - self.log_event(self.messages.del_group_log, dict(group_id=group_id)) - - def id_group(self, role): - """ - Returns the group_id of the group specified by the role - """ - rows = self.db(self.table_group().role == role).select() - if not rows: - return None - return rows[0].id - - def user_group(self, user_id=None): - """ - Returns the group_id of the group uniquely associated to this user - i.e. `role=user:[user_id]` - """ - return self.id_group(self.user_group_role(user_id)) - - def user_group_role(self, user_id=None): - if not self.settings.create_user_groups: - return None - if user_id: - user = self.table_user()[user_id] - else: - user = self.user - return self.settings.create_user_groups % user - - def has_membership(self, group_id=None, user_id=None, role=None): - """ - Checks if user is member of group_id or role - """ - group_id = group_id or self.id_group(role) - try: - group_id = int(group_id) - except: - group_id = self.id_group(group_id) # interpret group_id as a role - if not user_id and self.user: - user_id = self.user.id - membership = self.table_membership() - if group_id and user_id and self.db((membership.user_id == user_id) & - (membership.group_id == group_id)).select(): - r = True - else: - r = False - self.log_event(self.messages['has_membership_log'], - dict(user_id=user_id, group_id=group_id, check=r)) - return r - - def add_membership(self, group_id=None, user_id=None, role=None): - """ - Gives user_id membership of group_id or role - if user is None than user_id is that of current logged in user - """ - - group_id = group_id or self.id_group(role) - try: - group_id = int(group_id) - except: - group_id = self.id_group(group_id) # interpret group_id as a role - if not user_id and self.user: - user_id = self.user.id - if not group_id: - raise ValueError('group_id not provided or invalid') - if not user_id: - raise ValueError('user_id not provided or invalid') - membership = self.table_membership() - db = membership._db - record = db((membership.user_id == user_id) & - (membership.group_id == group_id), - ignore_common_filters=True).select().first() - if record: - if hasattr(record, 'is_active') and not record.is_active: - record.update_record(is_active=True) - return record.id - else: - id = membership.insert(group_id=group_id, user_id=user_id) - if role: - self.user_groups[group_id] = role - else: - self.update_groups() - self.log_event(self.messages['add_membership_log'], - dict(user_id=user_id, group_id=group_id)) - return id - - def del_membership(self, group_id=None, user_id=None, role=None): - """ - Revokes membership from group_id to user_id - if user_id is None than user_id is that of current logged in user - """ - - group_id = group_id or self.id_group(role) - try: - group_id = int(group_id) - except: - group_id = self.id_group(group_id) # interpret group_id as a role - if not user_id and self.user: - user_id = self.user.id - membership = self.table_membership() - self.log_event(self.messages['del_membership_log'], - dict(user_id=user_id, group_id=group_id)) - ret = self.db(membership.user_id == user_id)(membership.group_id == group_id).delete() - if group_id in self.user_groups: - del self.user_groups[group_id] - return ret - - def has_permission(self, - name='any', - table_name='', - record_id=0, - user_id=None, - group_id=None, - ): - """ - Checks if user_id or current logged in user is member of a group - that has 'name' permission on 'table_name' and 'record_id' - if group_id is passed, it checks whether the group has the permission - """ - - if not group_id and self.settings.everybody_group_id and \ - self.has_permission(name, table_name, record_id, user_id=None, - group_id=self.settings.everybody_group_id): - return True - - if not user_id and not group_id and self.user: - user_id = self.user.id - if user_id: - membership = self.table_membership() - rows = self.db(membership.user_id == user_id).select(membership.group_id) - groups = set([row.group_id for row in rows]) - if group_id and group_id not in groups: - return False - else: - groups = set([group_id]) - permission = self.table_permission() - rows = self.db(permission.name == - name)(permission.table_name == - str(table_name))(permission.record_id == - record_id).select(permission.group_id) - groups_required = set([row.group_id for row in rows]) - if record_id: - rows = self.db(permission.name == - name)(permission.table_name == - str(table_name))(permission.record_id == - 0).select(permission.group_id) - groups_required = groups_required.union(set([row.group_id for row in rows])) - if groups.intersection(groups_required): - r = True - else: - r = False - if user_id: - self.log_event(self.messages['has_permission_log'], - dict(user_id=user_id, name=name, - table_name=table_name, record_id=record_id)) - return r - - def add_permission(self, - group_id, - name='any', - table_name='', - record_id=0, - ): - """ - Gives group_id 'name' access to 'table_name' and 'record_id' - """ - - permission = self.table_permission() - if group_id == 0: - group_id = self.user_group() - record = self.db((permission.group_id == group_id) & - (permission.name == name) & - (permission.table_name == str(table_name)) & - (permission.record_id == long(record_id)), - ignore_common_filters=True - ).select(limitby=(0, 1), orderby_on_limitby=False).first() - if record: - if hasattr(record, 'is_active') and not record.is_active: - record.update_record(is_active=True) - id = record.id - else: - id = permission.insert(group_id=group_id, name=name, - table_name=str(table_name), - record_id=long(record_id)) - self.log_event(self.messages['add_permission_log'], - dict(permission_id=id, group_id=group_id, - name=name, table_name=table_name, - record_id=record_id)) - return id - - def del_permission(self, - group_id, - name='any', - table_name='', - record_id=0, - ): - """ - Revokes group_id 'name' access to 'table_name' and 'record_id' - """ - - permission = self.table_permission() - self.log_event(self.messages['del_permission_log'], - dict(group_id=group_id, name=name, - table_name=table_name, record_id=record_id)) - return self.db(permission.group_id == - group_id)(permission.name == - name)(permission.table_name == - str(table_name))(permission.record_id == - long(record_id)).delete() - def accessible_query(self, name, table, user_id=None): """ Returns a query with all accessible records for user_id or @@ -4622,16 +4036,16 @@ class Auth(object): permission = self.table_permission() query = table.id.belongs( db(membership.user_id == user_id) - (membership.group_id == permission.group_id) - (permission.name == name) - (permission.table_name == table) - ._select(permission.record_id)) + (membership.group_id == permission.group_id) + (permission.name == name) + (permission.table_name == table) + ._select(permission.record_id)) if self.settings.everybody_group_id: query |= table.id.belongs( db(permission.group_id == self.settings.everybody_group_id) - (permission.name == name) - (permission.table_name == table) - ._select(permission.record_id)) + (permission.name == name) + (permission.table_name == table) + ._select(permission.record_id)) return query @staticmethod @@ -4769,12 +4183,12 @@ class Auth(object): def wikimenu(self): """To be used in menu.py for app wide wiki menus""" if (hasattr(self, "_wiki") and - self._wiki.settings.controller and - self._wiki.settings.function): + self._wiki.settings.controller and + self._wiki.settings.function): self._wiki.automenu() -class Crud(object): # pragma: no cover +class Crud(object): # pragma: no cover def url(self, f=None, args=None, vars=None): """ @@ -4947,7 +4361,7 @@ class Crud(object): # pragma: no cover formstyle=self.settings.formstyle, separator=self.settings.label_separator, **attributes # contains hidden - ) + ) self.accepted = False self.deleted = False captcha = self.settings.update_captcha or self.settings.captcha @@ -5044,7 +4458,7 @@ class Crud(object): # pragma: no cover showid=self.settings.showid, formstyle=self.settings.formstyle, separator=self.settings.label_separator - ) + ) if current.request.extension not in ('html', 'load'): return table._filter_fields(form.record, id=True) return form @@ -5702,6 +5116,7 @@ class Service(object): self.error() class JsonRpcException(Exception): + def __init__(self, code, info): jrpc_error = Service.jsonrpc_errors.get(code) if jrpc_error: @@ -6218,6 +5633,7 @@ class PluginManager(object): class Expose(object): + def __init__(self, base=None, basename=None, extensions=None, allow_download=True, follow_symlink_out=False): """ @@ -6245,7 +5661,8 @@ class Expose(object): """ # why would this not be callable? but otherwise tests do not pass - if current.session and callable(current.session.forget): current.session.forget() + if current.session and callable(current.session.forget): + current.session.forget() self.follow_symlink_out = follow_symlink_out self.base = self.normalize_path( base or os.path.join(current.request.folder, 'static')) @@ -6305,7 +5722,7 @@ class Expose(object): @staticmethod def __in_base(subdir, basedir, sep=os.path.sep): """True if subdir/ is under basedir/""" - s = lambda f: '%s%s' % (f.rstrip(sep), sep) # f -> f/ + s = lambda f: '%s%s' % (f.rstrip(sep), sep) # f -> f/ # The trailing '/' is for the case of '/foobar' in_base of '/foo': # - becase '/foobar' starts with '/foo' # - but '/foobar/' doesn't start with '/foo/' @@ -6470,46 +5887,46 @@ class Wiki(object): table_definitions = [ ('wiki_page', { - 'args': [ - Field('slug', - requires=[IS_SLUG(), - IS_NOT_IN_DB(db, 'wiki_page.slug')], - writable=False), - Field('title', length=255, unique=True), - Field('body', 'text', notnull=True), - Field('tags', 'list:string'), - Field('can_read', 'list:string', - writable=perms, - readable=perms, - default=[Wiki.everybody]), - Field('can_edit', 'list:string', - writable=perms, readable=perms, - default=[Wiki.everybody]), - Field('changelog'), - Field('html', 'text', - compute=self.get_renderer(), - readable=False, writable=False), - Field('render', default="markmin", - readable=show_engine, - writable=show_engine, - requires=IS_EMPTY_OR( - IS_IN_SET(engines))), - auth.signature], - 'vars': {'format': '%(title)s', 'migrate': migrate}}), + 'args': [ + Field('slug', + requires=[IS_SLUG(), + IS_NOT_IN_DB(db, 'wiki_page.slug')], + writable=False), + Field('title', length=255, unique=True), + Field('body', 'text', notnull=True), + Field('tags', 'list:string'), + Field('can_read', 'list:string', + writable=perms, + readable=perms, + default=[Wiki.everybody]), + Field('can_edit', 'list:string', + writable=perms, readable=perms, + default=[Wiki.everybody]), + Field('changelog'), + Field('html', 'text', + compute=self.get_renderer(), + readable=False, writable=False), + Field('render', default="markmin", + readable=show_engine, + writable=show_engine, + requires=IS_EMPTY_OR( + IS_IN_SET(engines))), + auth.signature], + 'vars': {'format': '%(title)s', 'migrate': migrate}}), ('wiki_tag', { - 'args': [ - Field('name'), - Field('wiki_page', 'reference wiki_page'), - auth.signature], - 'vars':{'format': '%(title)s', 'migrate': migrate}}), + 'args': [ + Field('name'), + Field('wiki_page', 'reference wiki_page'), + auth.signature], + 'vars':{'format': '%(title)s', 'migrate': migrate}}), ('wiki_media', { - 'args': [ - Field('wiki_page', 'reference wiki_page'), - Field('title', required=True), - Field('filename', 'upload', required=True), - auth.signature], - 'vars': {'format': '%(title)s', 'migrate': migrate}}), - ] + 'args': [ + Field('wiki_page', 'reference wiki_page'), + Field('title', required=True), + Field('filename', 'upload', required=True), + auth.signature], + 'vars': {'format': '%(title)s', 'migrate': migrate}}), + ] # define only non-existent tables for key, value in table_definitions: @@ -6567,8 +5984,8 @@ class Wiki(object): elif self.auth.user: groups = self.settings.groups if ('wiki_editor' in groups or - set(groups).intersection(set(page.can_read + page.can_edit)) or - page.created_by == self.auth.user.id): + set(groups).intersection(set(page.can_read + page.can_edit)) or + page.created_by == self.auth.user.id): return True return False @@ -6814,7 +6231,7 @@ class Wiki(object): options.insert(0, OPTION('', _value='')) fields = [Field("slug", default=current.request.args(1) or self.settings.force_prefix, - requires=(IS_SLUG(), IS_NOT_IN_DB(db, db.wiki_page.slug))),] + requires=(IS_SLUG(), IS_NOT_IN_DB(db, db.wiki_page.slug))), ] if self.settings.templates: fields.append( Field("from_template", "reference wiki_page", @@ -6980,14 +6397,14 @@ class Wiki(object): def link(t): return A(t, _href=URL(args='_search', vars=dict(q=t))) items = [DIV(H3(A(p.wiki_page.title, _href=URL( - args=p.wiki_page.slug))), - MARKMIN(self.first_paragraph(p.wiki_page)) - if preview else '', - DIV(_class='w2p_wiki_tags', - *[link(t.strip()) for t in - p.wiki_page.tags or [] if t.strip()]), - _class='w2p_wiki_search_item') - for p in pages] + args=p.wiki_page.slug))), + MARKMIN(self.first_paragraph(p.wiki_page)) + if preview else '', + DIV(_class='w2p_wiki_tags', + *[link(t.strip()) for t in + p.wiki_page.tags or [] if t.strip()]), + _class='w2p_wiki_search_item') + for p in pages] content.append(DIV(_class='w2p_wiki_pages', *items)) else: cloud = False @@ -7032,6 +6449,7 @@ class Wiki(object): class Config(object): + def __init__( self, filename, diff --git a/gluon/validators.py b/gluon/validators.py index 4181c226..779acab4 100644 --- a/gluon/validators.py +++ b/gluon/validators.py @@ -2478,9 +2478,14 @@ class IS_LOWER(Validator): ('\\xc3\\xb1', None) """ - def __call__(self, value): - return (to_bytes(to_unicode(value).lower()), None) + cast_back = lambda x: x + if isinstance(value, str): + cast_back = to_native + elif isinstance(value, bytes): + cast_back = to_bytes + value = to_unicode(value).lower() + return (cast_back(value), None) class IS_UPPER(Validator): @@ -2495,7 +2500,13 @@ class IS_UPPER(Validator): """ def __call__(self, value): - return (to_bytes(to_unicode(value).upper()), None) + cast_back = lambda x: x + if isinstance(value, str): + cast_back = to_native + elif isinstance(value, bytes): + cast_back = to_bytes + value = to_unicode(value).upper() + return (cast_back(value), None) def urlify(s, maxlen=80, keep_underscores=False):