allow token renewal with http authorization header.

This commit is contained in:
Michele Comitini
2016-05-31 23:55:58 +02:00
parent 95c1a734d1
commit 67f85fd631
2 changed files with 56 additions and 17 deletions

View File

@@ -250,16 +250,30 @@ class TestAuthJWT(unittest.TestCase):
def test_jwt_token_manager(self):
import gluon.serializers
self.request.vars.update(self.user_data)
self.token = self.jwtauth.jwt_token_manager()
self.assertIsNotNone(self.token)
del self.request.vars['username']
del self.request.vars['password']
self.request.vars._token = gluon.serializers.json_parser.loads(self.token)['token']
self.token = self.jwtauth.jwt_token_manager()
self.assertIsNotNone(self.token)
def test_allows_jwt(self):
request = self.request
import gluon.serializers
self.request.vars.update(self.user_data)
self.token = self.jwtauth.jwt_token_manager()
self.assertIsNotNone(self.token)
del self.request.vars['username']
del self.request.vars['password']
self.token = self.jwtauth.jwt_token_manager()
self.request.vars._token = gluon.serializers.json_parser.loads(self.token)['token']
@self.jwtauth.allows_jwt()
def optional_auth():
assertEqual(self.user_data['username'], self.auth.user.username)
self.assertEqual(self.user_data['username'], self.auth.user.username)
optional_auth()
@unittest.skipIf(IS_IMAP, "TODO: Imap raises 'Connection refused'")
# class TestAuth(unittest.TestCase):

View File

@@ -1,4 +1,3 @@
#!/bin/python
# -*- coding: utf-8 -*-
@@ -1252,7 +1251,8 @@ class AuthJWT(object):
self.additional_payload = additional_payload
self.before_authorization = before_authorization
self.max_header_length = max_header_length
self.recvd_token = None
@staticmethod
def jwt_b64e(string):
if isinstance(string, unicode):
@@ -1368,17 +1368,18 @@ class AuthJWT(object):
payload.update(self.additional_payload)
return payload
def jwt_token_manager(self):
def jwt_token_manager(self, token_param='_token'):
"""
The part that issues (and refreshes) tokens.
Used in a controller, given myjwt is the istantiated class, as
@myjwt.allow_jwt(required=False, verify_expiration=False)
def api_auth():
return myjwt.jwt_token_manager()
Then, a call to /app/c/api_auth with username and password
returns a token, while /app/c/api_auth with the current token
issues another token
issues another token (expired, but within grace time)
"""
request = current.request
response = current.response
@@ -1387,10 +1388,14 @@ class AuthJWT(object):
session.forget(response)
valid_user = None
ret = None
if request.vars.token:
token = None
try:
token = self.recvd_token or self.get_jwt_token_from_request(token_param)
except HTTP:
pass
if token:
if not self.allow_refresh:
raise HTTP(403, u'Refreshing token is not allowed')
token = request.vars.token
tokend = self.load_token(token)
# verification can fail here
refreshed = self.refresh_token(tokend)
@@ -1401,8 +1406,9 @@ class AuthJWT(object):
valid_user = self.auth.login_bare(username, password)
else:
valid_user = self.auth.user
self.auth.login_user(valid_user)
if valid_user:
payload = self.serialize_auth_session(current.session.auth)
payload = self.serialize_auth_session(session.auth)
self.alter_payload(payload)
ret = {'token': self.generate_token(payload)}
elif ret is None:
@@ -1421,11 +1427,12 @@ class AuthJWT(object):
self.auth.user_groups = tokend['user_groups']
self.auth.hmac_key = tokend['hmac_key']
def get_jwt_token_from_request(self):
def get_jwt_token_from_request(self, token_param='_token'):
"""
The method that extracts and validates the token, either
from the header or the _token var
token_param: request.vars attribute with the token used only if the http authorization header is not present.
"""
token = None
token_in_header = current.request.env.http_authorization
@@ -1439,24 +1446,42 @@ class AuthJWT(object):
raise HTTP(400, 'Invalid JWT header, token contains spaces')
token = parts[1]
else:
token = current.request.vars._token
token = current.request.vars.get(token_param)
self.recvd_token = token
return token
def allows_jwt(self, otherwise=None):
def allows_jwt(self, otherwise=None, required=True, verify_expiration=True, token_param='_token'):
"""
The decorator that takes care of injecting auth info in the decorated action.
Works w/o resorting to session.
Args:
required: the token is mandatory (either in request.var._token or in the HTTP hearder Authorization Bearer)
verify_expiration: allows to bypass expiration check. Useful to manage token renewal.
token_param: request.vars attribute with the token used only if the http authorization header is not present (default: "_token").
"""
def decorator(action):
def f(*args, **kwargs):
token = self.get_jwt_token_from_request()
try:
token = self.get_jwt_token_from_request(token_param=token_param)
except HTTP, e:
if required:
raise e
token = None
if token and len(token) < self.max_header_length:
old_verify_expiration = self.verify_expiration
try:
self.verify_expiration = verify_expiration
tokend = self.load_token(token)
except ValueError:
raise HTTP(400, 'Invalid JWT header, wrong token format')
finally:
self.verify_expiration = old_verify_expiration
self.inject_token(tokend)
return action(*args, **kwargs)
f.__doc__ = action.__doc__
@@ -6995,4 +7020,4 @@ class Config(object):
if __name__ == '__main__':
import doctest
doctest.testmod()
doctest.testmod()