5 from django.contrib.auth.decorators import login_required
6 from django.http import HttpResponseForbidden, HttpResponseNotAllowed, HttpResponseRedirect, JsonResponse
7 from django.urls import reverse
8 from django.utils.text import slugify
9 from django.utils.timezone import now
10 from django.views.decorators.csrf import csrf_exempt
11 from jwcrypto.jwk import JWK
12 from jwcrypto.jwt import JWT
14 from .models import OAuthAccessToken, OAuthClient, OAuthCode, OAuthSub
18 def authorize(request, *args, **kwargs):
19 state = request.GET['state']
20 client = OAuthClient.objects.get(client_id=request.GET['client_id'])
21 code = OAuthCode.objects.create(client=client, user=request.user, nonce=request.GET.get('nonce'))
22 redirect_uri = request.GET['redirect_uri']
27 if request.GET.get('nonce'):
28 params['nonce'] = request.GET.get('nonce')
29 return HttpResponseRedirect(redirect_uri + '?%s' % '&'.join(['%s=%s' % x for x in params.items()]))
33 def token(request, *args, **kwargs):
34 if request.method != 'POST':
35 return HttpResponseNotAllowed(['POST'])
37 client = OAuthClient.objects.get(client_id=request.POST.get('client_id'))
40 code = OAuthCode.objects.get(client=client, code=request.POST.get('code'))
41 except OAuthCode.DoesNotExist:
42 return HttpResponseForbidden()
45 exp = start + datetime.timedelta(seconds=30)
47 sub, created = OAuthSub.objects.get_or_create(client=client, user=code.user)
50 'iss': request.build_absolute_uri('/'),
51 'aud': client.client_id,
53 'exp': int(exp.timestamp()),
54 'iat': int(start.timestamp()),
55 'auth_time': int(start.timestamp()),
59 id_token['nonce'] = code.nonce
60 header = {'alg': 'HS256'}
61 k = base64.urlsafe_b64encode(client.client_secret.encode('utf-8')).strip(b'=')
62 jwk = JWK(kty='oct', k=k.decode())
63 jwt = JWT(header=header, claims=id_token)
64 jwt.make_signed_token(jwk)
66 access_token = OAuthAccessToken.objects.create(client=client, user=code.user)
69 'access_token': access_token.token,
70 'token_type': 'Bearer',
72 'id_token': jwt.serialize(),
73 'refresh_token': None,
76 return JsonResponse(response)
79 def user_info(request, *args, **kwargs):
80 authorization = request.headers['authorization'].split()
81 if len(authorization) != 2 or authorization[0] != 'Bearer':
82 return HttpResponseForbidden('invalid authorization header')
84 access_token = OAuthAccessToken.objects.get(token=authorization[1])
85 except OAuthAccessToken.DoesNotExist:
86 return HttpResponseForbidden('missing access token')
88 sub = OAuthSub.objects.get(client=access_token.client, user=access_token.user)
91 # use a random string as id, as it is required for pharum not to pick
92 # an existing and wrong user.
93 'id': str(random.randint(10**6, 10**7)),
94 'email': access_token.user.email,
95 'name': str(access_token.user),
96 'username': slugify(str(access_token.user)),
99 return JsonResponse(user_info)
103 return JsonResponse({'keys': []})
106 def well_known_openid_configuration(request):
108 'issuer': request.build_absolute_uri('/'),
109 'authorization_endpoint': request.build_absolute_uri(reverse('oauth-authorize')),
110 'token_endpoint': request.build_absolute_uri(reverse('oauth-token')),
111 'jwks_uri': request.build_absolute_uri(reverse('oauth-jwks')),
112 'userinfo_endpoint': request.build_absolute_uri(reverse('oauth-user-info')),
113 'token_endpoint_auth_methods_supported': ['client_secret_post'],
115 return JsonResponse(data)