diff --git a/gluon/tests/test_tools.py b/gluon/tests/test_tools.py index 933c4e80..8ead4e1a 100644 --- a/gluon/tests/test_tools.py +++ b/gluon/tests/test_tools.py @@ -494,9 +494,7 @@ class TestAuth(unittest.TestCase): self.assertTrue('auth_permission' in self.db) self.assertTrue('auth_event' in self.db) - def test_enable_record_versioning(self): - self.assertTrue('t0_archive' in self.db) - + # Just calling many form functions def test_basic_blank_forms(self): for f in ['login', 'retrieve_password', 'retrieve_username', 'register']: html_form = getattr(self.auth, f)().xml() @@ -517,6 +515,63 @@ class TestAuth(unittest.TestCase): pass return + def test_get_vars_next(self): + self.current.request.vars._next = 'next_test' + self.assertEqual(self.auth.get_vars_next(), 'next_test') + + # TODO: def test_navbar(self): + # TODO: def test___get_migrate(self): + + def test_enable_record_versioning(self): + self.assertTrue('t0_archive' in self.db) + + # TODO: def test_define_signature(self): + # TODO: def test_define_signature(self): + # TODO: def test_define_table(self): + + def test_log_event(self): + self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() + bart_id = self.db(self.db.auth_user.username == 'bart').select(self.db.auth_user.id).first().id + # user logged in + self.auth.log_event(description='some_log_event_description_%(var1)s', + vars={"var1": "var1"}, + origin='log_event_test_1') + rtn = self.db(self.db.auth_event.origin == 'log_event_test_1' + ).select(*[self.db.auth_event[f] + for f in self.db.auth_event.fields if f not in ('id', 'time_stamp')]).first().as_dict() + self.assertEqual(set(rtn.items()), set({'origin': 'log_event_test_1', + 'client_ip': None, + 'user_id': bart_id, + 'description': 'some_log_event_description_var1'}.items())) + # user not logged + self.auth.logout_bare() + self.auth.log_event(description='some_log_event_description_%(var2)s', + vars={"var2": "var2"}, + origin='log_event_test_2') + rtn = self.db(self.db.auth_event.origin == 'log_event_test_2' + ).select(*[self.db.auth_event[f] + for f in self.db.auth_event.fields if f not in ('id', 'time_stamp')]).first().as_dict() + self.assertEqual(set(rtn.items()), set({'origin': 'log_event_test_2', + 'client_ip': None, + 'user_id': None, + 'description': 'some_log_event_description_var2'}.items())) + # no logging tests + self.auth.settings.logging_enabled = False + count_log_event_test_before = self.db(self.db.auth_event.id > 0).count() + self.auth.log_event(description='some_log_event_description_%(var3)s', + vars={"var3": "var3"}, + origin='log_event_test_3') + count_log_event_test_after = self.db(self.db.auth_event.id > 0).count() + self.assertEqual(count_log_event_test_after, count_log_event_test_before) + self.auth.settings.logging_enabled = True + count_log_event_test_before = self.db(self.db.auth_event.id > 0).count() + self.auth.log_event(description=None, + vars={"var4": "var4"}, + origin='log_event_test_4') + count_log_event_test_after = self.db(self.db.auth_event.id > 0).count() + self.assertEqual(count_log_event_test_after, count_log_event_test_before) + # TODO: Corner case translated description... + def test_get_or_create_user(self): self.db.auth_user.insert(email='user1@test.com', username='user1', password='password_123') self.db.commit() @@ -539,6 +594,10 @@ class TestAuth(unittest.TestCase): self.db.auth_user.truncate() self.db.commit() + # TODO: def test_basic(self): + # TODO: def test_login_user(self): + # TODO: def test__get_login_settings(self): + # login_bare() seems broken see my post on web2py-developpers # commented for now # def test_login_bare(self): @@ -550,12 +609,6 @@ class TestAuth(unittest.TestCase): # self.auth.logout_bare() # self.db.auth_user.truncate() - def test_logout_bare(self): - self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() - self.assertTrue(self.auth.is_logged_in()) - self.auth.logout_bare() - self.assertFalse(self.auth.is_logged_in()) - def test_register_bare(self): # corner case empty register call register_bare without args self.assertRaises(ValueError, self.auth.register_bare) @@ -574,12 +627,50 @@ class TestAuth(unittest.TestCase): self.db.auth_user.truncate() self.db.commit() + # TODO: def test_cas_login(self): + # TODO: def test_cas_validate(self): + # TODO: def test__reset_two_factor_auth(self): + # TODO: def test_when_is_logged_in_bypass_next_in_url(self): + # TODO: def test_login(self): + # TODO: def test_logout(self): + + def test_logout_bare(self): + self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() + self.assertTrue(self.auth.is_logged_in()) + self.auth.logout_bare() + self.assertFalse(self.auth.is_logged_in()) + + # TODO: def test_register(self): + + def test_is_logged_in(self): + self.auth.user = 'logged_in' + self.assertTrue(self.auth.is_logged_in()) + self.auth.user = None + self.assertFalse(self.auth.is_logged_in()) + + # TODO: def test_verify_email(self): + # TODO: def test_retrieve_username(self): + + def test_random_password(self): + # let just check that the function is callable + self.assertTrue(self.auth.random_password()) + + # TODO: def test_reset_password_deprecated(self): + # TODO: def test_confirm_registration(self): + # TODO: def test_email_registration(self): + def test_bulk_register(self): self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() self.auth.settings.bulk_register_enabled = True bulk_register_form = self.auth.bulk_register(max_emails=10).xml() self.assertTrue('name="_formkey"' in bulk_register_form) + # TODO: def test_manage_tokens(self): + # TODO: def test_reset_password(self): + # TODO: def test_request_reset_password(self): + # TODO: def test_email_reset_password(self): + # TODO: def test_retrieve_password(self): + def test_change_password(self): self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() change_password_form = getattr(self.auth, 'change_password')().xml() @@ -590,9 +681,9 @@ class TestAuth(unittest.TestCase): profile_form = getattr(self.auth, 'profile')().xml() self.assertTrue('name="_formkey"' in profile_form) - def test_get_vars_next(self): - self.current.request.vars._next = 'next_test' - self.assertEqual(self.auth.get_vars_next(), 'next_test') + # TODO: def test_run_login_onaccept(self): + # TODO: def test_jwt(self): + # TODO: def test_is_impersonating(self): def test_impersonate(self): # Create a user to be impersonated @@ -655,7 +746,9 @@ class TestAuth(unittest.TestCase): self.assertTrue(self.auth.is_impersonating()) self.assertEqual(self.auth.impersonate(user_id=0), None) - def test_group(self): + # TODO: def test_update_groups(self): + + def test_groups(self): self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() self.assertEqual(self.auth.groups().xml(), '

user_1(1)

') @@ -663,6 +756,22 @@ class TestAuth(unittest.TestCase): def test_not_authorized(self): self.current.request.ajax = 'facke_ajax_request' self.assertRaisesRegexp(HTTP, "403*", self.auth.not_authorized) + self.current.request.ajax = None + self.assertEqual(self.auth.not_authorized(), self.auth.messages.access_denied) + + def test_allows_jwt(self): + self.assertRaisesRegexp(HTTP, "400*", self.auth.allows_jwt) + + # TODO: def test_requires(self): + # TODO: def test_requires_login(self): + # TODO: def test_requires_login_or_token(self): + # TODO: def test_requires_membership(self): + # TODO: def test_requires_permission(self): + # TODO: def test_requires_signature(self): + + def test_add_group(self): + self.assertEqual(self.auth.add_group(role='a_group', description='a_group_role_description'), + self.db(self.db.auth_group.role == 'a_group').select(self.db.auth_group.id).first().id) def test_del_group(self): bart_group_id = 1 # Should be group 1, 'user_1' @@ -677,13 +786,151 @@ class TestAuth(unittest.TestCase): self.assertEqual(self.auth.user_group(user_id=1), 1) # Bart should be user 1 and it unique group should be 1, 'user_1' + def test_user_group_role(self): + self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() + user_group_role = 'user_%s' % self.db(self.db.auth_user.username == 'bart' + ).select(self.db.auth_user.id).first().id + self.assertEqual(self.auth.user_group_role(), user_group_role) + self.auth.logout_bare() + # with user_id args + self.assertEqual(self.auth.user_group_role(user_id=1), 'user_1') + # test None + self.auth.settings.create_user_groups = None + self.assertEqual(self.auth.user_group_role(user_id=1), None) + def test_has_membership(self): self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() + self.assertTrue(self.auth.has_membership('user_1')) + self.assertFalse(self.auth.has_membership('user_555')) self.assertTrue(self.auth.has_membership(group_id=1)) - self.assertTrue(self.auth.has_membership(role='user_1')) + self.auth.logout_bare() + self.assertTrue(self.auth.has_membership(role='user_1', user_id=1)) + self.assertTrue(self.auth.has_membership(group_id=1, user_id=1)) + # check that event is logged + count_log_event_test_before = self.db(self.db.auth_event.id > 0).count() + self.assertTrue(self.auth.has_membership(group_id=1, user_id=1)) + count_log_event_test_after = self.db(self.db.auth_event.id > 0).count() + self.assertEqual(count_log_event_test_after, count_log_event_test_before) - def test_allows_jwt(self): - self.assertRaisesRegexp(HTTP, "400*", self.auth.allows_jwt) + # Waiting guidance : https://github.com/web2py/web2py/issues/1300 + # def test_add_membership(self): + # self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() + # # failing case + # rtn = self.auth.add_membership('not_existing_role_name') + # # self.assertEqual(rtn, 'test') + # self.assertEqual(self.db(self.db.auth_group.role == 'not_existing_role_name').select().first(), 'test') + + def test_del_membership(self): + self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() + count_log_event_test_before = self.db(self.db.auth_event.id > 0).count() + user_1_role_id = self.db(self.db.auth_membership.group_id == self.auth.id_group('user_1') + ).select(self.db.auth_membership.id).first().id + self.assertEqual(self.auth.del_membership('user_1'), user_1_role_id) + count_log_event_test_after = self.db(self.db.auth_event.id > 0).count() + # check that event is logged + self.assertEqual(count_log_event_test_after, count_log_event_test_before) + # not logged in test case + group_id = self.auth.add_group('some_test_group') + membership_id = self.auth.add_membership('some_test_group') + self.assertEqual(self.auth.user_groups[group_id], 'some_test_group') + self.auth.logout_bare() + # not deleted + self.assertFalse(self.auth.del_membership('some_test_group')) + self.assertEqual(set(self.db.auth_membership(membership_id).as_dict().items()), + set({'group_id': 2L, 'user_id': 1L, 'id': 2L}.items())) # is not deleted + # deleted + bart_id = self.db(self.db.auth_user.username == 'bart').select(self.db.auth_user.id).first().id + self.assertTrue(self.auth.del_membership('some_test_group', user_id=bart_id)) + self.assertEqual(self.db.auth_membership(membership_id), None) # is really deleted + + def test_has_permission(self): + self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() + bart_id = self.db(self.db.auth_user.username == 'bart').select(self.db.auth_user.id).first().id + self.auth.add_permission(group_id=self.auth.id_group('user_1'), + name='some_permission', + table_name='auth_user', + record_id=0, + ) + # True case + self.assertTrue(self.auth.has_permission(name='some_permission', + table_name='auth_user', + record_id=0, + user_id=bart_id, + group_id=self.auth.id_group('user_1'))) + # False case + self.assertFalse(self.auth.has_permission(name='some_other_permission', + table_name='auth_user', + record_id=0, + user_id=bart_id, + group_id=self.auth.id_group('user_1'))) + + def test_add_permission(self): + count_log_event_test_before = self.db(self.db.auth_event.id > 0).count() + permission_id = \ + self.auth.add_permission(group_id=self.auth.id_group('user_1'), + name='some_permission', + table_name='auth_user', + record_id=0, + ) + count_log_event_test_after = self.db(self.db.auth_event.id > 0).count() + # check that event is logged + self.assertEqual(count_log_event_test_after, count_log_event_test_before) + # True case + permission_count = \ + self.db(self.db.auth_permission.id == permission_id).count() + self.assertTrue(permission_count) + # False case + permission_count = \ + self.db((self.db.auth_permission.group_id == self.auth.id_group('user_1')) & + (self.db.auth_permission.name == 'no_permission') & + (self.db.auth_permission.table_name == 'no_table') & + (self.db.auth_permission.record_id == 0)).count() + self.assertFalse(permission_count) + # corner case + self.auth.login_user(self.db(self.db.auth_user.username == 'bart').select().first()) # bypass login_bare() + permission_id = \ + self.auth.add_permission(group_id=0, + name='user_1_permission', + table_name='auth_user', + record_id=0, + ) + permission_name = \ + self.db(self.db.auth_permission.id == permission_id).select(self.db.auth_permission.name).first().name + self.assertEqual(permission_name, 'user_1_permission') + # add an existing permission + permission_id =\ + self.auth.add_permission(group_id=0, + name='user_1_permission', + table_name='auth_user', + record_id=0, + ) + self.assertTrue(permission_id) + + def test_del_permission(self): + permission_id = \ + self.auth.add_permission(group_id=self.auth.id_group('user_1'), + name='del_permission_test', + table_name='auth_user', + record_id=0, + ) + count_log_event_test_before = self.db(self.db.auth_event.id > 0).count() + self.assertTrue(self.auth.del_permission(group_id=self.auth.id_group('user_1'), + name='del_permission_test', + table_name='auth_user', + record_id=0,)) + count_log_event_test_after = self.db(self.db.auth_event.id > 0).count() + # check that event is logged + self.assertEqual(count_log_event_test_after, count_log_event_test_before) + # really deleted + permission_count = \ + self.db(self.db.auth_permission.id == permission_id).count() + self.assertFalse(permission_count) + + # TODO: def test_accessible_query(self): + # TODO: def test_archive(self): + # TODO: def test_wiki(self): + # TODO: def test_wikimenu(self): + # End Auth test # TODO: class TestCrud(unittest.TestCase): diff --git a/gluon/tools.py b/gluon/tools.py index 5fc274dd..350ee96d 100644 --- a/gluon/tools.py +++ b/gluon/tools.py @@ -2527,9 +2527,7 @@ class Auth(object): # log messages should not be translated if type(description).__name__ == 'lazyT': description = description.m - self.table_event().insert( - description=str(description % vars), - origin=origin, user_id=user_id) + self.table_event().insert(description=str(description % vars), origin=origin, user_id=user_id) def get_or_create_user(self, keys, update_fields=['email'], login=True, get=True): @@ -2578,8 +2576,7 @@ class Auth(object): user_id = table_user.insert(**vars) user = table_user[user_id] if self.settings.create_user_groups: - group_id = self.add_group( - self.settings.create_user_groups % user) + group_id = self.add_group(self.settings.create_user_groups % user) self.add_membership(group_id, user_id) if self.settings.everybody_group_id: self.add_membership(self.settings.everybody_group_id, user_id) @@ -3345,7 +3342,7 @@ class Auth(object): key = web2py_uuid() if self.settings.registration_requires_approval: - key = 'pending-'+key + key = 'pending-' + key table_user.registration_key.default = key if form.accepts(request, session if self.csrf_prevention else None, @@ -3354,12 +3351,10 @@ class Auth(object): hideerror=self.settings.hideerror): description = self.messages.group_description % form.vars if self.settings.create_user_groups: - group_id = self.add_group( - self.settings.create_user_groups % form.vars, description) + group_id = self.add_group(self.settings.create_user_groups % form.vars, description) self.add_membership(group_id, form.vars.id) if self.settings.everybody_group_id: - self.add_membership( - self.settings.everybody_group_id, form.vars.id) + self.add_membership(self.settings.everybody_group_id, form.vars.id) if self.settings.registration_requires_verification: link = self.url( self.settings.function, args=('verify_email', key), scheme=True) @@ -4314,11 +4309,8 @@ class Auth(object): """ Creates a group associated to a role """ - - group_id = self.table_group().insert( - role=role, description=description) - self.log_event(self.messages['add_group_log'], - dict(group_id=group_id, role=role)) + group_id = self.table_group().insert(role=role, description=description) + self.log_event(self.messages['add_group_log'], dict(group_id=group_id, role=role)) return group_id def del_group(self, group_id): @@ -4328,7 +4320,8 @@ class Auth(object): self.db(self.table_group().id == group_id).delete() self.db(self.table_membership().group_id == group_id).delete() self.db(self.table_permission().group_id == group_id).delete() - if group_id in self.user_groups: del self.user_groups[group_id] + if group_id in self.user_groups: + del self.user_groups[group_id] self.log_event(self.messages.del_group_log, dict(group_id=group_id)) def id_group(self, role): @@ -4360,7 +4353,6 @@ class Auth(object): """ Checks if user is member of group_id or role """ - group_id = group_id or self.id_group(role) try: group_id = int(group_id) @@ -4369,8 +4361,8 @@ class Auth(object): if not user_id and self.user: user_id = self.user.id membership = self.table_membership() - if group_id and user_id and self.db((membership.user_id == user_id) - & (membership.group_id == group_id)).select(): + if group_id and user_id and self.db((membership.user_id == user_id) & + (membership.group_id == group_id)).select(): r = True else: r = False @@ -4417,6 +4409,10 @@ class Auth(object): """ group_id = group_id or self.id_group(role) + try: + group_id = int(group_id) + except: + group_id = self.id_group(group_id) # interpret group_id as a role if not user_id and self.user: user_id = self.user.id membership = self.table_membership() @@ -6426,10 +6422,9 @@ class Wiki(object): args += value['args'] db.define_table(key, *args, **value['vars']) - if self.settings.templates is None and not \ - self.settings.manage_permissions: - self.settings.templates = db.wiki_page.tags.contains('template') & \ - db.wiki_page.can_read.contains('everybody') + if self.settings.templates is None and not self.settings.manage_permissions: + self.settings.templates = \ + db.wiki_page.tags.contains('template') & db.wiki_page.can_read.contains('everybody') def update_tags_insert(page, id, db=db): for tag in page.tags or []: @@ -6452,8 +6447,10 @@ class Wiki(object): 'wiki_editor' not in auth.user_groups.values() and self.settings.groups == auth.user_groups.values()): group = db.auth_group(role='wiki_editor') - gid = group.id if group else db.auth_group.insert( - role='wiki_editor') + if group: + gid = group.id + else: + db.auth_group.insert(role='wiki_editor') auth.add_membership(gid) settings.lock_keys = True