import signal
import sys
import time
+import urllib.parse
+import django.db
import requests
-
from django.conf import settings
from django.core.management.base import BaseCommand
-
from emissions.models import Nonstop
-from nonstop.models import Track, Jingle, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
-from nonstop.app_settings import app_settings
+from nonstop.app_settings import app_settings
+from nonstop.models import (
+ Jingle,
+ NonstopZoneSettings,
+ RecurringRandomDirectoryOccurence,
+ RecurringStreamOccurence,
+ ScheduledDiffusion,
+ SomaLogLine,
+ Track,
+)
+from nonstop.utils import Tracklist
logger = logging.getLogger('stamina')
asyncio.run(self.main(), debug=settings.DEBUG)
except KeyboardInterrupt:
break
- except Exception:
+ except Exception as e:
timestamp = time.time()
if (timestamp - latest_exception_timestamp) > 300:
# if latest exception was a "long" time ago, assume
latest_alert_timestamp = timestamp
alert_index += 1
+ if alert_index and isinstance(e, django.db.InterfaceError):
+ # most likely "connection already closed", because postgresql
+ # is been restarted; log then get out to be restarted.
+ logger.error('Aborting on repeated database error')
+ break
+
time.sleep(2) # retry after a bit
latest_exception_timestamp = timestamp
continue
break
def get_playlist(self, zone, start_datetime, end_datetime):
- current_datetime = start_datetime
+ current_datetime = start_datetime() if callable(start_datetime) else start_datetime
if self.last_jingle_datetime is None:
self.last_jingle_datetime = current_datetime
# Define a max duration (1 hour), if it is reached, and far enough
playlist = []
adjustment_counter = 0
try:
- jingles = list(zone.nonstopzonesettings_set.first().jingles.all())
+ zone_settings = zone.nonstopzonesettings_set.first()
+ jingles = list(zone_settings.jingles.all())
except AttributeError:
+ zone_settings = NonstopZoneSettings()
jingles = []
- recent_tracks_id = [x.track_id for x in
- SomaLogLine.objects.exclude(on_air=False).filter(
- track__isnull=False,
- play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
+ all_clocked_jingles = Jingle.objects.exclude(clock_time__isnull=True)
+
+ zone_ids = [zone.id]
+ zone_ids.extend([x.id for x in zone_settings.extra_zones.all()])
+ extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
+ if extra_zones:
+ zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
+
+ recent_tracks_id = [
+ x.track_id
+ for x in SomaLogLine.objects.exclude(on_air=False).filter(
+ track__isnull=False,
+ play_timestamp__gt=datetime.datetime.now()
+ - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY),
+ )
+ ]
+
+ tracklist = Tracklist(zone_settings, zone_ids, recent_tracks_id)
+ random_tracks_iterator = tracklist.get_random_tracks()
t0 = datetime.datetime.now()
allow_overflow = False
+ if callable(start_datetime):
+ # compute start_datetime (e.g. now()) at the last moment, to get
+ # computed playlist timestamps as close as possible as future real
+ # ones.
+ start_datetime = start_datetime()
while current_datetime < end_datetime:
if (current_datetime - start_datetime) > max_duration and (
- (end_datetime - current_datetime) > max_duration_leftover):
+ (end_datetime - current_datetime) > max_duration_leftover
+ ):
break
- if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
+ if zone_settings.intro_jingle and (current_datetime.hour, current_datetime.minute) == (
+ zone.start.hour,
+ zone.start.minute,
+ ):
+ tracklist.playlist.append(zone_settings.intro_jingle)
+ self.last_jingle_datetime = current_datetime
+ current_datetime = start_datetime + tracklist.get_duration()
+ elif jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
# jingle time, every ~20 minutes
- playlist.append(random.choice(jingles))
+ # maybe there's a dedicated jingle for this time of day?
+ current_minute = current_datetime.time().replace(second=0, microsecond=0)
+ next_minute = (
+ (current_datetime + datetime.timedelta(minutes=1)).time().replace(second=0, microsecond=0)
+ )
+ clocked_jingles = [
+ x
+ for x in all_clocked_jingles
+ if x.clock_time >= current_minute and x.clock_time < next_minute
+ ]
+ if clocked_jingles:
+ clocked_jingle = random.choice(clocked_jingles)
+ clocked_jingle.label = '⏰ %s' % clocked_jingle.label
+ tracklist.playlist.append(clocked_jingle)
+ else:
+ tracklist.playlist.append(random.choice(jingles))
self.last_jingle_datetime = current_datetime
- current_datetime = start_datetime + sum(
- [x.duration for x in playlist], datetime.timedelta(seconds=0))
-
- remaining_time = (end_datetime - current_datetime)
- track = Track.objects.filter(
- nonstop_zones=zone,
- duration__isnull=False).exclude(
- id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
- ).order_by('?').first()
- if track is None:
- # no track, reduce recent tracks exclusion
- recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
- continue
- playlist.append(track)
- current_datetime = start_datetime + sum(
- [x.duration for x in playlist], datetime.timedelta(seconds=0))
+ current_datetime = start_datetime + tracklist.get_duration()
+ remaining_time = end_datetime - current_datetime
+
+ track = next(random_tracks_iterator)
+ tracklist.append(track)
+ current_datetime = start_datetime + tracklist.get_duration()
if current_datetime > end_datetime and not allow_overflow:
# last track overshot
# 1st strategy: remove last track and try to get a track with
# exact remaining time
logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
- playlist = playlist[:-1]
- track = Track.objects.filter(
- nonstop_zones=zone,
- duration__gte=remaining_time,
- duration__lt=remaining_time + datetime.timedelta(seconds=1)
- ).exclude(
- id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
- ).order_by('?').first()
+ tracklist.pop()
+ try:
+ track = next(
+ tracklist.get_random_tracks(
+ k=1,
+ extra_filters={
+ 'duration__gte': remaining_time,
+ 'duration__lt': remaining_time + datetime.timedelta(seconds=1),
+ },
+ )
+ )
+ except StopIteration: # nothing
+ track = None
if track:
# found a track
- playlist.append(track)
+ tracklist.append(track)
else:
# fallback strategy: didn't find track of expected duration,
# reduce playlist further
adjustment_counter += 1
- playlist = playlist[:-1]
- if len(playlist) == 0 or adjustment_counter > 5:
+ if tracklist.pop() is None or adjustment_counter > 5:
# a dedicated sound that ended a bit too early,
# or too many failures to get an appropriate file,
# allow whatever comes.
allow_overflow = True
logger.debug('Allowing overflows')
- current_datetime = start_datetime + sum(
- [x.duration for x in playlist], datetime.timedelta(seconds=0))
+ current_datetime = start_datetime + tracklist.get_duration()
- logger.info('Computed playlist for "%s" (computation time: %ss)',
- zone, (datetime.datetime.now() - t0))
+ logger.info(
+ 'Computed playlist for "%s" (computation time: %ss)', zone, (datetime.datetime.now() - t0)
+ )
current_datetime = start_datetime
- for track in playlist:
+ for track in tracklist.playlist:
logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
current_datetime += track.duration
logger.debug('- end: %s', current_datetime)
- return playlist
+ return tracklist.playlist
def is_nonstop_on_air(self):
# check if nonstop system is currently on air
if status.get('active') == 0:
return True
elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
+ # TODO: replace this hardware check that no longer works by a
+ # logical check on programs: if there's nothing scheduled at the
+ # moment, consider nonstop is broadcasted, even if studio1 is on.
return True
elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
return True
log_line.save()
async def player_process(self, item, timeout=None):
+ if app_settings.PLAYER_IPC_PATH:
+ return await self.player_process_ipc(item, timeout=timeout)
cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
if hasattr(item, 'is_stream') and item.is_stream():
+ if urllib.parse.urlparse(item.stream.url).scheme == 'mumble':
+ cmd = [app_settings.MUMBLE_PLAYER_COMMAND]
cmd.append(item.stream.url)
logger.info('Play stream: %s', item.stream.url)
else:
cmd.append(item.file_path())
logger.info('Play file: %s', item.file_path())
- if app_settings.DEBUG_WITH_SLEEPS:
- # replace command by a sleep call, for silent debugging
- if hasattr(item, 'is_stream') and item.is_stream():
- cmd = 'sleep 86400 # %s' % item.stream.url
- elif isinstance(item.duration, datetime.timedelta):
- cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
- elif isinstance(item.duration, int):
- cmd = 'sleep %s # %s' % (item.duration, item.file_path())
logger.debug('cmd %r', cmd)
- if isinstance(cmd, str):
- self.player = await asyncio.create_subprocess_shell(
- cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE)
- else:
- self.player = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE)
+ self.player = await asyncio.create_subprocess_exec(
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
+ )
if timeout is None:
await self.player.communicate()
else:
self.player.kill()
self.player = None
+ async def player_process_ipc(self, item, timeout=None):
+ starting = False
+ while True:
+ try:
+ reader, writer = await asyncio.open_unix_connection(app_settings.PLAYER_IPC_PATH)
+ break
+ except (FileNotFoundError, ConnectionRefusedError):
+ if not starting:
+ cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
+ cmd += ['--input-ipc-server=%s' % app_settings.PLAYER_IPC_PATH, '--idle']
+ self.player = await asyncio.create_subprocess_exec(
+ *cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL
+ )
+ starting = True
+ await asyncio.sleep(0.1)
+
+ if hasattr(item, 'is_stream') and item.is_stream():
+ file_path = item.stream.url
+ logger.info('Play stream: %s', item.stream.url)
+ else:
+ file_path = item.file_path()
+ logger.info('Play file: %s', item.file_path())
+
+ writer.write(json.dumps({'command': ['loadfile', file_path]}).encode() + b'\n')
+ try:
+ await writer.drain()
+ except ConnectionResetError: # connection lost
+ return
+ try:
+ await asyncio.wait_for(self.player_ipc_idle(reader, writer), timeout=timeout)
+ except asyncio.TimeoutError:
+ pass
+ writer.close()
+ await writer.wait_closed()
+
+ async def player_ipc_idle(self, reader, writer):
+ while True:
+ data = await reader.readline()
+ if not data:
+ break
+ if json.loads(data) == {'event': 'idle'}:
+ break
+
async def play(self, slot):
now = datetime.datetime.now()
if isinstance(slot, Nonstop):
- self.playlist = self.get_playlist(slot, now, slot.end_datetime)
+ self.playlist = self.get_playlist(slot, datetime.datetime.now, slot.end_datetime)
self.playhead = 0
self.softstop = False
while not self.quit:
logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
record_task = None
if isinstance(track, Track): # not jingles
- record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
+ record_task = asyncio.create_task(
+ self.record_nonstop_line(track, datetime.datetime.now())
+ )
await self.player_process(track)
if record_task:
await record_task
has_played = False
while True:
player_start_time = datetime.datetime.now()
- await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
+ await self.player_process(
+ slot, timeout=(slot.end_datetime - player_start_time).total_seconds()
+ )
now = datetime.datetime.now()
if (slot.end_datetime - now).total_seconds() < 2:
# it went well, stop
# push back start datetime for 5 minutes, and get
# back to nonstop music in the meantime
logger.info('Pushing starting time of %s', slot.diffusion.episode)
- slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
+ slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(
+ seconds=300
+ )
slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
if slot.diffusion.episode.duration <= 5:
slot.diffusion.episode.duration = 0
def recompute_playlist(self):
current_track = self.playlist[self.playhead]
- logger.debug('Recomputing playlist at %s, from %s to %s',
- current_track.title,
- self.current_track_start_datetime + current_track.duration,
- self.slot.end_datetime)
- playlist = self.get_playlist(self.slot,
- self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
+ logger.debug(
+ 'Recomputing playlist at %s, from %s to %s',
+ current_track.title,
+ self.current_track_start_datetime + current_track.duration,
+ self.slot.end_datetime,
+ )
+ playlist = self.get_playlist(
+ self.slot, self.current_track_start_datetime + current_track.duration, self.slot.end_datetime
+ )
if playlist:
- self.playlist[self.playhead + 1:] = playlist
+ self.playlist[self.playhead + 1 :] = playlist
def get_current_diffusion(self):
now = datetime.datetime.now()
- diffusion = ScheduledDiffusion.objects.filter(
- diffusion__datetime__gt=now - datetime.timedelta(days=1),
- diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
- occurence = RecurringStreamOccurence.objects.filter(
+ diffusion = (
+ ScheduledDiffusion.objects.filter(
+ diffusion__datetime__gt=now - datetime.timedelta(days=1), diffusion__datetime__lt=now
+ )
+ .order_by('diffusion__datetime')
+ .last()
+ )
+ occurence = (
+ RecurringStreamOccurence.objects.filter(
datetime__gt=now - datetime.timedelta(days=1),
- datetime__lt=now).order_by('datetime').last()
- directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
- datetime__gt=now - datetime.timedelta(days=1),
- datetime__lt=now).order_by('datetime').last()
+ datetime__lt=now,
+ diffusion__is_active=True,
+ )
+ .order_by('datetime')
+ .last()
+ )
+ directory_occurence = (
+ RecurringRandomDirectoryOccurence.objects.filter(
+ datetime__gt=now - datetime.timedelta(days=1), datetime__lt=now
+ )
+ .order_by('datetime')
+ .last()
+ )
+
+ # factor in some tolerance for diffusions a bit shorter than known, so
+ # they are not replayed from the start if they finished too early.
+ tolerance = datetime.timedelta(seconds=60)
+ if diffusion and hasattr(diffusion, 'is_stream') and diffusion.is_stream():
+ # unless it's a stream
+ tolerance = datetime.timedelta(seconds=0)
+
# note it shouldn't be possible to have both diffusion and occurences
# running at the moment.
if occurence and occurence.end_datetime > now:
return occurence
- if diffusion and diffusion.end_datetime > now:
+ if diffusion and (diffusion.end_datetime - tolerance) > now:
return diffusion
if directory_occurence and directory_occurence.end_datetime > now:
return directory_occurence
def get_next_diffusion(self, before_datetime):
now = datetime.datetime.now()
- diffusion = ScheduledDiffusion.objects.filter(
+ diffusion = (
+ ScheduledDiffusion.objects.filter(
diffusion__datetime__gt=now,
diffusion__datetime__lt=before_datetime,
- ).order_by('diffusion__datetime').first()
- occurence = RecurringStreamOccurence.objects.filter(
+ )
+ .order_by('diffusion__datetime')
+ .first()
+ )
+ occurence = (
+ RecurringStreamOccurence.objects.filter(
datetime__gt=now,
datetime__lt=before_datetime,
- ).order_by('datetime').first()
- directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
+ diffusion__is_active=True,
+ )
+ .order_by('datetime')
+ .first()
+ )
+ directory_occurence = (
+ RecurringRandomDirectoryOccurence.objects.filter(
datetime__gt=now,
datetime__lt=before_datetime,
- ).order_by('datetime').first()
+ )
+ .order_by('datetime')
+ .first()
+ )
if diffusion and occurence:
return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
if diffusion:
try:
self.slot = [x for x in nonstops if x.start < now.time()][-1]
except IndexError:
- self.slot = nonstops[0]
+ # no slots starting at midnight, and time is midnight, get latest zone,
+ # as it will span midnight.
+ self.slot = nonstops[-1]
try:
next_slot = nonstops[nonstops.index(self.slot) + 1]
except IndexError:
next_slot = nonstops[0]
- self.slot.datetime = now.replace(
- hour=self.slot.start.hour,
- minute=self.slot.start.minute)
+ self.slot.datetime = now.replace(hour=self.slot.start.hour, minute=self.slot.start.minute)
self.slot.end_datetime = now.replace(
- hour=next_slot.start.hour,
- minute=next_slot.start.minute,
- second=0,
- microsecond=0)
+ hour=next_slot.start.hour, minute=next_slot.start.minute, second=0, microsecond=0
+ )
if self.slot.end_datetime < self.slot.datetime:
self.slot.end_datetime += datetime.timedelta(days=1)
async def recompute_slots_loop(self):
now = datetime.datetime.now()
sleep = (60 - now.second) % 10 # adjust to awake at :00
+ i = 0
while not self.quit:
await asyncio.sleep(sleep)
sleep = 10 # next cycles every 10 seconds
# interrupt nonstop
logger.info('Interrupting nonstop')
self.play_task.cancel()
+ elif current_slot.is_stream():
+ # it should have been stopped by timeout set on player but
+ # maybe the episode duration has been shortened after its
+ # start.
+ logger.info('Interrupting stream')
+ self.play_task.cancel()
elif current_slot.end_datetime > expected_slot.end_datetime:
now = datetime.datetime.now()
- logger.debug('Change in end time, from %s to %s',
- current_slot.end_datetime,
- expected_slot.end_datetime)
- if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
+ logger.debug(
+ 'Change in end time, from %s to %s', current_slot.end_datetime, expected_slot.end_datetime
+ )
+ if isinstance(current_slot, Nonstop) and (
+ expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5)
+ ):
# more than 5 minutes left, recompute playlist
self.recompute_playlist()
+ i += 1
+ if i == 10:
+ # realign clock every ten cycles
+ now = datetime.datetime.now()
+ # adjust to awake at :00
+ sleep = ((60 - now.second) % 10) or 10
+ i = 0
+
async def handle_connection(self, reader, writer):
writer.write(b'Watusi!\n')
writer.write(b'Known commands: status, softquit, hardquit\n')
pass
else:
response['track'] = {}
- response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
+ response['track']['start_datetime'] = self.current_track_start_datetime.strftime(
+ '%Y-%m-%d %H:%M:%S'
+ )
response['track']['title'] = track.title
response['track']['artist'] = track.artist.name if track.artist_id else ''
response['track']['duration'] = track.duration.total_seconds()
- response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
- response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
+ response['track']['elapsed'] = (
+ datetime.datetime.now() - self.current_track_start_datetime
+ ).total_seconds()
+ response['track']['remaining'] = (
+ track.duration - datetime.timedelta(seconds=response['track']['elapsed'])
+ ).total_seconds()
next_diffusion = self.get_next_diffusion(
- before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
+ before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5)
+ )
if next_diffusion:
response['next_diffusion'] = {
'label': str(next_diffusion),
'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
}
if isinstance(next_diffusion, ScheduledDiffusion):
- response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
+ response['next_diffusion'][
+ 'emission'
+ ] = next_diffusion.diffusion.episode.emission.title
response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
elif message == '.':
end = True
self.play_task.cancel()
async def main(self):
+ self.player = None
loop = asyncio.get_running_loop()
- loop.add_signal_handler(
- signal.SIGTERM,
- self.sigterm_handler)
+ loop.add_signal_handler(signal.SIGTERM, self.sigterm_handler)
self.recompute_slots()
server = await asyncio.start_server(
- self.handle_connection,
- app_settings.SERVER_BIND_IFACE,
- app_settings.SERVER_BIND_PORT)
+ self.handle_connection, app_settings.SERVER_BIND_IFACE, app_settings.SERVER_BIND_PORT
+ )
async with server:
asyncio.create_task(server.serve_forever())