]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
dd8e5e46ad9c73d891e9ee28a1aab29c03751458
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import json
4 import logging
5 import random
6 import signal
7 import sys
8
9 import requests
10
11 from django.conf import settings
12 from django.core.management.base import BaseCommand
13
14 from emissions.models import Nonstop
15 from nonstop.models import Track, Jingle, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
16 from nonstop.app_settings import app_settings
17
18
19 logger = logging.getLogger('stamina')
20
21
22 class Command(BaseCommand):
23     requires_system_checks = False
24
25     last_jingle_datetime = None
26     quit = False
27
28     def handle(self, verbosity, **kwargs):
29         try:
30             asyncio.run(self.main(), debug=settings.DEBUG)
31         except KeyboardInterrupt:
32             pass
33
34     def get_playlist(self, zone, start_datetime, end_datetime):
35         current_datetime = start_datetime
36         if self.last_jingle_datetime is None:
37             self.last_jingle_datetime = current_datetime
38         # Define a max duration (1 hour), if it is reached, and far enough
39         # from end_datetime (30 minutes), return the playlist as is, not aligned
40         # on end time, so a new playlist gets computed once it's over.
41         # This avoids misalignments due to track durations not matching exactly
42         # or additional delays caused by the player program.
43         max_duration = datetime.timedelta(hours=1)
44         max_duration_leftover = datetime.timedelta(minutes=30)
45         playlist = []
46         adjustment_counter = 0
47         try:
48             jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
49         except AttributeError:
50             jingles = []
51
52         recent_tracks_id = [x.track_id for x in
53                 SomaLogLine.objects.exclude(on_air=False).filter(
54                     track__isnull=False,
55                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
56         t0 = datetime.datetime.now()
57         allow_overflow = False
58         while current_datetime < end_datetime:
59             if (current_datetime - start_datetime) > max_duration and (
60                     (end_datetime - current_datetime) > max_duration_leftover):
61                 break
62
63             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
64                 # jingle time, every ~20 minutes
65                 playlist.append(random.choice(jingles))
66                 self.last_jingle_datetime = current_datetime
67                 current_datetime = start_datetime + sum(
68                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
69
70             remaining_time = (end_datetime - current_datetime)
71             track = Track.objects.filter(
72                     nonstop_zones=zone,
73                     duration__isnull=False).exclude(
74                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
75                     ).order_by('?').first()
76             if track is None:
77                 # no track, reduce recent tracks exclusion
78                 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
79                 continue
80             playlist.append(track)
81             current_datetime = start_datetime + sum(
82                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
83             if current_datetime > end_datetime and not allow_overflow:
84                 # last track overshot
85                 # 1st strategy: remove last track and try to get a track with
86                 # exact remaining time
87                 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
88                 playlist = playlist[:-1]
89                 track = Track.objects.filter(
90                         nonstop_zones=zone,
91                         duration__gte=remaining_time,
92                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
93                         ).exclude(
94                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
95                         ).order_by('?').first()
96                 if track:
97                     # found a track
98                     playlist.append(track)
99                 else:
100                     # fallback strategy: didn't find track of expected duration,
101                     # reduce playlist further
102                     adjustment_counter += 1
103                     playlist = playlist[:-1]
104                     if len(playlist) == 0 or adjustment_counter > 5:
105                         # a dedicated sound that ended a bit too early,
106                         # or too many failures to get an appropriate file,
107                         # allow whatever comes.
108                         allow_overflow = True
109                         logger.debug('Allowing overflows')
110
111                 current_datetime = start_datetime + sum(
112                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
113
114         logger.info('Computed playlist: (computation time: %ss)',
115                 (datetime.datetime.now() - t0))
116         current_datetime = start_datetime
117         for track in playlist:
118             logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
119             current_datetime += track.duration
120         logger.debug('- end: %s', current_datetime)
121         return playlist
122
123     def is_nonstop_on_air(self):
124         # check if nonstop system is currently on air
125         if app_settings.ON_AIR_SWITCH_URL is None:
126             return None
127         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
128         if not switch_response.ok:
129             return None
130         try:
131             status = switch_response.json()
132         except ValueError:
133             return None
134         if status.get('active') == 0:
135             return True
136         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
137             return True
138         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
139             return True
140         return False
141
142     async def record_nonstop_line(self, track, now):
143         log_line = SomaLogLine()
144         log_line.play_timestamp = now
145         log_line.track = track
146         log_line.filepath = track.nonstopfile_set.first()
147         log_line.on_air = self.is_nonstop_on_air()
148         log_line.save()
149
150     async def player_process(self, item, timeout=None):
151         cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
152         if hasattr(item, 'is_stream') and item.is_stream():
153             cmd.append(item.stream.url)
154             logger.info('Play stream: %s', item.stream.url)
155         else:
156             cmd.append(item.file_path())
157             logger.info('Play file: %s', item.file_path())
158         if app_settings.DEBUG_WITH_SLEEPS:
159             # replace command by a sleep call, for silent debugging
160             if hasattr(item, 'is_stream') and item.is_stream():
161                 cmd = 'sleep 86400 # %s' % item.stream.url
162             elif isinstance(item.duration, datetime.timedelta):
163                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
164             elif isinstance(item.duration, int):
165                 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
166         logger.debug('cmd %r', cmd)
167         if isinstance(cmd, str):
168             self.player = await asyncio.create_subprocess_shell(
169                     cmd,
170                     stdout=asyncio.subprocess.PIPE,
171                     stderr=asyncio.subprocess.PIPE)
172         else:
173             self.player = await asyncio.create_subprocess_exec(
174                     *cmd,
175                     stdout=asyncio.subprocess.PIPE,
176                     stderr=asyncio.subprocess.PIPE)
177         if timeout is None:
178             await self.player.communicate()
179         else:
180             try:
181                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
182             except asyncio.TimeoutError:
183                 self.player.kill()
184         self.player = None
185
186     async def play(self, slot):
187         now = datetime.datetime.now()
188         if isinstance(slot, Nonstop):
189             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
190             self.playhead = 0
191             self.softstop = False
192             while not self.quit:
193                 now = datetime.datetime.now()
194                 try:
195                     track = self.playlist[self.playhead]
196                 except IndexError:
197                     break
198                 self.current_track_start_datetime = now
199                 if isinstance(track, Jingle):
200                     logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
201                 else:
202                     logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
203                 record_task = None
204                 if isinstance(track, Track):  # not jingles
205                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
206                 await self.player_process(track)
207                 if record_task:
208                     await record_task
209                 if self.softstop:
210                     # track was left to finish, but now the playlist should stop.
211                     break
212                 self.playhead += 1
213         elif slot.is_stream():
214             logger.info('Stream: %s', slot.stream)
215             if slot.jingle_id:
216                 await self.player_process(slot.jingle, timeout=60)
217             logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
218             await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
219         else:
220             if hasattr(slot, 'episode'):
221                 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
222             else:
223                 logger.info('Random: %s', slot)
224             if slot.jingle_id:
225                 await self.player_process(slot.jingle, timeout=60)
226             await self.player_process(slot)
227
228     def recompute_playlist(self):
229         current_track = self.playlist[self.playhead]
230         logger.debug('Recomputing playlist at %s, from %s to %s',
231                 current_track.title,
232                 self.current_track_start_datetime + current_track.duration,
233                 self.slot.end_datetime)
234         playlist = self.get_playlist(self.slot,
235                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
236         if playlist:
237             self.playlist[self.playhead + 1:] = playlist
238
239     def get_current_diffusion(self):
240         now = datetime.datetime.now()
241         diffusion = ScheduledDiffusion.objects.filter(
242                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
243                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
244         occurence = RecurringStreamOccurence.objects.filter(
245                 datetime__gt=now - datetime.timedelta(days=1),
246                 datetime__lt=now).order_by('datetime').last()
247         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
248                 datetime__gt=now - datetime.timedelta(days=1),
249                 datetime__lt=now).order_by('datetime').last()
250         # note it shouldn't be possible to have both diffusion and occurences
251         # running at the moment.
252         if occurence and occurence.end_datetime > now:
253             return occurence
254         if diffusion and diffusion.end_datetime > now:
255             return diffusion
256         if directory_occurence and directory_occurence.end_datetime > now:
257             return directory_occurence
258         return None
259
260     def get_next_diffusion(self, before_datetime):
261         now = datetime.datetime.now()
262         diffusion = ScheduledDiffusion.objects.filter(
263                 diffusion__datetime__gt=now,
264                 diffusion__datetime__lt=before_datetime,
265                 ).order_by('diffusion__datetime').first()
266         occurence = RecurringStreamOccurence.objects.filter(
267                 datetime__gt=now,
268                 datetime__lt=before_datetime,
269                 ).order_by('datetime').first()
270         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
271                 datetime__gt=now,
272                 datetime__lt=before_datetime,
273                 ).order_by('datetime').first()
274         if diffusion and occurence:
275             return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
276         if diffusion:
277             return diffusion
278         if occurence:
279             return occurence
280         if directory_occurence:
281             return directory_occurence
282         return None
283
284     def recompute_slots(self):
285         now = datetime.datetime.now()
286         diffusion = self.get_current_diffusion()
287         if diffusion:
288             self.slot = diffusion
289         else:
290             nonstops = list(Nonstop.objects.all().order_by('start'))
291             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
292             try:
293                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
294             except IndexError:
295                 self.slot = nonstops[0]
296             try:
297                 next_slot = nonstops[nonstops.index(self.slot) + 1]
298             except IndexError:
299                 next_slot = nonstops[0]
300             self.slot.datetime = now.replace(
301                     hour=self.slot.start.hour,
302                     minute=self.slot.start.minute)
303             self.slot.end_datetime = now.replace(
304                     hour=next_slot.start.hour,
305                     minute=next_slot.start.minute,
306                     second=0,
307                     microsecond=0)
308             if self.slot.end_datetime < self.slot.datetime:
309                 self.slot.end_datetime += datetime.timedelta(days=1)
310
311             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
312             if diffusion:
313                 self.slot.end_datetime = diffusion.datetime
314
315     async def recompute_slots_loop(self):
316         now = datetime.datetime.now()
317         sleep = (60 - now.second) % 10  # adjust to awake at :00
318         while not self.quit:
319             await asyncio.sleep(sleep)
320             sleep = 10  # next cycles every 10 seconds
321             current_slot = self.slot
322             self.recompute_slots()
323             expected_slot = self.slot
324             if current_slot != expected_slot:
325                 now = datetime.datetime.now()
326                 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
327                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
328                     # ask for a softstop, i.e. finish the track then switch.
329                     self.softstop = True
330                 elif isinstance(current_slot, Nonstop):
331                     # interrupt nonstop
332                     logger.info('Interrupting nonstop')
333                     self.play_task.cancel()
334             elif current_slot.end_datetime > expected_slot.end_datetime:
335                 now = datetime.datetime.now()
336                 logger.debug('Change in end time, from %s to %s',
337                         current_slot.end_datetime,
338                         expected_slot.end_datetime)
339                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
340                     # more than 5 minutes left, recompute playlist
341                     self.recompute_playlist()
342
343     async def handle_connection(self, reader, writer):
344         writer.write(b'Watusi!\n')
345         writer.write(b'Known commands: status, softquit, hardquit\n')
346         writer.write(b'(dot on empty line to stop connection)\n')
347         await writer.drain()
348         end = False
349         while not end:
350             data = await reader.read(100)
351             try:
352                 message = data.decode().strip()
353             except UnicodeDecodeError:
354                 logger.debug('Server, invalid message %r', message)
355                 if not data:
356                     end = True
357                 continue
358             logger.debug('Server, message %r', message)
359             if message == 'status':
360                 response = {'slot': str(self.slot)}
361                 if isinstance(self.slot, Nonstop):
362                     try:
363                         track = self.playlist[self.playhead]
364                     except IndexError:
365                         pass
366                     else:
367                         response['track'] = {}
368                         response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
369                         response['track']['title'] = track.title
370                         response['track']['artist'] = track.artist.name if track.artist_id else ''
371                         response['track']['duration'] = track.duration.total_seconds()
372                         response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
373                         response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
374                 next_diffusion = self.get_next_diffusion(
375                         before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
376                 if next_diffusion:
377                     response['next_diffusion'] = {
378                         'label': str(next_diffusion),
379                         'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
380                     }
381                     if isinstance(next_diffusion, ScheduledDiffusion):
382                         response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
383                         response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
384             elif message == '.':
385                 end = True
386                 response = {'ack': True}
387             elif message == 'softquit':
388                 self.quit = True
389                 end = True
390                 response = {'ack': True}
391             elif message == 'hardquit':
392                 self.quit = True
393                 end = True
394                 response = {'ack': True}
395                 if self.player and self.player.returncode is None:  # not finished
396                     self.player.kill()
397             else:
398                 response = {'err': 1, 'msg': 'unknown command: %r' % message}
399             writer.write(json.dumps(response).encode('utf-8') + b'\n')
400             try:
401                 await writer.drain()
402             except ConnectionResetError:
403                 break
404         writer.close()
405
406     def sigterm_handler(self):
407         logger.info('Got SIGTERM')
408         self.quit = True
409         self.play_task.cancel()
410
411     async def main(self):
412         loop = asyncio.get_running_loop()
413         loop.add_signal_handler(
414                 signal.SIGTERM,
415                 self.sigterm_handler)
416         self.recompute_slots()
417         server = await asyncio.start_server(
418                 self.handle_connection,
419                 app_settings.SERVER_BIND_IFACE,
420                 app_settings.SERVER_BIND_PORT)
421         async with server:
422             asyncio.create_task(server.serve_forever())
423
424             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
425             while not self.quit:
426                 now = datetime.datetime.now()
427                 duration = (self.slot.end_datetime - now).seconds
428                 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
429                 if duration < 2:
430                     # next slot is very close, wait for it
431                     await asyncio.sleep(duration)
432                     self.recompute_slots()
433                 self.play_task = asyncio.create_task(self.play(self.slot))
434                 try:
435                     await self.play_task
436                     self.recompute_slots()
437                 except asyncio.CancelledError:
438                     logger.debug('Player cancelled exception')
439                     if self.player and self.player.returncode is None:  # not finished
440                         self.player.kill()
441                 except KeyboardInterrupt:
442                     self.quit = True
443                     break