9 from django.core.management.base import BaseCommand
11 from emissions.models import Nonstop
12 from nonstop.models import Track, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
13 from nonstop.app_settings import app_settings
16 class Command(BaseCommand):
17 last_jingle_datetime = None
20 def handle(self, verbosity, **kwargs):
22 asyncio.run(self.main(), debug=True)
23 except KeyboardInterrupt:
26 def get_playlist(self, zone, start_datetime, end_datetime):
27 current_datetime = start_datetime
28 if self.last_jingle_datetime is None:
29 self.last_jingle_datetime = current_datetime
30 # Define a max duration (1 hour), if it is reached, and far enough
31 # from end_datetime (30 minutes), return the playlist as is, not aligned
32 # on end time, so a new playlist gets computed once it's over.
33 # This avoids misalignments due to track durations not matching exactly
34 # or additional delays caused by the player program.
35 max_duration = datetime.timedelta(hours=1)
36 max_duration_leftover = datetime.timedelta(minutes=30)
38 adjustment_counter = 0
40 jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
41 except AttributeError:
44 recent_tracks_id = [x.track_id for x in
45 SomaLogLine.objects.exclude(on_air=False).filter(
47 play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
48 t0 = datetime.datetime.now()
49 while current_datetime < end_datetime and adjustment_counter < 5:
50 if (current_datetime - start_datetime) > max_duration and (
51 (end_datetime - current_datetime) > max_duration_leftover):
54 if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
55 # jingle time, every ~20 minutes
56 playlist.append(random.choice(jingles))
57 self.last_jingle_datetime = current_datetime
58 current_datetime = start_datetime + sum(
59 [x.duration for x in playlist], datetime.timedelta(seconds=0))
61 remaining_time = (end_datetime - current_datetime)
62 track = Track.objects.filter(
64 duration__isnull=False).exclude(
65 id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
66 ).order_by('?').first()
68 # no track, reduce recent tracks exclusion
69 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
71 playlist.append(track)
72 current_datetime = start_datetime + sum(
73 [x.duration for x in playlist], datetime.timedelta(seconds=0))
74 if current_datetime > end_datetime:
76 # 1st strategy: remove last track and try to get a track with
77 # exact remaining time
78 playlist = playlist[:-1]
79 track = Track.objects.filter(
81 duration__gte=remaining_time,
82 duration__lt=remaining_time + datetime.timedelta(seconds=1)
84 id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
85 ).order_by('?').first()
88 playlist.append(track)
90 # fallback strategy: didn't find track of expected duration,
91 # reduce playlist further
92 adjustment_counter += 1
93 playlist = playlist[:-1]
95 current_datetime = start_datetime + sum(
96 [x.duration for x in playlist], datetime.timedelta(seconds=0))
98 print('computed playlist: (computation time: %ss)' % (datetime.datetime.now() - t0))
99 current_datetime = start_datetime
100 for track in playlist:
101 print(' ', current_datetime, track.duration, track.title)
102 current_datetime += track.duration
103 print(' ', current_datetime, '---')
104 print(' adjustment_counter:', adjustment_counter)
108 def is_nonstop_on_air(self):
109 # check if nonstop system is currently on air
110 if app_settings.ON_AIR_SWITCH_URL is None:
112 switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
113 if not switch_response.ok:
116 status = switch_response.json()
119 if status.get('active') == 0:
121 elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
123 elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
127 async def record_nonstop_line(self, track, now):
128 log_line = SomaLogLine()
129 log_line.play_timestamp = now
130 log_line.track = track
131 log_line.filepath = track.nonstopfile_set.first()
132 log_line.on_air = self.is_nonstop_on_air()
135 async def player_process(self, item, timeout=None):
136 if app_settings.DEBUG_WITH_SLEEPS:
137 if hasattr(item, 'is_stream') and item.is_stream():
138 cmd = 'sleep 86400 # %s' % item.stream.url
139 elif isinstance(item.duration, datetime.timedelta):
140 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
141 elif isinstance(item.duration, int):
142 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
144 cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
145 if hasattr(item, 'is_stream') and item.is_stream():
146 cmd.append(item.stream.url)
148 cmd.append(item.file_path())
150 if isinstance(cmd, str):
151 self.player = await asyncio.create_subprocess_shell(
153 stdout=asyncio.subprocess.PIPE,
154 stderr=asyncio.subprocess.PIPE)
156 self.player = await asyncio.create_subprocess_exec(
158 stdout=asyncio.subprocess.PIPE,
159 stderr=asyncio.subprocess.PIPE)
161 await self.player.communicate()
164 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
165 except asyncio.TimeoutError:
169 async def play(self, slot):
170 now = datetime.datetime.now()
171 if isinstance(slot, Nonstop):
172 self.playlist = self.get_playlist(slot, now, slot.end_datetime)
174 self.softstop = False
176 now = datetime.datetime.now()
178 track = self.playlist[self.playhead]
181 self.current_track_start_datetime = now
182 print(now, track.title, track.duration)
184 if isinstance(track, Track): # not jingles
185 record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
186 await self.player_process(track)
190 # track was left to finish, but now the playlist should stop.
193 elif slot.is_stream():
194 print(now, 'playing stream', slot.stream)
196 await self.player_process(slot.jingle, timeout=60)
197 print('timeout at', (slot.end_datetime - now).total_seconds())
198 await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
200 if hasattr(slot, 'episode'):
201 print(now, 'playing sound', slot.episode)
203 print(now, 'playing random')
205 await self.player_process(slot.jingle, timeout=60)
206 await self.player_process(slot)
208 def recompute_playlist(self):
209 current_track = self.playlist[self.playhead]
210 print('recompute_playlist, from', current_track.title, self.current_track_start_datetime + current_track.duration, 'to', self.slot.end_datetime)
211 playlist = self.get_playlist(self.slot,
212 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
214 self.playlist[self.playhead + 1:] = playlist
216 def get_current_diffusion(self):
217 now = datetime.datetime.now()
218 diffusion = ScheduledDiffusion.objects.filter(
219 diffusion__datetime__gt=now - datetime.timedelta(days=1),
220 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
221 occurence = RecurringStreamOccurence.objects.filter(
222 datetime__gt=now - datetime.timedelta(days=1),
223 datetime__lt=now).order_by('datetime').last()
224 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
225 datetime__gt=now - datetime.timedelta(days=1),
226 datetime__lt=now).order_by('datetime').last()
227 # note it shouldn't be possible to have both diffusion and occurences
228 # running at the moment.
229 if occurence and occurence.end_datetime > now:
231 if diffusion and diffusion.end_datetime > now:
233 if directory_occurence and directory_occurence.end_datetime > now:
234 return directory_occurence
237 def get_next_diffusion(self, before_datetime):
238 now = datetime.datetime.now()
239 diffusion = ScheduledDiffusion.objects.filter(
240 diffusion__datetime__gt=now,
241 diffusion__datetime__lt=before_datetime,
242 ).order_by('diffusion__datetime').first()
243 occurence = RecurringStreamOccurence.objects.filter(
245 datetime__lt=before_datetime,
246 ).order_by('datetime').first()
247 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
249 datetime__lt=before_datetime,
250 ).order_by('datetime').first()
251 if diffusion and occurence:
252 return diffusion if diffusion.diffusion__datetime < occurence.datetime else occurence
257 if directory_occurence:
258 return directory_occurence
261 def recompute_slots(self):
262 now = datetime.datetime.now()
263 # print(now, 'recompute_slots')
264 diffusion = self.get_current_diffusion()
266 self.slot = diffusion
268 nonstops = list(Nonstop.objects.all().order_by('start'))
269 nonstops = [x for x in nonstops if x.start != x.end] # disabled zones
271 self.slot = [x for x in nonstops if x.start < now.time()][-1]
273 self.slot = nonstops[0]
275 next_slot = nonstops[nonstops.index(self.slot) + 1]
277 next_slot = nonstops[0]
278 self.slot.datetime = now.replace(
279 hour=self.slot.start.hour,
280 minute=self.slot.start.minute)
281 self.slot.end_datetime = now.replace(
282 hour=next_slot.start.hour,
283 minute=next_slot.start.minute,
286 if self.slot.end_datetime < self.slot.datetime:
287 self.slot.end_datetime += datetime.timedelta(days=1)
289 diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
291 self.slot.end_datetime = diffusion.datetime
293 async def recompute_slots_loop(self):
294 now = datetime.datetime.now()
295 print(now, 'recompute_slots_loop')
296 sleep = (60 - now.second) % 10 # adjust to awake at :00
298 await asyncio.sleep(sleep)
299 sleep = 10 # next cycles every 10 seconds
300 current_slot = self.slot
301 self.recompute_slots()
302 expected_slot = self.slot
303 if current_slot != expected_slot:
304 now = datetime.datetime.now()
305 print(now, 'unexpected change', current_slot, 'vs', expected_slot)
306 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
307 # ask for a softstop, i.e. finish the track then switch.
311 print('interrupting nonstop')
312 self.play_task.cancel()
313 elif current_slot.end_datetime > expected_slot.end_datetime:
314 now = datetime.datetime.now()
315 print(now, 'change in end time, from %s to %s' %
316 (current_slot.end_datetime, expected_slot.end_datetime))
317 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
318 # more than 5 minutes left, recompute playlist
319 self.recompute_playlist()
321 async def handle_connection(self, reader, writer):
322 writer.write(b'Watusi!\n')
323 writer.write(b'(dot on empty line to stop connection)\n')
326 data = await reader.read(100)
328 message = data.decode().strip()
329 except UnicodeDecodeError:
330 print('got invalid message %r' % message)
332 print('got message: %r' % message)
333 if message == 'status':
334 response = {'slot': str(self.slot)}
335 if isinstance(self.slot, Nonstop):
337 track = self.playlist[self.playhead]
341 response['track'] = {}
342 response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
343 response['track']['title'] = track.title
344 response['track']['artist'] = track.artist.name if track.artist_id else ''
345 response['track']['duration'] = track.duration.total_seconds()
346 response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
347 response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
348 next_diffusion = self.get_next_diffusion(
349 before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
351 response['next_diffusion'] = {
352 'label': str(next_diffusion),
353 'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
355 if isinstance(next_diffusion, ScheduledDiffusion):
356 response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
357 response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
360 response = {'ack': True}
361 elif message == 'softquit':
364 response = {'ack': True}
365 elif message == 'hardquit':
368 response = {'ack': True}
369 if self.player and self.player.returncode is None: # not finished
372 response = {'err': 1, 'msg': 'unknown command: %r' % message}
373 writer.write(json.dumps(response).encode('utf-8') + b'\n')
376 except ConnectionResetError:
380 def sigterm_handler(self):
383 self.play_task.cancel()
385 async def main(self):
386 loop = asyncio.get_running_loop()
387 loop.add_signal_handler(
389 self.sigterm_handler)
390 now = datetime.datetime.now()
391 self.recompute_slots()
392 server = await asyncio.start_server(
393 self.handle_connection,
394 app_settings.SERVER_BIND_IFACE,
395 app_settings.SERVER_BIND_PORT)
397 asyncio.create_task(server.serve_forever())
399 self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
401 duration = (self.slot.end_datetime - now).seconds
402 print('next sure slot', duration, self.slot.end_datetime)
404 # next slot is very close, wait for it
405 await asyncio.sleep(duration)
406 self.recompute_slots()
407 self.play_task = asyncio.create_task(self.play(self.slot))
410 self.recompute_slots()
411 except asyncio.CancelledError as exc:
413 if self.player and self.player.returncode is None: # not finished
415 except KeyboardInterrupt: