]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
42c5df8c6f98a25acd7c95859d3c7b89b6efa277
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import json
4 import logging
5 import random
6 import signal
7 import sys
8 import time
9
10 import requests
11
12 from django.conf import settings
13 from django.core.management.base import BaseCommand
14
15 from emissions.models import Nonstop
16 from nonstop.models import Track, Jingle, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
17 from nonstop.app_settings import app_settings
18
19
20 logger = logging.getLogger('stamina')
21
22
23 class Command(BaseCommand):
24     requires_system_checks = False
25
26     last_jingle_datetime = None
27     quit = False
28
29     def handle(self, verbosity, **kwargs):
30         alert_index = 0
31         latest_alert_timestamp = 0
32         latest_exception_timestamp = 0
33
34         def exception_alert_thresholds():
35             yield 0
36             duration = 3
37             while duration < 3600:
38                 yield duration
39                 duration *= 5
40             duration = 3600
41             while True:
42                 yield duration
43                 duration += 3600
44
45         while True:
46             try:
47                 asyncio.run(self.main(), debug=settings.DEBUG)
48             except KeyboardInterrupt:
49                 break
50             except Exception:
51                 timestamp = time.time()
52                 if (timestamp - latest_exception_timestamp) > 300:
53                     # if latest exception was a "long" time ago, assume
54                     # things went smooth for a while and reset things
55                     alert_index = 0
56                     latest_alert_timestamp = 0
57                     latest_exception_timestamp = 0
58
59                 alert_threshold = 0
60                 for i, threshold in enumerate(exception_alert_thresholds()):
61                     if i == alert_index:
62                         alert_threshold = threshold
63                         break
64
65                 if (timestamp - latest_alert_timestamp) > alert_threshold:
66                     logger.exception('General exception (alert index: %s)', alert_index)
67                     latest_alert_timestamp = timestamp
68                     alert_index += 1
69
70                 time.sleep(2)  # retry after a bit
71                 latest_exception_timestamp = timestamp
72                 continue
73             break
74
75     def get_playlist(self, zone, start_datetime, end_datetime):
76         current_datetime = start_datetime
77         if self.last_jingle_datetime is None:
78             self.last_jingle_datetime = current_datetime
79         # Define a max duration (1 hour), if it is reached, and far enough
80         # from end_datetime (30 minutes), return the playlist as is, not aligned
81         # on end time, so a new playlist gets computed once it's over.
82         # This avoids misalignments due to track durations not matching exactly
83         # or additional delays caused by the player program.
84         max_duration = datetime.timedelta(hours=1)
85         max_duration_leftover = datetime.timedelta(minutes=30)
86         playlist = []
87         adjustment_counter = 0
88         try:
89             jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
90         except AttributeError:
91             jingles = []
92
93         recent_tracks_id = [x.track_id for x in
94                 SomaLogLine.objects.exclude(on_air=False).filter(
95                     track__isnull=False,
96                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
97         t0 = datetime.datetime.now()
98         allow_overflow = False
99         while current_datetime < end_datetime:
100             if (current_datetime - start_datetime) > max_duration and (
101                     (end_datetime - current_datetime) > max_duration_leftover):
102                 break
103
104             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
105                 # jingle time, every ~20 minutes
106                 playlist.append(random.choice(jingles))
107                 self.last_jingle_datetime = current_datetime
108                 current_datetime = start_datetime + sum(
109                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
110
111             remaining_time = (end_datetime - current_datetime)
112             track = Track.objects.filter(
113                     nonstop_zones=zone,
114                     duration__isnull=False).exclude(
115                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
116                     ).order_by('?').first()
117             if track is None:
118                 # no track, reduce recent tracks exclusion
119                 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
120                 continue
121             playlist.append(track)
122             current_datetime = start_datetime + sum(
123                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
124             if current_datetime > end_datetime and not allow_overflow:
125                 # last track overshot
126                 # 1st strategy: remove last track and try to get a track with
127                 # exact remaining time
128                 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
129                 playlist = playlist[:-1]
130                 track = Track.objects.filter(
131                         nonstop_zones=zone,
132                         duration__gte=remaining_time,
133                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
134                         ).exclude(
135                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
136                         ).order_by('?').first()
137                 if track:
138                     # found a track
139                     playlist.append(track)
140                 else:
141                     # fallback strategy: didn't find track of expected duration,
142                     # reduce playlist further
143                     adjustment_counter += 1
144                     playlist = playlist[:-1]
145                     if len(playlist) == 0 or adjustment_counter > 5:
146                         # a dedicated sound that ended a bit too early,
147                         # or too many failures to get an appropriate file,
148                         # allow whatever comes.
149                         allow_overflow = True
150                         logger.debug('Allowing overflows')
151
152                 current_datetime = start_datetime + sum(
153                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
154
155         logger.info('Computed playlist: (computation time: %ss)',
156                 (datetime.datetime.now() - t0))
157         current_datetime = start_datetime
158         for track in playlist:
159             logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
160             current_datetime += track.duration
161         logger.debug('- end: %s', current_datetime)
162         return playlist
163
164     def is_nonstop_on_air(self):
165         # check if nonstop system is currently on air
166         if app_settings.ON_AIR_SWITCH_URL is None:
167             return None
168         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
169         if not switch_response.ok:
170             return None
171         try:
172             status = switch_response.json()
173         except ValueError:
174             return None
175         if status.get('active') == 0:
176             return True
177         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
178             return True
179         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
180             return True
181         return False
182
183     async def record_nonstop_line(self, track, now):
184         log_line = SomaLogLine()
185         log_line.play_timestamp = now
186         log_line.track = track
187         log_line.filepath = track.nonstopfile_set.first()
188         log_line.on_air = self.is_nonstop_on_air()
189         log_line.save()
190
191     async def player_process(self, item, timeout=None):
192         cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
193         if hasattr(item, 'is_stream') and item.is_stream():
194             cmd.append(item.stream.url)
195             logger.info('Play stream: %s', item.stream.url)
196         else:
197             cmd.append(item.file_path())
198             logger.info('Play file: %s', item.file_path())
199         if app_settings.DEBUG_WITH_SLEEPS:
200             # replace command by a sleep call, for silent debugging
201             if hasattr(item, 'is_stream') and item.is_stream():
202                 cmd = 'sleep 86400 # %s' % item.stream.url
203             elif isinstance(item.duration, datetime.timedelta):
204                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
205             elif isinstance(item.duration, int):
206                 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
207         logger.debug('cmd %r', cmd)
208         if isinstance(cmd, str):
209             self.player = await asyncio.create_subprocess_shell(
210                     cmd,
211                     stdout=asyncio.subprocess.PIPE,
212                     stderr=asyncio.subprocess.PIPE)
213         else:
214             self.player = await asyncio.create_subprocess_exec(
215                     *cmd,
216                     stdout=asyncio.subprocess.PIPE,
217                     stderr=asyncio.subprocess.PIPE)
218         if timeout is None:
219             await self.player.communicate()
220         else:
221             try:
222                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
223             except asyncio.TimeoutError:
224                 self.player.kill()
225         self.player = None
226
227     async def play(self, slot):
228         now = datetime.datetime.now()
229         if isinstance(slot, Nonstop):
230             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
231             self.playhead = 0
232             self.softstop = False
233             while not self.quit:
234                 now = datetime.datetime.now()
235                 try:
236                     track = self.playlist[self.playhead]
237                 except IndexError:
238                     break
239                 self.current_track_start_datetime = now
240                 if isinstance(track, Jingle):
241                     logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
242                 else:
243                     logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
244                 record_task = None
245                 if isinstance(track, Track):  # not jingles
246                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
247                 await self.player_process(track)
248                 if record_task:
249                     await record_task
250                 if self.softstop:
251                     # track was left to finish, but now the playlist should stop.
252                     break
253                 self.playhead += 1
254         elif slot.is_stream():
255             logger.info('Stream: %s', slot.stream)
256             if slot.jingle_id:
257                 await self.player_process(slot.jingle, timeout=60)
258             logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
259             short_interruption_counter = 0
260             has_played = False
261             while True:
262                 player_start_time = datetime.datetime.now()
263                 await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
264                 now = datetime.datetime.now()
265                 if (slot.end_datetime - now).total_seconds() < 2:
266                     # it went well, stop
267                     break
268                 # stream got interrupted
269                 if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
270                     # and was up for less than 15 seconds.
271                     if not has_played:
272                         # never up before, probably not even started
273                         if isinstance(slot, RecurringStreamOccurence):
274                             # no mercy for recurring stream, remove occurence
275                             logger.info('Missing stream for %s, removing', slot)
276                             slot.delete()
277                         elif slot.auto_delayed is True:
278                             # was already delayed and is still not up, remove.
279                             logger.info('Still missing stream for %s, removing', slot)
280                             slot.delete()
281                         else:
282                             # push back start datetime for 5 minutes, and get
283                             # back to nonstop music in the meantime
284                             logger.info('Pushing starting time of %s', slot.diffusion.episode)
285                             slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
286                             slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
287                             if slot.diffusion.episode.duration <= 5:
288                                 slot.diffusion.episode.duration = 0
289                             slot.auto_delayed = True
290                             slot.diffusion.save()
291                             slot.diffusion.episode.save()
292                             slot.save()
293                         break
294                     short_interruption_counter += 1
295                     # wait a bit
296                     await asyncio.sleep(short_interruption_counter)
297                 else:
298                     # mark stream as ok at least one, and reset short
299                     # interruption counter
300                     has_played = True
301                     short_interruption_counter = 0
302                     logger.debug('Stream error for %s', slot)
303
304                 if short_interruption_counter > 5:
305                     # many short interruptions
306                     logger.info('Too many stream errors for %s, removing', slot)
307                     slot.delete()
308                     break
309         else:
310             if hasattr(slot, 'episode'):
311                 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
312             else:
313                 logger.info('Random: %s', slot)
314             if slot.jingle_id:
315                 await self.player_process(slot.jingle, timeout=60)
316             await self.player_process(slot)
317
318     def recompute_playlist(self):
319         current_track = self.playlist[self.playhead]
320         logger.debug('Recomputing playlist at %s, from %s to %s',
321                 current_track.title,
322                 self.current_track_start_datetime + current_track.duration,
323                 self.slot.end_datetime)
324         playlist = self.get_playlist(self.slot,
325                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
326         if playlist:
327             self.playlist[self.playhead + 1:] = playlist
328
329     def get_current_diffusion(self):
330         now = datetime.datetime.now()
331         diffusion = ScheduledDiffusion.objects.filter(
332                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
333                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
334         occurence = RecurringStreamOccurence.objects.filter(
335                 datetime__gt=now - datetime.timedelta(days=1),
336                 datetime__lt=now).order_by('datetime').last()
337         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
338                 datetime__gt=now - datetime.timedelta(days=1),
339                 datetime__lt=now).order_by('datetime').last()
340         # note it shouldn't be possible to have both diffusion and occurences
341         # running at the moment.
342         if occurence and occurence.end_datetime > now:
343             return occurence
344         if diffusion and diffusion.end_datetime > now:
345             return diffusion
346         if directory_occurence and directory_occurence.end_datetime > now:
347             return directory_occurence
348         return None
349
350     def get_next_diffusion(self, before_datetime):
351         now = datetime.datetime.now()
352         diffusion = ScheduledDiffusion.objects.filter(
353                 diffusion__datetime__gt=now,
354                 diffusion__datetime__lt=before_datetime,
355                 ).order_by('diffusion__datetime').first()
356         occurence = RecurringStreamOccurence.objects.filter(
357                 datetime__gt=now,
358                 datetime__lt=before_datetime,
359                 ).order_by('datetime').first()
360         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
361                 datetime__gt=now,
362                 datetime__lt=before_datetime,
363                 ).order_by('datetime').first()
364         if diffusion and occurence:
365             return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
366         if diffusion:
367             return diffusion
368         if occurence:
369             return occurence
370         if directory_occurence:
371             return directory_occurence
372         return None
373
374     def recompute_slots(self):
375         now = datetime.datetime.now()
376         diffusion = self.get_current_diffusion()
377         if diffusion:
378             self.slot = diffusion
379         else:
380             nonstops = list(Nonstop.objects.all().order_by('start'))
381             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
382             try:
383                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
384             except IndexError:
385                 self.slot = nonstops[0]
386             try:
387                 next_slot = nonstops[nonstops.index(self.slot) + 1]
388             except IndexError:
389                 next_slot = nonstops[0]
390             self.slot.datetime = now.replace(
391                     hour=self.slot.start.hour,
392                     minute=self.slot.start.minute)
393             self.slot.end_datetime = now.replace(
394                     hour=next_slot.start.hour,
395                     minute=next_slot.start.minute,
396                     second=0,
397                     microsecond=0)
398             if self.slot.end_datetime < self.slot.datetime:
399                 self.slot.end_datetime += datetime.timedelta(days=1)
400
401             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
402             if diffusion:
403                 self.slot.end_datetime = diffusion.datetime
404
405     async def recompute_slots_loop(self):
406         now = datetime.datetime.now()
407         sleep = (60 - now.second) % 10  # adjust to awake at :00
408         while not self.quit:
409             await asyncio.sleep(sleep)
410             sleep = 10  # next cycles every 10 seconds
411             current_slot = self.slot
412             self.recompute_slots()
413             expected_slot = self.slot
414             if current_slot != expected_slot:
415                 now = datetime.datetime.now()
416                 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
417                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
418                     # ask for a softstop, i.e. finish the track then switch.
419                     self.softstop = True
420                 elif isinstance(current_slot, Nonstop):
421                     # interrupt nonstop
422                     logger.info('Interrupting nonstop')
423                     self.play_task.cancel()
424             elif current_slot.end_datetime > expected_slot.end_datetime:
425                 now = datetime.datetime.now()
426                 logger.debug('Change in end time, from %s to %s',
427                         current_slot.end_datetime,
428                         expected_slot.end_datetime)
429                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
430                     # more than 5 minutes left, recompute playlist
431                     self.recompute_playlist()
432
433     async def handle_connection(self, reader, writer):
434         writer.write(b'Watusi!\n')
435         writer.write(b'Known commands: status, softquit, hardquit\n')
436         writer.write(b'(dot on empty line to stop connection)\n')
437         await writer.drain()
438         end = False
439         while not end:
440             data = await reader.read(100)
441             try:
442                 message = data.decode().strip()
443             except UnicodeDecodeError:
444                 logger.debug('Server, invalid message %r', message)
445                 if not data:
446                     end = True
447                 continue
448             logger.debug('Server, message %r', message)
449             if message == 'status':
450                 response = {'slot': str(self.slot)}
451                 if isinstance(self.slot, Nonstop):
452                     try:
453                         track = self.playlist[self.playhead]
454                     except IndexError:
455                         pass
456                     else:
457                         response['track'] = {}
458                         response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
459                         response['track']['title'] = track.title
460                         response['track']['artist'] = track.artist.name if track.artist_id else ''
461                         response['track']['duration'] = track.duration.total_seconds()
462                         response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
463                         response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
464                 next_diffusion = self.get_next_diffusion(
465                         before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
466                 if next_diffusion:
467                     response['next_diffusion'] = {
468                         'label': str(next_diffusion),
469                         'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
470                     }
471                     if isinstance(next_diffusion, ScheduledDiffusion):
472                         response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
473                         response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
474             elif message == '.':
475                 end = True
476                 response = {'ack': True}
477             elif message == 'softquit':
478                 self.quit = True
479                 end = True
480                 response = {'ack': True}
481             elif message == 'hardquit':
482                 self.quit = True
483                 end = True
484                 response = {'ack': True}
485                 if self.player and self.player.returncode is None:  # not finished
486                     self.player.kill()
487             else:
488                 response = {'err': 1, 'msg': 'unknown command: %r' % message}
489             writer.write(json.dumps(response).encode('utf-8') + b'\n')
490             try:
491                 await writer.drain()
492             except ConnectionResetError:
493                 break
494         writer.close()
495
496     def sigterm_handler(self):
497         logger.info('Got SIGTERM')
498         self.quit = True
499         self.play_task.cancel()
500
501     async def main(self):
502         loop = asyncio.get_running_loop()
503         loop.add_signal_handler(
504                 signal.SIGTERM,
505                 self.sigterm_handler)
506         self.recompute_slots()
507         server = await asyncio.start_server(
508                 self.handle_connection,
509                 app_settings.SERVER_BIND_IFACE,
510                 app_settings.SERVER_BIND_PORT)
511         async with server:
512             asyncio.create_task(server.serve_forever())
513
514             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
515             while not self.quit:
516                 now = datetime.datetime.now()
517                 duration = (self.slot.end_datetime - now).seconds
518                 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
519                 if duration < 2:
520                     # next slot is very close, wait for it
521                     await asyncio.sleep(duration)
522                     self.recompute_slots()
523                 self.play_task = asyncio.create_task(self.play(self.slot))
524                 try:
525                     await self.play_task
526                     self.recompute_slots()
527                 except asyncio.CancelledError:
528                     logger.debug('Player cancelled exception')
529                     if self.player and self.player.returncode is None:  # not finished
530                         self.player.kill()
531                 except KeyboardInterrupt:
532                     self.quit = True
533                     break