import asyncio
import datetime
+import json
+import logging
import random
+import signal
+import sys
+import time
+import requests
+
+from django.conf import settings
from django.core.management.base import BaseCommand
from emissions.models import Nonstop
-from nonstop.models import Track, ScheduledDiffusion
+from nonstop.models import Track, Jingle, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
+from nonstop.app_settings import app_settings
+
+
+logger = logging.getLogger('stamina')
class Command(BaseCommand):
+ requires_system_checks = False
+
last_jingle_datetime = None
quit = False
def handle(self, verbosity, **kwargs):
- try:
- asyncio.run(self.main(), debug=True)
- except KeyboardInterrupt:
- if not self.play_task.done():
- self.play_task.cancel()
- if not self.recompute_slots_task.done():
- self.recompute_slots_task.done()
+ alert_index = 0
+ latest_alert_timestamp = 0
+ latest_exception_timestamp = 0
+
+ def exception_alert_thresholds():
+ yield 0
+ duration = 3
+ while duration < 3600:
+ yield duration
+ duration *= 5
+ duration = 3600
+ while True:
+ yield duration
+ duration += 3600
+
+ while True:
+ try:
+ asyncio.run(self.main(), debug=settings.DEBUG)
+ except KeyboardInterrupt:
+ break
+ except Exception:
+ timestamp = time.time()
+ if (timestamp - latest_exception_timestamp) > 300:
+ # if latest exception was a "long" time ago, assume
+ # things went smooth for a while and reset things
+ alert_index = 0
+ latest_alert_timestamp = 0
+ latest_exception_timestamp = 0
+
+ alert_threshold = 0
+ for i, threshold in enumerate(exception_alert_thresholds()):
+ if i == alert_index:
+ alert_threshold = threshold
+ break
+
+ if (timestamp - latest_alert_timestamp) > alert_threshold:
+ logger.exception('General exception (alert index: %s)', alert_index)
+ latest_alert_timestamp = timestamp
+ alert_index += 1
+
+ time.sleep(2) # retry after a bit
+ latest_exception_timestamp = timestamp
+ continue
+ break
def get_playlist(self, zone, start_datetime, end_datetime):
current_datetime = start_datetime
if self.last_jingle_datetime is None:
self.last_jingle_datetime = current_datetime
+ # Define a max duration (1 hour), if it is reached, and far enough
+ # from end_datetime (30 minutes), return the playlist as is, not aligned
+ # on end time, so a new playlist gets computed once it's over.
+ # This avoids misalignments due to track durations not matching exactly
+ # or additional delays caused by the player program.
+ max_duration = datetime.timedelta(hours=1)
+ max_duration_leftover = datetime.timedelta(minutes=30)
playlist = []
adjustment_counter = 0
try:
except AttributeError:
jingles = []
- while current_datetime < end_datetime and adjustment_counter < 5:
+ recent_tracks_id = [x.track_id for x in
+ SomaLogLine.objects.exclude(on_air=False).filter(
+ track__isnull=False,
+ play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
+ t0 = datetime.datetime.now()
+ allow_overflow = False
+ while current_datetime < end_datetime:
+ if (current_datetime - start_datetime) > max_duration and (
+ (end_datetime - current_datetime) > max_duration_leftover):
+ break
if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
# jingle time, every ~20 minutes
playlist.append(random.choice(jingles))
self.last_jingle_datetime = current_datetime
+ current_datetime = start_datetime + sum(
+ [x.duration for x in playlist], datetime.timedelta(seconds=0))
+ zone_ids = [zone.id]
+ extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
+ if extra_zones:
+ zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
remaining_time = (end_datetime - current_datetime)
track = Track.objects.filter(
- nonstop_zones=zone,
+ nonstop_zones__in=zone_ids,
duration__isnull=False).exclude(
- id__in=[x.id for x in playlist if isinstance(x, Track)]
+ id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
).order_by('?').first()
+ if track is None:
+ # no track, reduce recent tracks exclusion
+ recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
+ continue
playlist.append(track)
current_datetime = start_datetime + sum(
[x.duration for x in playlist], datetime.timedelta(seconds=0))
- if current_datetime > end_datetime:
+ if current_datetime > end_datetime and not allow_overflow:
# last track overshot
# 1st strategy: remove last track and try to get a track with
# exact remaining time
+ logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
playlist = playlist[:-1]
- current_datetime = start_datetime + sum(
- [x.duration for x in playlist], datetime.timedelta(seconds=0))
track = Track.objects.filter(
nonstop_zones=zone,
duration__gte=remaining_time,
duration__lt=remaining_time + datetime.timedelta(seconds=1)
).exclude(
- id__in=[x.id for x in playlist if isinstance(x, Track)]
+ id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
).order_by('?').first()
if track:
# found a track
playlist.append(track)
- current_datetime = start_datetime + sum(
- [x.duration for x in playlist], datetime.timedelta(seconds=0))
else:
# fallback strategy: didn't find track of expected duration,
# reduce playlist further
adjustment_counter += 1
playlist = playlist[:-1]
- current_datetime = start_datetime + sum(
- [x.duration for x in playlist], datetime.timedelta(seconds=0))
+ if len(playlist) == 0 or adjustment_counter > 5:
+ # a dedicated sound that ended a bit too early,
+ # or too many failures to get an appropriate file,
+ # allow whatever comes.
+ allow_overflow = True
+ logger.debug('Allowing overflows')
- print('computed playlist:')
+ current_datetime = start_datetime + sum(
+ [x.duration for x in playlist], datetime.timedelta(seconds=0))
+
+ logger.info('Computed playlist for "%s" (computation time: %ss)',
+ zone, (datetime.datetime.now() - t0))
current_datetime = start_datetime
for track in playlist:
- print(' ', current_datetime, track.duration, track.title)
+ logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
current_datetime += track.duration
- print(' ', current_datetime, '---')
- print(' adjustment_counter:', adjustment_counter)
-
+ logger.debug('- end: %s', current_datetime)
return playlist
- async def player_process(self, cmd, timeout=None):
- self.player = await asyncio.create_subprocess_shell(
- cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE)
+ def is_nonstop_on_air(self):
+ # check if nonstop system is currently on air
+ if app_settings.ON_AIR_SWITCH_URL is None:
+ return None
+ switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
+ if not switch_response.ok:
+ return None
+ try:
+ status = switch_response.json()
+ except ValueError:
+ return None
+ if status.get('active') == 0:
+ return True
+ elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
+ return True
+ elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
+ return True
+ return False
+
+ async def record_nonstop_line(self, track, now):
+ log_line = SomaLogLine()
+ log_line.play_timestamp = now
+ log_line.track = track
+ log_line.filepath = track.nonstopfile_set.first()
+ log_line.on_air = self.is_nonstop_on_air()
+ log_line.save()
+
+ async def player_process(self, item, timeout=None):
+ cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
+ if hasattr(item, 'is_stream') and item.is_stream():
+ cmd.append(item.stream.url)
+ logger.info('Play stream: %s', item.stream.url)
+ else:
+ cmd.append(item.file_path())
+ logger.info('Play file: %s', item.file_path())
+ if app_settings.DEBUG_WITH_SLEEPS:
+ # replace command by a sleep call, for silent debugging
+ if hasattr(item, 'is_stream') and item.is_stream():
+ cmd = 'sleep 86400 # %s' % item.stream.url
+ elif isinstance(item.duration, datetime.timedelta):
+ cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
+ elif isinstance(item.duration, int):
+ cmd = 'sleep %s # %s' % (item.duration, item.file_path())
+ logger.debug('cmd %r', cmd)
+ if isinstance(cmd, str):
+ self.player = await asyncio.create_subprocess_shell(
+ cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE)
+ else:
+ self.player = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE)
if timeout is None:
await self.player.communicate()
else:
try:
await asyncio.wait_for(self.player.communicate(), timeout=timeout)
except asyncio.TimeoutError:
- pass
+ self.player.kill()
self.player = None
async def play(self, slot):
if isinstance(slot, Nonstop):
self.playlist = self.get_playlist(slot, now, slot.end_datetime)
self.playhead = 0
- while True:
+ self.softstop = False
+ while not self.quit:
now = datetime.datetime.now()
try:
track = self.playlist[self.playhead]
except IndexError:
break
self.current_track_start_datetime = now
- print(now, track.title, track.duration,
- '- future tracks:', [x.title for x in self.playlist[self.playhead + 1:self.playhead + 3]])
- cmd = 'sleep %s # %s' % (track.duration.seconds, track.title)
- await self.player_process(cmd)
+ if isinstance(track, Jingle):
+ logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
+ else:
+ logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
+ record_task = None
+ if isinstance(track, Track): # not jingles
+ record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
+ await self.player_process(track)
+ if record_task:
+ await record_task
+ if self.softstop:
+ # track was left to finish, but now the playlist should stop.
+ break
self.playhead += 1
elif slot.is_stream():
- print(now, 'playing stream', slot.stream)
- # TODO: jingle
- cmd = 'sleep 86400 # stream' # will never stop by itself
- await self.player_process(cmd, timeout=slot.duration)
+ logger.info('Stream: %s', slot.stream)
+ if slot.jingle_id:
+ await self.player_process(slot.jingle, timeout=60)
+ logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
+ short_interruption_counter = 0
+ has_played = False
+ while True:
+ player_start_time = datetime.datetime.now()
+ await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
+ now = datetime.datetime.now()
+ if (slot.end_datetime - now).total_seconds() < 2:
+ # it went well, stop
+ break
+ # stream got interrupted
+ if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
+ # and was up for less than 15 seconds.
+ if not has_played:
+ # never up before, probably not even started
+ if isinstance(slot, RecurringStreamOccurence):
+ # no mercy for recurring stream, remove occurence
+ logger.info('Missing stream for %s, removing', slot)
+ slot.delete()
+ elif slot.auto_delayed is True:
+ # was already delayed and is still not up, remove.
+ logger.info('Still missing stream for %s, removing', slot)
+ slot.delete()
+ else:
+ # push back start datetime for 5 minutes, and get
+ # back to nonstop music in the meantime
+ logger.info('Pushing starting time of %s', slot.diffusion.episode)
+ slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
+ slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
+ if slot.diffusion.episode.duration <= 5:
+ slot.diffusion.episode.duration = 0
+ slot.auto_delayed = True
+ slot.diffusion.save()
+ slot.diffusion.episode.save()
+ slot.save()
+ break
+ short_interruption_counter += 1
+ # wait a bit
+ await asyncio.sleep(short_interruption_counter)
+ else:
+ # mark stream as ok at least one, and reset short
+ # interruption counter
+ has_played = True
+ short_interruption_counter = 0
+ logger.debug('Stream error for %s', slot)
+
+ if short_interruption_counter > 5:
+ # many short interruptions
+ logger.info('Too many stream errors for %s, removing', slot)
+ slot.delete()
+ break
else:
- print(now, 'playing sound', slot.episode)
- # TODO: jingle
- cmd = 'sleep %s # %s' % (slot.soundfile.duration, slot.episode)
- await self.player_process(cmd)
+ if hasattr(slot, 'episode'):
+ logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
+ else:
+ logger.info('Random: %s', slot)
+ if slot.jingle_id:
+ await self.player_process(slot.jingle, timeout=60)
+ await self.player_process(slot)
def recompute_playlist(self):
current_track = self.playlist[self.playhead]
- print('recompute_playlist, from', current_track.title, self.current_track_start_datetime + current_track.duration, 'to', self.slot.end_datetime)
+ logger.debug('Recomputing playlist at %s, from %s to %s',
+ current_track.title,
+ self.current_track_start_datetime + current_track.duration,
+ self.slot.end_datetime)
playlist = self.get_playlist(self.slot,
self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
if playlist:
self.playlist[self.playhead + 1:] = playlist
- def recompute_slots(self):
+ def get_current_diffusion(self):
now = datetime.datetime.now()
- # print(now, 'recompute_slots')
diffusion = ScheduledDiffusion.objects.filter(
diffusion__datetime__gt=now - datetime.timedelta(days=1),
- diffusion__datetime__lt=now).last()
+ diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
+ occurence = RecurringStreamOccurence.objects.filter(
+ datetime__gt=now - datetime.timedelta(days=1),
+ datetime__lt=now).order_by('datetime').last()
+ directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
+ datetime__gt=now - datetime.timedelta(days=1),
+ datetime__lt=now).order_by('datetime').last()
+ # note it shouldn't be possible to have both diffusion and occurences
+ # running at the moment.
+ if occurence and occurence.end_datetime > now:
+ return occurence
if diffusion and diffusion.end_datetime > now:
+ return diffusion
+ if directory_occurence and directory_occurence.end_datetime > now:
+ return directory_occurence
+ return None
+
+ def get_next_diffusion(self, before_datetime):
+ now = datetime.datetime.now()
+ diffusion = ScheduledDiffusion.objects.filter(
+ diffusion__datetime__gt=now,
+ diffusion__datetime__lt=before_datetime,
+ ).order_by('diffusion__datetime').first()
+ occurence = RecurringStreamOccurence.objects.filter(
+ datetime__gt=now,
+ datetime__lt=before_datetime,
+ ).order_by('datetime').first()
+ directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
+ datetime__gt=now,
+ datetime__lt=before_datetime,
+ ).order_by('datetime').first()
+ if diffusion and occurence:
+ return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
+ if diffusion:
+ return diffusion
+ if occurence:
+ return occurence
+ if directory_occurence:
+ return directory_occurence
+ return None
+
+ def recompute_slots(self):
+ now = datetime.datetime.now()
+ diffusion = self.get_current_diffusion()
+ if diffusion:
self.slot = diffusion
else:
nonstops = list(Nonstop.objects.all().order_by('start'))
if self.slot.end_datetime < self.slot.datetime:
self.slot.end_datetime += datetime.timedelta(days=1)
- diffusion = ScheduledDiffusion.objects.filter(
- diffusion__datetime__gt=now,
- diffusion__datetime__lt=self.slot.end_datetime).first()
+ diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
if diffusion:
self.slot.end_datetime = diffusion.datetime
async def recompute_slots_loop(self):
- print('recompute_slots_loop')
now = datetime.datetime.now()
sleep = (60 - now.second) % 10 # adjust to awake at :00
while not self.quit:
self.recompute_slots()
expected_slot = self.slot
if current_slot != expected_slot:
- print('unexpected change', current_slot, 'vs', expected_slot)
- if isinstance(current_slot, Nonstop) and not isinstance(expected_slot, Nonstop):
+ now = datetime.datetime.now()
+ logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
+ if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
+ # ask for a softstop, i.e. finish the track then switch.
+ self.softstop = True
+ elif isinstance(current_slot, Nonstop):
# interrupt nonstop
- print('interrupting nonstop')
+ logger.info('Interrupting nonstop')
self.play_task.cancel()
elif current_slot.end_datetime > expected_slot.end_datetime:
- print('change in end time, from %s to %s' %
- (current_slot.end_datetime, expected_slot.end_datetime))
+ now = datetime.datetime.now()
+ logger.debug('Change in end time, from %s to %s',
+ current_slot.end_datetime,
+ expected_slot.end_datetime)
if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
# more than 5 minutes left, recompute playlist
self.recompute_playlist()
async def handle_connection(self, reader, writer):
- data = await reader.read(100)
- message = data.decode().strip()
- response = 'err'
- if message == 'playing?':
- response = '%s' % self.slot
- writer.write(response.encode('utf-8'))
+ writer.write(b'Watusi!\n')
+ writer.write(b'Known commands: status, softquit, hardquit\n')
+ writer.write(b'(dot on empty line to stop connection)\n')
await writer.drain()
+ end = False
+ while not end:
+ data = await reader.read(100)
+ try:
+ message = data.decode().strip()
+ except UnicodeDecodeError:
+ logger.debug('Server, invalid message %r', message)
+ if not data:
+ end = True
+ continue
+ logger.debug('Server, message %r', message)
+ if message == 'status':
+ response = {'slot': str(self.slot)}
+ if isinstance(self.slot, Nonstop):
+ try:
+ track = self.playlist[self.playhead]
+ except IndexError:
+ pass
+ else:
+ response['track'] = {}
+ response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
+ response['track']['title'] = track.title
+ response['track']['artist'] = track.artist.name if track.artist_id else ''
+ response['track']['duration'] = track.duration.total_seconds()
+ response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
+ response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
+ next_diffusion = self.get_next_diffusion(
+ before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
+ if next_diffusion:
+ response['next_diffusion'] = {
+ 'label': str(next_diffusion),
+ 'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
+ }
+ if isinstance(next_diffusion, ScheduledDiffusion):
+ response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
+ response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
+ elif message == '.':
+ end = True
+ response = {'ack': True}
+ elif message == 'softquit':
+ self.quit = True
+ end = True
+ response = {'ack': True}
+ elif message == 'hardquit':
+ self.quit = True
+ end = True
+ response = {'ack': True}
+ if self.player and self.player.returncode is None: # not finished
+ self.player.kill()
+ else:
+ response = {'err': 1, 'msg': 'unknown command: %r' % message}
+ writer.write(json.dumps(response).encode('utf-8') + b'\n')
+ try:
+ await writer.drain()
+ except ConnectionResetError:
+ break
writer.close()
+ def sigterm_handler(self):
+ logger.info('Got SIGTERM')
+ self.quit = True
+ self.play_task.cancel()
+
async def main(self):
- now = datetime.datetime.now()
+ loop = asyncio.get_running_loop()
+ loop.add_signal_handler(
+ signal.SIGTERM,
+ self.sigterm_handler)
self.recompute_slots()
- server = await asyncio.start_server(self.handle_connection, '127.0.0.1', 8888)
+ server = await asyncio.start_server(
+ self.handle_connection,
+ app_settings.SERVER_BIND_IFACE,
+ app_settings.SERVER_BIND_PORT)
async with server:
asyncio.create_task(server.serve_forever())
self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
- while True:
+ while not self.quit:
+ now = datetime.datetime.now()
duration = (self.slot.end_datetime - now).seconds
- print('next sure slot', duration, self.slot.end_datetime)
+ logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
if duration < 2:
# next slot is very close, wait for it
await asyncio.sleep(duration)
try:
await self.play_task
self.recompute_slots()
- except asyncio.CancelledError as exc:
- print('exc:', exc)
+ except asyncio.CancelledError:
+ logger.debug('Player cancelled exception')
if self.player and self.player.returncode is None: # not finished
self.player.kill()
except KeyboardInterrupt:
self.quit = True
break
- self.quit = True
- await self.recompute_slots_task