import jwt from django.utils.translation import gettext_lazy as _ from rest_framework import HTTP_HEADER_ENCODING from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication from rest_framework_jwt.authentication import JSONWebTokenAuthentication from rest_framework_jwt.settings import api_settings from apps.user.models import User from utils.redis_utils import RedisUtils jwt_decode_handler = api_settings.JWT_DECODE_HANDLER jwt_get_username_from_payload = api_settings.JWT_PAYLOAD_GET_USERNAME_HANDLER def get_authorization_header(request, key='HTTP_AUTHORIZATION'): """ Return request's 'Authorization:' header, as a bytestring. Hide some test client ickyness where the header can be unicode. """ auth = request.META.get(key, b'') if isinstance(auth, str): # Work around django test client oddness auth = auth.encode(HTTP_HEADER_ENCODING) return auth class MyJSONWebTokenAuthentication(JSONWebTokenAuthentication): def authenticate(self, request): """ Returns a two-tuple of `User` and token if a valid signature has been supplied using JWT-based authentication. Otherwise returns `None`. """ jwt_value = self.get_jwt_value(request) if jwt_value is None: return None redis_utils = RedisUtils("token") payload = None try: payload = redis_utils.read_value_to_object(jwt_value) or jwt_decode_handler(jwt_value) if not payload: msg = _('Signature has expired.') raise exceptions.AuthenticationFailed(msg) except jwt.ExpiredSignature: msg = _('Signature has expired.') raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _('Error decoding signature.') raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() redis_utils.write_value_as_string(jwt_value, payload, 60 * 60 * 30) user = self.authenticate_credentials(payload) return user, jwt_value def authenticate_credentials(self, payload): """ Returns an active user that matches the payload's user id and email. """ username = jwt_get_username_from_payload(payload) if not username: msg = _('Invalid payload.') raise exceptions.AuthenticationFailed(msg) try: user = User.objects.get(username=username) except User.DoesNotExist: msg = _('Invalid signature.') raise exceptions.AuthenticationFailed(msg) return user def get_jwt_value(self, request): auth = get_authorization_header(request) if not auth: if api_settings.JWT_AUTH_COOKIE: return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE) return None return auth class UserAppSecretAuthentication(BaseAuthentication): def authenticate(self, request): app_secret = get_authorization_header(request, key='APP_SECRET') if not app_secret: raise exceptions.AuthenticationFailed() user = User.objects.filter(app_secret=app_secret).first() if user is None or not user.is_active: raise exceptions.AuthenticationFailed() return user, app_secret def authenticate_header(self, request): pass