12 from django.conf import settings
14 from django.core.management.base import BaseCommand
16 from emissions.models import Nonstop
17 from nonstop.models import (NonstopZoneSettings, Track, Jingle, SomaLogLine,
18 ScheduledDiffusion, RecurringStreamOccurence, RecurringRandomDirectoryOccurence)
19 from nonstop.app_settings import app_settings
20 from nonstop.utils import Tracklist
23 logger = logging.getLogger('stamina')
26 class Command(BaseCommand):
27 requires_system_checks = False
29 last_jingle_datetime = None
32 def handle(self, verbosity, **kwargs):
34 latest_alert_timestamp = 0
35 latest_exception_timestamp = 0
37 def exception_alert_thresholds():
40 while duration < 3600:
50 asyncio.run(self.main(), debug=settings.DEBUG)
51 except KeyboardInterrupt:
53 except Exception as e:
54 timestamp = time.time()
55 if (timestamp - latest_exception_timestamp) > 300:
56 # if latest exception was a "long" time ago, assume
57 # things went smooth for a while and reset things
59 latest_alert_timestamp = 0
60 latest_exception_timestamp = 0
63 for i, threshold in enumerate(exception_alert_thresholds()):
65 alert_threshold = threshold
68 if (timestamp - latest_alert_timestamp) > alert_threshold:
69 logger.exception('General exception (alert index: %s)', alert_index)
70 latest_alert_timestamp = timestamp
73 if alert_index and isinstance(e, django.db.InterfaceError):
74 # most likely "connection already closed", because postgresql
75 # is been restarted; log then get out to be restarted.
76 logger.error('Aborting on repeated database error')
79 time.sleep(2) # retry after a bit
80 latest_exception_timestamp = timestamp
84 def get_playlist(self, zone, start_datetime, end_datetime):
85 current_datetime = start_datetime() if callable(start_datetime) else start_datetime
86 if self.last_jingle_datetime is None:
87 self.last_jingle_datetime = current_datetime
88 # Define a max duration (1 hour), if it is reached, and far enough
89 # from end_datetime (30 minutes), return the playlist as is, not aligned
90 # on end time, so a new playlist gets computed once it's over.
91 # This avoids misalignments due to track durations not matching exactly
92 # or additional delays caused by the player program.
93 max_duration = datetime.timedelta(hours=1)
94 max_duration_leftover = datetime.timedelta(minutes=30)
96 adjustment_counter = 0
98 zone_settings = zone.nonstopzonesettings_set.first()
99 jingles = list(zone_settings.jingles.all())
100 except AttributeError:
101 zone_settings = NonstopZoneSettings()
105 zone_ids.extend([x.id for x in zone_settings.extra_zones.all()])
106 extra_zones = app_settings.EXTRA_ZONES.get(zone.slug)
108 zone_ids.extend([x.id for x in Nonstop.objects.filter(slug__in=extra_zones)])
110 recent_tracks_id = [x.track_id for x in
111 SomaLogLine.objects.exclude(on_air=False).filter(
113 play_timestamp__gt=datetime.datetime.now() - datetime.timedelta(days=app_settings.NO_REPEAT_DELAY))]
115 tracklist = Tracklist(zone_settings, zone_ids, recent_tracks_id)
116 random_tracks_iterator = tracklist.get_random_tracks()
117 t0 = datetime.datetime.now()
118 allow_overflow = False
119 if callable(start_datetime):
120 # compute start_datetime (e.g. now()) at the last moment, to get
121 # computed playlist timestamps as close as possible as future real
123 start_datetime = start_datetime()
124 while current_datetime < end_datetime:
125 if (current_datetime - start_datetime) > max_duration and (
126 (end_datetime - current_datetime) > max_duration_leftover):
129 if zone_settings.intro_jingle and (
130 current_datetime.hour, current_datetime.minute) == (zone.start.hour, zone.start.minute):
131 tracklist.playlist.append(zone_settings.intro_jingle)
132 self.last_jingle_datetime = current_datetime
133 current_datetime = start_datetime + tracklist.get_duration()
134 elif jingles and current_datetime - self.last_jingle_datetime > datetime.timedelta(minutes=20):
135 # jingle time, every ~20 minutes
136 tracklist.playlist.append(random.choice(jingles))
137 self.last_jingle_datetime = current_datetime
138 current_datetime = start_datetime + tracklist.get_duration()
139 remaining_time = (end_datetime - current_datetime)
141 track = next(random_tracks_iterator)
142 tracklist.append(track)
143 current_datetime = start_datetime + tracklist.get_duration()
144 if current_datetime > end_datetime and not allow_overflow:
145 # last track overshot
146 # 1st strategy: remove last track and try to get a track with
147 # exact remaining time
148 logger.debug('Overshoot %s, %s', adjustment_counter, current_datetime)
150 track = Track.objects.filter(
151 nonstop_zones__in=zone_ids,
152 duration__gte=remaining_time,
153 duration__lt=remaining_time + datetime.timedelta(seconds=1)
154 ).exclude(id__in=tracklist.get_recent_track_ids()).order_by('?').first()
157 tracklist.append(track)
159 # fallback strategy: didn't find track of expected duration,
160 # reduce playlist further
161 adjustment_counter += 1
162 if tracklist.pop() is None or adjustment_counter > 5:
163 # a dedicated sound that ended a bit too early,
164 # or too many failures to get an appropriate file,
165 # allow whatever comes.
166 allow_overflow = True
167 logger.debug('Allowing overflows')
169 current_datetime = start_datetime + tracklist.get_duration()
171 logger.info('Computed playlist for "%s" (computation time: %ss)',
172 zone, (datetime.datetime.now() - t0))
173 current_datetime = start_datetime
174 for track in tracklist.playlist:
175 logger.debug('- track: %s %s %s', current_datetime, track.duration, track.title)
176 current_datetime += track.duration
177 logger.debug('- end: %s', current_datetime)
178 return tracklist.playlist
180 def is_nonstop_on_air(self):
181 # check if nonstop system is currently on air
182 if app_settings.ON_AIR_SWITCH_URL is None:
184 switch_response = requests.get(app_settings.ON_AIR_SWITCH_URL, timeout=5)
185 if not switch_response.ok:
188 status = switch_response.json()
191 if status.get('active') == 0:
193 elif status.get('active') == 1 and status.get('nonstop-via-stud1') == 0:
194 # TODO: replace this hardware check that no longer works by a
195 # logical check on programs: if there's nothing scheduled at the
196 # moment, consider nonstop is broadcasted, even if studio1 is on.
198 elif status.get('active') == 2 and status.get('nonstop-via-stud2') == 1:
202 async def record_nonstop_line(self, track, now):
203 log_line = SomaLogLine()
204 log_line.play_timestamp = now
205 log_line.track = track
206 log_line.filepath = track.nonstopfile_set.first()
207 log_line.on_air = self.is_nonstop_on_air()
210 async def player_process(self, item, timeout=None):
211 if app_settings.PLAYER_IPC_PATH:
212 return await self.player_process_ipc(item, timeout=timeout)
213 cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
214 if hasattr(item, 'is_stream') and item.is_stream():
215 cmd.append(item.stream.url)
216 logger.info('Play stream: %s', item.stream.url)
218 cmd.append(item.file_path())
219 logger.info('Play file: %s', item.file_path())
220 logger.debug('cmd %r', cmd)
221 self.player = await asyncio.create_subprocess_exec(
223 stdout=asyncio.subprocess.PIPE,
224 stderr=asyncio.subprocess.PIPE)
226 await self.player.communicate()
229 await asyncio.wait_for(self.player.communicate(), timeout=timeout)
230 except asyncio.TimeoutError:
234 async def player_process_ipc(self, item, timeout=None):
238 reader, writer = await asyncio.open_unix_connection(
239 app_settings.PLAYER_IPC_PATH)
241 except (FileNotFoundError, ConnectionRefusedError):
243 cmd = [app_settings.PLAYER_COMMAND] + app_settings.PLAYER_ARGS
244 cmd += ['--input-ipc-server=%s' % app_settings.PLAYER_IPC_PATH, '--idle']
245 self.player = await asyncio.create_subprocess_exec(
247 stdout=asyncio.subprocess.DEVNULL,
248 stderr=asyncio.subprocess.DEVNULL)
250 await asyncio.sleep(0.1)
252 if hasattr(item, 'is_stream') and item.is_stream():
253 file_path = item.stream.url
254 logger.info('Play stream: %s', item.stream.url)
256 file_path = item.file_path()
257 logger.info('Play file: %s', item.file_path())
259 writer.write(json.dumps({'command': ['loadfile', file_path]}).encode() + b'\n')
262 except ConnectionResetError: # connection lost
265 await asyncio.wait_for(self.player_ipc_idle(reader, writer), timeout=timeout)
266 except asyncio.TimeoutError:
269 await writer.wait_closed()
271 async def player_ipc_idle(self, reader, writer):
273 data = await reader.readline()
276 if json.loads(data) == {'event': 'idle'}:
279 async def play(self, slot):
280 now = datetime.datetime.now()
281 if isinstance(slot, Nonstop):
282 self.playlist = self.get_playlist(slot, datetime.datetime.now, slot.end_datetime)
284 self.softstop = False
286 now = datetime.datetime.now()
288 track = self.playlist[self.playhead]
291 self.current_track_start_datetime = now
292 if isinstance(track, Jingle):
293 logger.info('Jingle: %s (id: %s) (%s)', track.title, track.id, track.duration)
295 logger.info('Track: %s (id: %s) (%s)', track.title, track.id, track.duration)
297 if isinstance(track, Track): # not jingles
298 record_task = asyncio.create_task(self.record_nonstop_line(track, datetime.datetime.now()))
299 await self.player_process(track)
303 # track was left to finish, but now the playlist should stop.
306 elif slot.is_stream():
307 logger.info('Stream: %s', slot.stream)
309 await self.player_process(slot.jingle, timeout=60)
310 logger.debug('Stream timeout: %s', (slot.end_datetime - now).total_seconds())
311 short_interruption_counter = 0
314 player_start_time = datetime.datetime.now()
315 await self.player_process(slot, timeout=(slot.end_datetime - player_start_time).total_seconds())
316 now = datetime.datetime.now()
317 if (slot.end_datetime - now).total_seconds() < 2:
320 # stream got interrupted
321 if (datetime.datetime.now() - player_start_time).total_seconds() < 15:
322 # and was up for less than 15 seconds.
324 # never up before, probably not even started
325 if isinstance(slot, RecurringStreamOccurence):
326 # no mercy for recurring stream, remove occurence
327 logger.info('Missing stream for %s, removing', slot)
329 elif slot.auto_delayed is True:
330 # was already delayed and is still not up, remove.
331 logger.info('Still missing stream for %s, removing', slot)
334 # push back start datetime for 5 minutes, and get
335 # back to nonstop music in the meantime
336 logger.info('Pushing starting time of %s', slot.diffusion.episode)
337 slot.diffusion.datetime = slot.diffusion.datetime + datetime.timedelta(seconds=300)
338 slot.diffusion.episode.duration = slot.diffusion.episode.get_duration() - 5
339 if slot.diffusion.episode.duration <= 5:
340 slot.diffusion.episode.duration = 0
341 slot.auto_delayed = True
342 slot.diffusion.save()
343 slot.diffusion.episode.save()
346 short_interruption_counter += 1
348 await asyncio.sleep(short_interruption_counter)
350 # mark stream as ok at least one, and reset short
351 # interruption counter
353 short_interruption_counter = 0
354 logger.debug('Stream error for %s', slot)
356 if short_interruption_counter > 5:
357 # many short interruptions
358 logger.info('Too many stream errors for %s, removing', slot)
362 if hasattr(slot, 'episode'):
363 logger.info('Episode: %s (id: %s)', slot.episode, slot.episode.id)
365 logger.info('Random: %s', slot)
367 await self.player_process(slot.jingle, timeout=60)
368 await self.player_process(slot)
370 def recompute_playlist(self):
371 current_track = self.playlist[self.playhead]
372 logger.debug('Recomputing playlist at %s, from %s to %s',
374 self.current_track_start_datetime + current_track.duration,
375 self.slot.end_datetime)
376 playlist = self.get_playlist(self.slot,
377 self.current_track_start_datetime + current_track.duration, self.slot.end_datetime)
379 self.playlist[self.playhead + 1:] = playlist
381 def get_current_diffusion(self):
382 now = datetime.datetime.now()
383 diffusion = ScheduledDiffusion.objects.filter(
384 diffusion__datetime__gt=now - datetime.timedelta(days=1),
385 diffusion__datetime__lt=now).order_by('diffusion__datetime').last()
386 occurence = RecurringStreamOccurence.objects.filter(
387 datetime__gt=now - datetime.timedelta(days=1),
388 datetime__lt=now).order_by('datetime').last()
389 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
390 datetime__gt=now - datetime.timedelta(days=1),
391 datetime__lt=now).order_by('datetime').last()
392 # note it shouldn't be possible to have both diffusion and occurences
393 # running at the moment.
394 if occurence and occurence.end_datetime > now:
396 if diffusion and diffusion.end_datetime > now:
398 if directory_occurence and directory_occurence.end_datetime > now:
399 return directory_occurence
402 def get_next_diffusion(self, before_datetime):
403 now = datetime.datetime.now()
404 diffusion = ScheduledDiffusion.objects.filter(
405 diffusion__datetime__gt=now,
406 diffusion__datetime__lt=before_datetime,
407 ).order_by('diffusion__datetime').first()
408 occurence = RecurringStreamOccurence.objects.filter(
410 datetime__lt=before_datetime,
411 ).order_by('datetime').first()
412 directory_occurence = RecurringRandomDirectoryOccurence.objects.filter(
414 datetime__lt=before_datetime,
415 ).order_by('datetime').first()
416 if diffusion and occurence:
417 return diffusion if diffusion.diffusion.datetime < occurence.datetime else occurence
422 if directory_occurence:
423 return directory_occurence
426 def recompute_slots(self):
427 now = datetime.datetime.now()
428 diffusion = self.get_current_diffusion()
430 self.slot = diffusion
432 nonstops = list(Nonstop.objects.all().order_by('start'))
433 nonstops = [x for x in nonstops if x.start != x.end] # disabled zones
435 self.slot = [x for x in nonstops if x.start < now.time()][-1]
437 self.slot = nonstops[0]
439 next_slot = nonstops[nonstops.index(self.slot) + 1]
441 next_slot = nonstops[0]
442 self.slot.datetime = now.replace(
443 hour=self.slot.start.hour,
444 minute=self.slot.start.minute)
445 self.slot.end_datetime = now.replace(
446 hour=next_slot.start.hour,
447 minute=next_slot.start.minute,
450 if self.slot.end_datetime < self.slot.datetime:
451 self.slot.end_datetime += datetime.timedelta(days=1)
453 diffusion = self.get_next_diffusion(before_datetime=self.slot.end_datetime)
455 self.slot.end_datetime = diffusion.datetime
457 async def recompute_slots_loop(self):
458 now = datetime.datetime.now()
459 sleep = (60 - now.second) % 10 # adjust to awake at :00
461 await asyncio.sleep(sleep)
462 sleep = 10 # next cycles every 10 seconds
463 current_slot = self.slot
464 self.recompute_slots()
465 expected_slot = self.slot
466 if current_slot != expected_slot:
467 now = datetime.datetime.now()
468 logger.info('Unexpected change, %s vs %s', current_slot, expected_slot)
469 if isinstance(current_slot, Nonstop) and isinstance(expected_slot, Nonstop):
470 # ask for a softstop, i.e. finish the track then switch.
472 elif isinstance(current_slot, Nonstop):
474 logger.info('Interrupting nonstop')
475 self.play_task.cancel()
476 elif current_slot.is_stream():
477 # it should have been stopped by timeout set on player but
478 # maybe the episode duration has been shortened after its
480 logger.info('Interrupting stream')
481 self.play_task.cancel()
482 elif current_slot.end_datetime > expected_slot.end_datetime:
483 now = datetime.datetime.now()
484 logger.debug('Change in end time, from %s to %s',
485 current_slot.end_datetime,
486 expected_slot.end_datetime)
487 if isinstance(current_slot, Nonstop) and (
488 expected_slot.end_datetime - datetime.datetime.now() > datetime.timedelta(minutes=5)):
489 # more than 5 minutes left, recompute playlist
490 self.recompute_playlist()
492 async def handle_connection(self, reader, writer):
493 writer.write(b'Watusi!\n')
494 writer.write(b'Known commands: status, softquit, hardquit\n')
495 writer.write(b'(dot on empty line to stop connection)\n')
499 data = await reader.read(100)
501 message = data.decode().strip()
502 except UnicodeDecodeError:
503 logger.debug('Server, invalid message %r', message)
507 logger.debug('Server, message %r', message)
508 if message == 'status':
509 response = {'slot': str(self.slot)}
510 if isinstance(self.slot, Nonstop):
512 track = self.playlist[self.playhead]
516 response['track'] = {}
517 response['track']['start_datetime'] = self.current_track_start_datetime.strftime('%Y-%m-%d %H:%M:%S')
518 response['track']['title'] = track.title
519 response['track']['artist'] = track.artist.name if track.artist_id else ''
520 response['track']['duration'] = track.duration.total_seconds()
521 response['track']['elapsed'] = (datetime.datetime.now() - self.current_track_start_datetime).total_seconds()
522 response['track']['remaining'] = (track.duration - datetime.timedelta(seconds=response['track']['elapsed'])).total_seconds()
523 next_diffusion = self.get_next_diffusion(
524 before_datetime=datetime.datetime.now() + datetime.timedelta(hours=5))
526 response['next_diffusion'] = {
527 'label': str(next_diffusion),
528 'start_datetime': next_diffusion.datetime.strftime('%Y-%m-%d %H:%M:%S'),
530 if isinstance(next_diffusion, ScheduledDiffusion):
531 response['next_diffusion']['emission'] = next_diffusion.diffusion.episode.emission.title
532 response['next_diffusion']['episode'] = next_diffusion.diffusion.episode.title
535 response = {'ack': True}
536 elif message == 'softquit':
539 response = {'ack': True}
540 elif message == 'hardquit':
543 response = {'ack': True}
544 if self.player and self.player.returncode is None: # not finished
547 response = {'err': 1, 'msg': 'unknown command: %r' % message}
548 writer.write(json.dumps(response).encode('utf-8') + b'\n')
551 except ConnectionResetError:
555 def sigterm_handler(self):
556 logger.info('Got SIGTERM')
558 self.play_task.cancel()
560 async def main(self):
562 loop = asyncio.get_running_loop()
563 loop.add_signal_handler(
565 self.sigterm_handler)
566 self.recompute_slots()
567 server = await asyncio.start_server(
568 self.handle_connection,
569 app_settings.SERVER_BIND_IFACE,
570 app_settings.SERVER_BIND_PORT)
572 asyncio.create_task(server.serve_forever())
574 self.recompute_slots_task = asyncio.create_task(self.recompute_slots_loop())
576 now = datetime.datetime.now()
577 duration = (self.slot.end_datetime - now).seconds
578 logger.debug('Next sure shot %s (in %s)', self.slot.end_datetime, duration)
580 # next slot is very close, wait for it
581 await asyncio.sleep(duration)
582 self.recompute_slots()
583 self.play_task = asyncio.create_task(self.play(self.slot))
586 self.recompute_slots()
587 except asyncio.CancelledError:
588 logger.debug('Player cancelled exception')
589 if self.player and self.player.returncode is None: # not finished
591 except KeyboardInterrupt: