]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
b60412657a3f3fdda7f4b34756262513e7cb0e82
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import json
4 import logging
5 import random
6 import signal
7 import sys
8 import time
9
10 import requests
11
12 from django.conf import settings
13 from django.core.management.base import BaseCommand
14
15 from emissions.models import Nonstop
16 from nonstop.models import Track, Jingle, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
17 from nonstop.app_settings import app_settings
18
19
20 logger = logging.getLogger('stamina')
21
22
23 class Command(BaseCommand):
24     requires_system_checks = False
25
26     last_jingle_datetime = None
27     quit = False
28
29     def handle(self, verbosity, **kwargs):
30         alert_index = 0
31         latest_alert_timestamp = 0
32         latest_exception_timestamp = 0
33
34         def exception_alert_thresholds():
35             yield 0
36             duration = 3
37             while duration < 3600:
38                 yield duration
39                 duration *= 5
40             duration = 3600
41             while True:
42                 yield duration
43                 duration += 3600
44
45         while True:
46             try:
47                 asyncio.run(self.main(), debug=settings.DEBUG)
48             except KeyboardInterrupt:
49                 break
50             except Exception:
51                 timestamp = time.time()
52                 if (timestamp - latest_exception_timestamp) > 300:
53                     # if latest exception was a "long" time ago, assume
54                     # things went smooth for a while and reset things
55                     alert_index = 0
56                     latest_alert_timestamp = 0
57                     latest_exception_timestamp = 0
58
59                 alert_threshold = 0
60                 for i, threshold in enumerate(exception_alert_thresholds()):
61                     if i == alert_index:
62                         alert_threshold = threshold
63                         break
64
65                 if (timestamp - latest_alert_timestamp) > alert_threshold:
66                     logger.exception('General exception (alert index: %s)', alert_index)
67                     latest_alert_timestamp = timestamp
68                     alert_index += 1
69
70                 time.sleep(2)  # retry after a bit
71                 latest_exception_timestamp = timestamp
72                 continue
73             break
74
75     def get_playlist(self, zone, start_datetime, end_datetime):
76         current_datetime = start_datetime
77         if self.last_jingle_datetime is None:
78             self.last_jingle_datetime = current_datetime
79         # Define a max duration (1 hour), if it is reached, and far enough
80         # from end_datetime (30 minutes), return the playlist as is, not aligned
81         # on end time, so a new playlist gets computed once it's over.
82         # This avoids misalignments due to track durations not matching exactly
83         # or additional delays caused by the player program.
84         max_duration = datetime.timedelta(hours=1)
85         max_duration_leftover = datetime.timedelta(minutes=30)
86         playlist = []
87         adjustment_counter = 0
88         try:
89             jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
90         except AttributeError:
91             jingles = []
92
93         recent_tracks_id = [x.track_id for x in
94                 SomaLogLine.objects.exclude(on_air=False).filter(
95                     track__isnull=False,
96                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
97         t0 = datetime.datetime.now()
98         allow_overflow = False
99         while current_datetime < end_datetime:
100             if (current_datetime - start_datetime) > max_duration and (
101                     (end_datetime - current_datetime) > max_duration_leftover):
102                 break
103
104             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
105                 # jingle time, every ~20 minutes
106                 playlist.append(random.choice(jingles))
107                 self.last_jingle_datetime = current_datetime
108                 current_datetime = start_datetime + sum(
109                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
110
111             remaining_time = (end_datetime - current_datetime)
112             track = Track.objects.filter(
113                     nonstop_zones=zone,
114                     duration__isnull=False).exclude(
115                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
116                     ).order_by('?').first()
117             if track is None:
118                 # no track, reduce recent tracks exclusion
119                 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
120                 continue
121             playlist.append(track)
122             current_datetime = start_datetime + sum(
123                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
124             if current_datetime > end_datetime and not allow_overflow:
125                 # last track overshot
126                 # 1st strategy: remove last track and try to get a track with
127                 # exact remaining time
128                 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
129                 playlist = playlist[:-1]
130                 track = Track.objects.filter(
131                         nonstop_zones=zone,
132                         duration__gte=remaining_time,
133                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
134                         ).exclude(
135                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
136                         ).order_by('?').first()
137                 if track:
138                     # found a track
139                     playlist.append(track)
140                 else:
141                     # fallback strategy: didn't find track of expected duration,
142                     # reduce playlist further
143                     adjustment_counter += 1
144                     playlist = playlist[:-1]
145                     if len(playlist) == 0 or adjustment_counter > 5:
146                         # a dedicated sound that ended a bit too early,
147                         # or too many failures to get an appropriate file,
148                         # allow whatever comes.
149                         allow_overflow = True
150                         logger.debug('Allowing overflows')
151
152                 current_datetime = start_datetime + sum(
153                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
154
155         logger.info('Computed playlist: (computation time: %ss)',
156                 (datetime.datetime.now() - t0))
157         current_datetime = start_datetime
158         for track in playlist:
159             logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
160             current_datetime += track.duration
161         logger.debug('- end: %s', current_datetime)
162         return playlist
163
164     def is_nonstop_on_air(self):
165         # check if nonstop system is currently on air
166         if app_settings.ON_AIR_SWITCH_URL is None:
167             return None
168         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
169         if not switch_response.ok:
170             return None
171         try:
172             status = switch_response.json()
173         except ValueError:
174             return None
175         if status.get('active') == 0:
176             return True
177         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
178             return True
179         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
180             return True
181         return False
182
183     async def record_nonstop_line(self, track, now):
184         log_line = SomaLogLine()
185         log_line.play_timestamp = now
186         log_line.track = track
187         log_line.filepath = track.nonstopfile_set.first()
188         log_line.on_air = self.is_nonstop_on_air()
189         log_line.save()
190
191     async def player_process(self, item, timeout=None):
192         cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
193         if hasattr(item, 'is_stream') and item.is_stream():
194             cmd.append(item.stream.url)
195             logger.info('Play stream: %s', item.stream.url)
196         else:
197             cmd.append(item.file_path())
198             logger.info('Play file: %s', item.file_path())
199         if app_settings.DEBUG_WITH_SLEEPS:
200             # replace command by a sleep call, for silent debugging
201             if hasattr(item, 'is_stream') and item.is_stream():
202                 cmd = 'sleep 86400 # %s' % item.stream.url
203             elif isinstance(item.duration, datetime.timedelta):
204                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
205             elif isinstance(item.duration, int):
206                 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
207         logger.debug('cmd %r', cmd)
208         if isinstance(cmd, str):
209             self.player = await asyncio.create_subprocess_shell(
210                     cmd,
211                     stdout=asyncio.subprocess.PIPE,
212                     stderr=asyncio.subprocess.PIPE)
213         else:
214             self.player = await asyncio.create_subprocess_exec(
215                     *cmd,
216                     stdout=asyncio.subprocess.PIPE,
217                     stderr=asyncio.subprocess.PIPE)
218         if timeout is None:
219             await self.player.communicate()
220         else:
221             try:
222                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
223             except asyncio.TimeoutError:
224                 self.player.kill()
225         self.player = None
226
227     async def play(self, slot):
228         now = datetime.datetime.now()
229         if isinstance(slot, Nonstop):
230             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
231             self.playhead = 0
232             self.softstop = False
233             while not self.quit:
234                 now = datetime.datetime.now()
235                 try:
236                     track = self.playlist[self.playhead]
237                 except IndexError:
238                     break
239                 self.current_track_start_datetime = now
240                 if isinstance(track, Jingle):
241                     logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
242                 else:
243                     logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
244                 record_task = None
245                 if isinstance(track, Track):  # not jingles
246                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
247                 await self.player_process(track)
248                 if record_task:
249                     await record_task
250                 if self.softstop:
251                     # track was left to finish, but now the playlist should stop.
252                     break
253                 self.playhead += 1
254         elif slot.is_stream():
255             logger.info('Stream: %s', slot.stream)
256             if slot.jingle_id:
257                 await self.player_process(slot.jingle, timeout=60)
258             logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
259             await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
260         else:
261             if hasattr(slot, 'episode'):
262                 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
263             else:
264                 logger.info('Random: %s', slot)
265             if slot.jingle_id:
266                 await self.player_process(slot.jingle, timeout=60)
267             await self.player_process(slot)
268
269     def recompute_playlist(self):
270         current_track = self.playlist[self.playhead]
271         logger.debug('Recomputing playlist at %s, from %s to %s',
272                 current_track.title,
273                 self.current_track_start_datetime + current_track.duration,
274                 self.slot.end_datetime)
275         playlist = self.get_playlist(self.slot,
276                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
277         if playlist:
278             self.playlist[self.playhead + 1:] = playlist
279
280     def get_current_diffusion(self):
281         now = datetime.datetime.now()
282         diffusion = ScheduledDiffusion.objects.filter(
283                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
284                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
285         occurence = RecurringStreamOccurence.objects.filter(
286                 datetime__gt=now - datetime.timedelta(days=1),
287                 datetime__lt=now).order_by('datetime').last()
288         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
289                 datetime__gt=now - datetime.timedelta(days=1),
290                 datetime__lt=now).order_by('datetime').last()
291         # note it shouldn't be possible to have both diffusion and occurences
292         # running at the moment.
293         if occurence and occurence.end_datetime > now:
294             return occurence
295         if diffusion and diffusion.end_datetime > now:
296             return diffusion
297         if directory_occurence and directory_occurence.end_datetime > now:
298             return directory_occurence
299         return None
300
301     def get_next_diffusion(self, before_datetime):
302         now = datetime.datetime.now()
303         diffusion = ScheduledDiffusion.objects.filter(
304                 diffusion__datetime__gt=now,
305                 diffusion__datetime__lt=before_datetime,
306                 ).order_by('diffusion__datetime').first()
307         occurence = RecurringStreamOccurence.objects.filter(
308                 datetime__gt=now,
309                 datetime__lt=before_datetime,
310                 ).order_by('datetime').first()
311         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
312                 datetime__gt=now,
313                 datetime__lt=before_datetime,
314                 ).order_by('datetime').first()
315         if diffusion and occurence:
316             return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
317         if diffusion:
318             return diffusion
319         if occurence:
320             return occurence
321         if directory_occurence:
322             return directory_occurence
323         return None
324
325     def recompute_slots(self):
326         now = datetime.datetime.now()
327         diffusion = self.get_current_diffusion()
328         if diffusion:
329             self.slot = diffusion
330         else:
331             nonstops = list(Nonstop.objects.all().order_by('start'))
332             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
333             try:
334                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
335             except IndexError:
336                 self.slot = nonstops[0]
337             try:
338                 next_slot = nonstops[nonstops.index(self.slot) + 1]
339             except IndexError:
340                 next_slot = nonstops[0]
341             self.slot.datetime = now.replace(
342                     hour=self.slot.start.hour,
343                     minute=self.slot.start.minute)
344             self.slot.end_datetime = now.replace(
345                     hour=next_slot.start.hour,
346                     minute=next_slot.start.minute,
347                     second=0,
348                     microsecond=0)
349             if self.slot.end_datetime < self.slot.datetime:
350                 self.slot.end_datetime += datetime.timedelta(days=1)
351
352             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
353             if diffusion:
354                 self.slot.end_datetime = diffusion.datetime
355
356     async def recompute_slots_loop(self):
357         now = datetime.datetime.now()
358         sleep = (60 - now.second) % 10  # adjust to awake at :00
359         while not self.quit:
360             await asyncio.sleep(sleep)
361             sleep = 10  # next cycles every 10 seconds
362             current_slot = self.slot
363             self.recompute_slots()
364             expected_slot = self.slot
365             if current_slot != expected_slot:
366                 now = datetime.datetime.now()
367                 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
368                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
369                     # ask for a softstop, i.e. finish the track then switch.
370                     self.softstop = True
371                 elif isinstance(current_slot, Nonstop):
372                     # interrupt nonstop
373                     logger.info('Interrupting nonstop')
374                     self.play_task.cancel()
375             elif current_slot.end_datetime > expected_slot.end_datetime:
376                 now = datetime.datetime.now()
377                 logger.debug('Change in end time, from %s to %s',
378                         current_slot.end_datetime,
379                         expected_slot.end_datetime)
380                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
381                     # more than 5 minutes left, recompute playlist
382                     self.recompute_playlist()
383
384     async def handle_connection(self, reader, writer):
385         writer.write(b'Watusi!\n')
386         writer.write(b'Known commands: status, softquit, hardquit\n')
387         writer.write(b'(dot on empty line to stop connection)\n')
388         await writer.drain()
389         end = False
390         while not end:
391             data = await reader.read(100)
392             try:
393                 message = data.decode().strip()
394             except UnicodeDecodeError:
395                 logger.debug('Server, invalid message %r', message)
396                 if not data:
397                     end = True
398                 continue
399             logger.debug('Server, message %r', message)
400             if message == 'status':
401                 response = {'slot': str(self.slot)}
402                 if isinstance(self.slot, Nonstop):
403                     try:
404                         track = self.playlist[self.playhead]
405                     except IndexError:
406                         pass
407                     else:
408                         response['track'] = {}
409                         response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
410                         response['track']['title'] = track.title
411                         response['track']['artist'] = track.artist.name if track.artist_id else ''
412                         response['track']['duration'] = track.duration.total_seconds()
413                         response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
414                         response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
415                 next_diffusion = self.get_next_diffusion(
416                         before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
417                 if next_diffusion:
418                     response['next_diffusion'] = {
419                         'label': str(next_diffusion),
420                         'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
421                     }
422                     if isinstance(next_diffusion, ScheduledDiffusion):
423                         response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
424                         response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
425             elif message == '.':
426                 end = True
427                 response = {'ack': True}
428             elif message == 'softquit':
429                 self.quit = True
430                 end = True
431                 response = {'ack': True}
432             elif message == 'hardquit':
433                 self.quit = True
434                 end = True
435                 response = {'ack': True}
436                 if self.player and self.player.returncode is None:  # not finished
437                     self.player.kill()
438             else:
439                 response = {'err': 1, 'msg': 'unknown command: %r' % message}
440             writer.write(json.dumps(response).encode('utf-8') + b'\n')
441             try:
442                 await writer.drain()
443             except ConnectionResetError:
444                 break
445         writer.close()
446
447     def sigterm_handler(self):
448         logger.info('Got SIGTERM')
449         self.quit = True
450         self.play_task.cancel()
451
452     async def main(self):
453         loop = asyncio.get_running_loop()
454         loop.add_signal_handler(
455                 signal.SIGTERM,
456                 self.sigterm_handler)
457         self.recompute_slots()
458         server = await asyncio.start_server(
459                 self.handle_connection,
460                 app_settings.SERVER_BIND_IFACE,
461                 app_settings.SERVER_BIND_PORT)
462         async with server:
463             asyncio.create_task(server.serve_forever())
464
465             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
466             while not self.quit:
467                 now = datetime.datetime.now()
468                 duration = (self.slot.end_datetime - now).seconds
469                 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
470                 if duration < 2:
471                     # next slot is very close, wait for it
472                     await asyncio.sleep(duration)
473                     self.recompute_slots()
474                 self.play_task = asyncio.create_task(self.play(self.slot))
475                 try:
476                     await self.play_task
477                     self.recompute_slots()
478                 except asyncio.CancelledError:
479                     logger.debug('Player cancelled exception')
480                     if self.player and self.player.returncode is None:  # not finished
481                         self.player.kill()
482                 except KeyboardInterrupt:
483                     self.quit = True
484                     break