11 from django.conf import settings
12 from django.core.management.base import BaseCommand
14 from emissions.models import Nonstop
15 from nonstop.models import Track, Jingle, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
16 from nonstop.app_settings import app_settings
19 logger = logging.getLogger('stamina')
22 class Command(BaseCommand):
23 requires_system_checks = False
25 last_jingle_datetime = None
28 def handle(self, verbosity, **kwargs):
30 asyncio.run(self.main(), debug=settings.DEBUG)
31 except KeyboardInterrupt:
34 def get_playlist(self, zone, start_datetime, end_datetime):
35 current_datetime = start_datetime
36 if self.last_jingle_datetime is None:
37 self.last_jingle_datetime = current_datetime
38 # Define a max duration (1 hour), if it is reached, and far enough
39 # from end_datetime (30 minutes), return the playlist as is, not aligned
40 # on end time, so a new playlist gets computed once it's over.
41 # This avoids misalignments due to track durations not matching exactly
42 # or additional delays caused by the player program.
43 max_duration = datetime.timedelta(hours=1)
44 max_duration_leftover = datetime.timedelta(minutes=30)
46 adjustment_counter = 0
48 jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
49 except AttributeError:
52 recent_tracks_id = [x.track_id for x in
53 SomaLogLine.objects.exclude(on_air=False).filter(
55 play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
56 t0 = datetime.datetime.now()
57 allow_overflow = False
58 while current_datetime < end_datetime:
59 if (current_datetime - start_datetime) > max_duration and (
60 (end_datetime - current_datetime) > max_duration_leftover):
63 if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
64 # jingle time, every ~20 minutes
65 playlist.append(random.choice(jingles))
66 self.last_jingle_datetime = current_datetime
67 current_datetime = start_datetime + sum(
68 [x.duration for x in playlist], datetime.timedelta(seconds=0))
70 remaining_time = (end_datetime - current_datetime)
71 track = Track.objects.filter(
73 duration__isnull=False).exclude(
74 id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
75 ).order_by('?').first()
77 # no track, reduce recent tracks exclusion
78 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
80 playlist.append(track)
81 current_datetime = start_datetime + sum(
82 [x.duration for x in playlist], datetime.timedelta(seconds=0))
83 if current_datetime > end_datetime and not allow_overflow:
85 # 1st strategy: remove last track and try to get a track with
86 # exact remaining time
87 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
88 playlist = playlist[:-1]
89 track = Track.objects.filter(
91 duration__gte=remaining_time,
92 duration__lt=remaining_time + datetime.timedelta(seconds=1)
94 id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
95 ).order_by('?').first()
98 playlist.append(track)
100 # fallback strategy: didn't find track of expected duration,
101 # reduce playlist further
102 adjustment_counter += 1
103 playlist = playlist[:-1]
104 if len(playlist) == 0 or adjustment_counter > 5:
105 # a dedicated sound that ended a bit too early,
106 # or too many failures to get an appropriate file,
107 # allow whatever comes.
108 allow_overflow = True
109 logger.debug('Allowing overflows')
111 current_datetime = start_datetime + sum(
112 [x.duration for x in playlist], datetime.timedelta(seconds=0))
114 logger.info('Computed playlist: (computation time: %ss)',
115 (datetime.datetime.now() - t0))
116 current_datetime = start_datetime
117 for track in playlist:
118 logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
119 current_datetime += track.duration
120 logger.debug('- end: %s', current_datetime)
123 def is_nonstop_on_air(self):
124 # check if nonstop system is currently on air
125 if app_settings.ON_AIR_SWITCH_URL is None:
127 switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
128 if not switch_response.ok:
131 status = switch_response.json()
134 if status.get('active') == 0:
136 elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
138 elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
142 async def record_nonstop_line(self, track, now):
143 log_line = SomaLogLine()
144 log_line.play_timestamp = now
145 log_line.track = track
146 log_line.filepath = track.nonstopfile_set.first()
147 log_line.on_air = self.is_nonstop_on_air()
150 async def player_process(self, item, timeout=None):
151 cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
152 if hasattr(item, 'is_stream') and item.is_stream():
153 cmd.append(item.stream.url)
154 logger.info('Play stream: %s', item.stream.url)
156 cmd.append(item.file_path())
157 logger.info('Play file: %s', item.file_path())
158 if app_settings.DEBUG_WITH_SLEEPS:
159 # replace command by a sleep call, for silent debugging
160 if hasattr(item, 'is_stream') and item.is_stream():
161 cmd = 'sleep 86400 # %s' % item.stream.url
162 elif isinstance(item.duration, datetime.timedelta):
163 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
164 elif isinstance(item.duration, int):
165 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
166 logger.debug('cmd %r', cmd)
167 if isinstance(cmd, str):
168 self.player = await asyncio.create_subprocess_shell(
170 stdout=asyncio.subprocess.PIPE,
171 stderr=asyncio.subprocess.PIPE)
173 self.player = await asyncio.create_subprocess_exec(
175 stdout=asyncio.subprocess.PIPE,
176 stderr=asyncio.subprocess.PIPE)
178 await self.player.communicate()
181 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
182 except asyncio.TimeoutError:
186 async def play(self, slot):
187 now = datetime.datetime.now()
188 if isinstance(slot, Nonstop):
189 self.playlist = self.get_playlist(slot, now, slot.end_datetime)
191 self.softstop = False
193 now = datetime.datetime.now()
195 track = self.playlist[self.playhead]
198 self.current_track_start_datetime = now
199 if isinstance(track, Jingle):
200 logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
202 logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
204 if isinstance(track, Track): # not jingles
205 record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
206 await self.player_process(track)
210 # track was left to finish, but now the playlist should stop.
213 elif slot.is_stream():
214 logger.info('Stream: %s', slot.stream)
216 await self.player_process(slot.jingle, timeout=60)
217 logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
218 await self.player_process(slot, timeout=(slot.end_datetime - now).total_seconds())
220 if hasattr(slot, 'episode'):
221 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
223 logger.info('Random: %s', slot)
225 await self.player_process(slot.jingle, timeout=60)
226 await self.player_process(slot)
228 def recompute_playlist(self):
229 current_track = self.playlist[self.playhead]
230 logger.debug('Recomputing playlist at %s, from %s to %s',
232 self.current_track_start_datetime + current_track.duration,
233 self.slot.end_datetime)
234 playlist = self.get_playlist(self.slot,
235 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
237 self.playlist[self.playhead + 1:] = playlist
239 def get_current_diffusion(self):
240 now = datetime.datetime.now()
241 diffusion = ScheduledDiffusion.objects.filter(
242 diffusion__datetime__gt=now - datetime.timedelta(days=1),
243 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
244 occurence = RecurringStreamOccurence.objects.filter(
245 datetime__gt=now - datetime.timedelta(days=1),
246 datetime__lt=now).order_by('datetime').last()
247 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
248 datetime__gt=now - datetime.timedelta(days=1),
249 datetime__lt=now).order_by('datetime').last()
250 # note it shouldn't be possible to have both diffusion and occurences
251 # running at the moment.
252 if occurence and occurence.end_datetime > now:
254 if diffusion and diffusion.end_datetime > now:
256 if directory_occurence and directory_occurence.end_datetime > now:
257 return directory_occurence
260 def get_next_diffusion(self, before_datetime):
261 now = datetime.datetime.now()
262 diffusion = ScheduledDiffusion.objects.filter(
263 diffusion__datetime__gt=now,
264 diffusion__datetime__lt=before_datetime,
265 ).order_by('diffusion__datetime').first()
266 occurence = RecurringStreamOccurence.objects.filter(
268 datetime__lt=before_datetime,
269 ).order_by('datetime').first()
270 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
272 datetime__lt=before_datetime,
273 ).order_by('datetime').first()
274 if diffusion and occurence:
275 return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
280 if directory_occurence:
281 return directory_occurence
284 def recompute_slots(self):
285 now = datetime.datetime.now()
286 diffusion = self.get_current_diffusion()
288 self.slot = diffusion
290 nonstops = list(Nonstop.objects.all().order_by('start'))
291 nonstops = [x for x in nonstops if x.start != x.end] # disabled zones
293 self.slot = [x for x in nonstops if x.start < now.time()][-1]
295 self.slot = nonstops[0]
297 next_slot = nonstops[nonstops.index(self.slot) + 1]
299 next_slot = nonstops[0]
300 self.slot.datetime = now.replace(
301 hour=self.slot.start.hour,
302 minute=self.slot.start.minute)
303 self.slot.end_datetime = now.replace(
304 hour=next_slot.start.hour,
305 minute=next_slot.start.minute,
308 if self.slot.end_datetime < self.slot.datetime:
309 self.slot.end_datetime += datetime.timedelta(days=1)
311 diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
313 self.slot.end_datetime = diffusion.datetime
315 async def recompute_slots_loop(self):
316 now = datetime.datetime.now()
317 sleep = (60 - now.second) % 10 # adjust to awake at :00
319 await asyncio.sleep(sleep)
320 sleep = 10 # next cycles every 10 seconds
321 current_slot = self.slot
322 self.recompute_slots()
323 expected_slot = self.slot
324 if current_slot != expected_slot:
325 now = datetime.datetime.now()
326 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
327 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
328 # ask for a softstop, i.e. finish the track then switch.
330 elif isinstance(current_slot, Nonstop):
332 logger.info('Interrupting nonstop')
333 self.play_task.cancel()
334 elif current_slot.end_datetime > expected_slot.end_datetime:
335 now = datetime.datetime.now()
336 logger.debug('Change in end time, from %s to %s',
337 current_slot.end_datetime,
338 expected_slot.end_datetime)
339 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
340 # more than 5 minutes left, recompute playlist
341 self.recompute_playlist()
343 async def handle_connection(self, reader, writer):
344 writer.write(b'Watusi!\n')
345 writer.write(b'Known commands: status, softquit, hardquit\n')
346 writer.write(b'(dot on empty line to stop connection)\n')
350 data = await reader.read(100)
352 message = data.decode().strip()
353 except UnicodeDecodeError:
354 logger.debug('Server, invalid message %r', message)
358 logger.debug('Server, message %r', message)
359 if message == 'status':
360 response = {'slot': str(self.slot)}
361 if isinstance(self.slot, Nonstop):
363 track = self.playlist[self.playhead]
367 response['track'] = {}
368 response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
369 response['track']['title'] = track.title
370 response['track']['artist'] = track.artist.name if track.artist_id else ''
371 response['track']['duration'] = track.duration.total_seconds()
372 response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
373 response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
374 next_diffusion = self.get_next_diffusion(
375 before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
377 response['next_diffusion'] = {
378 'label': str(next_diffusion),
379 'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
381 if isinstance(next_diffusion, ScheduledDiffusion):
382 response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
383 response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
386 response = {'ack': True}
387 elif message == 'softquit':
390 response = {'ack': True}
391 elif message == 'hardquit':
394 response = {'ack': True}
395 if self.player and self.player.returncode is None: # not finished
398 response = {'err': 1, 'msg': 'unknown command: %r' % message}
399 writer.write(json.dumps(response).encode('utf-8') + b'\n')
402 except ConnectionResetError:
406 def sigterm_handler(self):
407 logger.info('Got SIGTERM')
409 self.play_task.cancel()
411 async def main(self):
412 loop = asyncio.get_running_loop()
413 loop.add_signal_handler(
415 self.sigterm_handler)
416 self.recompute_slots()
417 server = await asyncio.start_server(
418 self.handle_connection,
419 app_settings.SERVER_BIND_IFACE,
420 app_settings.SERVER_BIND_PORT)
422 asyncio.create_task(server.serve_forever())
424 self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
426 now = datetime.datetime.now()
427 duration = (self.slot.end_datetime - now).seconds
428 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
430 # next slot is very close, wait for it
431 await asyncio.sleep(duration)
432 self.recompute_slots()
433 self.play_task = asyncio.create_task(self.play(self.slot))
436 self.recompute_slots()
437 except asyncio.CancelledError:
438 logger.debug('Player cancelled exception')
439 if self.player and self.player.returncode is None: # not finished
441 except KeyboardInterrupt: