]> git.0d.be Git - django-panik-nonstop.git/blob - nonstop/management/commands/stamina.py
344d89ef9ae6ef253ea7736db2d5fd6bd36d596c
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
1 import asyncio
2 import datetime
3 import random
4 import signal
5
6 from django.core.management.base import BaseCommand
7
8 from emissions.models import Nonstop
9 from nonstop.models import Track, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
10 from nonstop.app_settings import app_settings
11
12
13 class Command(BaseCommand):
14     last_jingle_datetime = None
15     quit = False
16
17     def handle(self, verbosity, **kwargs):
18         try:
19             asyncio.run(self.main(), debug=True)
20         except KeyboardInterrupt:
21             pass
22
23     def get_playlist(self, zone, start_datetime, end_datetime):
24         current_datetime = start_datetime
25         if self.last_jingle_datetime is None:
26             self.last_jingle_datetime = current_datetime
27         playlist = []
28         adjustment_counter = 0
29         try:
30             jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
31         except AttributeError:
32             jingles = []
33
34         while current_datetime < end_datetime and adjustment_counter < 5:
35
36             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
37                 # jingle time, every ~20 minutes
38                 playlist.append(random.choice(jingles))
39                 self.last_jingle_datetime = current_datetime
40                 current_datetime = start_datetime + sum(
41                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
42
43             remaining_time = (end_datetime - current_datetime)
44             track = Track.objects.filter(
45                     nonstop_zones=zone,
46                     duration__isnull=False).exclude(
47                             id__in=[x.id for x in playlist if isinstance(x, Track)]
48                     ).order_by('?').first()
49             playlist.append(track)
50             current_datetime = start_datetime + sum(
51                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
52             if current_datetime > end_datetime:
53                 # last track overshot
54                 # 1st strategy: remove last track and try to get a track with
55                 # exact remaining time
56                 playlist = playlist[:-1]
57                 track = Track.objects.filter(
58                         nonstop_zones=zone,
59                         duration__gte=remaining_time,
60                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
61                         ).exclude(
62                             id__in=[x.id for x in playlist if isinstance(x, Track)]
63                         ).order_by('?').first()
64                 if track:
65                     # found a track
66                     playlist.append(track)
67                 else:
68                     # fallback strategy: didn't find track of expected duration,
69                     # reduce playlist further
70                     adjustment_counter += 1
71                     playlist = playlist[:-1]
72
73                 current_datetime = start_datetime + sum(
74                         [x.duration for x in playlist], datetime.timedelta(seconds=0))
75
76         print('computed playlist:')
77         current_datetime = start_datetime
78         for track in playlist:
79             print('   ', current_datetime, track.duration, track.title)
80             current_datetime += track.duration
81         print('   ', current_datetime, '---')
82         print('   adjustment_counter:', adjustment_counter)
83
84         return playlist
85
86     async def player_process(self, item, timeout=None):
87         if app_settings.DEBUG_WITH_SLEEPS:
88             if hasattr(item, 'is_stream') and item.is_stream():
89                 cmd = 'sleep 86400 # %s' % item.stream.url
90             else:
91                 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
92         else:
93             cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
94             if hasattr(item, 'is_stream') and item.is_stream():
95                 cmd.append(item.stream.url)
96             else:
97                 cmd.append(item.file_path())
98         print('cmd:', cmd)
99         if isinstance(cmd, str):
100             self.player = await asyncio.create_subprocess_shell(
101                     cmd,
102                     stdout=asyncio.subprocess.PIPE,
103                     stderr=asyncio.subprocess.PIPE)
104         else:
105             self.player = await asyncio.create_subprocess_exec(
106                     *cmd,
107                     stdout=asyncio.subprocess.PIPE,
108                     stderr=asyncio.subprocess.PIPE)
109         if timeout is None:
110             await self.player.communicate()
111         else:
112             try:
113                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
114             except asyncio.TimeoutError:
115                 pass
116         self.player = None
117
118     async def play(self, slot):
119         now = datetime.datetime.now()
120         if isinstance(slot, Nonstop):
121             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
122             self.playhead = 0
123             self.softstop = False
124             while True:
125                 now = datetime.datetime.now()
126                 try:
127                     track = self.playlist[self.playhead]
128                 except IndexError:
129                     break
130                 self.current_track_start_datetime = now
131                 print(now, track.title, track.duration,
132                         '- future tracks:', [x.title for x in self.playlist[self.playhead + 1:self.playhead + 3]])
133                 await self.player_process(track)
134                 if self.softstop:
135                     # track was left to finish, but now the playlist should stop.
136                     break
137                 self.playhead += 1
138         elif slot.is_stream():
139             print(now, 'playing stream', slot.stream)
140             if slot.jingle_id:
141                 await self.player_process(slot.jingle, timeout=60)
142             print('timeout at', (slot.end_datetime - now).total_seconds())
143             await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
144         else:
145             if hasattr(slot, 'episode'):
146                 print(now, 'playing sound', slot.episode)
147             else:
148                 print(now, 'playing random')
149             if slot.jingle_id:
150                 await self.player_process(slot.jingle, timeout=60)
151             await self.player_process(slot)
152
153     def recompute_playlist(self):
154         current_track = self.playlist[self.playhead]
155         print('recompute_playlist, from', current_track.title, self.current_track_start_datetime + current_track.duration, 'to', self.slot.end_datetime)
156         playlist = self.get_playlist(self.slot,
157                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
158         if playlist:
159             self.playlist[self.playhead + 1:] = playlist
160
161     def get_current_diffusion(self):
162         now = datetime.datetime.now()
163         diffusion = ScheduledDiffusion.objects.filter(
164                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
165                 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
166         occurence = RecurringStreamOccurence.objects.filter(
167                 datetime__gt=now - datetime.timedelta(days=1),
168                 datetime__lt=now).order_by('datetime').last()
169         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
170                 datetime__gt=now - datetime.timedelta(days=1),
171                 datetime__lt=now).order_by('datetime').last()
172         # note it shouldn't be possible to have both diffusion and occurences
173         # running at the moment.
174         if occurence and occurence.end_datetime > now:
175             return occurence
176         if diffusion and diffusion.end_datetime > now:
177             return diffusion
178         if directory_occurence and directory_occurence.end_datetime > now:
179             return directory_occurence
180         return None
181
182     def get_next_diffusion(self, before_datetime):
183         now = datetime.datetime.now()
184         diffusion = ScheduledDiffusion.objects.filter(
185                 diffusion__datetime__gt=now,
186                 diffusion__datetime__lt=before_datetime,
187                 ).order_by('diffusion__datetime').first()
188         occurence = RecurringStreamOccurence.objects.filter(
189                 datetime__gt=now,
190                 datetime__lt=before_datetime,
191                 ).order_by('datetime').first()
192         directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
193                 datetime__gt=now,
194                 datetime__lt=before_datetime,
195                 ).order_by('datetime').first()
196         if diffusion and occurence:
197             return diffusion if diffusion.diffusion__datetime < occurence.datetime else occurence
198         if diffusion:
199             return diffusion
200         if occurence:
201             return occurence
202         if directory_occurence:
203             return directory_occurence
204         return None
205
206     def recompute_slots(self):
207         now = datetime.datetime.now()
208         # print(now, 'recompute_slots')
209         diffusion = self.get_current_diffusion()
210         if diffusion:
211             self.slot = diffusion
212         else:
213             nonstops = list(Nonstop.objects.all().order_by('start'))
214             nonstops = [x for x in nonstops if x.start != x.end]  # disabled zones
215             try:
216                 self.slot = [x for x in nonstops if x.start < now.time()][-1]
217             except IndexError:
218                 self.slot = nonstops[0]
219             try:
220                 next_slot = nonstops[nonstops.index(self.slot) + 1]
221             except IndexError:
222                 next_slot = nonstops[0]
223             self.slot.datetime = now.replace(
224                     hour=self.slot.start.hour,
225                     minute=self.slot.start.minute)
226             self.slot.end_datetime = now.replace(
227                     hour=next_slot.start.hour,
228                     minute=next_slot.start.minute,
229                     second=0,
230                     microsecond=0)
231             if self.slot.end_datetime < self.slot.datetime:
232                 self.slot.end_datetime += datetime.timedelta(days=1)
233
234             diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
235             if diffusion:
236                 self.slot.end_datetime = diffusion.datetime
237
238     async def recompute_slots_loop(self):
239         now = datetime.datetime.now()
240         print(now, 'recompute_slots_loop')
241         sleep = (60 - now.second) % 10  # adjust to awake at :00
242         while not self.quit:
243             await asyncio.sleep(sleep)
244             sleep = 10  # next cycles every 10 seconds
245             current_slot = self.slot
246             self.recompute_slots()
247             expected_slot = self.slot
248             if current_slot != expected_slot:
249                 print(now, 'unexpected change', current_slot, 'vs', expected_slot)
250                 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
251                     # ask for a softstop, i.e. finish the track then switch.
252                     self.softstop = True
253                 else:
254                     # interrupt nonstop
255                     print('interrupting nonstop')
256                     self.play_task.cancel()
257             elif current_slot.end_datetime > expected_slot.end_datetime:
258                 print('change in end time, from %s to %s' %
259                         (current_slot.end_datetime, expected_slot.end_datetime))
260                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
261                     # more than 5 minutes left, recompute playlist
262                     self.recompute_playlist()
263
264     async def handle_connection(self, reader, writer):
265         data = await reader.read(100)
266         message = data.decode().strip()
267         response = 'err'
268         if message == 'playing?':
269             response = '%s' % self.slot
270         writer.write(response.encode('utf-8'))
271         await writer.drain()
272         writer.close()
273
274     def sigterm_handler(self):
275         print('got signal')
276         self.quit = True
277         self.play_task.cancel()
278
279     async def main(self):
280         loop = asyncio.get_running_loop()
281         loop.add_signal_handler(
282                 signal.SIGTERM,
283                 self.sigterm_handler)
284         now = datetime.datetime.now()
285         self.recompute_slots()
286         server = await asyncio.start_server(self.handle_connection, '127.0.0.1', 8888)
287         async with server:
288             asyncio.create_task(server.serve_forever())
289
290             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
291             while not self.quit:
292                 duration = (self.slot.end_datetime - now).seconds
293                 print('next sure slot', duration, self.slot.end_datetime)
294                 if duration < 2:
295                     # next slot is very close, wait for it
296                     await asyncio.sleep(duration)
297                     self.recompute_slots()
298                 self.play_task = asyncio.create_task(self.play(self.slot))
299                 try:
300                     await self.play_task
301                     self.recompute_slots()
302                 except asyncio.CancelledError as exc:
303                     print('exc:', exc)
304                     if self.player and self.player.returncode is None:  # not finished
305                         self.player.kill()
306                 except KeyboardInterrupt:
307                     self.quit = True
308                     break