]> git.0d.be Git - panikdb.git/blob - panikdb/oauth/views.py
style: do not display hours for emission playlist zones
[panikdb.git] / panikdb / oauth / views.py
1 import base64
2 import datetime
3 import random
4
5 from django.contrib.auth.decorators import login_required
6 from django.http import HttpResponseForbidden, HttpResponseRedirect, HttpResponseNotAllowed, JsonResponse
7 from django.utils.text import slugify
8 from django.utils.timezone import now
9 from django.views.decorators.csrf import csrf_exempt
10
11 from jwcrypto.jwk import JWK
12 from jwcrypto.jwt import JWT
13
14 from .models import OAuthClient, OAuthCode, OAuthSub, OAuthAccessToken
15
16
17 @login_required
18 def authorize(request, *args, **kwargs):
19     state = request.GET['state']
20     client = OAuthClient.objects.get(client_id=request.GET['client_id'])
21     code = OAuthCode.objects.create(client=client, user=request.user)
22     redirect_uri = request.GET['redirect_uri']
23     return HttpResponseRedirect(redirect_uri + '?code=%s&state=%s' % (code.code, state))
24
25
26 @csrf_exempt
27 def token(request, *args, **kwargs):
28     if request.method != 'POST':
29         return HttpResponseNotAllowed(['POST'])
30
31     client = OAuthClient.objects.get(client_id=request.POST.get('client_id'))
32
33     try:
34         code = OAuthCode.objects.get(client=client, code=request.POST.get('code'))
35     except OAuthCode.DoesNotExist:
36         return HttpResponseForbidden()
37
38     start = now()
39     exp = start + datetime.timedelta(seconds=30)
40
41     sub, created = OAuthSub.objects.get_or_create(client=client, user=code.user)
42
43     id_token = {
44         'iss': request.build_absolute_uri('/'),
45         'aud': client.client_id,
46         'sub': str(sub.sub),
47         'exp': int(exp.timestamp()),
48         'iat': int(start.timestamp()),
49         'auth_time': int(start.timestamp()),
50         'acr': '0',
51     }
52     header = {'alg': 'HS256'}
53     k = base64.urlsafe_b64encode(client.client_secret.encode('utf-8')).strip(b'=')
54     jwk = JWK(kty='oct', k=k.decode())
55     jwt = JWT(header=header, claims=id_token)
56     jwt.make_signed_token(jwk)
57
58     access_token = OAuthAccessToken.objects.create(client=client, user=code.user)
59
60     return JsonResponse(
61         {
62             'access_token': access_token.token,
63             'token_type': 'Bearer',
64             'expires_in': 30,
65             'id_token': jwt.serialize(),
66         }
67     )
68
69
70 def user_info(request, *args, **kwargs):
71     authorization = request.META['HTTP_AUTHORIZATION'].split()
72     if len(authorization) != 2 or authorization[0] != 'Bearer':
73         return HttpResponseForbidden('invalid authorization header')
74     try:
75         access_token = OAuthAccessToken.objects.get(token=authorization[1])
76     except OAuthAccessToken.DoesNotExist:
77         return HttpResponseForbidden('missing access token')
78     user_info = {
79         # use a random string as id, as it is required for pharum not to pick
80         # an existing and wrong user.
81         'id': str(random.randint(10 ** 6, 10 ** 7)),
82         'email': access_token.user.email,
83         'name': str(access_token.user),
84         'username': slugify(str(access_token.user)),
85     }
86     return JsonResponse(user_info)