13 from django.conf import settings
14 from django.core.management.base import BaseCommand
15 from emissions.models import Nonstop
17 from nonstop.app_settings import app_settings
18 from nonstop.models import (
21 RecurringRandomDirectoryOccurence,
22 RecurringStreamOccurence,
27 from nonstop.utils import Tracklist
29 logger = logging.getLogger('stamina')
32 class Command(BaseCommand):
33 requires_system_checks = False
35 last_jingle_datetime = None
38 def handle(self, verbosity, **kwargs):
40 latest_alert_timestamp = 0
41 latest_exception_timestamp = 0
43 def exception_alert_thresholds():
46 while duration < 3600:
56 asyncio.run(self.main(), debug=settings.DEBUG)
57 except KeyboardInterrupt:
59 except Exception as e:
60 timestamp = time.time()
61 if (timestamp - latest_exception_timestamp) > 300:
62 # if latest exception was a "long" time ago, assume
63 # things went smooth for a while and reset things
65 latest_alert_timestamp = 0
66 latest_exception_timestamp = 0
69 for i, threshold in enumerate(exception_alert_thresholds()):
71 alert_threshold = threshold
74 if (timestamp - latest_alert_timestamp) > alert_threshold:
75 logger.exception('General exception (alert index: %s)', alert_index)
76 latest_alert_timestamp = timestamp
79 if alert_index and isinstance(e, django.db.InterfaceError):
80 # most likely "connection already closed", because postgresql
81 # is been restarted; log then get out to be restarted.
82 logger.error('Aborting on repeated database error')
85 time.sleep(2) # retry after a bit
86 latest_exception_timestamp = timestamp
90 def get_playlist(self, zone, start_datetime, end_datetime):
91 current_datetime = start_datetime() if callable(start_datetime) else start_datetime
92 if self.last_jingle_datetime is None:
93 self.last_jingle_datetime = current_datetime
94 # Define a max duration (1 hour), if it is reached, and far enough
95 # from end_datetime (30 minutes), return the playlist as is, not aligned
96 # on end time, so a new playlist gets computed once it's over.
97 # This avoids misalignments due to track durations not matching exactly
98 # or additional delays caused by the player program.
99 max_duration = datetime.timedelta(hours=1)
100 max_duration_leftover = datetime.timedelta(minutes=30)
102 adjustment_counter = 0
104 zone_settings = zone.nonstopzonesettings_set.first()
105 jingles = list(zone_settings.jingles.all())
106 except AttributeError:
107 zone_settings = NonstopZoneSettings()
110 all_clocked_jingles = Jingle.objects.exclude(clock_time__isnull=True)
113 zone_ids.extend([x.id for x in zone_settings.extra_zones.all()])
114 extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
116 zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
120 for x in SomaLogLine.objects.exclude(on_air=False).filter(
122 play_timestamp__gt=datetime.datetime.now()
123 - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY),
127 tracklist = Tracklist(zone_settings, zone_ids, recent_tracks_id)
128 random_tracks_iterator = tracklist.get_random_tracks()
129 t0 = datetime.datetime.now()
130 allow_overflow = False
131 if callable(start_datetime):
132 # compute start_datetime (e.g. now()) at the last moment, to get
133 # computed playlist timestamps as close as possible as future real
135 start_datetime = start_datetime()
136 while current_datetime < end_datetime:
137 if (current_datetime - start_datetime) > max_duration and (
138 (end_datetime - current_datetime) > max_duration_leftover
142 if zone_settings.intro_jingle and (current_datetime.hour, current_datetime.minute) == (
146 tracklist.playlist.append(zone_settings.intro_jingle)
147 self.last_jingle_datetime = current_datetime
148 current_datetime = start_datetime + tracklist.get_duration()
149 elif jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
150 # jingle time, every ~20 minutes
151 # maybe there's a dedicated jingle for this time of day?
152 current_minute = current_datetime.time().replace(second=0, microsecond=0)
154 (current_datetime + datetime.timedelta(minutes=1)).time().replace(second=0, microsecond=0)
158 for x in all_clocked_jingles
159 if x.clock_time >= current_minute and x.clock_time < next_minute
162 clocked_jingle = random.choice(clocked_jingles)
163 clocked_jingle.label = '⏰ %s' % clocked_jingle.label
164 tracklist.playlist.append(clocked_jingle)
166 tracklist.playlist.append(random.choice(jingles))
167 self.last_jingle_datetime = current_datetime
168 current_datetime = start_datetime + tracklist.get_duration()
169 remaining_time = end_datetime - current_datetime
171 track = next(random_tracks_iterator)
172 tracklist.append(track)
173 current_datetime = start_datetime + tracklist.get_duration()
174 if current_datetime > end_datetime and not allow_overflow:
175 # last track overshot
176 # 1st strategy: remove last track and try to get a track with
177 # exact remaining time
178 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
182 tracklist.get_random_tracks(
185 'duration__gte': remaining_time,
186 'duration__lt': remaining_time + datetime.timedelta(seconds=1),
190 except StopIteration: # nothing
194 tracklist.append(track)
196 # fallback strategy: didn't find track of expected duration,
197 # reduce playlist further
198 adjustment_counter += 1
199 if tracklist.pop() is None or adjustment_counter > 5:
200 # a dedicated sound that ended a bit too early,
201 # or too many failures to get an appropriate file,
202 # allow whatever comes.
203 allow_overflow = True
204 logger.debug('Allowing overflows')
206 current_datetime = start_datetime + tracklist.get_duration()
209 'Computed playlist for "%s" (computation time: %ss)', zone, (datetime.datetime.now() - t0)
211 current_datetime = start_datetime
212 for track in tracklist.playlist:
213 logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
214 current_datetime += track.duration
215 logger.debug('- end: %s', current_datetime)
216 return tracklist.playlist
218 def is_nonstop_on_air(self):
219 # check if nonstop system is currently on air
220 if app_settings.ON_AIR_SWITCH_URL is None:
222 switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
223 if not switch_response.ok:
226 status = switch_response.json()
229 if status.get('active') == 0:
231 elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
232 # TODO: replace this hardware check that no longer works by a
233 # logical check on programs: if there's nothing scheduled at the
234 # moment, consider nonstop is broadcasted, even if studio1 is on.
236 elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
240 async def record_nonstop_line(self, track, now):
241 log_line = SomaLogLine()
242 log_line.play_timestamp = now
243 log_line.track = track
244 log_line.filepath = track.nonstopfile_set.first()
245 log_line.on_air = self.is_nonstop_on_air()
248 async def player_process(self, item, timeout=None):
249 if app_settings.PLAYER_IPC_PATH:
250 return await self.player_process_ipc(item, timeout=timeout)
251 cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
252 if hasattr(item, 'is_stream') and item.is_stream():
253 if urllib.parse.urlparse(item.stream.url).scheme == 'mumble':
254 cmd = [app_settings.MUMBLE_PLAYER_COMMAND]
255 cmd.append(item.stream.url)
256 logger.info('Play stream: %s', item.stream.url)
258 cmd.append(item.file_path())
259 logger.info('Play file: %s', item.file_path())
260 logger.debug('cmd %r', cmd)
261 self.player = await asyncio.create_subprocess_exec(
262 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
265 await self.player.communicate()
268 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
269 except asyncio.TimeoutError:
273 async def player_process_ipc(self, item, timeout=None):
277 reader, writer = await asyncio.open_unix_connection(app_settings.PLAYER_IPC_PATH)
279 except (FileNotFoundError, ConnectionRefusedError):
281 cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
282 cmd += ['--input-ipc-server=%s' % app_settings.PLAYER_IPC_PATH, '--idle']
283 self.player = await asyncio.create_subprocess_exec(
284 *cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL
287 await asyncio.sleep(0.1)
289 if hasattr(item, 'is_stream') and item.is_stream():
290 file_path = item.stream.url
291 logger.info('Play stream: %s', item.stream.url)
293 file_path = item.file_path()
294 logger.info('Play file: %s', item.file_path())
296 writer.write(json.dumps({'command': ['loadfile', file_path]}).encode() + b'\n')
299 except ConnectionResetError: # connection lost
302 await asyncio.wait_for(self.player_ipc_idle(reader, writer), timeout=timeout)
303 except asyncio.TimeoutError:
306 await writer.wait_closed()
308 async def player_ipc_idle(self, reader, writer):
310 data = await reader.readline()
313 if json.loads(data) == {'event': 'idle'}:
316 async def play(self, slot):
317 now = datetime.datetime.now()
318 if isinstance(slot, Nonstop):
319 self.playlist = self.get_playlist(slot, datetime.datetime.now, slot.end_datetime)
321 self.softstop = False
323 now = datetime.datetime.now()
325 track = self.playlist[self.playhead]
328 self.current_track_start_datetime = now
329 if isinstance(track, Jingle):
330 logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
332 logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
334 if isinstance(track, Track): # not jingles
335 record_task = asyncio.create_task(
336 self.record_nonstop_line(track, datetime.datetime.now())
338 await self.player_process(track)
342 # track was left to finish, but now the playlist should stop.
345 elif slot.is_stream():
346 logger.info('Stream: %s', slot.stream)
348 await self.player_process(slot.jingle, timeout=60)
349 logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
350 short_interruption_counter = 0
353 player_start_time = datetime.datetime.now()
354 await self.player_process(
355 slot, timeout=(slot.end_datetime - player_start_time).total_seconds()
357 now = datetime.datetime.now()
358 if (slot.end_datetime - now).total_seconds() < 2:
361 # stream got interrupted
362 if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
363 # and was up for less than 15 seconds.
365 # never up before, probably not even started
366 if isinstance(slot, RecurringStreamOccurence):
367 # no mercy for recurring stream, remove occurence
368 logger.info('Missing stream for %s, removing', slot)
370 elif slot.auto_delayed is True:
371 # was already delayed and is still not up, remove.
372 logger.info('Still missing stream for %s, removing', slot)
375 # push back start datetime for 5 minutes, and get
376 # back to nonstop music in the meantime
377 logger.info('Pushing starting time of %s', slot.diffusion.episode)
378 slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(
381 slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
382 if slot.diffusion.episode.duration <= 5:
383 slot.diffusion.episode.duration = 0
384 slot.auto_delayed = True
385 slot.diffusion.save()
386 slot.diffusion.episode.save()
389 short_interruption_counter += 1
391 await asyncio.sleep(short_interruption_counter)
393 # mark stream as ok at least one, and reset short
394 # interruption counter
396 short_interruption_counter = 0
397 logger.debug('Stream error for %s', slot)
399 if short_interruption_counter > 5:
400 # many short interruptions
401 logger.info('Too many stream errors for %s, removing', slot)
405 if hasattr(slot, 'episode'):
406 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
408 logger.info('Random: %s', slot)
410 await self.player_process(slot.jingle, timeout=60)
411 await self.player_process(slot)
413 def recompute_playlist(self):
414 current_track = self.playlist[self.playhead]
416 'Recomputing playlist at %s, from %s to %s',
418 self.current_track_start_datetime + current_track.duration,
419 self.slot.end_datetime,
421 playlist = self.get_playlist(
422 self.slot, self.current_track_start_datetime + current_track.duration, self.slot.end_datetime
425 self.playlist[self.playhead + 1 :] = playlist
427 def get_current_diffusion(self):
428 now = datetime.datetime.now()
430 ScheduledDiffusion.objects.filter(
431 diffusion__datetime__gt=now - datetime.timedelta(days=1), diffusion__datetime__lt=now
433 .order_by('diffusion__datetime')
437 RecurringStreamOccurence.objects.filter(
438 datetime__gt=now - datetime.timedelta(days=1),
440 diffusion__is_active=True,
442 .order_by('datetime')
445 directory_occurence = (
446 RecurringRandomDirectoryOccurence.objects.filter(
447 datetime__gt=now - datetime.timedelta(days=1), datetime__lt=now
449 .order_by('datetime')
453 # factor in some tolerance for diffusions a bit shorter than known, so
454 # they are not replayed from the start if they finished too early.
455 tolerance = datetime.timedelta(seconds=60)
456 if diffusion and hasattr(diffusion, 'is_stream') and diffusion.is_stream():
457 # unless it's a stream
458 tolerance = datetime.timedelta(seconds=0)
460 # note it shouldn't be possible to have both diffusion and occurences
461 # running at the moment.
462 if occurence and occurence.end_datetime > now:
464 if diffusion and (diffusion.end_datetime - tolerance) > now:
466 if directory_occurence and directory_occurence.end_datetime > now:
467 return directory_occurence
470 def get_next_diffusion(self, before_datetime):
471 now = datetime.datetime.now()
473 ScheduledDiffusion.objects.filter(
474 diffusion__datetime__gt=now,
475 diffusion__datetime__lt=before_datetime,
477 .order_by('diffusion__datetime')
481 RecurringStreamOccurence.objects.filter(
483 datetime__lt=before_datetime,
484 diffusion__is_active=True,
486 .order_by('datetime')
489 directory_occurence = (
490 RecurringRandomDirectoryOccurence.objects.filter(
492 datetime__lt=before_datetime,
494 .order_by('datetime')
497 if diffusion and occurence:
498 return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
503 if directory_occurence:
504 return directory_occurence
507 def recompute_slots(self):
508 now = datetime.datetime.now()
509 diffusion = self.get_current_diffusion()
511 self.slot = diffusion
513 nonstops = list(Nonstop.objects.all().order_by('start'))
514 nonstops = [x for x in nonstops if x.start != x.end] # disabled zones
516 self.slot = [x for x in nonstops if x.start < now.time()][-1]
518 # no slots starting at midnight, and time is midnight, get latest zone,
519 # as it will span midnight.
520 self.slot = nonstops[-1]
522 next_slot = nonstops[nonstops.index(self.slot) + 1]
524 next_slot = nonstops[0]
525 self.slot.datetime = now.replace(hour=self.slot.start.hour, minute=self.slot.start.minute)
526 self.slot.end_datetime = now.replace(
527 hour=next_slot.start.hour, minute=next_slot.start.minute, second=0, microsecond=0
529 if self.slot.end_datetime < self.slot.datetime:
530 self.slot.end_datetime += datetime.timedelta(days=1)
532 diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
534 self.slot.end_datetime = diffusion.datetime
536 async def recompute_slots_loop(self):
537 now = datetime.datetime.now()
538 sleep = (60 - now.second) % 10 # adjust to awake at :00
541 await asyncio.sleep(sleep)
542 sleep = 10 # next cycles every 10 seconds
543 current_slot = self.slot
544 self.recompute_slots()
545 expected_slot = self.slot
546 if current_slot != expected_slot:
547 now = datetime.datetime.now()
548 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
549 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
550 # ask for a softstop, i.e. finish the track then switch.
552 elif isinstance(current_slot, Nonstop):
554 logger.info('Interrupting nonstop')
555 self.play_task.cancel()
556 elif current_slot.is_stream():
557 # it should have been stopped by timeout set on player but
558 # maybe the episode duration has been shortened after its
560 logger.info('Interrupting stream')
561 self.play_task.cancel()
562 elif current_slot.end_datetime > expected_slot.end_datetime:
563 now = datetime.datetime.now()
565 'Change in end time, from %s to %s', current_slot.end_datetime, expected_slot.end_datetime
567 if isinstance(current_slot, Nonstop) and (
568 expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5)
570 # more than 5 minutes left, recompute playlist
571 self.recompute_playlist()
575 # realign clock every ten cycles
576 now = datetime.datetime.now()
577 # adjust to awake at :00
578 sleep = ((60 - now.second) % 10) or 10
581 async def handle_connection(self, reader, writer):
582 writer.write(b'Watusi!\n')
583 writer.write(b'Known commands: status, softquit, hardquit\n')
584 writer.write(b'(dot on empty line to stop connection)\n')
588 data = await reader.read(100)
590 message = data.decode().strip()
591 except UnicodeDecodeError:
592 logger.debug('Server, invalid message %r', message)
596 logger.debug('Server, message %r', message)
597 if message == 'status':
598 response = {'slot': str(self.slot)}
599 if isinstance(self.slot, Nonstop):
601 track = self.playlist[self.playhead]
605 response['track'] = {}
606 response['track']['start_datetime'] = self.current_track_start_datetime.strftime(
609 response['track']['title'] = track.title
610 response['track']['artist'] = track.artist.name if track.artist_id else ''
611 response['track']['duration'] = track.duration.total_seconds()
612 response['track']['elapsed'] = (
613 datetime.datetime.now() - self.current_track_start_datetime
615 response['track']['remaining'] = (
616 track.duration - datetime.timedelta(seconds=response['track']['elapsed'])
618 next_diffusion = self.get_next_diffusion(
619 before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5)
622 response['next_diffusion'] = {
623 'label': str(next_diffusion),
624 'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
626 if isinstance(next_diffusion, ScheduledDiffusion):
627 response['next_diffusion'][
629 ] = next_diffusion.diffusion.episode.emission.title
630 response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
633 response = {'ack': True}
634 elif message == 'softquit':
637 response = {'ack': True}
638 elif message == 'hardquit':
641 response = {'ack': True}
642 if self.player and self.player.returncode is None: # not finished
645 response = {'err': 1, 'msg': 'unknown command: %r' % message}
646 writer.write(json.dumps(response).encode('utf-8') + b'\n')
649 except ConnectionResetError:
653 def sigterm_handler(self):
654 logger.info('Got SIGTERM')
656 self.play_task.cancel()
658 async def main(self):
660 loop = asyncio.get_running_loop()
661 loop.add_signal_handler(signal.SIGTERM, self.sigterm_handler)
662 self.recompute_slots()
663 server = await asyncio.start_server(
664 self.handle_connection, app_settings.SERVER_BIND_IFACE, app_settings.SERVER_BIND_PORT
667 asyncio.create_task(server.serve_forever())
669 self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
671 now = datetime.datetime.now()
672 duration = (self.slot.end_datetime - now).seconds
673 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
675 # next slot is very close, wait for it
676 await asyncio.sleep(duration)
677 self.recompute_slots()
678 self.play_task = asyncio.create_task(self.play(self.slot))
681 self.recompute_slots()
682 except asyncio.CancelledError:
683 logger.debug('Player cancelled exception')
684 if self.player and self.player.returncode is None: # not finished
686 except KeyboardInterrupt: