102 lines
3.4 KiB
Python
102 lines
3.4 KiB
Python
import jwt
|
|
from django.utils.translation import gettext_lazy as _
|
|
from rest_framework import HTTP_HEADER_ENCODING
|
|
from rest_framework import exceptions
|
|
from rest_framework.authentication import BaseAuthentication
|
|
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
|
|
from rest_framework_jwt.settings import api_settings
|
|
|
|
from apps.user.models import User
|
|
from utils.redis_utils import RedisUtils
|
|
|
|
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
|
|
jwt_get_username_from_payload = api_settings.JWT_PAYLOAD_GET_USERNAME_HANDLER
|
|
|
|
|
|
def get_authorization_header(request, key='HTTP_AUTHORIZATION'):
|
|
"""
|
|
Return request's 'Authorization:' header, as a bytestring.
|
|
|
|
Hide some test client ickyness where the header can be unicode.
|
|
"""
|
|
auth = request.META.get(key, b'')
|
|
if isinstance(auth, str):
|
|
# Work around django test client oddness
|
|
auth = auth.encode(HTTP_HEADER_ENCODING)
|
|
return auth
|
|
|
|
|
|
class MyJSONWebTokenAuthentication(JSONWebTokenAuthentication):
|
|
|
|
def authenticate(self, request):
|
|
"""
|
|
Returns a two-tuple of `User` and token if a valid signature has been
|
|
supplied using JWT-based authentication. Otherwise returns `None`.
|
|
"""
|
|
jwt_value = self.get_jwt_value(request)
|
|
if jwt_value is None:
|
|
return None
|
|
|
|
redis_utils = RedisUtils("token")
|
|
payload = None
|
|
try:
|
|
payload = redis_utils.read_value_to_object(jwt_value) or jwt_decode_handler(jwt_value)
|
|
if not payload:
|
|
msg = _('Signature has expired.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
except jwt.ExpiredSignature:
|
|
msg = _('Signature has expired.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
except jwt.DecodeError:
|
|
msg = _('Error decoding signature.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
except jwt.InvalidTokenError:
|
|
raise exceptions.AuthenticationFailed()
|
|
redis_utils.write_value_as_string(jwt_value, payload, 60 * 60 * 30)
|
|
user = self.authenticate_credentials(payload)
|
|
|
|
return user, jwt_value
|
|
|
|
def authenticate_credentials(self, payload):
|
|
"""
|
|
Returns an active user that matches the payload's user id and email.
|
|
"""
|
|
username = jwt_get_username_from_payload(payload)
|
|
|
|
if not username:
|
|
msg = _('Invalid payload.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
try:
|
|
user = User.objects.get(username=username)
|
|
except User.DoesNotExist:
|
|
msg = _('Invalid signature.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
return user
|
|
|
|
def get_jwt_value(self, request):
|
|
|
|
auth = get_authorization_header(request)
|
|
|
|
if not auth:
|
|
if api_settings.JWT_AUTH_COOKIE:
|
|
return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
|
|
return None
|
|
|
|
return auth
|
|
|
|
|
|
class UserAppSecretAuthentication(BaseAuthentication):
|
|
def authenticate(self, request):
|
|
app_secret = get_authorization_header(request, key='APP_SECRET')
|
|
if not app_secret:
|
|
raise exceptions.AuthenticationFailed()
|
|
user = User.objects.filter(app_secret=app_secret).first()
|
|
if user is None or not user.is_active:
|
|
raise exceptions.AuthenticationFailed()
|
|
return user, app_secret
|
|
|
|
def authenticate_header(self, request):
|
|
pass
|