yzk_wechat_event/utils/jwt_authenticate.py

102 lines
3.4 KiB
Python
Raw Normal View History

2023-12-13 11:41:22 +08:00
import jwt
from django.utils.translation import gettext_lazy as _
from rest_framework import HTTP_HEADER_ENCODING
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
from rest_framework_jwt.settings import api_settings
from apps.user.models import User
from utils.redis_utils import RedisUtils
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
jwt_get_username_from_payload = api_settings.JWT_PAYLOAD_GET_USERNAME_HANDLER
def get_authorization_header(request, key='HTTP_AUTHORIZATION'):
"""
Return request's 'Authorization:' header, as a bytestring.
Hide some test client ickyness where the header can be unicode.
"""
auth = request.META.get(key, b'')
if isinstance(auth, str):
# Work around django test client oddness
auth = auth.encode(HTTP_HEADER_ENCODING)
return auth
class MyJSONWebTokenAuthentication(JSONWebTokenAuthentication):
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
redis_utils = RedisUtils("token")
payload = None
try:
payload = redis_utils.read_value_to_object(jwt_value) or jwt_decode_handler(jwt_value)
if not payload:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
redis_utils.write_value_as_string(jwt_value, payload, 60 * 60 * 30)
user = self.authenticate_credentials(payload)
return user, jwt_value
def authenticate_credentials(self, payload):
"""
Returns an active user that matches the payload's user id and email.
"""
username = jwt_get_username_from_payload(payload)
if not username:
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
return user
def get_jwt_value(self, request):
auth = get_authorization_header(request)
if not auth:
if api_settings.JWT_AUTH_COOKIE:
return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
return None
return auth
class UserAppSecretAuthentication(BaseAuthentication):
def authenticate(self, request):
app_secret = get_authorization_header(request, key='APP_SECRET')
if not app_secret:
raise exceptions.AuthenticationFailed()
user = User.objects.filter(app_secret=app_secret).first()
if user is None or not user.is_active:
raise exceptions.AuthenticationFailed()
return user, app_secret
def authenticate_header(self, request):
pass