]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
768e455d39ff6c02c494c1c17cc07d2a920c2532
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import json
4 import logging
5 import random
6 import signal
7 import sys
8 import time
9
10 import requests
11
12 from django.conf import settings
13 from django.core.management.base import BaseCommand
14
15 from emissions.models import Nonstop
16 from nonstop.models import Track, Jingle, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
17 from nonstop.app_settings import app_settings
18
19
20 logger = logging.getLogger('stamina')
21
22
23 class Command(BaseCommand):
24     requires_system_checks = False
25
26     last_jingle_datetime = None
27     quit = False
28
29     def handle(self, verbosity, **kwargs):
30         alert_index = 0
31         latest_alert_timestamp = 0
32         latest_exception_timestamp = 0
33
34         def exception_alert_thresholds():
35             yield 0
36             duration = 3
37             while duration < 3600:
38                 yield duration
39                 duration *= 5
40             duration = 3600
41             while True:
42                 yield duration
43                 duration += 3600
44
45         while True:
46             try:
47                 asyncio.run(self.main(), debug=settings.DEBUG)
48             except KeyboardInterrupt:
49                 break
50             except Exception:
51                 timestamp = time.time()
52                 if (timestamp - latest_exception_timestamp) > 300:
53                     # if latest exception was a "long" time ago, assume
54                     # things went smooth for a while and reset things
55                     alert_index = 0
56                     latest_alert_timestamp = 0
57                     latest_exception_timestamp = 0
58
59                 alert_threshold = 0
60                 for i, threshold in enumerate(exception_alert_thresholds()):
61                     if i == alert_index:
62                         alert_threshold = threshold
63                         break
64
65                 if (timestamp - latest_alert_timestamp) > alert_threshold:
66                     logger.exception('General exception (alert index: %s)', alert_index)
67                     latest_alert_timestamp = timestamp
68                     alert_index += 1
69
70                 time.sleep(2)  # retry after a bit
71                 latest_exception_timestamp = timestamp
72                 continue
73             break
74
75     def get_playlist(self, zone, start_datetime, end_datetime):
76         current_datetime = start_datetime
77         if self.last_jingle_datetime is None:
78             self.last_jingle_datetime = current_datetime
79         # Define a max duration (1 hour), if it is reached, and far enough
80         # from end_datetime (30 minutes), return the playlist as is, not aligned
81         # on end time, so a new playlist gets computed once it's over.
82         # This avoids misalignments due to track durations not matching exactly
83         # or additional delays caused by the player program.
84         max_duration = datetime.timedelta(hours=1)
85         max_duration_leftover = datetime.timedelta(minutes=30)
86         playlist = []
87         adjustment_counter = 0
88         try:
89             jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
90         except AttributeError:
91             jingles = []
92
93         recent_tracks_id = [x.track_id for x in
94                 SomaLogLine.objects.exclude(on_air=False).filter(
95                     track__isnull=False,
96                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
97         t0 = datetime.datetime.now()
98         allow_overflow = False
99         while current_datetime < end_datetime:
100             if (current_datetime - start_datetime) > max_duration and (
101                     (end_datetime - current_datetime) > max_duration_leftover):
102                 break
103
104             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
105                 # jingle time, every ~20 minutes
106                 playlist.append(random.choice(jingles))
107                 self.last_jingle_datetime = current_datetime
108                 current_datetime = start_datetime + sum(
109                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
110
111             zone_ids = [zone.id]
112             extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
113             if extra_zones:
114                 zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
115             remaining_time = (end_datetime - current_datetime)
116             track = Track.objects.filter(
117                     nonstop_zones__in=zone_ids,
118                     duration__isnull=False).exclude(
119                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
120                     ).order_by('?').first()
121             if track is None:
122                 # no track, reduce recent tracks exclusion
123                 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
124                 continue
125             playlist.append(track)
126             current_datetime = start_datetime + sum(
127                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
128             if current_datetime > end_datetime and not allow_overflow:
129                 # last track overshot
130                 # 1st strategy: remove last track and try to get a track with
131                 # exact remaining time
132                 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
133                 playlist = playlist[:-1]
134                 track = Track.objects.filter(
135                         nonstop_zones=zone,
136                         duration__gte=remaining_time,
137                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
138                         ).exclude(
139                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
140                         ).order_by('?').first()
141                 if track:
142                     # found a track
143                     playlist.append(track)
144                 else:
145                     # fallback strategy: didn't find track of expected duration,
146                     # reduce playlist further
147                     adjustment_counter += 1
148                     playlist = playlist[:-1]
149                     if len(playlist) == 0 or adjustment_counter > 5:
150                         # a dedicated sound that ended a bit too early,
151                         # or too many failures to get an appropriate file,
152                         # allow whatever comes.
153                         allow_overflow = True
154                         logger.debug('Allowing overflows')
155
156                 current_datetime = start_datetime + sum(
157                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
158
159         logger.info('Computed playlist for "%s" (computation time: %ss)',
160                 zone, (datetime.datetime.now() - t0))
161         current_datetime = start_datetime
162         for track in playlist:
163             logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
164             current_datetime += track.duration
165         logger.debug('- end: %s', current_datetime)
166         return playlist
167
168     def is_nonstop_on_air(self):
169         # check if nonstop system is currently on air
170         if app_settings.ON_AIR_SWITCH_URL is None:
171             return None
172         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
173         if not switch_response.ok:
174             return None
175         try:
176             status = switch_response.json()
177         except ValueError:
178             return None
179         if status.get('active') == 0:
180             return True
181         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
182             return True
183         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
184             return True
185         return False
186
187     async def record_nonstop_line(self, track, now):
188         log_line = SomaLogLine()
189         log_line.play_timestamp = now
190         log_line.track = track
191         log_line.filepath = track.nonstopfile_set.first()
192         log_line.on_air = self.is_nonstop_on_air()
193         log_line.save()
194
195     async def player_process(self, item, timeout=None):
196         cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
197         if hasattr(item, 'is_stream') and item.is_stream():
198             cmd.append(item.stream.url)
199             logger.info('Play stream: %s', item.stream.url)
200         else:
201             cmd.append(item.file_path())
202             logger.info('Play file: %s', item.file_path())
203         if app_settings.DEBUG_WITH_SLEEPS:
204             # replace command by a sleep call, for silent debugging
205             if hasattr(item, 'is_stream') and item.is_stream():
206                 cmd = 'sleep 86400 # %s' % item.stream.url
207             elif isinstance(item.duration, datetime.timedelta):
208                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
209             elif isinstance(item.duration, int):
210                 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
211         logger.debug('cmd %r', cmd)
212         if isinstance(cmd, str):
213             self.player = await asyncio.create_subprocess_shell(
214                     cmd,
215                     stdout=asyncio.subprocess.PIPE,
216                     stderr=asyncio.subprocess.PIPE)
217         else:
218             self.player = await asyncio.create_subprocess_exec(
219                     *cmd,
220                     stdout=asyncio.subprocess.PIPE,
221                     stderr=asyncio.subprocess.PIPE)
222         if timeout is None:
223             await self.player.communicate()
224         else:
225             try:
226                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
227             except asyncio.TimeoutError:
228                 self.player.kill()
229         self.player = None
230
231     async def play(self, slot):
232         now = datetime.datetime.now()
233         if isinstance(slot, Nonstop):
234             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
235             self.playhead = 0
236             self.softstop = False
237             while not self.quit:
238                 now = datetime.datetime.now()
239                 try:
240                     track = self.playlist[self.playhead]
241                 except IndexError:
242                     break
243                 self.current_track_start_datetime = now
244                 if isinstance(track, Jingle):
245                     logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
246                 else:
247                     logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
248                 record_task = None
249                 if isinstance(track, Track):  # not jingles
250                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
251                 await self.player_process(track)
252                 if record_task:
253                     await record_task
254                 if self.softstop:
255                     # track was left to finish, but now the playlist should stop.
256                     break
257                 self.playhead += 1
258         elif slot.is_stream():
259             logger.info('Stream: %s', slot.stream)
260             if slot.jingle_id:
261                 await self.player_process(slot.jingle, timeout=60)
262             logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
263             short_interruption_counter = 0
264             has_played = False
265             while True:
266                 player_start_time = datetime.datetime.now()
267                 await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
268                 now = datetime.datetime.now()
269                 if (slot.end_datetime - now).total_seconds() < 2:
270                     # it went well, stop
271                     break
272                 # stream got interrupted
273                 if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
274                     # and was up for less than 15 seconds.
275                     if not has_played:
276                         # never up before, probably not even started
277                         if isinstance(slot, RecurringStreamOccurence):
278                             # no mercy for recurring stream, remove occurence
279                             logger.info('Missing stream for %s, removing', slot)
280                             slot.delete()
281                         elif slot.auto_delayed is True:
282                             # was already delayed and is still not up, remove.
283                             logger.info('Still missing stream for %s, removing', slot)
284                             slot.delete()
285                         else:
286                             # push back start datetime for 5 minutes, and get
287                             # back to nonstop music in the meantime
288                             logger.info('Pushing starting time of %s', slot.diffusion.episode)
289                             slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
290                             slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
291                             if slot.diffusion.episode.duration <= 5:
292                                 slot.diffusion.episode.duration = 0
293                             slot.auto_delayed = True
294                             slot.diffusion.save()
295                             slot.diffusion.episode.save()
296                             slot.save()
297                         break
298                     short_interruption_counter += 1
299                     # wait a bit
300                     await asyncio.sleep(short_interruption_counter)
301                 else:
302                     # mark stream as ok at least one, and reset short
303                     # interruption counter
304                     has_played = True
305                     short_interruption_counter = 0
306                     logger.debug('Stream error for %s', slot)
307
308                 if short_interruption_counter > 5:
309                     # many short interruptions
310                     logger.info('Too many stream errors for %s, removing', slot)
311                     slot.delete()
312                     break
313         else:
314             if hasattr(slot, 'episode'):
315                 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
316             else:
317                 logger.info('Random: %s', slot)
318             if slot.jingle_id:
319                 await self.player_process(slot.jingle, timeout=60)
320             await self.player_process(slot)
321
322     def recompute_playlist(self):
323         current_track = self.playlist[self.playhead]
324         logger.debug('Recomputing playlist at %s, from %s to %s',
325                 current_track.title,
326                 self.current_track_start_datetime + current_track.duration,
327                 self.slot.end_datetime)
328         playlist = self.get_playlist(self.slot,
329                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
330         if playlist:
331             self.playlist[self.playhead + 1:] = playlist
332
333     def get_current_diffusion(self):
334         now = datetime.datetime.now()
335         diffusion = ScheduledDiffusion.objects.filter(
336                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
337                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
338         occurence = RecurringStreamOccurence.objects.filter(
339                 datetime__gt=now - datetime.timedelta(days=1),
340                 datetime__lt=now).order_by('datetime').last()
341         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
342                 datetime__gt=now - datetime.timedelta(days=1),
343                 datetime__lt=now).order_by('datetime').last()
344         # note it shouldn't be possible to have both diffusion and occurences
345         # running at the moment.
346         if occurence and occurence.end_datetime > now:
347             return occurence
348         if diffusion and diffusion.end_datetime > now:
349             return diffusion
350         if directory_occurence and directory_occurence.end_datetime > now:
351             return directory_occurence
352         return None
353
354     def get_next_diffusion(self, before_datetime):
355         now = datetime.datetime.now()
356         diffusion = ScheduledDiffusion.objects.filter(
357                 diffusion__datetime__gt=now,
358                 diffusion__datetime__lt=before_datetime,
359                 ).order_by('diffusion__datetime').first()
360         occurence = RecurringStreamOccurence.objects.filter(
361                 datetime__gt=now,
362                 datetime__lt=before_datetime,
363                 ).order_by('datetime').first()
364         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
365                 datetime__gt=now,
366                 datetime__lt=before_datetime,
367                 ).order_by('datetime').first()
368         if diffusion and occurence:
369             return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
370         if diffusion:
371             return diffusion
372         if occurence:
373             return occurence
374         if directory_occurence:
375             return directory_occurence
376         return None
377
378     def recompute_slots(self):
379         now = datetime.datetime.now()
380         diffusion = self.get_current_diffusion()
381         if diffusion:
382             self.slot = diffusion
383         else:
384             nonstops = list(Nonstop.objects.all().order_by('start'))
385             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
386             try:
387                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
388             except IndexError:
389                 self.slot = nonstops[0]
390             try:
391                 next_slot = nonstops[nonstops.index(self.slot) + 1]
392             except IndexError:
393                 next_slot = nonstops[0]
394             self.slot.datetime = now.replace(
395                     hour=self.slot.start.hour,
396                     minute=self.slot.start.minute)
397             self.slot.end_datetime = now.replace(
398                     hour=next_slot.start.hour,
399                     minute=next_slot.start.minute,
400                     second=0,
401                     microsecond=0)
402             if self.slot.end_datetime < self.slot.datetime:
403                 self.slot.end_datetime += datetime.timedelta(days=1)
404
405             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
406             if diffusion:
407                 self.slot.end_datetime = diffusion.datetime
408
409     async def recompute_slots_loop(self):
410         now = datetime.datetime.now()
411         sleep = (60 - now.second) % 10  # adjust to awake at :00
412         while not self.quit:
413             await asyncio.sleep(sleep)
414             sleep = 10  # next cycles every 10 seconds
415             current_slot = self.slot
416             self.recompute_slots()
417             expected_slot = self.slot
418             if current_slot != expected_slot:
419                 now = datetime.datetime.now()
420                 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
421                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
422                     # ask for a softstop, i.e. finish the track then switch.
423                     self.softstop = True
424                 elif isinstance(current_slot, Nonstop):
425                     # interrupt nonstop
426                     logger.info('Interrupting nonstop')
427                     self.play_task.cancel()
428             elif current_slot.end_datetime > expected_slot.end_datetime:
429                 now = datetime.datetime.now()
430                 logger.debug('Change in end time, from %s to %s',
431                         current_slot.end_datetime,
432                         expected_slot.end_datetime)
433                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
434                     # more than 5 minutes left, recompute playlist
435                     self.recompute_playlist()
436
437     async def handle_connection(self, reader, writer):
438         writer.write(b'Watusi!\n')
439         writer.write(b'Known commands: status, softquit, hardquit\n')
440         writer.write(b'(dot on empty line to stop connection)\n')
441         await writer.drain()
442         end = False
443         while not end:
444             data = await reader.read(100)
445             try:
446                 message = data.decode().strip()
447             except UnicodeDecodeError:
448                 logger.debug('Server, invalid message %r', message)
449                 if not data:
450                     end = True
451                 continue
452             logger.debug('Server, message %r', message)
453             if message == 'status':
454                 response = {'slot': str(self.slot)}
455                 if isinstance(self.slot, Nonstop):
456                     try:
457                         track = self.playlist[self.playhead]
458                     except IndexError:
459                         pass
460                     else:
461                         response['track'] = {}
462                         response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
463                         response['track']['title'] = track.title
464                         response['track']['artist'] = track.artist.name if track.artist_id else ''
465                         response['track']['duration'] = track.duration.total_seconds()
466                         response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
467                         response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
468                 next_diffusion = self.get_next_diffusion(
469                         before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
470                 if next_diffusion:
471                     response['next_diffusion'] = {
472                         'label': str(next_diffusion),
473                         'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
474                     }
475                     if isinstance(next_diffusion, ScheduledDiffusion):
476                         response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
477                         response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
478             elif message == '.':
479                 end = True
480                 response = {'ack': True}
481             elif message == 'softquit':
482                 self.quit = True
483                 end = True
484                 response = {'ack': True}
485             elif message == 'hardquit':
486                 self.quit = True
487                 end = True
488                 response = {'ack': True}
489                 if self.player and self.player.returncode is None:  # not finished
490                     self.player.kill()
491             else:
492                 response = {'err': 1, 'msg': 'unknown command: %r' % message}
493             writer.write(json.dumps(response).encode('utf-8') + b'\n')
494             try:
495                 await writer.drain()
496             except ConnectionResetError:
497                 break
498         writer.close()
499
500     def sigterm_handler(self):
501         logger.info('Got SIGTERM')
502         self.quit = True
503         self.play_task.cancel()
504
505     async def main(self):
506         loop = asyncio.get_running_loop()
507         loop.add_signal_handler(
508                 signal.SIGTERM,
509                 self.sigterm_handler)
510         self.recompute_slots()
511         server = await asyncio.start_server(
512                 self.handle_connection,
513                 app_settings.SERVER_BIND_IFACE,
514                 app_settings.SERVER_BIND_PORT)
515         async with server:
516             asyncio.create_task(server.serve_forever())
517
518             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
519             while not self.quit:
520                 now = datetime.datetime.now()
521                 duration = (self.slot.end_datetime - now).seconds
522                 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
523                 if duration < 2:
524                     # next slot is very close, wait for it
525                     await asyncio.sleep(duration)
526                     self.recompute_slots()
527                 self.play_task = asyncio.create_task(self.play(self.slot))
528                 try:
529                     await self.play_task
530                     self.recompute_slots()
531                 except asyncio.CancelledError:
532                     logger.debug('Player cancelled exception')
533                     if self.player and self.player.returncode is None:  # not finished
534                         self.player.kill()
535                 except KeyboardInterrupt:
536                     self.quit = True
537                     break