]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
stamina: don't print future tracks
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import random
4 import signal
5
6 import requests
7
8 from django.core.management.base import BaseCommand
9
10 from emissions.models import Nonstop
11 from nonstop.models import Track, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
12 from nonstop.app_settings import app_settings
13
14
15 class Command(BaseCommand):
16     last_jingle_datetime = None
17     quit = False
18
19     def handle(self, verbosity, **kwargs):
20         try:
21             asyncio.run(self.main(), debug=True)
22         except KeyboardInterrupt:
23             pass
24
25     def get_playlist(self, zone, start_datetime, end_datetime):
26         current_datetime = start_datetime
27         if self.last_jingle_datetime is None:
28             self.last_jingle_datetime = current_datetime
29         # Define a max duration (1 hour), if it is reached, and far enough
30         # from end_datetime (30 minutes), return the playlist as is, not aligned
31         # on end time, so a new playlist gets computed once it's over.
32         # This avoids misalignments due to track durations not matching exactly
33         # or additional delays caused by the player program.
34         max_duration = datetime.timedelta(hours=1)
35         max_duration_leftover = datetime.timedelta(minutes=30)
36         playlist = []
37         adjustment_counter = 0
38         try:
39             jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
40         except AttributeError:
41             jingles = []
42
43         recent_tracks_id = [x.track_id for x in
44                 SomaLogLine.objects.exclude(on_air=False).filter(
45                     track__isnull=False,
46                     play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
47         t0 = datetime.datetime.now()
48         while current_datetime < end_datetime and adjustment_counter < 5:
49             if (current_datetime - start_datetime) > max_duration and (
50                     (end_datetime - current_datetime) > max_duration_leftover):
51                 break
52
53             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
54                 # jingle time, every ~20 minutes
55                 playlist.append(random.choice(jingles))
56                 self.last_jingle_datetime = current_datetime
57                 current_datetime = start_datetime + sum(
58                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
59
60             remaining_time = (end_datetime - current_datetime)
61             track = Track.objects.filter(
62                     nonstop_zones=zone,
63                     duration__isnull=False).exclude(
64                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
65                     ).order_by('?').first()
66             if track is None:
67                 # no track, reduce recent tracks exclusion
68                 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
69                 continue
70             playlist.append(track)
71             current_datetime = start_datetime + sum(
72                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
73             if current_datetime > end_datetime:
74                 # last track overshot
75                 # 1st strategy: remove last track and try to get a track with
76                 # exact remaining time
77                 playlist = playlist[:-1]
78                 track = Track.objects.filter(
79                         nonstop_zones=zone,
80                         duration__gte=remaining_time,
81                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
82                         ).exclude(
83                             id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
84                         ).order_by('?').first()
85                 if track:
86                     # found a track
87                     playlist.append(track)
88                 else:
89                     # fallback strategy: didn't find track of expected duration,
90                     # reduce playlist further
91                     adjustment_counter += 1
92                     playlist = playlist[:-1]
93
94                 current_datetime = start_datetime + sum(
95                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
96
97         print('computed playlist: (computation time: %ss)' % (datetime.datetime.now() - t0))
98         current_datetime = start_datetime
99         for track in playlist:
100             print('   ', current_datetime, track.duration, track.title)
101             current_datetime += track.duration
102         print('   ', current_datetime, '---')
103         print('   adjustment_counter:', adjustment_counter)
104
105         return playlist
106
107     def is_nonstop_on_air(self):
108         # check if nonstop system is currently on air
109         if app_settings.ON_AIR_SWITCH_URL is None:
110             return None
111         switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
112         if not switch_response.ok:
113             return None
114         try:
115             status = switch_response.json()
116         except ValueError:
117             return None
118         if status.get('active') == 0:
119             return True
120         elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
121             return True
122         elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
123             return True
124         return False
125
126     async def record_nonstop_line(self, track, now):
127         log_line = SomaLogLine()
128         log_line.play_timestamp = now
129         log_line.track = track
130         log_line.filepath = track.nonstopfile_set.first()
131         log_line.on_air = self.is_nonstop_on_air()
132         log_line.save()
133
134     async def player_process(self, item, timeout=None):
135         if app_settings.DEBUG_WITH_SLEEPS:
136             if hasattr(item, 'is_stream') and item.is_stream():
137                 cmd = 'sleep 86400 # %s' % item.stream.url
138             else:
139                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
140         else:
141             cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
142             if hasattr(item, 'is_stream') and item.is_stream():
143                 cmd.append(item.stream.url)
144             else:
145                 cmd.append(item.file_path())
146         print('cmd:', cmd)
147         if isinstance(cmd, str):
148             self.player = await asyncio.create_subprocess_shell(
149                     cmd,
150                     stdout=asyncio.subprocess.PIPE,
151                     stderr=asyncio.subprocess.PIPE)
152         else:
153             self.player = await asyncio.create_subprocess_exec(
154                     *cmd,
155                     stdout=asyncio.subprocess.PIPE,
156                     stderr=asyncio.subprocess.PIPE)
157         if timeout is None:
158             await self.player.communicate()
159         else:
160             try:
161                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
162             except asyncio.TimeoutError:
163                 self.player.kill()
164         self.player = None
165
166     async def play(self, slot):
167         now = datetime.datetime.now()
168         if isinstance(slot, Nonstop):
169             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
170             self.playhead = 0
171             self.softstop = False
172             while True:
173                 now = datetime.datetime.now()
174                 try:
175                     track = self.playlist[self.playhead]
176                 except IndexError:
177                     break
178                 self.current_track_start_datetime = now
179                 print(now, track.title, track.duration)
180                 record_task = None
181                 if isinstance(track, Track):  # not jingles
182                     record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
183                 await self.player_process(track)
184                 if record_task:
185                     await record_task
186                 if self.softstop:
187                     # track was left to finish, but now the playlist should stop.
188                     break
189                 self.playhead += 1
190         elif slot.is_stream():
191             print(now, 'playing stream', slot.stream)
192             if slot.jingle_id:
193                 await self.player_process(slot.jingle, timeout=60)
194             print('timeout at', (slot.end_datetime - now).total_seconds())
195             await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
196         else:
197             if hasattr(slot, 'episode'):
198                 print(now, 'playing sound', slot.episode)
199             else:
200                 print(now, 'playing random')
201             if slot.jingle_id:
202                 await self.player_process(slot.jingle, timeout=60)
203             await self.player_process(slot)
204
205     def recompute_playlist(self):
206         current_track = self.playlist[self.playhead]
207         print('recompute_playlist, from', current_track.title, self.current_track_start_datetime + current_track.duration, 'to', self.slot.end_datetime)
208         playlist = self.get_playlist(self.slot,
209                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
210         if playlist:
211             self.playlist[self.playhead + 1:] = playlist
212
213     def get_current_diffusion(self):
214         now = datetime.datetime.now()
215         diffusion = ScheduledDiffusion.objects.filter(
216                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
217                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
218         occurence = RecurringStreamOccurence.objects.filter(
219                 datetime__gt=now - datetime.timedelta(days=1),
220                 datetime__lt=now).order_by('datetime').last()
221         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
222                 datetime__gt=now - datetime.timedelta(days=1),
223                 datetime__lt=now).order_by('datetime').last()
224         # note it shouldn't be possible to have both diffusion and occurences
225         # running at the moment.
226         if occurence and occurence.end_datetime > now:
227             return occurence
228         if diffusion and diffusion.end_datetime > now:
229             return diffusion
230         if directory_occurence and directory_occurence.end_datetime > now:
231             return directory_occurence
232         return None
233
234     def get_next_diffusion(self, before_datetime):
235         now = datetime.datetime.now()
236         diffusion = ScheduledDiffusion.objects.filter(
237                 diffusion__datetime__gt=now,
238                 diffusion__datetime__lt=before_datetime,
239                 ).order_by('diffusion__datetime').first()
240         occurence = RecurringStreamOccurence.objects.filter(
241                 datetime__gt=now,
242                 datetime__lt=before_datetime,
243                 ).order_by('datetime').first()
244         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
245                 datetime__gt=now,
246                 datetime__lt=before_datetime,
247                 ).order_by('datetime').first()
248         if diffusion and occurence:
249             return diffusion if diffusion.diffusion__datetime < occurence.datetime else occurence
250         if diffusion:
251             return diffusion
252         if occurence:
253             return occurence
254         if directory_occurence:
255             return directory_occurence
256         return None
257
258     def recompute_slots(self):
259         now = datetime.datetime.now()
260         # print(now, 'recompute_slots')
261         diffusion = self.get_current_diffusion()
262         if diffusion:
263             self.slot = diffusion
264         else:
265             nonstops = list(Nonstop.objects.all().order_by('start'))
266             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
267             try:
268                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
269             except IndexError:
270                 self.slot = nonstops[0]
271             try:
272                 next_slot = nonstops[nonstops.index(self.slot) + 1]
273             except IndexError:
274                 next_slot = nonstops[0]
275             self.slot.datetime = now.replace(
276                     hour=self.slot.start.hour,
277                     minute=self.slot.start.minute)
278             self.slot.end_datetime = now.replace(
279                     hour=next_slot.start.hour,
280                     minute=next_slot.start.minute,
281                     second=0,
282                     microsecond=0)
283             if self.slot.end_datetime < self.slot.datetime:
284                 self.slot.end_datetime += datetime.timedelta(days=1)
285
286             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
287             if diffusion:
288                 self.slot.end_datetime = diffusion.datetime
289
290     async def recompute_slots_loop(self):
291         now = datetime.datetime.now()
292         print(now, 'recompute_slots_loop')
293         sleep = (60 - now.second) % 10  # adjust to awake at :00
294         while not self.quit:
295             await asyncio.sleep(sleep)
296             sleep = 10  # next cycles every 10 seconds
297             current_slot = self.slot
298             self.recompute_slots()
299             expected_slot = self.slot
300             if current_slot != expected_slot:
301                 print(now, 'unexpected change', current_slot, 'vs', expected_slot)
302                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
303                     # ask for a softstop, i.e. finish the track then switch.
304                     self.softstop = True
305                 else:
306                     # interrupt nonstop
307                     print('interrupting nonstop')
308                     self.play_task.cancel()
309             elif current_slot.end_datetime > expected_slot.end_datetime:
310                 print('change in end time, from %s to %s' %
311                         (current_slot.end_datetime, expected_slot.end_datetime))
312                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
313                     # more than 5 minutes left, recompute playlist
314                     self.recompute_playlist()
315
316     async def handle_connection(self, reader, writer):
317         data = await reader.read(100)
318         message = data.decode().strip()
319         response = 'err'
320         if message == 'playing?':
321             response = '%s' % self.slot
322         writer.write(response.encode('utf-8'))
323         await writer.drain()
324         writer.close()
325
326     def sigterm_handler(self):
327         print('got signal')
328         self.quit = True
329         self.play_task.cancel()
330
331     async def main(self):
332         loop = asyncio.get_running_loop()
333         loop.add_signal_handler(
334                 signal.SIGTERM,
335                 self.sigterm_handler)
336         now = datetime.datetime.now()
337         self.recompute_slots()
338         server = await asyncio.start_server(self.handle_connection, '127.0.0.1', 8888)
339         async with server:
340             asyncio.create_task(server.serve_forever())
341
342             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
343             while not self.quit:
344                 duration = (self.slot.end_datetime - now).seconds
345                 print('next sure slot', duration, self.slot.end_datetime)
346                 if duration < 2:
347                     # next slot is very close, wait for it
348                     await asyncio.sleep(duration)
349                     self.recompute_slots()
350                 self.play_task = asyncio.create_task(self.play(self.slot))
351                 try:
352                     await self.play_task
353                     self.recompute_slots()
354                 except asyncio.CancelledError as exc:
355                     print('exc:', exc)
356                     if self.player and self.player.returncode is None:  # not finished
357                         self.player.kill()
358                 except KeyboardInterrupt:
359                     self.quit = True
360                     break