]> git.0d.be Git - django-panik-nonstop.git/blobdiff - nonstop/management/commands/stamina.py
stamina: add settings/hack to merge tracks from multiple zones
[django-panik-nonstop.git] / nonstop / management / commands / stamina.py
index 6b9a69a5b8880173c1475140d7f5160f572dc4fd..768e455d39ff6c02c494c1c17cc07d2a920c2532 100644 (file)
@@ -1,30 +1,88 @@
 import asyncio
 import datetime
+import json
+import logging
 import random
+import signal
+import sys
+import time
 
+import requests
+
+from django.conf import settings
 from django.core.management.base import BaseCommand
 
 from emissions.models import Nonstop
-from nonstop.models import Track, ScheduledDiffusion
+from nonstop.models import Track, Jingle, SomaLogLine, ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence
+from nonstop.app_settings import app_settings
+
+
+logger = logging.getLogger('stamina')
 
 
 class Command(BaseCommand):
+    requires_system_checks = False
+
     last_jingle_datetime = None
     quit = False
 
     def handle(self, verbosity, **kwargs):
-        try:
-            asyncio.run(self.main(), debug=True)
-        except KeyboardInterrupt:
-            if not self.play_task.done():
-                self.play_task.cancel()
-            if not self.recompute_slots_task.done():
-                self.recompute_slots_task.done()
+        alert_index = 0
+        latest_alert_timestamp = 0
+        latest_exception_timestamp = 0
+
+        def exception_alert_thresholds():
+            yield 0
+            duration = 3
+            while duration < 3600:
+                yield duration
+                duration *= 5
+            duration = 3600
+            while True:
+                yield duration
+                duration += 3600
+
+        while True:
+            try:
+                asyncio.run(self.main(), debug=settings.DEBUG)
+            except KeyboardInterrupt:
+                break
+            except Exception:
+                timestamp = time.time()
+                if (timestamp - latest_exception_timestamp) > 300:
+                    # if latest exception was a "long" time ago, assume
+                    # things went smooth for a while and reset things
+                    alert_index = 0
+                    latest_alert_timestamp = 0
+                    latest_exception_timestamp = 0
+
+                alert_threshold = 0
+                for i, threshold in enumerate(exception_alert_thresholds()):
+                    if i == alert_index:
+                        alert_threshold = threshold
+                        break
+
+                if (timestamp - latest_alert_timestamp) > alert_threshold:
+                    logger.exception('General exception (alert index: %s)', alert_index)
+                    latest_alert_timestamp = timestamp
+                    alert_index += 1
+
+                time.sleep(2)  # retry after a bit
+                latest_exception_timestamp = timestamp
+                continue
+            break
 
     def get_playlist(self, zone, start_datetime, end_datetime):
         current_datetime = start_datetime
         if self.last_jingle_datetime is None:
             self.last_jingle_datetime = current_datetime
+        # Define a max duration (1 hour), if it is reached, and far enough
+        # from end_datetime (30 minutes), return the playlist as is, not aligned
+        # on end time, so a new playlist gets computed once it's over.
+        # This avoids misalignments due to track durations not matching exactly
+        # or additional delays caused by the player program.
+        max_duration = datetime.timedelta(hours=1)
+        max_duration_leftover = datetime.timedelta(minutes=30)
         playlist = []
         adjustment_counter = 0
         try:
@@ -32,71 +90,142 @@ class Command(BaseCommand):
         except AttributeError:
             jingles = []
 
-        while current_datetime < end_datetime and adjustment_counter < 5:
+        recent_tracks_id = [x.track_id for x in
+                SomaLogLine.objects.exclude(on_air=False).filter(
+                    track__isnull=False,
+                    play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
+        t0 = datetime.datetime.now()
+        allow_overflow = False
+        while current_datetime < end_datetime:
+            if (current_datetime - start_datetime) > max_duration and (
+                    (end_datetime - current_datetime) > max_duration_leftover):
+                break
 
             if jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
                 # jingle time, every ~20 minutes
                 playlist.append(random.choice(jingles))
                 self.last_jingle_datetime = current_datetime
+                current_datetime = start_datetime + sum(
+                        [x.duration for x in playlist], datetime.timedelta(seconds=0))
 
+            zone_ids = [zone.id]
+            extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
+            if extra_zones:
+                zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
             remaining_time = (end_datetime - current_datetime)
             track = Track.objects.filter(
-                    nonstop_zones=zone,
+                    nonstop_zones__in=zone_ids,
                     duration__isnull=False).exclude(
-                            id__in=[x.id for x in playlist if isinstance(x, Track)]
+                            id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
                     ).order_by('?').first()
+            if track is None:
+                # no track, reduce recent tracks exclusion
+                recent_tracks_id = recent_tracks_id[:len(recent_tracks_id)//2]
+                continue
             playlist.append(track)
             current_datetime = start_datetime + sum(
                     [x.duration for x in playlist], datetime.timedelta(seconds=0))
-            if current_datetime > end_datetime:
+            if current_datetime > end_datetime and not allow_overflow:
                 # last track overshot
                 # 1st strategy: remove last track and try to get a track with
                 # exact remaining time
+                logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
                 playlist = playlist[:-1]
-                current_datetime = start_datetime + sum(
-                        [x.duration for x in playlist], datetime.timedelta(seconds=0))
                 track = Track.objects.filter(
                         nonstop_zones=zone,
                         duration__gte=remaining_time,
                         duration__lt=remaining_time + datetime.timedelta(seconds=1)
                         ).exclude(
-                            id__in=[x.id for x in playlist if isinstance(x, Track)]
+                            id__in=recent_tracks_id + [x.id for x in playlist if isinstance(x, Track)]
                         ).order_by('?').first()
                 if track:
                     # found a track
                     playlist.append(track)
-                    current_datetime = start_datetime + sum(
-                            [x.duration for x in playlist], datetime.timedelta(seconds=0))
                 else:
                     # fallback strategy: didn't find track of expected duration,
                     # reduce playlist further
                     adjustment_counter += 1
                     playlist = playlist[:-1]
-                    current_datetime = start_datetime + sum(
-                            [x.duration for x in playlist], datetime.timedelta(seconds=0))
+                    if len(playlist) == 0 or adjustment_counter > 5:
+                        # a dedicated sound that ended a bit too early,
+                        # or too many failures to get an appropriate file,
+                        # allow whatever comes.
+                        allow_overflow = True
+                        logger.debug('Allowing overflows')
 
-        print('computed playlist:')
+                current_datetime = start_datetime + sum(
+                        [x.duration for x in playlist], datetime.timedelta(seconds=0))
+
+        logger.info('Computed playlist for "%s" (computation time: %ss)',
+                zone, (datetime.datetime.now() - t0))
         current_datetime = start_datetime
         for track in playlist:
-            print('   ', current_datetime, track.duration, track.title)
+            logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
             current_datetime += track.duration
-        print('   ', current_datetime, '---')
-        print('   adjustment_counter:', adjustment_counter)
-
+        logger.debug('- end: %s', current_datetime)
         return playlist
 
-    async def player_process(self, cmd, timeout=None):
-        self.player = await asyncio.create_subprocess_shell(
-                cmd,
-                stdout=asyncio.subprocess.PIPE,
-                stderr=asyncio.subprocess.PIPE)
+    def is_nonstop_on_air(self):
+        # check if nonstop system is currently on air
+        if app_settings.ON_AIR_SWITCH_URL is None:
+            return None
+        switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
+        if not switch_response.ok:
+            return None
+        try:
+            status = switch_response.json()
+        except ValueError:
+            return None
+        if status.get('active') == 0:
+            return True
+        elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
+            return True
+        elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
+            return True
+        return False
+
+    async def record_nonstop_line(self, track, now):
+        log_line = SomaLogLine()
+        log_line.play_timestamp = now
+        log_line.track = track
+        log_line.filepath = track.nonstopfile_set.first()
+        log_line.on_air = self.is_nonstop_on_air()
+        log_line.save()
+
+    async def player_process(self, item, timeout=None):
+        cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
+        if hasattr(item, 'is_stream') and item.is_stream():
+            cmd.append(item.stream.url)
+            logger.info('Play stream: %s', item.stream.url)
+        else:
+            cmd.append(item.file_path())
+            logger.info('Play file: %s', item.file_path())
+        if app_settings.DEBUG_WITH_SLEEPS:
+            # replace command by a sleep call, for silent debugging
+            if hasattr(item, 'is_stream') and item.is_stream():
+                cmd = 'sleep 86400 # %s' % item.stream.url
+            elif isinstance(item.duration, datetime.timedelta):
+                cmd = 'sleep %s # %s' % (item.duration.total_seconds(), item.file_path())
+            elif isinstance(item.duration, int):
+                cmd = 'sleep %s # %s' % (item.duration, item.file_path())
+        logger.debug('cmd %r', cmd)
+        if isinstance(cmd, str):
+            self.player = await asyncio.create_subprocess_shell(
+                    cmd,
+                    stdout=asyncio.subprocess.PIPE,
+                    stderr=asyncio.subprocess.PIPE)
+        else:
+            self.player = await asyncio.create_subprocess_exec(
+                    *cmd,
+                    stdout=asyncio.subprocess.PIPE,
+                    stderr=asyncio.subprocess.PIPE)
         if timeout is None:
             await self.player.communicate()
         else:
             try:
                 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
             except asyncio.TimeoutError:
-                pass
+                self.player.kill()
         self.player = None
 
     async def play(self, slot):
@@ -104,44 +233,152 @@ class Command(BaseCommand):
         if isinstance(slot, Nonstop):
             self.playlist = self.get_playlist(slot, now, slot.end_datetime)
             self.playhead = 0
-            while True:
+            self.softstop = False
+            while not self.quit:
                 now = datetime.datetime.now()
                 try:
                     track = self.playlist[self.playhead]
                 except IndexError:
                     break
                 self.current_track_start_datetime = now
-                print(now, track.title, track.duration,
-                        '- future tracks:', [x.title for x in self.playlist[self.playhead + 1:self.playhead + 3]])
-                cmd = 'sleep %s # %s' % (track.duration.seconds, track.title)
-                await self.player_process(cmd)
+                if isinstance(track, Jingle):
+                    logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
+                else:
+                    logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
+                record_task = None
+                if isinstance(track, Track):  # not jingles
+                    record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
+                await self.player_process(track)
+                if record_task:
+                    await record_task
+                if self.softstop:
+                    # track was left to finish, but now the playlist should stop.
+                    break
                 self.playhead += 1
         elif slot.is_stream():
-            print(now, 'playing stream', slot.stream)
-            # TODO: jingle
-            cmd = 'sleep 86400 # stream'  # will never stop by itself
-            await self.player_process(cmd, timeout=slot.duration)
+            logger.info('Stream: %s', slot.stream)
+            if slot.jingle_id:
+                await self.player_process(slot.jingle, timeout=60)
+            logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
+            short_interruption_counter = 0
+            has_played = False
+            while True:
+                player_start_time = datetime.datetime.now()
+                await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
+                now = datetime.datetime.now()
+                if (slot.end_datetime - now).total_seconds() < 2:
+                    # it went well, stop
+                    break
+                # stream got interrupted
+                if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
+                    # and was up for less than 15 seconds.
+                    if not has_played:
+                        # never up before, probably not even started
+                        if isinstance(slot, RecurringStreamOccurence):
+                            # no mercy for recurring stream, remove occurence
+                            logger.info('Missing stream for %s, removing', slot)
+                            slot.delete()
+                        elif slot.auto_delayed is True:
+                            # was already delayed and is still not up, remove.
+                            logger.info('Still missing stream for %s, removing', slot)
+                            slot.delete()
+                        else:
+                            # push back start datetime for 5 minutes, and get
+                            # back to nonstop music in the meantime
+                            logger.info('Pushing starting time of %s', slot.diffusion.episode)
+                            slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
+                            slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
+                            if slot.diffusion.episode.duration <= 5:
+                                slot.diffusion.episode.duration = 0
+                            slot.auto_delayed = True
+                            slot.diffusion.save()
+                            slot.diffusion.episode.save()
+                            slot.save()
+                        break
+                    short_interruption_counter += 1
+                    # wait a bit
+                    await asyncio.sleep(short_interruption_counter)
+                else:
+                    # mark stream as ok at least one, and reset short
+                    # interruption counter
+                    has_played = True
+                    short_interruption_counter = 0
+                    logger.debug('Stream error for %s', slot)
+
+                if short_interruption_counter > 5:
+                    # many short interruptions
+                    logger.info('Too many stream errors for %s, removing', slot)
+                    slot.delete()
+                    break
         else:
-            print(now, 'playing sound', slot.episode)
-            # TODO: jingle
-            cmd = 'sleep %s # %s' % (slot.soundfile.duration, slot.episode)
-            await self.player_process(cmd)
+            if hasattr(slot, 'episode'):
+                logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
+            else:
+                logger.info('Random: %s', slot)
+            if slot.jingle_id:
+                await self.player_process(slot.jingle, timeout=60)
+            await self.player_process(slot)
 
     def recompute_playlist(self):
         current_track = self.playlist[self.playhead]
-        print('recompute_playlist, from', current_track.title, self.current_track_start_datetime + current_track.duration, 'to', self.slot.end_datetime)
+        logger.debug('Recomputing playlist at %s, from %s to %s',
+                current_track.title,
+                self.current_track_start_datetime + current_track.duration,
+                self.slot.end_datetime)
         playlist = self.get_playlist(self.slot,
                 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
         if playlist:
             self.playlist[self.playhead + 1:] = playlist
 
-    def recompute_slots(self):
+    def get_current_diffusion(self):
         now = datetime.datetime.now()
-        # print(now, 'recompute_slots')
         diffusion = ScheduledDiffusion.objects.filter(
                 diffusion__datetime__gt=now - datetime.timedelta(days=1),
-                diffusion__datetime__lt=now).last()
+                diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
+        occurence = RecurringStreamOccurence.objects.filter(
+                datetime__gt=now - datetime.timedelta(days=1),
+                datetime__lt=now).order_by('datetime').last()
+        directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
+                datetime__gt=now - datetime.timedelta(days=1),
+                datetime__lt=now).order_by('datetime').last()
+        # note it shouldn't be possible to have both diffusion and occurences
+        # running at the moment.
+        if occurence and occurence.end_datetime > now:
+            return occurence
         if diffusion and diffusion.end_datetime > now:
+            return diffusion
+        if directory_occurence and directory_occurence.end_datetime > now:
+            return directory_occurence
+        return None
+
+    def get_next_diffusion(self, before_datetime):
+        now = datetime.datetime.now()
+        diffusion = ScheduledDiffusion.objects.filter(
+                diffusion__datetime__gt=now,
+                diffusion__datetime__lt=before_datetime,
+                ).order_by('diffusion__datetime').first()
+        occurence = RecurringStreamOccurence.objects.filter(
+                datetime__gt=now,
+                datetime__lt=before_datetime,
+                ).order_by('datetime').first()
+        directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
+                datetime__gt=now,
+                datetime__lt=before_datetime,
+                ).order_by('datetime').first()
+        if diffusion and occurence:
+            return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
+        if diffusion:
+            return diffusion
+        if occurence:
+            return occurence
+        if directory_occurence:
+            return directory_occurence
+        return None
+
+    def recompute_slots(self):
+        now = datetime.datetime.now()
+        diffusion = self.get_current_diffusion()
+        if diffusion:
             self.slot = diffusion
         else:
             nonstops = list(Nonstop.objects.all().order_by('start'))
@@ -165,14 +402,11 @@ class Command(BaseCommand):
             if self.slot.end_datetime < self.slot.datetime:
                 self.slot.end_datetime += datetime.timedelta(days=1)
 
-            diffusion = ScheduledDiffusion.objects.filter(
-                diffusion__datetime__gt=now,
-                diffusion__datetime__lt=self.slot.end_datetime).first()
+            diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
             if diffusion:
                 self.slot.end_datetime = diffusion.datetime
 
     async def recompute_slots_loop(self):
-        print('recompute_slots_loop')
         now = datetime.datetime.now()
         sleep = (60 - now.second) % 10  # adjust to awake at :00
         while not self.quit:
@@ -182,39 +416,110 @@ class Command(BaseCommand):
             self.recompute_slots()
             expected_slot = self.slot
             if current_slot != expected_slot:
-                print('unexpected change', current_slot, 'vs', expected_slot)
-                if isinstance(current_slot, Nonstop) and not isinstance(expected_slot, Nonstop):
+                now = datetime.datetime.now()
+                logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
+                if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
+                    # ask for a softstop, i.e. finish the track then switch.
+                    self.softstop = True
+                elif isinstance(current_slot, Nonstop):
                     # interrupt nonstop
-                    print('interrupting nonstop')
+                    logger.info('Interrupting nonstop')
                     self.play_task.cancel()
             elif current_slot.end_datetime > expected_slot.end_datetime:
-                print('change in end time, from %s to %s' %
-                        (current_slot.end_datetime, expected_slot.end_datetime))
+                now = datetime.datetime.now()
+                logger.debug('Change in end time, from %s to %s',
+                        current_slot.end_datetime,
+                        expected_slot.end_datetime)
                 if expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5):
                     # more than 5 minutes left, recompute playlist
                     self.recompute_playlist()
 
     async def handle_connection(self, reader, writer):
-        data = await reader.read(100)
-        message = data.decode().strip()
-        response = 'err'
-        if message == 'playing?':
-            response = '%s' % self.slot
-        writer.write(response.encode('utf-8'))
+        writer.write(b'Watusi!\n')
+        writer.write(b'Known commands: status, softquit, hardquit\n')
+        writer.write(b'(dot on empty line to stop connection)\n')
         await writer.drain()
+        end = False
+        while not end:
+            data = await reader.read(100)
+            try:
+                message = data.decode().strip()
+            except UnicodeDecodeError:
+                logger.debug('Server, invalid message %r', message)
+                if not data:
+                    end = True
+                continue
+            logger.debug('Server, message %r', message)
+            if message == 'status':
+                response = {'slot': str(self.slot)}
+                if isinstance(self.slot, Nonstop):
+                    try:
+                        track = self.playlist[self.playhead]
+                    except IndexError:
+                        pass
+                    else:
+                        response['track'] = {}
+                        response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
+                        response['track']['title'] = track.title
+                        response['track']['artist'] = track.artist.name if track.artist_id else ''
+                        response['track']['duration'] = track.duration.total_seconds()
+                        response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
+                        response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
+                next_diffusion = self.get_next_diffusion(
+                        before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
+                if next_diffusion:
+                    response['next_diffusion'] = {
+                        'label': str(next_diffusion),
+                        'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
+                    }
+                    if isinstance(next_diffusion, ScheduledDiffusion):
+                        response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
+                        response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
+            elif message == '.':
+                end = True
+                response = {'ack': True}
+            elif message == 'softquit':
+                self.quit = True
+                end = True
+                response = {'ack': True}
+            elif message == 'hardquit':
+                self.quit = True
+                end = True
+                response = {'ack': True}
+                if self.player and self.player.returncode is None:  # not finished
+                    self.player.kill()
+            else:
+                response = {'err': 1, 'msg': 'unknown command: %r' % message}
+            writer.write(json.dumps(response).encode('utf-8') + b'\n')
+            try:
+                await writer.drain()
+            except ConnectionResetError:
+                break
         writer.close()
 
+    def sigterm_handler(self):
+        logger.info('Got SIGTERM')
+        self.quit = True
+        self.play_task.cancel()
+
     async def main(self):
-        now = datetime.datetime.now()
+        loop = asyncio.get_running_loop()
+        loop.add_signal_handler(
+                signal.SIGTERM,
+                self.sigterm_handler)
         self.recompute_slots()
-        server = await asyncio.start_server(self.handle_connection, '127.0.0.1', 8888)
+        server = await asyncio.start_server(
+                self.handle_connection,
+                app_settings.SERVER_BIND_IFACE,
+                app_settings.SERVER_BIND_PORT)
         async with server:
             asyncio.create_task(server.serve_forever())
 
             self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
-            while True:
+            while not self.quit:
+                now = datetime.datetime.now()
                 duration = (self.slot.end_datetime - now).seconds
-                print('next sure slot', duration, self.slot.end_datetime)
+                logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
                 if duration < 2:
                     # next slot is very close, wait for it
                     await asyncio.sleep(duration)
@@ -223,12 +528,10 @@ class Command(BaseCommand):
                 try:
                     await self.play_task
                     self.recompute_slots()
-                except asyncio.CancelledError as exc:
-                    print('exc:', exc)
+                except asyncio.CancelledError:
+                    logger.debug('Player cancelled exception')
                     if self.player and self.player.returncode is None:  # not finished
                         self.player.kill()
                 except KeyboardInterrupt:
                     self.quit = True
                     break
-            self.quit = True
-            await self.recompute_slots_task