]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
stamina: delay computing now() when creating a playlist
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import json
4 import logging
5 import random
6 import signal
7 import sys
8 import time
9
10 import requests
11
12 from django.conf import settings
13 import django.db
14 from django.core.management.base import BaseCommand
15
16 from emissions.models import Nonstop
17 from nonstop.models import (NonstopZoneSettings, Track, Jingle, SomaLogLine,
18         ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence)
19 from nonstop.app_settings import app_settings
20 from nonstop.utils import Tracklist
21
22
23 logger = logging.getLogger('stamina')
24
25
26 class Command(BaseCommand):
27     requires_system_checks = False
28
29     last_jingle_datetime = None
30     quit = False
31
32     def handle(self, verbosity, **kwargs):
33         alert_index = 0
34         latest_alert_timestamp = 0
35         latest_exception_timestamp = 0
36
37         def exception_alert_thresholds():
38             yield 0
39             duration = 3
40             while duration < 3600:
41                 yield duration
42                 duration *= 5
43             duration = 3600
44             while True:
45                 yield duration
46                 duration += 3600
47
48         while True:
49             try:
50                 asyncio.run(self.main(), debug=settings.DEBUG)
51             except KeyboardInterrupt:
52                 break
53             except Exception as e:
54                 timestamp = time.time()
55                 if (timestamp - latest_exception_timestamp) > 300:
56                     # if latest exception was a "long" time ago, assume
57                     # things went smooth for a while and reset things
58                     alert_index = 0
59                     latest_alert_timestamp = 0
60                     latest_exception_timestamp = 0
61
62                 alert_threshold = 0
63                 for i, threshold in enumerate(exception_alert_thresholds()):
64                     if i == alert_index:
65                         alert_threshold = threshold
66                         break
67
68                 if (timestamp - latest_alert_timestamp) > alert_threshold:
69                     logger.exception('General exception (alert index: %s)', alert_index)
70                     latest_alert_timestamp = timestamp
71                     alert_index += 1
72
73                 if alert_index and isinstance(e, django.db.InterfaceError):
74                     # most likely "connection already closed", because postgresql
75                     # is been restarted; log then get out to be restarted.
76                     logger.error('Aborting on repeated database error')
77                     break
78
79                 time.sleep(2)  # retry after a bit
80                 latest_exception_timestamp = timestamp
81                 continue
82             break
83
84     def get_playlist(self, zone, start_datetime, end_datetime):
85         current_datetime = start_datetime() if callable(start_datetime) else start_datetime
86         if self.last_jingle_datetime is None:
87             self.last_jingle_datetime = current_datetime
88         # Define a max duration (1 hour), if it is reached, and far enough
89         # from end_datetime (30 minutes), return the playlist as is, not aligned
90         # on end time, so a new playlist gets computed once it's over.
91         # This avoids misalignments due to track durations not matching exactly
92         # or additional delays caused by the player program.
93         max_duration = datetime.timedelta(hours=1)
94         max_duration_leftover = datetime.timedelta(minutes=30)
95         playlist = []
96         adjustment_counter = 0
97         try:
98             zone_settings = zone.nonstopzonesettings_set.first()
99             jingles = list(zone_settings.jingles.all())
100         except AttributeError:
101             zone_settings = NonstopZoneSettings()
102             jingles = []
103
104         zone_ids = [zone.id]
105         zone_ids.extend([x.id for x in zone_settings.extra_zones.all()])
106         extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
107         if extra_zones:
108             zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
109
110         recent_tracks_id = [x.track_id for x in
111                 SomaLogLine.objects.exclude(on_air=False).filter(
112                     track__isnull=False,
113                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
114
115         tracklist = Tracklist(zone_settings, zone_ids, recent_tracks_id)
116         random_tracks_iterator = tracklist.get_random_tracks()
117         t0 = datetime.datetime.now()
118         allow_overflow = False
119         if callable(start_datetime):
120             # compute start_datetime (e.g. now()) at the last moment, to get
121             # computed playlist timestamps as close as possible as future real
122             # ones.
123             start_datetime = start_datetime()
124         while current_datetime < end_datetime:
125             if (current_datetime - start_datetime) > max_duration and (
126                     (end_datetime - current_datetime) > max_duration_leftover):
127                 break
128
129             if zone_settings.intro_jingle and (
130                     current_datetime.hour, current_datetime.minute) == (zone.start.hour, zone.start.minute):
131                 tracklist.playlist.append(zone_settings.intro_jingle)
132                 self.last_jingle_datetime = current_datetime
133                 current_datetime = start_datetime + tracklist.get_duration()
134             elif jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
135                 # jingle time, every ~20 minutes
136                 tracklist.playlist.append(random.choice(jingles))
137                 self.last_jingle_datetime = current_datetime
138                 current_datetime = start_datetime + tracklist.get_duration()
139             remaining_time = (end_datetime - current_datetime)
140
141             track = next(random_tracks_iterator)
142             tracklist.append(track)
143             current_datetime = start_datetime + tracklist.get_duration()
144             if current_datetime > end_datetime and not allow_overflow:
145                 # last track overshot
146                 # 1st strategy: remove last track and try to get a track with
147                 # exact remaining time
148                 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
149                 tracklist.pop()
150                 track = Track.objects.filter(
151                         nonstop_zones__in=zone_ids,
152                         duration__gte=remaining_time,
153                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
154                         ).exclude(id__in=tracklist.get_recent_track_ids()).order_by('?').first()
155                 if track:
156                     # found a track
157                     tracklist.append(track)
158                 else:
159                     # fallback strategy: didn't find track of expected duration,
160                     # reduce playlist further
161                     adjustment_counter += 1
162                     if tracklist.pop() is None or adjustment_counter > 5:
163                         # a dedicated sound that ended a bit too early,
164                         # or too many failures to get an appropriate file,
165                         # allow whatever comes.
166                         allow_overflow = True
167                         logger.debug('Allowing overflows')
168
169                 current_datetime = start_datetime + tracklist.get_duration()
170
171         logger.info('Computed playlist for "%s" (computation time: %ss)',
172                 zone, (datetime.datetime.now() - t0))
173         current_datetime = start_datetime
174         for track in tracklist.playlist:
175             logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
176             current_datetime += track.duration
177         logger.debug('- end: %s', current_datetime)
178         return tracklist.playlist
179
180     def is_nonstop_on_air(self):
181         # check if nonstop system is currently on air
182         if app_settings.ON_AIR_SWITCH_URL is None:
183             return None
184         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
185         if not switch_response.ok:
186             return None
187         try:
188             status = switch_response.json()
189         except ValueError:
190             return None
191         if status.get('active') == 0:
192             return True
193         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
194             # TODO: replace this hardware check that no longer works by a
195             # logical check on programs: if there's nothing scheduled at the
196             # moment, consider nonstop is broadcasted, even if studio1 is on.
197             return True
198         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
199             return True
200         return False
201
202     async def record_nonstop_line(self, track, now):
203         log_line = SomaLogLine()
204         log_line.play_timestamp = now
205         log_line.track = track
206         log_line.filepath = track.nonstopfile_set.first()
207         log_line.on_air = self.is_nonstop_on_air()
208         log_line.save()
209
210     async def player_process(self, item, timeout=None):
211         if app_settings.PLAYER_IPC_PATH:
212             return await self.player_process_ipc(item, timeout=timeout)
213         cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
214         if hasattr(item, 'is_stream') and item.is_stream():
215             cmd.append(item.stream.url)
216             logger.info('Play stream: %s', item.stream.url)
217         else:
218             cmd.append(item.file_path())
219             logger.info('Play file: %s', item.file_path())
220         logger.debug('cmd %r', cmd)
221         self.player = await asyncio.create_subprocess_exec(
222                     *cmd,
223                     stdout=asyncio.subprocess.PIPE,
224                     stderr=asyncio.subprocess.PIPE)
225         if timeout is None:
226             await self.player.communicate()
227         else:
228             try:
229                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
230             except asyncio.TimeoutError:
231                 self.player.kill()
232         self.player = None
233
234     async def player_process_ipc(self, item, timeout=None):
235         starting = False
236         while True:
237             try:
238                 reader, writer = await asyncio.open_unix_connection(
239                             app_settings.PLAYER_IPC_PATH)
240                 break
241             except (FileNotFoundError, ConnectionRefusedError):
242                 if not starting:
243                     cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
244                     cmd += ['--input-ipc-server=%s' % app_settings.PLAYER_IPC_PATH, '--idle']
245                     self.player = await asyncio.create_subprocess_exec(
246                             *cmd,
247                             stdout=asyncio.subprocess.DEVNULL,
248                             stderr=asyncio.subprocess.DEVNULL)
249                     starting = True
250                 await asyncio.sleep(0.1)
251
252         if hasattr(item, 'is_stream') and item.is_stream():
253             file_path = item.stream.url
254             logger.info('Play stream: %s', item.stream.url)
255         else:
256             file_path = item.file_path()
257             logger.info('Play file: %s', item.file_path())
258
259         writer.write(json.dumps({'command': ['loadfile', file_path]}).encode() + b'\n')
260         try:
261             await writer.drain()
262         except ConnectionResetError:  # connection lost
263             return
264         try:
265             await asyncio.wait_for(self.player_ipc_idle(reader, writer), timeout=timeout)
266         except asyncio.TimeoutError:
267             pass
268         writer.close()
269         await writer.wait_closed()
270
271     async def player_ipc_idle(self, reader, writer):
272         while True:
273             data = await reader.readline()
274             if not data:
275                 break
276             if json.loads(data) == {'event': 'idle'}:
277                 break
278
279     async def play(self, slot):
280         now = datetime.datetime.now()
281         if isinstance(slot, Nonstop):
282             self.playlist = self.get_playlist(slot, datetime.datetime.now, slot.end_datetime)
283             self.playhead = 0
284             self.softstop = False
285             while not self.quit:
286                 now = datetime.datetime.now()
287                 try:
288                     track = self.playlist[self.playhead]
289                 except IndexError:
290                     break
291                 self.current_track_start_datetime = now
292                 if isinstance(track, Jingle):
293                     logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
294                 else:
295                     logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
296                 record_task = None
297                 if isinstance(track, Track):  # not jingles
298                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
299                 await self.player_process(track)
300                 if record_task:
301                     await record_task
302                 if self.softstop:
303                     # track was left to finish, but now the playlist should stop.
304                     break
305                 self.playhead += 1
306         elif slot.is_stream():
307             logger.info('Stream: %s', slot.stream)
308             if slot.jingle_id:
309                 await self.player_process(slot.jingle, timeout=60)
310             logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
311             short_interruption_counter = 0
312             has_played = False
313             while True:
314                 player_start_time = datetime.datetime.now()
315                 await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
316                 now = datetime.datetime.now()
317                 if (slot.end_datetime - now).total_seconds() < 2:
318                     # it went well, stop
319                     break
320                 # stream got interrupted
321                 if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
322                     # and was up for less than 15 seconds.
323                     if not has_played:
324                         # never up before, probably not even started
325                         if isinstance(slot, RecurringStreamOccurence):
326                             # no mercy for recurring stream, remove occurence
327                             logger.info('Missing stream for %s, removing', slot)
328                             slot.delete()
329                         elif slot.auto_delayed is True:
330                             # was already delayed and is still not up, remove.
331                             logger.info('Still missing stream for %s, removing', slot)
332                             slot.delete()
333                         else:
334                             # push back start datetime for 5 minutes, and get
335                             # back to nonstop music in the meantime
336                             logger.info('Pushing starting time of %s', slot.diffusion.episode)
337                             slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
338                             slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
339                             if slot.diffusion.episode.duration <= 5:
340                                 slot.diffusion.episode.duration = 0
341                             slot.auto_delayed = True
342                             slot.diffusion.save()
343                             slot.diffusion.episode.save()
344                             slot.save()
345                         break
346                     short_interruption_counter += 1
347                     # wait a bit
348                     await asyncio.sleep(short_interruption_counter)
349                 else:
350                     # mark stream as ok at least one, and reset short
351                     # interruption counter
352                     has_played = True
353                     short_interruption_counter = 0
354                     logger.debug('Stream error for %s', slot)
355
356                 if short_interruption_counter > 5:
357                     # many short interruptions
358                     logger.info('Too many stream errors for %s, removing', slot)
359                     slot.delete()
360                     break
361         else:
362             if hasattr(slot, 'episode'):
363                 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
364             else:
365                 logger.info('Random: %s', slot)
366             if slot.jingle_id:
367                 await self.player_process(slot.jingle, timeout=60)
368             await self.player_process(slot)
369
370     def recompute_playlist(self):
371         current_track = self.playlist[self.playhead]
372         logger.debug('Recomputing playlist at %s, from %s to %s',
373                 current_track.title,
374                 self.current_track_start_datetime + current_track.duration,
375                 self.slot.end_datetime)
376         playlist = self.get_playlist(self.slot,
377                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
378         if playlist:
379             self.playlist[self.playhead + 1:] = playlist
380
381     def get_current_diffusion(self):
382         now = datetime.datetime.now()
383         diffusion = ScheduledDiffusion.objects.filter(
384                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
385                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
386         occurence = RecurringStreamOccurence.objects.filter(
387                 datetime__gt=now - datetime.timedelta(days=1),
388                 datetime__lt=now).order_by('datetime').last()
389         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
390                 datetime__gt=now - datetime.timedelta(days=1),
391                 datetime__lt=now).order_by('datetime').last()
392         # note it shouldn't be possible to have both diffusion and occurences
393         # running at the moment.
394         if occurence and occurence.end_datetime > now:
395             return occurence
396         if diffusion and diffusion.end_datetime > now:
397             return diffusion
398         if directory_occurence and directory_occurence.end_datetime > now:
399             return directory_occurence
400         return None
401
402     def get_next_diffusion(self, before_datetime):
403         now = datetime.datetime.now()
404         diffusion = ScheduledDiffusion.objects.filter(
405                 diffusion__datetime__gt=now,
406                 diffusion__datetime__lt=before_datetime,
407                 ).order_by('diffusion__datetime').first()
408         occurence = RecurringStreamOccurence.objects.filter(
409                 datetime__gt=now,
410                 datetime__lt=before_datetime,
411                 ).order_by('datetime').first()
412         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
413                 datetime__gt=now,
414                 datetime__lt=before_datetime,
415                 ).order_by('datetime').first()
416         if diffusion and occurence:
417             return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
418         if diffusion:
419             return diffusion
420         if occurence:
421             return occurence
422         if directory_occurence:
423             return directory_occurence
424         return None
425
426     def recompute_slots(self):
427         now = datetime.datetime.now()
428         diffusion = self.get_current_diffusion()
429         if diffusion:
430             self.slot = diffusion
431         else:
432             nonstops = list(Nonstop.objects.all().order_by('start'))
433             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
434             try:
435                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
436             except IndexError:
437                 self.slot = nonstops[0]
438             try:
439                 next_slot = nonstops[nonstops.index(self.slot) + 1]
440             except IndexError:
441                 next_slot = nonstops[0]
442             self.slot.datetime = now.replace(
443                     hour=self.slot.start.hour,
444                     minute=self.slot.start.minute)
445             self.slot.end_datetime = now.replace(
446                     hour=next_slot.start.hour,
447                     minute=next_slot.start.minute,
448                     second=0,
449                     microsecond=0)
450             if self.slot.end_datetime < self.slot.datetime:
451                 self.slot.end_datetime += datetime.timedelta(days=1)
452
453             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
454             if diffusion:
455                 self.slot.end_datetime = diffusion.datetime
456
457     async def recompute_slots_loop(self):
458         now = datetime.datetime.now()
459         sleep = (60 - now.second) % 10  # adjust to awake at :00
460         while not self.quit:
461             await asyncio.sleep(sleep)
462             sleep = 10  # next cycles every 10 seconds
463             current_slot = self.slot
464             self.recompute_slots()
465             expected_slot = self.slot
466             if current_slot != expected_slot:
467                 now = datetime.datetime.now()
468                 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
469                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
470                     # ask for a softstop, i.e. finish the track then switch.
471                     self.softstop = True
472                 elif isinstance(current_slot, Nonstop):
473                     # interrupt nonstop
474                     logger.info('Interrupting nonstop')
475                     self.play_task.cancel()
476                 elif current_slot.is_stream():
477                     # it should have been stopped by timeout set on player but
478                     # maybe the episode duration has been shortened after its
479                     # start.
480                     logger.info('Interrupting stream')
481                     self.play_task.cancel()
482             elif current_slot.end_datetime > expected_slot.end_datetime:
483                 now = datetime.datetime.now()
484                 logger.debug('Change in end time, from %s to %s',
485                         current_slot.end_datetime,
486                         expected_slot.end_datetime)
487                 if isinstance(current_slot, Nonstop) and (
488                         expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5)):
489                     # more than 5 minutes left, recompute playlist
490                     self.recompute_playlist()
491
492     async def handle_connection(self, reader, writer):
493         writer.write(b'Watusi!\n')
494         writer.write(b'Known commands: status, softquit, hardquit\n')
495         writer.write(b'(dot on empty line to stop connection)\n')
496         await writer.drain()
497         end = False
498         while not end:
499             data = await reader.read(100)
500             try:
501                 message = data.decode().strip()
502             except UnicodeDecodeError:
503                 logger.debug('Server, invalid message %r', message)
504                 if not data:
505                     end = True
506                 continue
507             logger.debug('Server, message %r', message)
508             if message == 'status':
509                 response = {'slot': str(self.slot)}
510                 if isinstance(self.slot, Nonstop):
511                     try:
512                         track = self.playlist[self.playhead]
513                     except IndexError:
514                         pass
515                     else:
516                         response['track'] = {}
517                         response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
518                         response['track']['title'] = track.title
519                         response['track']['artist'] = track.artist.name if track.artist_id else ''
520                         response['track']['duration'] = track.duration.total_seconds()
521                         response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
522                         response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
523                 next_diffusion = self.get_next_diffusion(
524                         before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
525                 if next_diffusion:
526                     response['next_diffusion'] = {
527                         'label': str(next_diffusion),
528                         'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
529                     }
530                     if isinstance(next_diffusion, ScheduledDiffusion):
531                         response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
532                         response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
533             elif message == '.':
534                 end = True
535                 response = {'ack': True}
536             elif message == 'softquit':
537                 self.quit = True
538                 end = True
539                 response = {'ack': True}
540             elif message == 'hardquit':
541                 self.quit = True
542                 end = True
543                 response = {'ack': True}
544                 if self.player and self.player.returncode is None:  # not finished
545                     self.player.kill()
546             else:
547                 response = {'err': 1, 'msg': 'unknown command: %r' % message}
548             writer.write(json.dumps(response).encode('utf-8') + b'\n')
549             try:
550                 await writer.drain()
551             except ConnectionResetError:
552                 break
553         writer.close()
554
555     def sigterm_handler(self):
556         logger.info('Got SIGTERM')
557         self.quit = True
558         self.play_task.cancel()
559
560     async def main(self):
561         self.player = None
562         loop = asyncio.get_running_loop()
563         loop.add_signal_handler(
564                 signal.SIGTERM,
565                 self.sigterm_handler)
566         self.recompute_slots()
567         server = await asyncio.start_server(
568                 self.handle_connection,
569                 app_settings.SERVER_BIND_IFACE,
570                 app_settings.SERVER_BIND_PORT)
571         async with server:
572             asyncio.create_task(server.serve_forever())
573
574             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
575             while not self.quit:
576                 now = datetime.datetime.now()
577                 duration = (self.slot.end_datetime - now).seconds
578                 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
579                 if duration < 2:
580                     # next slot is very close, wait for it
581                     await asyncio.sleep(duration)
582                     self.recompute_slots()
583                 self.play_task = asyncio.create_task(self.play(self.slot))
584                 try:
585                     await self.play_task
586                     self.recompute_slots()
587                 except asyncio.CancelledError:
588                     logger.debug('Player cancelled exception')
589                     if self.player and self.player.returncode is None:  # not finished
590                         self.player.kill()
591                 except KeyboardInterrupt:
592                     self.quit = True
593                     break