]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
e9eeb8517a61beec3afe34c9bce958d8efd24635
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import json
4 import logging
5 import random
6 import signal
7 import sys
8 import time
9
10 import requests
11
12 from django.conf import settings
13 from django.core.management.base import BaseCommand
14
15 from emissions.models import Nonstop
16 from nonstop.models import (NonstopZoneSettings, Track, Jingle, SomaLogLine,
17         ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence)
18 from nonstop.app_settings import app_settings
19 from nonstop.utils import Tracklist
20
21
22 logger = logging.getLogger('stamina')
23
24
25 class Command(BaseCommand):
26     requires_system_checks = False
27
28     last_jingle_datetime = None
29     quit = False
30
31     def handle(self, verbosity, **kwargs):
32         alert_index = 0
33         latest_alert_timestamp = 0
34         latest_exception_timestamp = 0
35
36         def exception_alert_thresholds():
37             yield 0
38             duration = 3
39             while duration < 3600:
40                 yield duration
41                 duration *= 5
42             duration = 3600
43             while True:
44                 yield duration
45                 duration += 3600
46
47         while True:
48             try:
49                 asyncio.run(self.main(), debug=settings.DEBUG)
50             except KeyboardInterrupt:
51                 break
52             except Exception:
53                 timestamp = time.time()
54                 if (timestamp - latest_exception_timestamp) > 300:
55                     # if latest exception was a "long" time ago, assume
56                     # things went smooth for a while and reset things
57                     alert_index = 0
58                     latest_alert_timestamp = 0
59                     latest_exception_timestamp = 0
60
61                 alert_threshold = 0
62                 for i, threshold in enumerate(exception_alert_thresholds()):
63                     if i == alert_index:
64                         alert_threshold = threshold
65                         break
66
67                 if (timestamp - latest_alert_timestamp) > alert_threshold:
68                     logger.exception('General exception (alert index: %s)', alert_index)
69                     latest_alert_timestamp = timestamp
70                     alert_index += 1
71
72                 time.sleep(2)  # retry after a bit
73                 latest_exception_timestamp = timestamp
74                 continue
75             break
76
77     def get_playlist(self, zone, start_datetime, end_datetime):
78         current_datetime = start_datetime
79         if self.last_jingle_datetime is None:
80             self.last_jingle_datetime = current_datetime
81         # Define a max duration (1 hour), if it is reached, and far enough
82         # from end_datetime (30 minutes), return the playlist as is, not aligned
83         # on end time, so a new playlist gets computed once it's over.
84         # This avoids misalignments due to track durations not matching exactly
85         # or additional delays caused by the player program.
86         max_duration = datetime.timedelta(hours=1)
87         max_duration_leftover = datetime.timedelta(minutes=30)
88         playlist = []
89         adjustment_counter = 0
90         try:
91             zone_settings = zone.nonstopzonesettings_set.first()
92             jingles = list(zone_settings.jingles.all())
93         except AttributeError:
94             zone_settings = NonstopZoneSettings()
95             jingles = []
96
97         zone_ids = [zone.id]
98         extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
99         if extra_zones:
100             zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
101
102         recent_tracks_id = [x.track_id for x in
103                 SomaLogLine.objects.exclude(on_air=False).filter(
104                     track__isnull=False,
105                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
106
107         tracklist = Tracklist(zone_settings, zone_ids, recent_tracks_id)
108         random_tracks_iterator = tracklist.get_random_tracks()
109         t0 = datetime.datetime.now()
110         allow_overflow = False
111         while current_datetime < end_datetime:
112             if (current_datetime - start_datetime) > max_duration and (
113                     (end_datetime - current_datetime) > max_duration_leftover):
114                 break
115
116             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
117                 # jingle time, every ~20 minutes
118                 tracklist.playlist.append(random.choice(jingles))
119                 self.last_jingle_datetime = current_datetime
120                 current_datetime = start_datetime + tracklist.get_duration()
121             remaining_time = (end_datetime - current_datetime)
122
123             track = next(random_tracks_iterator)
124             tracklist.append(track)
125             current_datetime = start_datetime + tracklist.get_duration()
126             if current_datetime > end_datetime and not allow_overflow:
127                 # last track overshot
128                 # 1st strategy: remove last track and try to get a track with
129                 # exact remaining time
130                 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
131                 tracklist.pop()
132                 track = Track.objects.filter(
133                         nonstop_zones__in=zone_ids,
134                         duration__gte=remaining_time,
135                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
136                         ).exclude(id__in=tracklist.get_recent_track_ids()).order_by('?').first()
137                 if track:
138                     # found a track
139                     tracklist.append(track)
140                 else:
141                     # fallback strategy: didn't find track of expected duration,
142                     # reduce playlist further
143                     adjustment_counter += 1
144                     if tracklist.pop() is None or adjustment_counter > 5:
145                         # a dedicated sound that ended a bit too early,
146                         # or too many failures to get an appropriate file,
147                         # allow whatever comes.
148                         allow_overflow = True
149                         logger.debug('Allowing overflows')
150
151                 current_datetime = start_datetime + tracklist.get_duration()
152
153         logger.info('Computed playlist for "%s" (computation time: %ss)',
154                 zone, (datetime.datetime.now() - t0))
155         current_datetime = start_datetime
156         for track in tracklist.playlist:
157             logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
158             current_datetime += track.duration
159         logger.debug('- end: %s', current_datetime)
160         return tracklist.playlist
161
162     def is_nonstop_on_air(self):
163         # check if nonstop system is currently on air
164         if app_settings.ON_AIR_SWITCH_URL is None:
165             return None
166         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
167         if not switch_response.ok:
168             return None
169         try:
170             status = switch_response.json()
171         except ValueError:
172             return None
173         if status.get('active') == 0:
174             return True
175         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
176             return True
177         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
178             return True
179         return False
180
181     async def record_nonstop_line(self, track, now):
182         log_line = SomaLogLine()
183         log_line.play_timestamp = now
184         log_line.track = track
185         log_line.filepath = track.nonstopfile_set.first()
186         log_line.on_air = self.is_nonstop_on_air()
187         log_line.save()
188
189     async def player_process(self, item, timeout=None):
190         cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
191         if hasattr(item, 'is_stream') and item.is_stream():
192             cmd.append(item.stream.url)
193             logger.info('Play stream: %s', item.stream.url)
194         else:
195             cmd.append(item.file_path())
196             logger.info('Play file: %s', item.file_path())
197         if app_settings.DEBUG_WITH_SLEEPS:
198             # replace command by a sleep call, for silent debugging
199             if hasattr(item, 'is_stream') and item.is_stream():
200                 cmd = 'sleep 86400 # %s' % item.stream.url
201             elif isinstance(item.duration, datetime.timedelta):
202                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
203             elif isinstance(item.duration, int):
204                 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
205         logger.debug('cmd %r', cmd)
206         if isinstance(cmd, str):
207             self.player = await asyncio.create_subprocess_shell(
208                     cmd,
209                     stdout=asyncio.subprocess.PIPE,
210                     stderr=asyncio.subprocess.PIPE)
211         else:
212             self.player = await asyncio.create_subprocess_exec(
213                     *cmd,
214                     stdout=asyncio.subprocess.PIPE,
215                     stderr=asyncio.subprocess.PIPE)
216         if timeout is None:
217             await self.player.communicate()
218         else:
219             try:
220                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
221             except asyncio.TimeoutError:
222                 self.player.kill()
223         self.player = None
224
225     async def play(self, slot):
226         now = datetime.datetime.now()
227         if isinstance(slot, Nonstop):
228             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
229             self.playhead = 0
230             self.softstop = False
231             while not self.quit:
232                 now = datetime.datetime.now()
233                 try:
234                     track = self.playlist[self.playhead]
235                 except IndexError:
236                     break
237                 self.current_track_start_datetime = now
238                 if isinstance(track, Jingle):
239                     logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
240                 else:
241                     logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
242                 record_task = None
243                 if isinstance(track, Track):  # not jingles
244                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
245                 await self.player_process(track)
246                 if record_task:
247                     await record_task
248                 if self.softstop:
249                     # track was left to finish, but now the playlist should stop.
250                     break
251                 self.playhead += 1
252         elif slot.is_stream():
253             logger.info('Stream: %s', slot.stream)
254             if slot.jingle_id:
255                 await self.player_process(slot.jingle, timeout=60)
256             logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
257             short_interruption_counter = 0
258             has_played = False
259             while True:
260                 player_start_time = datetime.datetime.now()
261                 await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
262                 now = datetime.datetime.now()
263                 if (slot.end_datetime - now).total_seconds() < 2:
264                     # it went well, stop
265                     break
266                 # stream got interrupted
267                 if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
268                     # and was up for less than 15 seconds.
269                     if not has_played:
270                         # never up before, probably not even started
271                         if isinstance(slot, RecurringStreamOccurence):
272                             # no mercy for recurring stream, remove occurence
273                             logger.info('Missing stream for %s, removing', slot)
274                             slot.delete()
275                         elif slot.auto_delayed is True:
276                             # was already delayed and is still not up, remove.
277                             logger.info('Still missing stream for %s, removing', slot)
278                             slot.delete()
279                         else:
280                             # push back start datetime for 5 minutes, and get
281                             # back to nonstop music in the meantime
282                             logger.info('Pushing starting time of %s', slot.diffusion.episode)
283                             slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
284                             slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
285                             if slot.diffusion.episode.duration <= 5:
286                                 slot.diffusion.episode.duration = 0
287                             slot.auto_delayed = True
288                             slot.diffusion.save()
289                             slot.diffusion.episode.save()
290                             slot.save()
291                         break
292                     short_interruption_counter += 1
293                     # wait a bit
294                     await asyncio.sleep(short_interruption_counter)
295                 else:
296                     # mark stream as ok at least one, and reset short
297                     # interruption counter
298                     has_played = True
299                     short_interruption_counter = 0
300                     logger.debug('Stream error for %s', slot)
301
302                 if short_interruption_counter > 5:
303                     # many short interruptions
304                     logger.info('Too many stream errors for %s, removing', slot)
305                     slot.delete()
306                     break
307         else:
308             if hasattr(slot, 'episode'):
309                 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
310             else:
311                 logger.info('Random: %s', slot)
312             if slot.jingle_id:
313                 await self.player_process(slot.jingle, timeout=60)
314             await self.player_process(slot)
315
316     def recompute_playlist(self):
317         current_track = self.playlist[self.playhead]
318         logger.debug('Recomputing playlist at %s, from %s to %s',
319                 current_track.title,
320                 self.current_track_start_datetime + current_track.duration,
321                 self.slot.end_datetime)
322         playlist = self.get_playlist(self.slot,
323                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
324         if playlist:
325             self.playlist[self.playhead + 1:] = playlist
326
327     def get_current_diffusion(self):
328         now = datetime.datetime.now()
329         diffusion = ScheduledDiffusion.objects.filter(
330                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
331                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
332         occurence = RecurringStreamOccurence.objects.filter(
333                 datetime__gt=now - datetime.timedelta(days=1),
334                 datetime__lt=now).order_by('datetime').last()
335         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
336                 datetime__gt=now - datetime.timedelta(days=1),
337                 datetime__lt=now).order_by('datetime').last()
338         # note it shouldn't be possible to have both diffusion and occurences
339         # running at the moment.
340         if occurence and occurence.end_datetime > now:
341             return occurence
342         if diffusion and diffusion.end_datetime > now:
343             return diffusion
344         if directory_occurence and directory_occurence.end_datetime > now:
345             return directory_occurence
346         return None
347
348     def get_next_diffusion(self, before_datetime):
349         now = datetime.datetime.now()
350         diffusion = ScheduledDiffusion.objects.filter(
351                 diffusion__datetime__gt=now,
352                 diffusion__datetime__lt=before_datetime,
353                 ).order_by('diffusion__datetime').first()
354         occurence = RecurringStreamOccurence.objects.filter(
355                 datetime__gt=now,
356                 datetime__lt=before_datetime,
357                 ).order_by('datetime').first()
358         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
359                 datetime__gt=now,
360                 datetime__lt=before_datetime,
361                 ).order_by('datetime').first()
362         if diffusion and occurence:
363             return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
364         if diffusion:
365             return diffusion
366         if occurence:
367             return occurence
368         if directory_occurence:
369             return directory_occurence
370         return None
371
372     def recompute_slots(self):
373         now = datetime.datetime.now()
374         diffusion = self.get_current_diffusion()
375         if diffusion:
376             self.slot = diffusion
377         else:
378             nonstops = list(Nonstop.objects.all().order_by('start'))
379             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
380             try:
381                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
382             except IndexError:
383                 self.slot = nonstops[0]
384             try:
385                 next_slot = nonstops[nonstops.index(self.slot) + 1]
386             except IndexError:
387                 next_slot = nonstops[0]
388             self.slot.datetime = now.replace(
389                     hour=self.slot.start.hour,
390                     minute=self.slot.start.minute)
391             self.slot.end_datetime = now.replace(
392                     hour=next_slot.start.hour,
393                     minute=next_slot.start.minute,
394                     second=0,
395                     microsecond=0)
396             if self.slot.end_datetime < self.slot.datetime:
397                 self.slot.end_datetime += datetime.timedelta(days=1)
398
399             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
400             if diffusion:
401                 self.slot.end_datetime = diffusion.datetime
402
403     async def recompute_slots_loop(self):
404         now = datetime.datetime.now()
405         sleep = (60 - now.second) % 10  # adjust to awake at :00
406         while not self.quit:
407             await asyncio.sleep(sleep)
408             sleep = 10  # next cycles every 10 seconds
409             current_slot = self.slot
410             self.recompute_slots()
411             expected_slot = self.slot
412             if current_slot != expected_slot:
413                 now = datetime.datetime.now()
414                 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
415                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
416                     # ask for a softstop, i.e. finish the track then switch.
417                     self.softstop = True
418                 elif isinstance(current_slot, Nonstop):
419                     # interrupt nonstop
420                     logger.info('Interrupting nonstop')
421                     self.play_task.cancel()
422             elif current_slot.end_datetime > expected_slot.end_datetime:
423                 now = datetime.datetime.now()
424                 logger.debug('Change in end time, from %s to %s',
425                         current_slot.end_datetime,
426                         expected_slot.end_datetime)
427                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
428                     # more than 5 minutes left, recompute playlist
429                     self.recompute_playlist()
430
431     async def handle_connection(self, reader, writer):
432         writer.write(b'Watusi!\n')
433         writer.write(b'Known commands: status, softquit, hardquit\n')
434         writer.write(b'(dot on empty line to stop connection)\n')
435         await writer.drain()
436         end = False
437         while not end:
438             data = await reader.read(100)
439             try:
440                 message = data.decode().strip()
441             except UnicodeDecodeError:
442                 logger.debug('Server, invalid message %r', message)
443                 if not data:
444                     end = True
445                 continue
446             logger.debug('Server, message %r', message)
447             if message == 'status':
448                 response = {'slot': str(self.slot)}
449                 if isinstance(self.slot, Nonstop):
450                     try:
451                         track = self.playlist[self.playhead]
452                     except IndexError:
453                         pass
454                     else:
455                         response['track'] = {}
456                         response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
457                         response['track']['title'] = track.title
458                         response['track']['artist'] = track.artist.name if track.artist_id else ''
459                         response['track']['duration'] = track.duration.total_seconds()
460                         response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
461                         response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
462                 next_diffusion = self.get_next_diffusion(
463                         before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
464                 if next_diffusion:
465                     response['next_diffusion'] = {
466                         'label': str(next_diffusion),
467                         'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
468                     }
469                     if isinstance(next_diffusion, ScheduledDiffusion):
470                         response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
471                         response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
472             elif message == '.':
473                 end = True
474                 response = {'ack': True}
475             elif message == 'softquit':
476                 self.quit = True
477                 end = True
478                 response = {'ack': True}
479             elif message == 'hardquit':
480                 self.quit = True
481                 end = True
482                 response = {'ack': True}
483                 if self.player and self.player.returncode is None:  # not finished
484                     self.player.kill()
485             else:
486                 response = {'err': 1, 'msg': 'unknown command: %r' % message}
487             writer.write(json.dumps(response).encode('utf-8') + b'\n')
488             try:
489                 await writer.drain()
490             except ConnectionResetError:
491                 break
492         writer.close()
493
494     def sigterm_handler(self):
495         logger.info('Got SIGTERM')
496         self.quit = True
497         self.play_task.cancel()
498
499     async def main(self):
500         loop = asyncio.get_running_loop()
501         loop.add_signal_handler(
502                 signal.SIGTERM,
503                 self.sigterm_handler)
504         self.recompute_slots()
505         server = await asyncio.start_server(
506                 self.handle_connection,
507                 app_settings.SERVER_BIND_IFACE,
508                 app_settings.SERVER_BIND_PORT)
509         async with server:
510             asyncio.create_task(server.serve_forever())
511
512             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
513             while not self.quit:
514                 now = datetime.datetime.now()
515                 duration = (self.slot.end_datetime - now).seconds
516                 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
517                 if duration < 2:
518                     # next slot is very close, wait for it
519                     await asyncio.sleep(duration)
520                     self.recompute_slots()
521                 self.play_task = asyncio.create_task(self.play(self.slot))
522                 try:
523                     await self.play_task
524                     self.recompute_slots()
525                 except asyncio.CancelledError:
526                     logger.debug('Player cancelled exception')
527                     if self.player and self.player.returncode is None:  # not finished
528                         self.player.kill()
529                 except KeyboardInterrupt:
530                     self.quit = True
531                     break