]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
a03677c37771b5916c15eb9fcbeb3cfdff7bdb21
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import json
4 import logging
5 import random
6 import signal
7 import sys
8 import time
9
10 import requests
11
12 from django.conf import settings
13 from django.core.management.base import BaseCommand
14
15 from emissions.models import Nonstop
16 from nonstop.models import (NonstopZoneSettings, Track, Jingle, SomaLogLine,
17         ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence)
18 from nonstop.app_settings import app_settings
19 from nonstop.utils import Tracklist
20
21
22 logger = logging.getLogger('stamina')
23
24
25 class Command(BaseCommand):
26     requires_system_checks = False
27
28     last_jingle_datetime = None
29     quit = False
30
31     def handle(self, verbosity, **kwargs):
32         alert_index = 0
33         latest_alert_timestamp = 0
34         latest_exception_timestamp = 0
35
36         def exception_alert_thresholds():
37             yield 0
38             duration = 3
39             while duration < 3600:
40                 yield duration
41                 duration *= 5
42             duration = 3600
43             while True:
44                 yield duration
45                 duration += 3600
46
47         while True:
48             try:
49                 asyncio.run(self.main(), debug=settings.DEBUG)
50             except KeyboardInterrupt:
51                 break
52             except Exception:
53                 timestamp = time.time()
54                 if (timestamp - latest_exception_timestamp) > 300:
55                     # if latest exception was a "long" time ago, assume
56                     # things went smooth for a while and reset things
57                     alert_index = 0
58                     latest_alert_timestamp = 0
59                     latest_exception_timestamp = 0
60
61                 alert_threshold = 0
62                 for i, threshold in enumerate(exception_alert_thresholds()):
63                     if i == alert_index:
64                         alert_threshold = threshold
65                         break
66
67                 if (timestamp - latest_alert_timestamp) > alert_threshold:
68                     logger.exception('General exception (alert index: %s)', alert_index)
69                     latest_alert_timestamp = timestamp
70                     alert_index += 1
71
72                 time.sleep(2)  # retry after a bit
73                 latest_exception_timestamp = timestamp
74                 continue
75             break
76
77     def get_playlist(self, zone, start_datetime, end_datetime):
78         current_datetime = start_datetime
79         if self.last_jingle_datetime is None:
80             self.last_jingle_datetime = current_datetime
81         # Define a max duration (1 hour), if it is reached, and far enough
82         # from end_datetime (30 minutes), return the playlist as is, not aligned
83         # on end time, so a new playlist gets computed once it's over.
84         # This avoids misalignments due to track durations not matching exactly
85         # or additional delays caused by the player program.
86         max_duration = datetime.timedelta(hours=1)
87         max_duration_leftover = datetime.timedelta(minutes=30)
88         playlist = []
89         adjustment_counter = 0
90         try:
91             zone_settings = zone.nonstopzonesettings_set.first()
92             jingles = list(zone_settings.jingles.all())
93         except AttributeError:
94             zone_settings = NonstopZoneSettings()
95             jingles = []
96
97         zone_ids = [zone.id]
98         extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
99         if extra_zones:
100             zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
101
102         recent_tracks_id = [x.track_id for x in
103                 SomaLogLine.objects.exclude(on_air=False).filter(
104                     track__isnull=False,
105                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
106
107         tracklist = Tracklist(zone_settings, zone_ids, recent_tracks_id)
108         random_tracks_iterator = tracklist.get_random_tracks()
109         t0 = datetime.datetime.now()
110         allow_overflow = False
111         while current_datetime < end_datetime:
112             if (current_datetime - start_datetime) > max_duration and (
113                     (end_datetime - current_datetime) > max_duration_leftover):
114                 break
115
116             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
117                 # jingle time, every ~20 minutes
118                 tracklist.playlist.append(random.choice(jingles))
119                 self.last_jingle_datetime = current_datetime
120                 current_datetime = start_datetime + tracklist.get_duration()
121             remaining_time = (end_datetime - current_datetime)
122
123             track = next(random_tracks_iterator)
124             tracklist.append(track)
125             current_datetime = start_datetime + tracklist.get_duration()
126             if current_datetime > end_datetime and not allow_overflow:
127                 # last track overshot
128                 # 1st strategy: remove last track and try to get a track with
129                 # exact remaining time
130                 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
131                 tracklist.pop()
132                 track = Track.objects.filter(
133                         nonstop_zones__in=zone_ids,
134                         duration__gte=remaining_time,
135                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
136                         ).exclude(id__in=tracklist.get_recent_track_ids()).order_by('?').first()
137                 if track:
138                     # found a track
139                     tracklist.append(track)
140                 else:
141                     # fallback strategy: didn't find track of expected duration,
142                     # reduce playlist further
143                     adjustment_counter += 1
144                     if tracklist.pop() is None or adjustment_counter > 5:
145                         # a dedicated sound that ended a bit too early,
146                         # or too many failures to get an appropriate file,
147                         # allow whatever comes.
148                         allow_overflow = True
149                         logger.debug('Allowing overflows')
150
151                 current_datetime = start_datetime + tracklist.get_duration()
152
153         logger.info('Computed playlist for "%s" (computation time: %ss)',
154                 zone, (datetime.datetime.now() - t0))
155         current_datetime = start_datetime
156         for track in tracklist.playlist:
157             logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
158             current_datetime += track.duration
159         logger.debug('- end: %s', current_datetime)
160         return tracklist.playlist
161
162     def is_nonstop_on_air(self):
163         # check if nonstop system is currently on air
164         if app_settings.ON_AIR_SWITCH_URL is None:
165             return None
166         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
167         if not switch_response.ok:
168             return None
169         try:
170             status = switch_response.json()
171         except ValueError:
172             return None
173         if status.get('active') == 0:
174             return True
175         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
176             return True
177         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
178             return True
179         return False
180
181     async def record_nonstop_line(self, track, now):
182         log_line = SomaLogLine()
183         log_line.play_timestamp = now
184         log_line.track = track
185         log_line.filepath = track.nonstopfile_set.first()
186         log_line.on_air = self.is_nonstop_on_air()
187         log_line.save()
188
189     async def player_process(self, item, timeout=None):
190         if app_settings.PLAYER_IPC_PATH:
191             return await self.player_process_ipc(item, timeout=timeout)
192         cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
193         if hasattr(item, 'is_stream') and item.is_stream():
194             cmd.append(item.stream.url)
195             logger.info('Play stream: %s', item.stream.url)
196         else:
197             cmd.append(item.file_path())
198             logger.info('Play file: %s', item.file_path())
199         logger.debug('cmd %r', cmd)
200         self.player = await asyncio.create_subprocess_exec(
201                     *cmd,
202                     stdout=asyncio.subprocess.PIPE,
203                     stderr=asyncio.subprocess.PIPE)
204         if timeout is None:
205             await self.player.communicate()
206         else:
207             try:
208                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
209             except asyncio.TimeoutError:
210                 self.player.kill()
211         self.player = None
212
213     async def player_process_ipc(self, item, timeout=None):
214         starting = False
215         while True:
216             try:
217                 reader, writer = await asyncio.open_unix_connection(
218                             app_settings.PLAYER_IPC_PATH)
219                 break
220             except (FileNotFoundError, ConnectionRefusedError):
221                 if not starting:
222                     cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
223                     cmd += ['--input-ipc-server=%s' % app_settings.PLAYER_IPC_PATH, '--idle']
224                     self.player = await asyncio.create_subprocess_exec(
225                             *cmd,
226                             stdout=asyncio.subprocess.DEVNULL,
227                             stderr=asyncio.subprocess.DEVNULL)
228                     starting = True
229                 await asyncio.sleep(0.1)
230
231         if hasattr(item, 'is_stream') and item.is_stream():
232             file_path = item.stream.url
233             logger.info('Play stream: %s', item.stream.url)
234         else:
235             file_path = item.file_path()
236             logger.info('Play file: %s', item.file_path())
237
238         writer.write(json.dumps({'command': ['loadfile', file_path]}).encode() + b'\n')
239         try:
240             await writer.drain()
241         except ConnectionResetError:  # connection lost
242             return
243         try:
244             await asyncio.wait_for(self.player_ipc_idle(reader, writer), timeout=timeout)
245         except asyncio.TimeoutError:
246             pass
247         writer.close()
248         await writer.wait_closed()
249
250     async def player_ipc_idle(self, reader, writer):
251         while True:
252             data = await reader.readline()
253             if not data:
254                 break
255             if json.loads(data) == {'event': 'idle'}:
256                 break
257
258     async def play(self, slot):
259         now = datetime.datetime.now()
260         if isinstance(slot, Nonstop):
261             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
262             self.playhead = 0
263             self.softstop = False
264             while not self.quit:
265                 now = datetime.datetime.now()
266                 try:
267                     track = self.playlist[self.playhead]
268                 except IndexError:
269                     break
270                 self.current_track_start_datetime = now
271                 if isinstance(track, Jingle):
272                     logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
273                 else:
274                     logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
275                 record_task = None
276                 if isinstance(track, Track):  # not jingles
277                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
278                 await self.player_process(track)
279                 if record_task:
280                     await record_task
281                 if self.softstop:
282                     # track was left to finish, but now the playlist should stop.
283                     break
284                 self.playhead += 1
285         elif slot.is_stream():
286             logger.info('Stream: %s', slot.stream)
287             if slot.jingle_id:
288                 await self.player_process(slot.jingle, timeout=60)
289             logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
290             short_interruption_counter = 0
291             has_played = False
292             while True:
293                 player_start_time = datetime.datetime.now()
294                 await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
295                 now = datetime.datetime.now()
296                 if (slot.end_datetime - now).total_seconds() < 2:
297                     # it went well, stop
298                     break
299                 # stream got interrupted
300                 if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
301                     # and was up for less than 15 seconds.
302                     if not has_played:
303                         # never up before, probably not even started
304                         if isinstance(slot, RecurringStreamOccurence):
305                             # no mercy for recurring stream, remove occurence
306                             logger.info('Missing stream for %s, removing', slot)
307                             slot.delete()
308                         elif slot.auto_delayed is True:
309                             # was already delayed and is still not up, remove.
310                             logger.info('Still missing stream for %s, removing', slot)
311                             slot.delete()
312                         else:
313                             # push back start datetime for 5 minutes, and get
314                             # back to nonstop music in the meantime
315                             logger.info('Pushing starting time of %s', slot.diffusion.episode)
316                             slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
317                             slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
318                             if slot.diffusion.episode.duration <= 5:
319                                 slot.diffusion.episode.duration = 0
320                             slot.auto_delayed = True
321                             slot.diffusion.save()
322                             slot.diffusion.episode.save()
323                             slot.save()
324                         break
325                     short_interruption_counter += 1
326                     # wait a bit
327                     await asyncio.sleep(short_interruption_counter)
328                 else:
329                     # mark stream as ok at least one, and reset short
330                     # interruption counter
331                     has_played = True
332                     short_interruption_counter = 0
333                     logger.debug('Stream error for %s', slot)
334
335                 if short_interruption_counter > 5:
336                     # many short interruptions
337                     logger.info('Too many stream errors for %s, removing', slot)
338                     slot.delete()
339                     break
340         else:
341             if hasattr(slot, 'episode'):
342                 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
343             else:
344                 logger.info('Random: %s', slot)
345             if slot.jingle_id:
346                 await self.player_process(slot.jingle, timeout=60)
347             await self.player_process(slot)
348
349     def recompute_playlist(self):
350         current_track = self.playlist[self.playhead]
351         logger.debug('Recomputing playlist at %s, from %s to %s',
352                 current_track.title,
353                 self.current_track_start_datetime + current_track.duration,
354                 self.slot.end_datetime)
355         playlist = self.get_playlist(self.slot,
356                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
357         if playlist:
358             self.playlist[self.playhead + 1:] = playlist
359
360     def get_current_diffusion(self):
361         now = datetime.datetime.now()
362         diffusion = ScheduledDiffusion.objects.filter(
363                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
364                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
365         occurence = RecurringStreamOccurence.objects.filter(
366                 datetime__gt=now - datetime.timedelta(days=1),
367                 datetime__lt=now).order_by('datetime').last()
368         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
369                 datetime__gt=now - datetime.timedelta(days=1),
370                 datetime__lt=now).order_by('datetime').last()
371         # note it shouldn't be possible to have both diffusion and occurences
372         # running at the moment.
373         if occurence and occurence.end_datetime > now:
374             return occurence
375         if diffusion and diffusion.end_datetime > now:
376             return diffusion
377         if directory_occurence and directory_occurence.end_datetime > now:
378             return directory_occurence
379         return None
380
381     def get_next_diffusion(self, before_datetime):
382         now = datetime.datetime.now()
383         diffusion = ScheduledDiffusion.objects.filter(
384                 diffusion__datetime__gt=now,
385                 diffusion__datetime__lt=before_datetime,
386                 ).order_by('diffusion__datetime').first()
387         occurence = RecurringStreamOccurence.objects.filter(
388                 datetime__gt=now,
389                 datetime__lt=before_datetime,
390                 ).order_by('datetime').first()
391         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
392                 datetime__gt=now,
393                 datetime__lt=before_datetime,
394                 ).order_by('datetime').first()
395         if diffusion and occurence:
396             return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
397         if diffusion:
398             return diffusion
399         if occurence:
400             return occurence
401         if directory_occurence:
402             return directory_occurence
403         return None
404
405     def recompute_slots(self):
406         now = datetime.datetime.now()
407         diffusion = self.get_current_diffusion()
408         if diffusion:
409             self.slot = diffusion
410         else:
411             nonstops = list(Nonstop.objects.all().order_by('start'))
412             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
413             try:
414                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
415             except IndexError:
416                 self.slot = nonstops[0]
417             try:
418                 next_slot = nonstops[nonstops.index(self.slot) + 1]
419             except IndexError:
420                 next_slot = nonstops[0]
421             self.slot.datetime = now.replace(
422                     hour=self.slot.start.hour,
423                     minute=self.slot.start.minute)
424             self.slot.end_datetime = now.replace(
425                     hour=next_slot.start.hour,
426                     minute=next_slot.start.minute,
427                     second=0,
428                     microsecond=0)
429             if self.slot.end_datetime < self.slot.datetime:
430                 self.slot.end_datetime += datetime.timedelta(days=1)
431
432             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
433             if diffusion:
434                 self.slot.end_datetime = diffusion.datetime
435
436     async def recompute_slots_loop(self):
437         now = datetime.datetime.now()
438         sleep = (60 - now.second) % 10  # adjust to awake at :00
439         while not self.quit:
440             await asyncio.sleep(sleep)
441             sleep = 10  # next cycles every 10 seconds
442             current_slot = self.slot
443             self.recompute_slots()
444             expected_slot = self.slot
445             if current_slot != expected_slot:
446                 now = datetime.datetime.now()
447                 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
448                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
449                     # ask for a softstop, i.e. finish the track then switch.
450                     self.softstop = True
451                 elif isinstance(current_slot, Nonstop):
452                     # interrupt nonstop
453                     logger.info('Interrupting nonstop')
454                     self.play_task.cancel()
455                 elif current_slot.is_stream():
456                     # it should have been stopped by timeout set on player but
457                     # maybe the episode duration has been shortened after its
458                     # start.
459                     logger.info('Interrupting stream')
460                     self.play_task.cancel()
461             elif current_slot.end_datetime > expected_slot.end_datetime:
462                 now = datetime.datetime.now()
463                 logger.debug('Change in end time, from %s to %s',
464                         current_slot.end_datetime,
465                         expected_slot.end_datetime)
466                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
467                     # more than 5 minutes left, recompute playlist
468                     self.recompute_playlist()
469
470     async def handle_connection(self, reader, writer):
471         writer.write(b'Watusi!\n')
472         writer.write(b'Known commands: status, softquit, hardquit\n')
473         writer.write(b'(dot on empty line to stop connection)\n')
474         await writer.drain()
475         end = False
476         while not end:
477             data = await reader.read(100)
478             try:
479                 message = data.decode().strip()
480             except UnicodeDecodeError:
481                 logger.debug('Server, invalid message %r', message)
482                 if not data:
483                     end = True
484                 continue
485             logger.debug('Server, message %r', message)
486             if message == 'status':
487                 response = {'slot': str(self.slot)}
488                 if isinstance(self.slot, Nonstop):
489                     try:
490                         track = self.playlist[self.playhead]
491                     except IndexError:
492                         pass
493                     else:
494                         response['track'] = {}
495                         response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
496                         response['track']['title'] = track.title
497                         response['track']['artist'] = track.artist.name if track.artist_id else ''
498                         response['track']['duration'] = track.duration.total_seconds()
499                         response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
500                         response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
501                 next_diffusion = self.get_next_diffusion(
502                         before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
503                 if next_diffusion:
504                     response['next_diffusion'] = {
505                         'label': str(next_diffusion),
506                         'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
507                     }
508                     if isinstance(next_diffusion, ScheduledDiffusion):
509                         response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
510                         response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
511             elif message == '.':
512                 end = True
513                 response = {'ack': True}
514             elif message == 'softquit':
515                 self.quit = True
516                 end = True
517                 response = {'ack': True}
518             elif message == 'hardquit':
519                 self.quit = True
520                 end = True
521                 response = {'ack': True}
522                 if self.player and self.player.returncode is None:  # not finished
523                     self.player.kill()
524             else:
525                 response = {'err': 1, 'msg': 'unknown command: %r' % message}
526             writer.write(json.dumps(response).encode('utf-8') + b'\n')
527             try:
528                 await writer.drain()
529             except ConnectionResetError:
530                 break
531         writer.close()
532
533     def sigterm_handler(self):
534         logger.info('Got SIGTERM')
535         self.quit = True
536         self.play_task.cancel()
537
538     async def main(self):
539         loop = asyncio.get_running_loop()
540         loop.add_signal_handler(
541                 signal.SIGTERM,
542                 self.sigterm_handler)
543         self.recompute_slots()
544         server = await asyncio.start_server(
545                 self.handle_connection,
546                 app_settings.SERVER_BIND_IFACE,
547                 app_settings.SERVER_BIND_PORT)
548         async with server:
549             asyncio.create_task(server.serve_forever())
550
551             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
552             while not self.quit:
553                 now = datetime.datetime.now()
554                 duration = (self.slot.end_datetime - now).seconds
555                 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
556                 if duration < 2:
557                     # next slot is very close, wait for it
558                     await asyncio.sleep(duration)
559                     self.recompute_slots()
560                 self.play_task = asyncio.create_task(self.play(self.slot))
561                 try:
562                     await self.play_task
563                     self.recompute_slots()
564                 except asyncio.CancelledError:
565                     logger.debug('Player cancelled exception')
566                     if self.player and self.player.returncode is None:  # not finished
567                         self.player.kill()
568                 except KeyboardInterrupt:
569                     self.quit = True
570                     break