]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
stamina: handle invalid incoming messages
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import json
4 import random
5 import signal
6
7 import requests
8
9 from django.core.management.base import BaseCommand
10
11 from emissions.models import Nonstop
12 from nonstop.models import Track, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
13 from nonstop.app_settings import app_settings
14
15
16 class Command(BaseCommand):
17     last_jingle_datetime = None
18     quit = False
19
20     def handle(self, verbosity, **kwargs):
21         try:
22             asyncio.run(self.main(), debug=True)
23         except KeyboardInterrupt:
24             pass
25
26     def get_playlist(self, zone, start_datetime, end_datetime):
27         current_datetime = start_datetime
28         if self.last_jingle_datetime is None:
29             self.last_jingle_datetime = current_datetime
30         # Define a max duration (1 hour), if it is reached, and far enough
31         # from end_datetime (30 minutes), return the playlist as is, not aligned
32         # on end time, so a new playlist gets computed once it's over.
33         # This avoids misalignments due to track durations not matching exactly
34         # or additional delays caused by the player program.
35         max_duration = datetime.timedelta(hours=1)
36         max_duration_leftover = datetime.timedelta(minutes=30)
37         playlist = []
38         adjustment_counter = 0
39         try:
40             jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
41         except AttributeError:
42             jingles = []
43
44         recent_tracks_id = [x.track_id for x in
45                 SomaLogLine.objects.exclude(on_air=False).filter(
46                     track__isnull=False,
47                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
48         t0 = datetime.datetime.now()
49         while current_datetime < end_datetime and adjustment_counter < 5:
50             if (current_datetime - start_datetime) > max_duration and (
51                     (end_datetime - current_datetime) > max_duration_leftover):
52                 break
53
54             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
55                 # jingle time, every ~20 minutes
56                 playlist.append(random.choice(jingles))
57                 self.last_jingle_datetime = current_datetime
58                 current_datetime = start_datetime + sum(
59                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
60
61             remaining_time = (end_datetime - current_datetime)
62             track = Track.objects.filter(
63                     nonstop_zones=zone,
64                     duration__isnull=False).exclude(
65                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
66                     ).order_by('?').first()
67             if track is None:
68                 # no track, reduce recent tracks exclusion
69                 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
70                 continue
71             playlist.append(track)
72             current_datetime = start_datetime + sum(
73                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
74             if current_datetime > end_datetime:
75                 # last track overshot
76                 # 1st strategy: remove last track and try to get a track with
77                 # exact remaining time
78                 playlist = playlist[:-1]
79                 track = Track.objects.filter(
80                         nonstop_zones=zone,
81                         duration__gte=remaining_time,
82                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
83                         ).exclude(
84                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
85                         ).order_by('?').first()
86                 if track:
87                     # found a track
88                     playlist.append(track)
89                 else:
90                     # fallback strategy: didn't find track of expected duration,
91                     # reduce playlist further
92                     adjustment_counter += 1
93                     playlist = playlist[:-1]
94
95                 current_datetime = start_datetime + sum(
96                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
97
98         print('computed playlist: (computation time: %ss)' % (datetime.datetime.now() - t0))
99         current_datetime = start_datetime
100         for track in playlist:
101             print('   ', current_datetime, track.duration, track.title)
102             current_datetime += track.duration
103         print('   ', current_datetime, '---')
104         print('   adjustment_counter:', adjustment_counter)
105
106         return playlist
107
108     def is_nonstop_on_air(self):
109         # check if nonstop system is currently on air
110         if app_settings.ON_AIR_SWITCH_URL is None:
111             return None
112         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
113         if not switch_response.ok:
114             return None
115         try:
116             status = switch_response.json()
117         except ValueError:
118             return None
119         if status.get('active') == 0:
120             return True
121         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
122             return True
123         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
124             return True
125         return False
126
127     async def record_nonstop_line(self, track, now):
128         log_line = SomaLogLine()
129         log_line.play_timestamp = now
130         log_line.track = track
131         log_line.filepath = track.nonstopfile_set.first()
132         log_line.on_air = self.is_nonstop_on_air()
133         log_line.save()
134
135     async def player_process(self, item, timeout=None):
136         if app_settings.DEBUG_WITH_SLEEPS:
137             if hasattr(item, 'is_stream') and item.is_stream():
138                 cmd = 'sleep 86400 # %s' % item.stream.url
139             elif isinstance(item.duration, datetime.timedelta):
140                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
141             elif isinstance(item.duration, int):
142                 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
143         else:
144             cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
145             if hasattr(item, 'is_stream') and item.is_stream():
146                 cmd.append(item.stream.url)
147             else:
148                 cmd.append(item.file_path())
149         print('cmd:', cmd)
150         if isinstance(cmd, str):
151             self.player = await asyncio.create_subprocess_shell(
152                     cmd,
153                     stdout=asyncio.subprocess.PIPE,
154                     stderr=asyncio.subprocess.PIPE)
155         else:
156             self.player = await asyncio.create_subprocess_exec(
157                     *cmd,
158                     stdout=asyncio.subprocess.PIPE,
159                     stderr=asyncio.subprocess.PIPE)
160         if timeout is None:
161             await self.player.communicate()
162         else:
163             try:
164                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
165             except asyncio.TimeoutError:
166                 self.player.kill()
167         self.player = None
168
169     async def play(self, slot):
170         now = datetime.datetime.now()
171         if isinstance(slot, Nonstop):
172             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
173             self.playhead = 0
174             self.softstop = False
175             while not self.quit:
176                 now = datetime.datetime.now()
177                 try:
178                     track = self.playlist[self.playhead]
179                 except IndexError:
180                     break
181                 self.current_track_start_datetime = now
182                 print(now, track.title, track.duration)
183                 record_task = None
184                 if isinstance(track, Track):  # not jingles
185                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
186                 await self.player_process(track)
187                 if record_task:
188                     await record_task
189                 if self.softstop:
190                     # track was left to finish, but now the playlist should stop.
191                     break
192                 self.playhead += 1
193         elif slot.is_stream():
194             print(now, 'playing stream', slot.stream)
195             if slot.jingle_id:
196                 await self.player_process(slot.jingle, timeout=60)
197             print('timeout at', (slot.end_datetime - now).total_seconds())
198             await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
199         else:
200             if hasattr(slot, 'episode'):
201                 print(now, 'playing sound', slot.episode)
202             else:
203                 print(now, 'playing random')
204             if slot.jingle_id:
205                 await self.player_process(slot.jingle, timeout=60)
206             await self.player_process(slot)
207
208     def recompute_playlist(self):
209         current_track = self.playlist[self.playhead]
210         print('recompute_playlist, from', current_track.title, self.current_track_start_datetime + current_track.duration, 'to', self.slot.end_datetime)
211         playlist = self.get_playlist(self.slot,
212                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
213         if playlist:
214             self.playlist[self.playhead + 1:] = playlist
215
216     def get_current_diffusion(self):
217         now = datetime.datetime.now()
218         diffusion = ScheduledDiffusion.objects.filter(
219                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
220                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
221         occurence = RecurringStreamOccurence.objects.filter(
222                 datetime__gt=now - datetime.timedelta(days=1),
223                 datetime__lt=now).order_by('datetime').last()
224         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
225                 datetime__gt=now - datetime.timedelta(days=1),
226                 datetime__lt=now).order_by('datetime').last()
227         # note it shouldn't be possible to have both diffusion and occurences
228         # running at the moment.
229         if occurence and occurence.end_datetime > now:
230             return occurence
231         if diffusion and diffusion.end_datetime > now:
232             return diffusion
233         if directory_occurence and directory_occurence.end_datetime > now:
234             return directory_occurence
235         return None
236
237     def get_next_diffusion(self, before_datetime):
238         now = datetime.datetime.now()
239         diffusion = ScheduledDiffusion.objects.filter(
240                 diffusion__datetime__gt=now,
241                 diffusion__datetime__lt=before_datetime,
242                 ).order_by('diffusion__datetime').first()
243         occurence = RecurringStreamOccurence.objects.filter(
244                 datetime__gt=now,
245                 datetime__lt=before_datetime,
246                 ).order_by('datetime').first()
247         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
248                 datetime__gt=now,
249                 datetime__lt=before_datetime,
250                 ).order_by('datetime').first()
251         if diffusion and occurence:
252             return diffusion if diffusion.diffusion__datetime < occurence.datetime else occurence
253         if diffusion:
254             return diffusion
255         if occurence:
256             return occurence
257         if directory_occurence:
258             return directory_occurence
259         return None
260
261     def recompute_slots(self):
262         now = datetime.datetime.now()
263         # print(now, 'recompute_slots')
264         diffusion = self.get_current_diffusion()
265         if diffusion:
266             self.slot = diffusion
267         else:
268             nonstops = list(Nonstop.objects.all().order_by('start'))
269             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
270             try:
271                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
272             except IndexError:
273                 self.slot = nonstops[0]
274             try:
275                 next_slot = nonstops[nonstops.index(self.slot) + 1]
276             except IndexError:
277                 next_slot = nonstops[0]
278             self.slot.datetime = now.replace(
279                     hour=self.slot.start.hour,
280                     minute=self.slot.start.minute)
281             self.slot.end_datetime = now.replace(
282                     hour=next_slot.start.hour,
283                     minute=next_slot.start.minute,
284                     second=0,
285                     microsecond=0)
286             if self.slot.end_datetime < self.slot.datetime:
287                 self.slot.end_datetime += datetime.timedelta(days=1)
288
289             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
290             if diffusion:
291                 self.slot.end_datetime = diffusion.datetime
292
293     async def recompute_slots_loop(self):
294         now = datetime.datetime.now()
295         print(now, 'recompute_slots_loop')
296         sleep = (60 - now.second) % 10  # adjust to awake at :00
297         while not self.quit:
298             await asyncio.sleep(sleep)
299             sleep = 10  # next cycles every 10 seconds
300             current_slot = self.slot
301             self.recompute_slots()
302             expected_slot = self.slot
303             if current_slot != expected_slot:
304                 now = datetime.datetime.now()
305                 print(now, 'unexpected change', current_slot, 'vs', expected_slot)
306                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
307                     # ask for a softstop, i.e. finish the track then switch.
308                     self.softstop = True
309                 else:
310                     # interrupt nonstop
311                     print('interrupting nonstop')
312                     self.play_task.cancel()
313             elif current_slot.end_datetime > expected_slot.end_datetime:
314                 now = datetime.datetime.now()
315                 print(now, 'change in end time, from %s to %s' %
316                         (current_slot.end_datetime, expected_slot.end_datetime))
317                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
318                     # more than 5 minutes left, recompute playlist
319                     self.recompute_playlist()
320
321     async def handle_connection(self, reader, writer):
322         writer.write(b'Watusi!\n')
323         writer.write(b'(dot on empty line to stop connection)\n')
324         end = False
325         while not end:
326             data = await reader.read(100)
327             try:
328                 message = data.decode().strip()
329             except UnicodeDecodeError:
330                 print('got invalid message %r' % message)
331                 continue
332             print('got message: %r' % message)
333             if message == 'status':
334                 response = {'slot': str(self.slot)}
335                 if isinstance(self.slot, Nonstop):
336                     try:
337                         track = self.playlist[self.playhead]
338                     except IndexError:
339                         pass
340                     else:
341                         response['track'] = {}
342                         response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
343                         response['track']['title'] = track.title
344                         response['track']['artist'] = track.artist.name if track.artist_id else ''
345                         response['track']['duration'] = track.duration.total_seconds()
346                         response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
347                         response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
348                 next_diffusion = self.get_next_diffusion(
349                         before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
350                 if next_diffusion:
351                     response['next_diffusion'] = {
352                         'label': str(next_diffusion),
353                         'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
354                     }
355                     if isinstance(next_diffusion, ScheduledDiffusion):
356                         response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
357                         response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
358             elif message == '.':
359                 end = True
360                 response = {'ack': True}
361             elif message == 'softquit':
362                 self.quit = True
363                 end = True
364                 response = {'ack': True}
365             elif message == 'hardquit':
366                 self.quit = True
367                 end = True
368                 response = {'ack': True}
369                 if self.player and self.player.returncode is None:  # not finished
370                     self.player.kill()
371             else:
372                 response = {'err': 1, 'msg': 'unknown command: %r' % message}
373             writer.write(json.dumps(response).encode('utf-8') + b'\n')
374             try:
375                 await writer.drain()
376             except ConnectionResetError:
377                 break
378         writer.close()
379
380     def sigterm_handler(self):
381         print('got signal')
382         self.quit = True
383         self.play_task.cancel()
384
385     async def main(self):
386         loop = asyncio.get_running_loop()
387         loop.add_signal_handler(
388                 signal.SIGTERM,
389                 self.sigterm_handler)
390         now = datetime.datetime.now()
391         self.recompute_slots()
392         server = await asyncio.start_server(
393                 self.handle_connection,
394                 app_settings.SERVER_BIND_IFACE,
395                 app_settings.SERVER_BIND_PORT)
396         async with server:
397             asyncio.create_task(server.serve_forever())
398
399             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
400             while not self.quit:
401                 duration = (self.slot.end_datetime - now).seconds
402                 print('next sure slot', duration, self.slot.end_datetime)
403                 if duration < 2:
404                     # next slot is very close, wait for it
405                     await asyncio.sleep(duration)
406                     self.recompute_slots()
407                 self.play_task = asyncio.create_task(self.play(self.slot))
408                 try:
409                     await self.play_task
410                     self.recompute_slots()
411                 except asyncio.CancelledError as exc:
412                     print('exc:', exc)
413                     if self.player and self.player.returncode is None:  # not finished
414                         self.player.kill()
415                 except KeyboardInterrupt:
416                     self.quit = True
417                     break