]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
stamina: add debug print for overflows
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import json
4 import random
5 import signal
6 import sys
7
8 import requests
9
10 from django.core.management.base import BaseCommand
11
12 from emissions.models import Nonstop
13 from nonstop.models import Track, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
14 from nonstop.app_settings import app_settings
15
16
17 class Command(BaseCommand):
18     last_jingle_datetime = None
19     quit = False
20
21     def handle(self, verbosity, **kwargs):
22         try:
23             asyncio.run(self.main(), debug=True)
24         except KeyboardInterrupt:
25             pass
26
27     def get_playlist(self, zone, start_datetime, end_datetime):
28         current_datetime = start_datetime
29         if self.last_jingle_datetime is None:
30             self.last_jingle_datetime = current_datetime
31         # Define a max duration (1 hour), if it is reached, and far enough
32         # from end_datetime (30 minutes), return the playlist as is, not aligned
33         # on end time, so a new playlist gets computed once it's over.
34         # This avoids misalignments due to track durations not matching exactly
35         # or additional delays caused by the player program.
36         max_duration = datetime.timedelta(hours=1)
37         max_duration_leftover = datetime.timedelta(minutes=30)
38         playlist = []
39         adjustment_counter = 0
40         try:
41             jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
42         except AttributeError:
43             jingles = []
44
45         recent_tracks_id = [x.track_id for x in
46                 SomaLogLine.objects.exclude(on_air=False).filter(
47                     track__isnull=False,
48                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
49         t0 = datetime.datetime.now()
50         allow_overflow = False
51         while current_datetime < end_datetime:
52             if (current_datetime - start_datetime) > max_duration and (
53                     (end_datetime - current_datetime) > max_duration_leftover):
54                 break
55
56             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
57                 # jingle time, every ~20 minutes
58                 playlist.append(random.choice(jingles))
59                 self.last_jingle_datetime = current_datetime
60                 current_datetime = start_datetime + sum(
61                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
62
63             remaining_time = (end_datetime - current_datetime)
64             track = Track.objects.filter(
65                     nonstop_zones=zone,
66                     duration__isnull=False).exclude(
67                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
68                     ).order_by('?').first()
69             if track is None:
70                 # no track, reduce recent tracks exclusion
71                 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
72                 continue
73             playlist.append(track)
74             current_datetime = start_datetime + sum(
75                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
76             if current_datetime > end_datetime and not allow_overflow:
77                 # last track overshot
78                 # 1st strategy: remove last track and try to get a track with
79                 # exact remaining time
80                 print('overshoot', current_datetime, file=sys.stderr)
81                 playlist = playlist[:-1]
82                 track = Track.objects.filter(
83                         nonstop_zones=zone,
84                         duration__gte=remaining_time,
85                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
86                         ).exclude(
87                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
88                         ).order_by('?').first()
89                 if track:
90                     # found a track
91                     playlist.append(track)
92                 else:
93                     # fallback strategy: didn't find track of expected duration,
94                     # reduce playlist further
95                     adjustment_counter += 1
96                     playlist = playlist[:-1]
97                     if len(playlist) == 0 or adjustment_counter > 5:
98                         # a dedicated sound that ended a bit too early,
99                         # or too many failures to get an appropriate file,
100                         # allow whatever comes.
101                         allow_overflow = True
102                         print('allow overflow', file=sys.stderr)
103
104                 current_datetime = start_datetime + sum(
105                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
106
107         print('computed playlist: (computation time: %ss)' % (datetime.datetime.now() - t0), file=sys.stderr)
108         current_datetime = start_datetime
109         for track in playlist:
110             print('   ', current_datetime, track.duration, track.title, file=sys.stderr)
111             current_datetime += track.duration
112         print('   ', current_datetime, '---', file=sys.stderr)
113         print('   adjustment_counter:', adjustment_counter, file=sys.stderr)
114
115         return playlist
116
117     def is_nonstop_on_air(self):
118         # check if nonstop system is currently on air
119         if app_settings.ON_AIR_SWITCH_URL is None:
120             return None
121         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
122         if not switch_response.ok:
123             return None
124         try:
125             status = switch_response.json()
126         except ValueError:
127             return None
128         if status.get('active') == 0:
129             return True
130         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
131             return True
132         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
133             return True
134         return False
135
136     async def record_nonstop_line(self, track, now):
137         log_line = SomaLogLine()
138         log_line.play_timestamp = now
139         log_line.track = track
140         log_line.filepath = track.nonstopfile_set.first()
141         log_line.on_air = self.is_nonstop_on_air()
142         log_line.save()
143
144     async def player_process(self, item, timeout=None):
145         if app_settings.DEBUG_WITH_SLEEPS:
146             if hasattr(item, 'is_stream') and item.is_stream():
147                 cmd = 'sleep 86400 # %s' % item.stream.url
148             elif isinstance(item.duration, datetime.timedelta):
149                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
150             elif isinstance(item.duration, int):
151                 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
152         else:
153             cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
154             if hasattr(item, 'is_stream') and item.is_stream():
155                 cmd.append(item.stream.url)
156             else:
157                 cmd.append(item.file_path())
158         print('cmd:', cmd, file=sys.stderr)
159         if isinstance(cmd, str):
160             self.player = await asyncio.create_subprocess_shell(
161                     cmd,
162                     stdout=asyncio.subprocess.PIPE,
163                     stderr=asyncio.subprocess.PIPE)
164         else:
165             self.player = await asyncio.create_subprocess_exec(
166                     *cmd,
167                     stdout=asyncio.subprocess.PIPE,
168                     stderr=asyncio.subprocess.PIPE)
169         if timeout is None:
170             await self.player.communicate()
171         else:
172             try:
173                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
174             except asyncio.TimeoutError:
175                 self.player.kill()
176         self.player = None
177
178     async def play(self, slot):
179         now = datetime.datetime.now()
180         if isinstance(slot, Nonstop):
181             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
182             self.playhead = 0
183             self.softstop = False
184             while not self.quit:
185                 now = datetime.datetime.now()
186                 try:
187                     track = self.playlist[self.playhead]
188                 except IndexError:
189                     break
190                 self.current_track_start_datetime = now
191                 print(now, track.title, track.duration, file=sys.stderr)
192                 record_task = None
193                 if isinstance(track, Track):  # not jingles
194                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
195                 await self.player_process(track)
196                 if record_task:
197                     await record_task
198                 if self.softstop:
199                     # track was left to finish, but now the playlist should stop.
200                     break
201                 self.playhead += 1
202         elif slot.is_stream():
203             print(now, 'playing stream', slot.stream, file=sys.stderr)
204             if slot.jingle_id:
205                 await self.player_process(slot.jingle, timeout=60)
206             print('timeout at', (slot.end_datetime - now).total_seconds(), file=sys.stderr)
207             await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
208         else:
209             if hasattr(slot, 'episode'):
210                 print(now, 'playing sound', slot.episode, file=sys.stderr)
211             else:
212                 print(now, 'playing random', file=sys.stderr)
213             if slot.jingle_id:
214                 await self.player_process(slot.jingle, timeout=60)
215             await self.player_process(slot)
216
217     def recompute_playlist(self):
218         current_track = self.playlist[self.playhead]
219         print('recompute_playlist, from', current_track.title, self.current_track_start_datetime + current_track.duration, 'to', self.slot.end_datetime, file=sys.stderr)
220         playlist = self.get_playlist(self.slot,
221                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
222         if playlist:
223             self.playlist[self.playhead + 1:] = playlist
224
225     def get_current_diffusion(self):
226         now = datetime.datetime.now()
227         diffusion = ScheduledDiffusion.objects.filter(
228                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
229                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
230         occurence = RecurringStreamOccurence.objects.filter(
231                 datetime__gt=now - datetime.timedelta(days=1),
232                 datetime__lt=now).order_by('datetime').last()
233         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
234                 datetime__gt=now - datetime.timedelta(days=1),
235                 datetime__lt=now).order_by('datetime').last()
236         # note it shouldn't be possible to have both diffusion and occurences
237         # running at the moment.
238         if occurence and occurence.end_datetime > now:
239             return occurence
240         if diffusion and diffusion.end_datetime > now:
241             return diffusion
242         if directory_occurence and directory_occurence.end_datetime > now:
243             return directory_occurence
244         return None
245
246     def get_next_diffusion(self, before_datetime):
247         now = datetime.datetime.now()
248         diffusion = ScheduledDiffusion.objects.filter(
249                 diffusion__datetime__gt=now,
250                 diffusion__datetime__lt=before_datetime,
251                 ).order_by('diffusion__datetime').first()
252         occurence = RecurringStreamOccurence.objects.filter(
253                 datetime__gt=now,
254                 datetime__lt=before_datetime,
255                 ).order_by('datetime').first()
256         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
257                 datetime__gt=now,
258                 datetime__lt=before_datetime,
259                 ).order_by('datetime').first()
260         if diffusion and occurence:
261             return diffusion if diffusion.diffusion__datetime < occurence.datetime else occurence
262         if diffusion:
263             return diffusion
264         if occurence:
265             return occurence
266         if directory_occurence:
267             return directory_occurence
268         return None
269
270     def recompute_slots(self):
271         now = datetime.datetime.now()
272         # print(now, 'recompute_slots', file=sys.stderr)
273         diffusion = self.get_current_diffusion()
274         if diffusion:
275             self.slot = diffusion
276         else:
277             nonstops = list(Nonstop.objects.all().order_by('start'))
278             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
279             try:
280                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
281             except IndexError:
282                 self.slot = nonstops[0]
283             try:
284                 next_slot = nonstops[nonstops.index(self.slot) + 1]
285             except IndexError:
286                 next_slot = nonstops[0]
287             self.slot.datetime = now.replace(
288                     hour=self.slot.start.hour,
289                     minute=self.slot.start.minute)
290             self.slot.end_datetime = now.replace(
291                     hour=next_slot.start.hour,
292                     minute=next_slot.start.minute,
293                     second=0,
294                     microsecond=0)
295             if self.slot.end_datetime < self.slot.datetime:
296                 self.slot.end_datetime += datetime.timedelta(days=1)
297
298             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
299             if diffusion:
300                 self.slot.end_datetime = diffusion.datetime
301
302     async def recompute_slots_loop(self):
303         now = datetime.datetime.now()
304         print(now, 'recompute_slots_loop', file=sys.stderr)
305         sleep = (60 - now.second) % 10  # adjust to awake at :00
306         while not self.quit:
307             await asyncio.sleep(sleep)
308             sleep = 10  # next cycles every 10 seconds
309             current_slot = self.slot
310             self.recompute_slots()
311             expected_slot = self.slot
312             if current_slot != expected_slot:
313                 now = datetime.datetime.now()
314                 print(now, 'unexpected change', current_slot, 'vs', expected_slot, file=sys.stderr)
315                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
316                     # ask for a softstop, i.e. finish the track then switch.
317                     self.softstop = True
318                 else:
319                     # interrupt nonstop
320                     print('interrupting nonstop', file=sys.stderr)
321                     self.play_task.cancel()
322             elif current_slot.end_datetime > expected_slot.end_datetime:
323                 now = datetime.datetime.now()
324                 print(now, 'change in end time, from %s to %s' %
325                         (current_slot.end_datetime, expected_slot.end_datetime), file=sys.stderr)
326                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
327                     # more than 5 minutes left, recompute playlist
328                     self.recompute_playlist()
329
330     async def handle_connection(self, reader, writer):
331         writer.write(b'Watusi!\n')
332         writer.write(b'(dot on empty line to stop connection)\n')
333         end = False
334         while not end:
335             data = await reader.read(100)
336             try:
337                 message = data.decode().strip()
338             except UnicodeDecodeError:
339                 print('got invalid message %r' % message, file=sys.stderr)
340                 continue
341             print('got message: %r' % message, file=sys.stderr)
342             if message == 'status':
343                 response = {'slot': str(self.slot)}
344                 if isinstance(self.slot, Nonstop):
345                     try:
346                         track = self.playlist[self.playhead]
347                     except IndexError:
348                         pass
349                     else:
350                         response['track'] = {}
351                         response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
352                         response['track']['title'] = track.title
353                         response['track']['artist'] = track.artist.name if track.artist_id else ''
354                         response['track']['duration'] = track.duration.total_seconds()
355                         response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
356                         response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
357                 next_diffusion = self.get_next_diffusion(
358                         before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
359                 if next_diffusion:
360                     response['next_diffusion'] = {
361                         'label': str(next_diffusion),
362                         'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
363                     }
364                     if isinstance(next_diffusion, ScheduledDiffusion):
365                         response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
366                         response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
367             elif message == '.':
368                 end = True
369                 response = {'ack': True}
370             elif message == 'softquit':
371                 self.quit = True
372                 end = True
373                 response = {'ack': True}
374             elif message == 'hardquit':
375                 self.quit = True
376                 end = True
377                 response = {'ack': True}
378                 if self.player and self.player.returncode is None:  # not finished
379                     self.player.kill()
380             else:
381                 response = {'err': 1, 'msg': 'unknown command: %r' % message}
382             writer.write(json.dumps(response).encode('utf-8') + b'\n')
383             try:
384                 await writer.drain()
385             except ConnectionResetError:
386                 break
387         writer.close()
388
389     def sigterm_handler(self):
390         print('got signal', file=sys.stderr)
391         self.quit = True
392         self.play_task.cancel()
393
394     async def main(self):
395         loop = asyncio.get_running_loop()
396         loop.add_signal_handler(
397                 signal.SIGTERM,
398                 self.sigterm_handler)
399         now = datetime.datetime.now()
400         self.recompute_slots()
401         server = await asyncio.start_server(
402                 self.handle_connection,
403                 app_settings.SERVER_BIND_IFACE,
404                 app_settings.SERVER_BIND_PORT)
405         async with server:
406             asyncio.create_task(server.serve_forever())
407
408             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
409             while not self.quit:
410                 duration = (self.slot.end_datetime - now).seconds
411                 print('next sure slot', duration, self.slot.end_datetime, file=sys.stderr)
412                 if duration < 2:
413                     # next slot is very close, wait for it
414                     await asyncio.sleep(duration)
415                     self.recompute_slots()
416                 self.play_task = asyncio.create_task(self.play(self.slot))
417                 try:
418                     await self.play_task
419                     self.recompute_slots()
420                 except asyncio.CancelledError as exc:
421                     print('exc:', exc, file=sys.stderr)
422                     if self.player and self.player.returncode is None:  # not finished
423                         self.player.kill()
424                 except KeyboardInterrupt:
425                     self.quit = True
426                     break