10 from django.core.management.base import BaseCommand
12 from emissions.models import Nonstop
13 from nonstop.models import Track, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
14 from nonstop.app_settings import app_settings
17 class Command(BaseCommand):
18 last_jingle_datetime = None
21 def handle(self, verbosity, **kwargs):
23 asyncio.run(self.main(), debug=True)
24 except KeyboardInterrupt:
27 def get_playlist(self, zone, start_datetime, end_datetime):
28 current_datetime = start_datetime
29 if self.last_jingle_datetime is None:
30 self.last_jingle_datetime = current_datetime
31 # Define a max duration (1 hour), if it is reached, and far enough
32 # from end_datetime (30 minutes), return the playlist as is, not aligned
33 # on end time, so a new playlist gets computed once it's over.
34 # This avoids misalignments due to track durations not matching exactly
35 # or additional delays caused by the player program.
36 max_duration = datetime.timedelta(hours=1)
37 max_duration_leftover = datetime.timedelta(minutes=30)
39 adjustment_counter = 0
41 jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
42 except AttributeError:
45 recent_tracks_id = [x.track_id for x in
46 SomaLogLine.objects.exclude(on_air=False).filter(
48 play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
49 t0 = datetime.datetime.now()
50 allow_overflow = False
51 while current_datetime < end_datetime:
52 if (current_datetime - start_datetime) > max_duration and (
53 (end_datetime - current_datetime) > max_duration_leftover):
56 if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
57 # jingle time, every ~20 minutes
58 playlist.append(random.choice(jingles))
59 self.last_jingle_datetime = current_datetime
60 current_datetime = start_datetime + sum(
61 [x.duration for x in playlist], datetime.timedelta(seconds=0))
63 remaining_time = (end_datetime - current_datetime)
64 track = Track.objects.filter(
66 duration__isnull=False).exclude(
67 id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
68 ).order_by('?').first()
70 # no track, reduce recent tracks exclusion
71 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
73 playlist.append(track)
74 current_datetime = start_datetime + sum(
75 [x.duration for x in playlist], datetime.timedelta(seconds=0))
76 if current_datetime > end_datetime and not allow_overflow:
78 # 1st strategy: remove last track and try to get a track with
79 # exact remaining time
80 print('overshoot', current_datetime, file=sys.stderr)
81 playlist = playlist[:-1]
82 track = Track.objects.filter(
84 duration__gte=remaining_time,
85 duration__lt=remaining_time + datetime.timedelta(seconds=1)
87 id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
88 ).order_by('?').first()
91 playlist.append(track)
93 # fallback strategy: didn't find track of expected duration,
94 # reduce playlist further
95 adjustment_counter += 1
96 playlist = playlist[:-1]
97 if len(playlist) == 0 or adjustment_counter > 5:
98 # a dedicated sound that ended a bit too early,
99 # or too many failures to get an appropriate file,
100 # allow whatever comes.
101 allow_overflow = True
102 print('allow overflow', file=sys.stderr)
104 current_datetime = start_datetime + sum(
105 [x.duration for x in playlist], datetime.timedelta(seconds=0))
107 print('computed playlist: (computation time: %ss)' % (datetime.datetime.now() - t0), file=sys.stderr)
108 current_datetime = start_datetime
109 for track in playlist:
110 print(' ', current_datetime, track.duration, track.title, file=sys.stderr)
111 current_datetime += track.duration
112 print(' ', current_datetime, '---', file=sys.stderr)
113 print(' adjustment_counter:', adjustment_counter, file=sys.stderr)
117 def is_nonstop_on_air(self):
118 # check if nonstop system is currently on air
119 if app_settings.ON_AIR_SWITCH_URL is None:
121 switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
122 if not switch_response.ok:
125 status = switch_response.json()
128 if status.get('active') == 0:
130 elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
132 elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
136 async def record_nonstop_line(self, track, now):
137 log_line = SomaLogLine()
138 log_line.play_timestamp = now
139 log_line.track = track
140 log_line.filepath = track.nonstopfile_set.first()
141 log_line.on_air = self.is_nonstop_on_air()
144 async def player_process(self, item, timeout=None):
145 if app_settings.DEBUG_WITH_SLEEPS:
146 if hasattr(item, 'is_stream') and item.is_stream():
147 cmd = 'sleep 86400 # %s' % item.stream.url
148 elif isinstance(item.duration, datetime.timedelta):
149 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
150 elif isinstance(item.duration, int):
151 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
153 cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
154 if hasattr(item, 'is_stream') and item.is_stream():
155 cmd.append(item.stream.url)
157 cmd.append(item.file_path())
158 print('cmd:', cmd, file=sys.stderr)
159 if isinstance(cmd, str):
160 self.player = await asyncio.create_subprocess_shell(
162 stdout=asyncio.subprocess.PIPE,
163 stderr=asyncio.subprocess.PIPE)
165 self.player = await asyncio.create_subprocess_exec(
167 stdout=asyncio.subprocess.PIPE,
168 stderr=asyncio.subprocess.PIPE)
170 await self.player.communicate()
173 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
174 except asyncio.TimeoutError:
178 async def play(self, slot):
179 now = datetime.datetime.now()
180 if isinstance(slot, Nonstop):
181 self.playlist = self.get_playlist(slot, now, slot.end_datetime)
183 self.softstop = False
185 now = datetime.datetime.now()
187 track = self.playlist[self.playhead]
190 self.current_track_start_datetime = now
191 print(now, track.title, track.duration, file=sys.stderr)
193 if isinstance(track, Track): # not jingles
194 record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
195 await self.player_process(track)
199 # track was left to finish, but now the playlist should stop.
202 elif slot.is_stream():
203 print(now, 'playing stream', slot.stream, file=sys.stderr)
205 await self.player_process(slot.jingle, timeout=60)
206 print('timeout at', (slot.end_datetime - now).total_seconds(), file=sys.stderr)
207 await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
209 if hasattr(slot, 'episode'):
210 print(now, 'playing sound', slot.episode, file=sys.stderr)
212 print(now, 'playing random', file=sys.stderr)
214 await self.player_process(slot.jingle, timeout=60)
215 await self.player_process(slot)
217 def recompute_playlist(self):
218 current_track = self.playlist[self.playhead]
219 print('recompute_playlist, from', current_track.title, self.current_track_start_datetime + current_track.duration, 'to', self.slot.end_datetime, file=sys.stderr)
220 playlist = self.get_playlist(self.slot,
221 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
223 self.playlist[self.playhead + 1:] = playlist
225 def get_current_diffusion(self):
226 now = datetime.datetime.now()
227 diffusion = ScheduledDiffusion.objects.filter(
228 diffusion__datetime__gt=now - datetime.timedelta(days=1),
229 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
230 occurence = RecurringStreamOccurence.objects.filter(
231 datetime__gt=now - datetime.timedelta(days=1),
232 datetime__lt=now).order_by('datetime').last()
233 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
234 datetime__gt=now - datetime.timedelta(days=1),
235 datetime__lt=now).order_by('datetime').last()
236 # note it shouldn't be possible to have both diffusion and occurences
237 # running at the moment.
238 if occurence and occurence.end_datetime > now:
240 if diffusion and diffusion.end_datetime > now:
242 if directory_occurence and directory_occurence.end_datetime > now:
243 return directory_occurence
246 def get_next_diffusion(self, before_datetime):
247 now = datetime.datetime.now()
248 diffusion = ScheduledDiffusion.objects.filter(
249 diffusion__datetime__gt=now,
250 diffusion__datetime__lt=before_datetime,
251 ).order_by('diffusion__datetime').first()
252 occurence = RecurringStreamOccurence.objects.filter(
254 datetime__lt=before_datetime,
255 ).order_by('datetime').first()
256 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
258 datetime__lt=before_datetime,
259 ).order_by('datetime').first()
260 if diffusion and occurence:
261 return diffusion if diffusion.diffusion__datetime < occurence.datetime else occurence
266 if directory_occurence:
267 return directory_occurence
270 def recompute_slots(self):
271 now = datetime.datetime.now()
272 # print(now, 'recompute_slots', file=sys.stderr)
273 diffusion = self.get_current_diffusion()
275 self.slot = diffusion
277 nonstops = list(Nonstop.objects.all().order_by('start'))
278 nonstops = [x for x in nonstops if x.start != x.end] # disabled zones
280 self.slot = [x for x in nonstops if x.start < now.time()][-1]
282 self.slot = nonstops[0]
284 next_slot = nonstops[nonstops.index(self.slot) + 1]
286 next_slot = nonstops[0]
287 self.slot.datetime = now.replace(
288 hour=self.slot.start.hour,
289 minute=self.slot.start.minute)
290 self.slot.end_datetime = now.replace(
291 hour=next_slot.start.hour,
292 minute=next_slot.start.minute,
295 if self.slot.end_datetime < self.slot.datetime:
296 self.slot.end_datetime += datetime.timedelta(days=1)
298 diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
300 self.slot.end_datetime = diffusion.datetime
302 async def recompute_slots_loop(self):
303 now = datetime.datetime.now()
304 print(now, 'recompute_slots_loop', file=sys.stderr)
305 sleep = (60 - now.second) % 10 # adjust to awake at :00
307 await asyncio.sleep(sleep)
308 sleep = 10 # next cycles every 10 seconds
309 current_slot = self.slot
310 self.recompute_slots()
311 expected_slot = self.slot
312 if current_slot != expected_slot:
313 now = datetime.datetime.now()
314 print(now, 'unexpected change', current_slot, 'vs', expected_slot, file=sys.stderr)
315 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
316 # ask for a softstop, i.e. finish the track then switch.
320 print('interrupting nonstop', file=sys.stderr)
321 self.play_task.cancel()
322 elif current_slot.end_datetime > expected_slot.end_datetime:
323 now = datetime.datetime.now()
324 print(now, 'change in end time, from %s to %s' %
325 (current_slot.end_datetime, expected_slot.end_datetime), file=sys.stderr)
326 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
327 # more than 5 minutes left, recompute playlist
328 self.recompute_playlist()
330 async def handle_connection(self, reader, writer):
331 writer.write(b'Watusi!\n')
332 writer.write(b'(dot on empty line to stop connection)\n')
335 data = await reader.read(100)
337 message = data.decode().strip()
338 except UnicodeDecodeError:
339 print('got invalid message %r' % message, file=sys.stderr)
341 print('got message: %r' % message, file=sys.stderr)
342 if message == 'status':
343 response = {'slot': str(self.slot)}
344 if isinstance(self.slot, Nonstop):
346 track = self.playlist[self.playhead]
350 response['track'] = {}
351 response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
352 response['track']['title'] = track.title
353 response['track']['artist'] = track.artist.name if track.artist_id else ''
354 response['track']['duration'] = track.duration.total_seconds()
355 response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
356 response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
357 next_diffusion = self.get_next_diffusion(
358 before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
360 response['next_diffusion'] = {
361 'label': str(next_diffusion),
362 'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
364 if isinstance(next_diffusion, ScheduledDiffusion):
365 response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
366 response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
369 response = {'ack': True}
370 elif message == 'softquit':
373 response = {'ack': True}
374 elif message == 'hardquit':
377 response = {'ack': True}
378 if self.player and self.player.returncode is None: # not finished
381 response = {'err': 1, 'msg': 'unknown command: %r' % message}
382 writer.write(json.dumps(response).encode('utf-8') + b'\n')
385 except ConnectionResetError:
389 def sigterm_handler(self):
390 print('got signal', file=sys.stderr)
392 self.play_task.cancel()
394 async def main(self):
395 loop = asyncio.get_running_loop()
396 loop.add_signal_handler(
398 self.sigterm_handler)
399 now = datetime.datetime.now()
400 self.recompute_slots()
401 server = await asyncio.start_server(
402 self.handle_connection,
403 app_settings.SERVER_BIND_IFACE,
404 app_settings.SERVER_BIND_PORT)
406 asyncio.create_task(server.serve_forever())
408 self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
410 duration = (self.slot.end_datetime - now).seconds
411 print('next sure slot', duration, self.slot.end_datetime, file=sys.stderr)
413 # next slot is very close, wait for it
414 await asyncio.sleep(duration)
415 self.recompute_slots()
416 self.play_task = asyncio.create_task(self.play(self.slot))
419 self.recompute_slots()
420 except asyncio.CancelledError as exc:
421 print('exc:', exc, file=sys.stderr)
422 if self.player and self.player.returncode is None: # not finished
424 except KeyboardInterrupt: