if request.method != 'POST':
return HttpResponseNotAllowed(['POST'])
- client = OAuthClient.objects.get(client_id=request.POST.get('client_id'))
-
try:
- code = OAuthCode.objects.get(client=client, code=request.POST.get('code'))
+ code = OAuthCode.objects.get(code=request.POST.get('code'))
except OAuthCode.DoesNotExist:
return HttpResponseForbidden()
start = now()
exp = start + datetime.timedelta(seconds=30)
- sub, created = OAuthSub.objects.get_or_create(client=client, user=code.user)
+ sub, created = OAuthSub.objects.get_or_create(client=code.client, user=code.user)
id_token = {
'iss': request.build_absolute_uri('/'),
- 'aud': client.client_id,
+ 'aud': code.client.client_id,
'sub': str(sub.sub),
'exp': int(exp.timestamp()),
'iat': int(start.timestamp()),
'auth_time': int(start.timestamp()),
'acr': '0',
}
+ if code.nonce:
+ id_token['nonce'] = code.nonce
header = {'alg': 'HS256'}
- k = base64.urlsafe_b64encode(client.client_secret.encode('utf-8')).strip(b'=')
+ k = base64.urlsafe_b64encode(code.client.client_secret.encode('utf-8')).strip(b'=')
jwk = JWK(kty='oct', k=k.decode())
jwt = JWT(header=header, claims=id_token)
jwt.make_signed_token(jwk)
- access_token = OAuthAccessToken.objects.create(client=client, user=code.user)
+ access_token = OAuthAccessToken.objects.create(client=code.client, user=code.user)
response = {
'access_token': access_token.token,
'token_type': 'Bearer',
'expires_in': 30,
'id_token': jwt.serialize(),
+ 'refresh_token': None,
}
- if code.nonce:
- response['nonce'] = code.nonce
return JsonResponse(response)
def user_info(request, *args, **kwargs):
- authorization = request.META['HTTP_AUTHORIZATION'].split()
+ authorization = request.headers['authorization'].split()
if len(authorization) != 2 or authorization[0] != 'Bearer':
return HttpResponseForbidden('invalid authorization header')
try:
access_token = OAuthAccessToken.objects.get(token=authorization[1])
except OAuthAccessToken.DoesNotExist:
return HttpResponseForbidden('missing access token')
+
+ sub = OAuthSub.objects.get(client=access_token.client, user=access_token.user)
+
user_info = {
# use a random string as id, as it is required for pharum not to pick
# an existing and wrong user.
- 'id': str(random.randint(10 ** 6, 10 ** 7)),
+ 'id': str(random.randint(10**6, 10**7)),
'email': access_token.user.email,
'name': str(access_token.user),
'username': slugify(str(access_token.user)),
+ 'sub': sub.sub,
}
return JsonResponse(user_info)