]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
stamina: compute shorter playlists
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import random
4 import signal
5
6 from django.core.management.base import BaseCommand
7
8 from emissions.models import Nonstop
9 from nonstop.models import Track, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
10 from nonstop.app_settings import app_settings
11
12
13 class Command(BaseCommand):
14     last_jingle_datetime = None
15     quit = False
16
17     def handle(self, verbosity, **kwargs):
18         try:
19             asyncio.run(self.main(), debug=True)
20         except KeyboardInterrupt:
21             pass
22
23     def get_playlist(self, zone, start_datetime, end_datetime):
24         current_datetime = start_datetime
25         if self.last_jingle_datetime is None:
26             self.last_jingle_datetime = current_datetime
27         # Define a max duration (1 hour), if it is reached, and far enough
28         # from end_datetime (30 minutes), return the playlist as is, not aligned
29         # on end time, so a new playlist gets computed once it's over.
30         # This avoids misalignments due to track durations not matching exactly
31         # or additional delays caused by the player program.
32         max_duration = datetime.timedelta(hours=1)
33         max_duration_leftover = datetime.timedelta(minutes=30)
34         playlist = []
35         adjustment_counter = 0
36         try:
37             jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
38         except AttributeError:
39             jingles = []
40
41         while current_datetime < end_datetime and adjustment_counter < 5:
42             if (current_datetime - start_datetime) > max_duration and (
43                     (end_datetime - current_datetime) > max_duration_leftover):
44                 break
45
46             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
47                 # jingle time, every ~20 minutes
48                 playlist.append(random.choice(jingles))
49                 self.last_jingle_datetime = current_datetime
50                 current_datetime = start_datetime + sum(
51                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
52
53             remaining_time = (end_datetime - current_datetime)
54             track = Track.objects.filter(
55                     nonstop_zones=zone,
56                     duration__isnull=False).exclude(
57                             id__in=[x.id for x in playlist if isinstance(x, Track)]
58                     ).order_by('?').first()
59             playlist.append(track)
60             current_datetime = start_datetime + sum(
61                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
62             if current_datetime > end_datetime:
63                 # last track overshot
64                 # 1st strategy: remove last track and try to get a track with
65                 # exact remaining time
66                 playlist = playlist[:-1]
67                 track = Track.objects.filter(
68                         nonstop_zones=zone,
69                         duration__gte=remaining_time,
70                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
71                         ).exclude(
72                             id__in=[x.id for x in playlist if isinstance(x, Track)]
73                         ).order_by('?').first()
74                 if track:
75                     # found a track
76                     playlist.append(track)
77                 else:
78                     # fallback strategy: didn't find track of expected duration,
79                     # reduce playlist further
80                     adjustment_counter += 1
81                     playlist = playlist[:-1]
82
83                 current_datetime = start_datetime + sum(
84                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
85
86         print('computed playlist:')
87         current_datetime = start_datetime
88         for track in playlist:
89             print('   ', current_datetime, track.duration, track.title)
90             current_datetime += track.duration
91         print('   ', current_datetime, '---')
92         print('   adjustment_counter:', adjustment_counter)
93
94         return playlist
95
96     async def player_process(self, item, timeout=None):
97         if app_settings.DEBUG_WITH_SLEEPS:
98             if hasattr(item, 'is_stream') and item.is_stream():
99                 cmd = 'sleep 86400 # %s' % item.stream.url
100             else:
101                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
102         else:
103             cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
104             if hasattr(item, 'is_stream') and item.is_stream():
105                 cmd.append(item.stream.url)
106             else:
107                 cmd.append(item.file_path())
108         print('cmd:', cmd)
109         if isinstance(cmd, str):
110             self.player = await asyncio.create_subprocess_shell(
111                     cmd,
112                     stdout=asyncio.subprocess.PIPE,
113                     stderr=asyncio.subprocess.PIPE)
114         else:
115             self.player = await asyncio.create_subprocess_exec(
116                     *cmd,
117                     stdout=asyncio.subprocess.PIPE,
118                     stderr=asyncio.subprocess.PIPE)
119         if timeout is None:
120             await self.player.communicate()
121         else:
122             try:
123                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
124             except asyncio.TimeoutError:
125                 self.player.kill()
126         self.player = None
127
128     async def play(self, slot):
129         now = datetime.datetime.now()
130         if isinstance(slot, Nonstop):
131             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
132             self.playhead = 0
133             self.softstop = False
134             while True:
135                 now = datetime.datetime.now()
136                 try:
137                     track = self.playlist[self.playhead]
138                 except IndexError:
139                     break
140                 self.current_track_start_datetime = now
141                 print(now, track.title, track.duration,
142                         '- future tracks:', [x.title for x in self.playlist[self.playhead + 1:self.playhead + 3]])
143                 await self.player_process(track)
144                 if self.softstop:
145                     # track was left to finish, but now the playlist should stop.
146                     break
147                 self.playhead += 1
148         elif slot.is_stream():
149             print(now, 'playing stream', slot.stream)
150             if slot.jingle_id:
151                 await self.player_process(slot.jingle, timeout=60)
152             print('timeout at', (slot.end_datetime - now).total_seconds())
153             await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
154         else:
155             if hasattr(slot, 'episode'):
156                 print(now, 'playing sound', slot.episode)
157             else:
158                 print(now, 'playing random')
159             if slot.jingle_id:
160                 await self.player_process(slot.jingle, timeout=60)
161             await self.player_process(slot)
162
163     def recompute_playlist(self):
164         current_track = self.playlist[self.playhead]
165         print('recompute_playlist, from', current_track.title, self.current_track_start_datetime + current_track.duration, 'to', self.slot.end_datetime)
166         playlist = self.get_playlist(self.slot,
167                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
168         if playlist:
169             self.playlist[self.playhead + 1:] = playlist
170
171     def get_current_diffusion(self):
172         now = datetime.datetime.now()
173         diffusion = ScheduledDiffusion.objects.filter(
174                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
175                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
176         occurence = RecurringStreamOccurence.objects.filter(
177                 datetime__gt=now - datetime.timedelta(days=1),
178                 datetime__lt=now).order_by('datetime').last()
179         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
180                 datetime__gt=now - datetime.timedelta(days=1),
181                 datetime__lt=now).order_by('datetime').last()
182         # note it shouldn't be possible to have both diffusion and occurences
183         # running at the moment.
184         if occurence and occurence.end_datetime > now:
185             return occurence
186         if diffusion and diffusion.end_datetime > now:
187             return diffusion
188         if directory_occurence and directory_occurence.end_datetime > now:
189             return directory_occurence
190         return None
191
192     def get_next_diffusion(self, before_datetime):
193         now = datetime.datetime.now()
194         diffusion = ScheduledDiffusion.objects.filter(
195                 diffusion__datetime__gt=now,
196                 diffusion__datetime__lt=before_datetime,
197                 ).order_by('diffusion__datetime').first()
198         occurence = RecurringStreamOccurence.objects.filter(
199                 datetime__gt=now,
200                 datetime__lt=before_datetime,
201                 ).order_by('datetime').first()
202         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
203                 datetime__gt=now,
204                 datetime__lt=before_datetime,
205                 ).order_by('datetime').first()
206         if diffusion and occurence:
207             return diffusion if diffusion.diffusion__datetime < occurence.datetime else occurence
208         if diffusion:
209             return diffusion
210         if occurence:
211             return occurence
212         if directory_occurence:
213             return directory_occurence
214         return None
215
216     def recompute_slots(self):
217         now = datetime.datetime.now()
218         # print(now, 'recompute_slots')
219         diffusion = self.get_current_diffusion()
220         if diffusion:
221             self.slot = diffusion
222         else:
223             nonstops = list(Nonstop.objects.all().order_by('start'))
224             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
225             try:
226                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
227             except IndexError:
228                 self.slot = nonstops[0]
229             try:
230                 next_slot = nonstops[nonstops.index(self.slot) + 1]
231             except IndexError:
232                 next_slot = nonstops[0]
233             self.slot.datetime = now.replace(
234                     hour=self.slot.start.hour,
235                     minute=self.slot.start.minute)
236             self.slot.end_datetime = now.replace(
237                     hour=next_slot.start.hour,
238                     minute=next_slot.start.minute,
239                     second=0,
240                     microsecond=0)
241             if self.slot.end_datetime < self.slot.datetime:
242                 self.slot.end_datetime += datetime.timedelta(days=1)
243
244             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
245             if diffusion:
246                 self.slot.end_datetime = diffusion.datetime
247
248     async def recompute_slots_loop(self):
249         now = datetime.datetime.now()
250         print(now, 'recompute_slots_loop')
251         sleep = (60 - now.second) % 10  # adjust to awake at :00
252         while not self.quit:
253             await asyncio.sleep(sleep)
254             sleep = 10  # next cycles every 10 seconds
255             current_slot = self.slot
256             self.recompute_slots()
257             expected_slot = self.slot
258             if current_slot != expected_slot:
259                 print(now, 'unexpected change', current_slot, 'vs', expected_slot)
260                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
261                     # ask for a softstop, i.e. finish the track then switch.
262                     self.softstop = True
263                 else:
264                     # interrupt nonstop
265                     print('interrupting nonstop')
266                     self.play_task.cancel()
267             elif current_slot.end_datetime > expected_slot.end_datetime:
268                 print('change in end time, from %s to %s' %
269                         (current_slot.end_datetime, expected_slot.end_datetime))
270                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
271                     # more than 5 minutes left, recompute playlist
272                     self.recompute_playlist()
273
274     async def handle_connection(self, reader, writer):
275         data = await reader.read(100)
276         message = data.decode().strip()
277         response = 'err'
278         if message == 'playing?':
279             response = '%s' % self.slot
280         writer.write(response.encode('utf-8'))
281         await writer.drain()
282         writer.close()
283
284     def sigterm_handler(self):
285         print('got signal')
286         self.quit = True
287         self.play_task.cancel()
288
289     async def main(self):
290         loop = asyncio.get_running_loop()
291         loop.add_signal_handler(
292                 signal.SIGTERM,
293                 self.sigterm_handler)
294         now = datetime.datetime.now()
295         self.recompute_slots()
296         server = await asyncio.start_server(self.handle_connection, '127.0.0.1', 8888)
297         async with server:
298             asyncio.create_task(server.serve_forever())
299
300             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
301             while not self.quit:
302                 duration = (self.slot.end_datetime - now).seconds
303                 print('next sure slot', duration, self.slot.end_datetime)
304                 if duration < 2:
305                     # next slot is very close, wait for it
306                     await asyncio.sleep(duration)
307                     self.recompute_slots()
308                 self.play_task = asyncio.create_task(self.play(self.slot))
309                 try:
310                     await self.play_task
311                     self.recompute_slots()
312                 except asyncio.CancelledError as exc:
313                     print('exc:', exc)
314                     if self.player and self.player.returncode is None:  # not finished
315                         self.player.kill()
316                 except KeyboardInterrupt:
317                     self.quit = True
318                     break