12 from django.conf import settings
13 from django.core.management.base import BaseCommand
15 from emissions.models import Nonstop
16 from nonstop.models import Track, Jingle, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
17 from nonstop.app_settings import app_settings
20 logger = logging.getLogger('stamina')
23 class Command(BaseCommand):
24 requires_system_checks = False
26 last_jingle_datetime = None
29 def handle(self, verbosity, **kwargs):
31 latest_alert_timestamp = 0
32 latest_exception_timestamp = 0
34 def exception_alert_thresholds():
37 while duration < 3600:
47 asyncio.run(self.main(), debug=settings.DEBUG)
48 except KeyboardInterrupt:
51 timestamp = time.time()
52 if (timestamp - latest_exception_timestamp) > 300:
53 # if latest exception was a "long" time ago, assume
54 # things went smooth for a while and reset things
56 latest_alert_timestamp = 0
57 latest_exception_timestamp = 0
60 for i, threshold in enumerate(exception_alert_thresholds()):
62 alert_threshold = threshold
65 if (timestamp - latest_alert_timestamp) > alert_threshold:
66 logger.exception('General exception (alert index: %s)', alert_index)
67 latest_alert_timestamp = timestamp
70 time.sleep(2) # retry after a bit
71 latest_exception_timestamp = timestamp
75 def get_playlist(self, zone, start_datetime, end_datetime):
76 current_datetime = start_datetime
77 if self.last_jingle_datetime is None:
78 self.last_jingle_datetime = current_datetime
79 # Define a max duration (1 hour), if it is reached, and far enough
80 # from end_datetime (30 minutes), return the playlist as is, not aligned
81 # on end time, so a new playlist gets computed once it's over.
82 # This avoids misalignments due to track durations not matching exactly
83 # or additional delays caused by the player program.
84 max_duration = datetime.timedelta(hours=1)
85 max_duration_leftover = datetime.timedelta(minutes=30)
87 adjustment_counter = 0
89 jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
90 except AttributeError:
93 recent_tracks_id = [x.track_id for x in
94 SomaLogLine.objects.exclude(on_air=False).filter(
96 play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
97 t0 = datetime.datetime.now()
98 allow_overflow = False
99 while current_datetime < end_datetime:
100 if (current_datetime - start_datetime) > max_duration and (
101 (end_datetime - current_datetime) > max_duration_leftover):
104 if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
105 # jingle time, every ~20 minutes
106 playlist.append(random.choice(jingles))
107 self.last_jingle_datetime = current_datetime
108 current_datetime = start_datetime + sum(
109 [x.duration for x in playlist], datetime.timedelta(seconds=0))
112 extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
114 zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
115 remaining_time = (end_datetime - current_datetime)
116 track = Track.objects.filter(
117 nonstop_zones__in=zone_ids,
118 duration__isnull=False).exclude(
119 id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
120 ).order_by('?').first()
122 # no track, reduce recent tracks exclusion
123 recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
125 playlist.append(track)
126 current_datetime = start_datetime + sum(
127 [x.duration for x in playlist], datetime.timedelta(seconds=0))
128 if current_datetime > end_datetime and not allow_overflow:
129 # last track overshot
130 # 1st strategy: remove last track and try to get a track with
131 # exact remaining time
132 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
133 playlist = playlist[:-1]
134 track = Track.objects.filter(
136 duration__gte=remaining_time,
137 duration__lt=remaining_time + datetime.timedelta(seconds=1)
139 id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
140 ).order_by('?').first()
143 playlist.append(track)
145 # fallback strategy: didn't find track of expected duration,
146 # reduce playlist further
147 adjustment_counter += 1
148 playlist = playlist[:-1]
149 if len(playlist) == 0 or adjustment_counter > 5:
150 # a dedicated sound that ended a bit too early,
151 # or too many failures to get an appropriate file,
152 # allow whatever comes.
153 allow_overflow = True
154 logger.debug('Allowing overflows')
156 current_datetime = start_datetime + sum(
157 [x.duration for x in playlist], datetime.timedelta(seconds=0))
159 logger.info('Computed playlist for "%s" (computation time: %ss)',
160 zone, (datetime.datetime.now() - t0))
161 current_datetime = start_datetime
162 for track in playlist:
163 logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
164 current_datetime += track.duration
165 logger.debug('- end: %s', current_datetime)
168 def is_nonstop_on_air(self):
169 # check if nonstop system is currently on air
170 if app_settings.ON_AIR_SWITCH_URL is None:
172 switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
173 if not switch_response.ok:
176 status = switch_response.json()
179 if status.get('active') == 0:
181 elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
183 elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
187 async def record_nonstop_line(self, track, now):
188 log_line = SomaLogLine()
189 log_line.play_timestamp = now
190 log_line.track = track
191 log_line.filepath = track.nonstopfile_set.first()
192 log_line.on_air = self.is_nonstop_on_air()
195 async def player_process(self, item, timeout=None):
196 cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
197 if hasattr(item, 'is_stream') and item.is_stream():
198 cmd.append(item.stream.url)
199 logger.info('Play stream: %s', item.stream.url)
201 cmd.append(item.file_path())
202 logger.info('Play file: %s', item.file_path())
203 if app_settings.DEBUG_WITH_SLEEPS:
204 # replace command by a sleep call, for silent debugging
205 if hasattr(item, 'is_stream') and item.is_stream():
206 cmd = 'sleep 86400 # %s' % item.stream.url
207 elif isinstance(item.duration, datetime.timedelta):
208 cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
209 elif isinstance(item.duration, int):
210 cmd = 'sleep %s # %s' % (item.duration, item.file_path())
211 logger.debug('cmd %r', cmd)
212 if isinstance(cmd, str):
213 self.player = await asyncio.create_subprocess_shell(
215 stdout=asyncio.subprocess.PIPE,
216 stderr=asyncio.subprocess.PIPE)
218 self.player = await asyncio.create_subprocess_exec(
220 stdout=asyncio.subprocess.PIPE,
221 stderr=asyncio.subprocess.PIPE)
223 await self.player.communicate()
226 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
227 except asyncio.TimeoutError:
231 async def play(self, slot):
232 now = datetime.datetime.now()
233 if isinstance(slot, Nonstop):
234 self.playlist = self.get_playlist(slot, now, slot.end_datetime)
236 self.softstop = False
238 now = datetime.datetime.now()
240 track = self.playlist[self.playhead]
243 self.current_track_start_datetime = now
244 if isinstance(track, Jingle):
245 logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
247 logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
249 if isinstance(track, Track): # not jingles
250 record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
251 await self.player_process(track)
255 # track was left to finish, but now the playlist should stop.
258 elif slot.is_stream():
259 logger.info('Stream: %s', slot.stream)
261 await self.player_process(slot.jingle, timeout=60)
262 logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
263 short_interruption_counter = 0
266 player_start_time = datetime.datetime.now()
267 await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
268 now = datetime.datetime.now()
269 if (slot.end_datetime - now).total_seconds() < 2:
272 # stream got interrupted
273 if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
274 # and was up for less than 15 seconds.
276 # never up before, probably not even started
277 if isinstance(slot, RecurringStreamOccurence):
278 # no mercy for recurring stream, remove occurence
279 logger.info('Missing stream for %s, removing', slot)
281 elif slot.auto_delayed is True:
282 # was already delayed and is still not up, remove.
283 logger.info('Still missing stream for %s, removing', slot)
286 # push back start datetime for 5 minutes, and get
287 # back to nonstop music in the meantime
288 logger.info('Pushing starting time of %s', slot.diffusion.episode)
289 slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
290 slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
291 if slot.diffusion.episode.duration <= 5:
292 slot.diffusion.episode.duration = 0
293 slot.auto_delayed = True
294 slot.diffusion.save()
295 slot.diffusion.episode.save()
298 short_interruption_counter += 1
300 await asyncio.sleep(short_interruption_counter)
302 # mark stream as ok at least one, and reset short
303 # interruption counter
305 short_interruption_counter = 0
306 logger.debug('Stream error for %s', slot)
308 if short_interruption_counter > 5:
309 # many short interruptions
310 logger.info('Too many stream errors for %s, removing', slot)
314 if hasattr(slot, 'episode'):
315 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
317 logger.info('Random: %s', slot)
319 await self.player_process(slot.jingle, timeout=60)
320 await self.player_process(slot)
322 def recompute_playlist(self):
323 current_track = self.playlist[self.playhead]
324 logger.debug('Recomputing playlist at %s, from %s to %s',
326 self.current_track_start_datetime + current_track.duration,
327 self.slot.end_datetime)
328 playlist = self.get_playlist(self.slot,
329 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
331 self.playlist[self.playhead + 1:] = playlist
333 def get_current_diffusion(self):
334 now = datetime.datetime.now()
335 diffusion = ScheduledDiffusion.objects.filter(
336 diffusion__datetime__gt=now - datetime.timedelta(days=1),
337 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
338 occurence = RecurringStreamOccurence.objects.filter(
339 datetime__gt=now - datetime.timedelta(days=1),
340 datetime__lt=now).order_by('datetime').last()
341 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
342 datetime__gt=now - datetime.timedelta(days=1),
343 datetime__lt=now).order_by('datetime').last()
344 # note it shouldn't be possible to have both diffusion and occurences
345 # running at the moment.
346 if occurence and occurence.end_datetime > now:
348 if diffusion and diffusion.end_datetime > now:
350 if directory_occurence and directory_occurence.end_datetime > now:
351 return directory_occurence
354 def get_next_diffusion(self, before_datetime):
355 now = datetime.datetime.now()
356 diffusion = ScheduledDiffusion.objects.filter(
357 diffusion__datetime__gt=now,
358 diffusion__datetime__lt=before_datetime,
359 ).order_by('diffusion__datetime').first()
360 occurence = RecurringStreamOccurence.objects.filter(
362 datetime__lt=before_datetime,
363 ).order_by('datetime').first()
364 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
366 datetime__lt=before_datetime,
367 ).order_by('datetime').first()
368 if diffusion and occurence:
369 return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
374 if directory_occurence:
375 return directory_occurence
378 def recompute_slots(self):
379 now = datetime.datetime.now()
380 diffusion = self.get_current_diffusion()
382 self.slot = diffusion
384 nonstops = list(Nonstop.objects.all().order_by('start'))
385 nonstops = [x for x in nonstops if x.start != x.end] # disabled zones
387 self.slot = [x for x in nonstops if x.start < now.time()][-1]
389 self.slot = nonstops[0]
391 next_slot = nonstops[nonstops.index(self.slot) + 1]
393 next_slot = nonstops[0]
394 self.slot.datetime = now.replace(
395 hour=self.slot.start.hour,
396 minute=self.slot.start.minute)
397 self.slot.end_datetime = now.replace(
398 hour=next_slot.start.hour,
399 minute=next_slot.start.minute,
402 if self.slot.end_datetime < self.slot.datetime:
403 self.slot.end_datetime += datetime.timedelta(days=1)
405 diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
407 self.slot.end_datetime = diffusion.datetime
409 async def recompute_slots_loop(self):
410 now = datetime.datetime.now()
411 sleep = (60 - now.second) % 10 # adjust to awake at :00
413 await asyncio.sleep(sleep)
414 sleep = 10 # next cycles every 10 seconds
415 current_slot = self.slot
416 self.recompute_slots()
417 expected_slot = self.slot
418 if current_slot != expected_slot:
419 now = datetime.datetime.now()
420 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
421 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
422 # ask for a softstop, i.e. finish the track then switch.
424 elif isinstance(current_slot, Nonstop):
426 logger.info('Interrupting nonstop')
427 self.play_task.cancel()
428 elif current_slot.end_datetime > expected_slot.end_datetime:
429 now = datetime.datetime.now()
430 logger.debug('Change in end time, from %s to %s',
431 current_slot.end_datetime,
432 expected_slot.end_datetime)
433 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
434 # more than 5 minutes left, recompute playlist
435 self.recompute_playlist()
437 async def handle_connection(self, reader, writer):
438 writer.write(b'Watusi!\n')
439 writer.write(b'Known commands: status, softquit, hardquit\n')
440 writer.write(b'(dot on empty line to stop connection)\n')
444 data = await reader.read(100)
446 message = data.decode().strip()
447 except UnicodeDecodeError:
448 logger.debug('Server, invalid message %r', message)
452 logger.debug('Server, message %r', message)
453 if message == 'status':
454 response = {'slot': str(self.slot)}
455 if isinstance(self.slot, Nonstop):
457 track = self.playlist[self.playhead]
461 response['track'] = {}
462 response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
463 response['track']['title'] = track.title
464 response['track']['artist'] = track.artist.name if track.artist_id else ''
465 response['track']['duration'] = track.duration.total_seconds()
466 response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
467 response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
468 next_diffusion = self.get_next_diffusion(
469 before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
471 response['next_diffusion'] = {
472 'label': str(next_diffusion),
473 'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
475 if isinstance(next_diffusion, ScheduledDiffusion):
476 response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
477 response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
480 response = {'ack': True}
481 elif message == 'softquit':
484 response = {'ack': True}
485 elif message == 'hardquit':
488 response = {'ack': True}
489 if self.player and self.player.returncode is None: # not finished
492 response = {'err': 1, 'msg': 'unknown command: %r' % message}
493 writer.write(json.dumps(response).encode('utf-8') + b'\n')
496 except ConnectionResetError:
500 def sigterm_handler(self):
501 logger.info('Got SIGTERM')
503 self.play_task.cancel()
505 async def main(self):
506 loop = asyncio.get_running_loop()
507 loop.add_signal_handler(
509 self.sigterm_handler)
510 self.recompute_slots()
511 server = await asyncio.start_server(
512 self.handle_connection,
513 app_settings.SERVER_BIND_IFACE,
514 app_settings.SERVER_BIND_PORT)
516 asyncio.create_task(server.serve_forever())
518 self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
520 now = datetime.datetime.now()
521 duration = (self.slot.end_datetime - now).seconds
522 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
524 # next slot is very close, wait for it
525 await asyncio.sleep(duration)
526 self.recompute_slots()
527 self.play_task = asyncio.create_task(self.play(self.slot))
530 self.recompute_slots()
531 except asyncio.CancelledError:
532 logger.debug('Player cancelled exception')
533 if self.player and self.player.returncode is None: # not finished
535 except KeyboardInterrupt: