import json import os import urllib.error import urllib.request import queue import random import shutil import subprocess import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Optional from uuid import uuid4 from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, StreamingResponse, FileResponse from fastapi.staticfiles import StaticFiles from huggingface_hub import ( HfFileSystem, attach_huggingface_oauth, parse_huggingface_oauth, ) BUCKET_ID = os.environ.get("BUCKET_ID", "victor/ace-step-community") RADIO_MODE = os.environ.get("RADIO_MODE", "random") DEFAULT_DURATION = float(os.environ.get("DEFAULT_DURATION", "180")) MAX_TRACKS = int(os.environ.get("MAX_TRACKS", "150")) ROTATION_TARGET_SECONDS = int(os.environ.get("ROTATION_TARGET_SECONDS", "3600")) ROTATION_PREBUILD_LEAD_SECONDS = int(os.environ.get("ROTATION_PREBUILD_LEAD_SECONDS", "600")) REFRESH_MODE = "prebuild_before_top_of_hour_utc" OAUTH_USERINFO_URL = os.environ.get("OAUTH_USERINFO_URL", "https://huggingface.co/oauth/userinfo") OAUTH_USERINFO_CACHE_TTL_SECONDS = int(os.environ.get("OAUTH_USERINFO_CACHE_TTL_SECONDS", "300")) STREAM_BITRATE = os.environ.get("STREAM_BITRATE", "128k") STREAM_SAMPLE_RATE = os.environ.get("STREAM_SAMPLE_RATE", "44100") STREAM_SEEK_EPSILON_SECONDS = float(os.environ.get("STREAM_SEEK_EPSILON_SECONDS", "0.08")) BROADCAST_QUEUE_MAX_CHUNKS = int(os.environ.get("BROADCAST_QUEUE_MAX_CHUNKS", "256")) BROADCAST_NO_TRACK_SLEEP_SECONDS = float(os.environ.get("BROADCAST_NO_TRACK_SLEEP_SECONDS", "1.0")) BROADCAST_SUBSCRIBER_TIMEOUT_SECONDS = float(os.environ.get("BROADCAST_SUBSCRIBER_TIMEOUT_SECONDS", "15.0")) BROADCAST_STALL_TIMEOUT_SECONDS = float(os.environ.get("BROADCAST_STALL_TIMEOUT_SECONDS", "25.0")) BROADCAST_WATCHDOG_INTERVAL_SECONDS = float(os.environ.get("BROADCAST_WATCHDOG_INTERVAL_SECONDS", "5.0")) BROADCAST_PROCESS_START_TIMEOUT_SECONDS = float(os.environ.get("BROADCAST_PROCESS_START_TIMEOUT_SECONDS", "20.0")) LIKES_DIR = os.environ.get("LIKES_DIR", "/likes") LIKES_FILE = os.environ.get("LIKES_FILE", "likes.json") REPORT_HIDE_THRESHOLD = int(os.environ.get("REPORT_HIDE_THRESHOLD", "2")) ADMIN_REPORT_USERNAMES = { username.strip().lower() for username in os.environ.get("ADMIN_REPORT_USERNAMES", "fffiloni").split(",") if username.strip() } ADMIN_REPORT_WEIGHT = int(os.environ.get("ADMIN_REPORT_WEIGHT", "2")) LIKE_WEIGHT_PER_LIKE = float(os.environ.get("LIKE_WEIGHT_PER_LIKE", "0.35")) LIKE_WEIGHT_MAX = float(os.environ.get("LIKE_WEIGHT_MAX", "2.0")) UPVOTE_WEIGHT_PER_UPVOTE = float(os.environ.get("UPVOTE_WEIGHT_PER_UPVOTE", "0.12")) UPVOTE_WEIGHT_MAX = float(os.environ.get("UPVOTE_WEIGHT_MAX", "0.6")) DOWNVOTE_WEIGHT_PER_DOWNVOTE = float(os.environ.get("DOWNVOTE_WEIGHT_PER_DOWNVOTE", "0.15")) DOWNVOTE_WEIGHT_MAX = float(os.environ.get("DOWNVOTE_WEIGHT_MAX", "0.7")) TRACK_WEIGHT_MIN = float(os.environ.get("TRACK_WEIGHT_MIN", "0.25")) TRACK_WEIGHT_MAX = float(os.environ.get("TRACK_WEIGHT_MAX", str(LIKE_WEIGHT_MAX))) AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE = float(os.environ.get("AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE", "-5")) RECENT_TRACK_QUOTA = float(os.environ.get("RECENT_TRACK_QUOTA", "0.15")) RECENT_TRACK_WINDOW_HOURS = int(os.environ.get("RECENT_TRACK_WINDOW_HOURS", "72")) RECENTLY_PLAYED_EXCLUDE_MINUTES = int(os.environ.get("RECENTLY_PLAYED_EXCLUDE_MINUTES", "90")) RECENTLY_PLAYED_MEMORY_MAX = int(os.environ.get("RECENTLY_PLAYED_MEMORY_MAX", "200")) RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER = int(os.environ.get("RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER", "30")) META_LOAD_WORKERS = int(os.environ.get("META_LOAD_WORKERS", "16")) LISTENER_TTL_SECONDS = int(os.environ.get("LISTENER_TTL_SECONDS", "45")) ENABLE_JINGLES = os.environ.get("ENABLE_JINGLES", "true").lower() == "true" JINGLES_DIR = os.environ.get("JINGLES_DIR", "static/jingles") JINGLES_GLOB = os.environ.get("JINGLES_GLOB", "jingle_hf_radio_*.wav") DEFAULT_JINGLE_DURATION = float(os.environ.get("DEFAULT_JINGLE_DURATION", "6")) JINGLE_VOLUME = float(os.environ.get("JINGLE_VOLUME", "2.00")) ENABLE_REMOTE_JINGLES = os.environ.get("ENABLE_REMOTE_JINGLES", "true").lower() == "true" REMOTE_JINGLES_DIR = os.environ.get("REMOTE_JINGLES_DIR", "/remote-jingles") ENABLE_SEGMENTS = os.environ.get("ENABLE_SEGMENTS", "true").lower() == "true" REMOTE_SEGMENTS_DIR = os.environ.get("REMOTE_SEGMENTS_DIR", "/remote-segments") DEFAULT_SEGMENT_DURATION = float(os.environ.get("DEFAULT_SEGMENT_DURATION", "30")) SEGMENT_VOLUME = float(os.environ.get("SEGMENT_VOLUME", "2.00")) MIN_SONGS_BETWEEN_SEGMENTS = int(os.environ.get("MIN_SONGS_BETWEEN_SEGMENTS", "12")) MAX_SONGS_BETWEEN_SEGMENTS = int(os.environ.get("MAX_SONGS_BETWEEN_SEGMENTS", "18")) SEGMENT_PROBABILITY_AFTER_MIN = float(os.environ.get("SEGMENT_PROBABILITY_AFTER_MIN", "0.20")) ENABLE_SONG_MASTERING = os.environ.get("ENABLE_SONG_MASTERING", "true").lower() == "true" SONG_AUDIO_FILTER = os.environ.get( "SONG_AUDIO_FILTER", "equalizer=f=95:t=q:w=0.9:g=1.8,equalizer=f=3500:t=q:w=1.0:g=-0.8,equalizer=f=8500:t=q:w=0.8:g=-2.2", ) MIN_SONGS_BETWEEN_JINGLES = int(os.environ.get("MIN_SONGS_BETWEEN_JINGLES", "6")) MAX_SONGS_BETWEEN_JINGLES = int(os.environ.get("MAX_SONGS_BETWEEN_JINGLES", "10")) JINGLE_PROBABILITY_AFTER_MIN = float(os.environ.get("JINGLE_PROBABILITY_AFTER_MIN", "0.25")) RADIO_CONTROL_DIR = os.environ.get("RADIO_CONTROL_DIR", "/radio-control") RADIO_CONTROL_CONFIG_FILE = os.environ.get("RADIO_CONTROL_CONFIG_FILE", "config.json") RADIO_TRACK_OVERRIDES_FILE = os.environ.get("RADIO_TRACK_OVERRIDES_FILE", "track_overrides.json") RADIO_NEXT_ROTATION_FILE = os.environ.get("RADIO_NEXT_ROTATION_FILE", "next_rotation.json") RADIO_NEXT_ROTATION_ARCHIVE_DIR = os.environ.get("RADIO_NEXT_ROTATION_ARCHIVE_DIR", "next_rotation_archive") COVER_OVERRIDES_DIR = os.environ.get("COVER_OVERRIDES_DIR", "/cover-overrides") ENABLE_CUSTOM_TRACKS = os.environ.get("ENABLE_CUSTOM_TRACKS", "true").lower() == "true" CUSTOM_TRACKS_DIR = os.environ.get("CUSTOM_TRACKS_DIR", "/custom-tracks") DEFAULT_CUSTOM_TRACK_DURATION = float(os.environ.get("DEFAULT_CUSTOM_TRACK_DURATION", "180")) CUSTOM_TRACK_QUOTA = float(os.environ.get("CUSTOM_TRACK_QUOTA", "0.10")) BUCKET_FS_ROOT = f"hf://buckets/{BUCKET_ID}" BUCKET_RESOLVE_URL = f"https://huggingface.co/buckets/{BUCKET_ID}/resolve" app = FastAPI(title="HF Bucket Radio Stream") attach_huggingface_oauth(app) def normalize_tags(raw_tags): if isinstance(raw_tags, str): normalized = ( raw_tags .replace("#", "") .replace(";", ",") .replace("|", ",") ) return [ tag.strip() for tag in normalized.split(",") if tag.strip() ] if isinstance(raw_tags, list): return [ str(tag).strip() for tag in raw_tags if str(tag).strip() ] return [] def probe_audio_duration(path_or_url: str, fallback: float) -> float: try: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path_or_url, ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=10, check=True, ) duration = float(result.stdout.strip()) if duration > 0: return duration except Exception as e: print(f"[audio] Could not probe duration for {path_or_url}: {e}") return fallback def parse_datetime(value): if not value: return None if isinstance(value, datetime): parsed = value else: try: normalized = str(value).strip() if normalized.endswith("Z"): normalized = normalized[:-1] + "+00:00" parsed = datetime.fromisoformat(normalized) except Exception: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def asset_is_enabled(meta: dict) -> bool: return bool(meta.get("enabled", True)) def asset_is_expired(meta: dict, now: Optional[datetime] = None) -> bool: expires_at = parse_datetime(meta.get("expires_at")) if expires_at is None: return False now = now or datetime.now(timezone.utc) return expires_at <= now def safe_float(value, fallback: float) -> float: try: parsed = float(value) if parsed > 0: return parsed except Exception: pass return fallback def safe_int(value, fallback: int) -> int: try: return int(value) except Exception: return fallback def safe_bool(value, fallback: bool) -> bool: if isinstance(value, bool): return value if isinstance(value, str): return value.strip().lower() in {"1", "true", "yes", "y", "on"} if value is None: return fallback return bool(value) def clamp_number(value, minimum=None, maximum=None): if minimum is not None: value = max(minimum, value) if maximum is not None: value = min(maximum, value) return value def next_utc_hour() -> datetime: now = datetime.now(timezone.utc) return now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) def seconds_until_next_utc_hour() -> float: return max(0.0, (next_utc_hour() - datetime.now(timezone.utc)).total_seconds()) def next_utc_hour_iso() -> str: return next_utc_hour().isoformat() def next_rotation_prebuild_at() -> datetime: return next_utc_hour() - timedelta(seconds=max(0, ROTATION_PREBUILD_LEAD_SECONDS)) def next_rotation_prebuild_at_iso() -> str: return next_rotation_prebuild_at().isoformat() oauth_userinfo_cache_lock = threading.Lock() oauth_userinfo_cache = {} def oauth_bool(value) -> bool: if isinstance(value, bool): return value if isinstance(value, str): return value.strip().lower() in {"1", "true", "yes", "y", "on"} return bool(value) def fetch_oauth_userinfo(access_token: Optional[str]) -> dict: if not access_token: return {} now = time.time() with oauth_userinfo_cache_lock: cached = oauth_userinfo_cache.get(access_token) if cached and now - cached.get("fetched_at", 0) < OAUTH_USERINFO_CACHE_TTL_SECONDS: return dict(cached.get("payload", {})) request = urllib.request.Request( OAUTH_USERINFO_URL, headers={ "Authorization": f"Bearer {access_token}", "Accept": "application/json", }, method="GET", ) try: with urllib.request.urlopen(request, timeout=5) as response: payload = json.loads(response.read().decode("utf-8")) if not isinstance(payload, dict): payload = {} with oauth_userinfo_cache_lock: oauth_userinfo_cache[access_token] = { "fetched_at": now, "payload": dict(payload), } return payload except Exception as e: print(f"[oauth] Could not fetch OAuth userinfo endpoint: {e}") return {} def extract_oauth_field(user_info, *field_names, default=None): for field_name in field_names: if isinstance(user_info, dict) and field_name in user_info: return user_info.get(field_name, default) value = getattr(user_info, field_name, None) if value is not None: return value raw = getattr(user_info, "__dict__", {}) or {} for field_name in field_names: if field_name in raw: return raw.get(field_name, default) return default def oauth_user_from_request(request: Request): """ Returns a compact public user profile if the visitor is signed in with HF. Returns None for anonymous visitors. Hugging Face exposes the PRO flag as `is_pro` in Python OAuthUserInfo and as `isPro` in the JS/OIDC userinfo payload. We read both. When the installed huggingface_hub version does not hydrate the field correctly, we fall back to the OAuth `/oauth/userinfo` endpoint using the current OAuth access token. """ try: oauth_info = parse_huggingface_oauth(request) except Exception as e: print(f"[oauth] Could not parse OAuth info: {e}") return None if oauth_info is None or oauth_info.user_info is None: return None user = oauth_info.user_info access_token = extract_oauth_field(oauth_info, "access_token") raw_userinfo = fetch_oauth_userinfo(access_token) username = ( extract_oauth_field(user, "preferred_username") or extract_oauth_field(raw_userinfo, "preferred_username") or extract_oauth_field(user, "name") or extract_oauth_field(raw_userinfo, "name") or "unknown" ) raw_is_pro_value = ( extract_oauth_field(user, "is_pro", "isPro", default=None) ) if raw_is_pro_value is None: raw_is_pro_value = extract_oauth_field(raw_userinfo, "is_pro", "isPro", default=False) elif oauth_bool(raw_is_pro_value) is False: # Some older hub versions hydrate unknown HF-specific fields as False. # If the raw userinfo endpoint says true, trust the endpoint. endpoint_is_pro = extract_oauth_field(raw_userinfo, "is_pro", "isPro", default=None) if endpoint_is_pro is not None: raw_is_pro_value = endpoint_is_pro is_pro = oauth_bool(raw_is_pro_value) return { "id": extract_oauth_field(user, "sub") or extract_oauth_field(raw_userinfo, "sub"), "username": username, "avatar_url": extract_oauth_field(user, "picture") or extract_oauth_field(raw_userinfo, "picture"), "profile_url": ( extract_oauth_field(user, "profile") or extract_oauth_field(raw_userinfo, "profile") or f"https://huggingface.co/{username}" ), "is_pro": is_pro, "isPro": is_pro, "pro_status_source": "oauth_userinfo_endpoint" if raw_userinfo else "oauth_session", } def report_weight_for_user(user: Optional[dict]) -> int: if not user: return 1 username = str(user.get("username") or "").strip().lower().lstrip("@") if username in ADMIN_REPORT_USERNAMES: return max(1, ADMIN_REPORT_WEIGHT) return 1 class ListenerTracker: """ Tracks logical active listeners from the web UI with heartbeats. Anonymous users are counted. Signed-in HF users are counted and exposed with avatar metadata. Direct /stream.mp3 listeners are handled separately by BroadcastEngine. """ def __init__(self, ttl_seconds: int = 45): self.lock = threading.Lock() self.listeners = {} self.total_sessions = 0 self.ttl_seconds = ttl_seconds def _cleanup_unlocked(self): now = time.time() expired_ids = [ listener_id for listener_id, item in self.listeners.items() if now - item.get("last_seen", 0) > self.ttl_seconds ] for listener_id in expired_ids: self.listeners.pop(listener_id, None) if expired_ids: print( f"[listeners] cleaned up expired={len(expired_ids)} " f"active={len(self.listeners)}" ) def start(self, listener_id: str, user: Optional[dict] = None): now = time.time() with self.lock: is_new = listener_id not in self.listeners previous = self.listeners.get(listener_id, {}) self.listeners[listener_id] = { "started_at": previous.get("started_at", now), "last_seen": now, "user": user, } if is_new: self.total_sessions += 1 self._cleanup_unlocked() print( f"[listeners] start active={len(self.listeners)} " f"total_sessions={self.total_sessions} " f"user={user.get('username') if user else 'anonymous'}" ) def heartbeat(self, listener_id: str, user: Optional[dict] = None): now = time.time() with self.lock: is_new = listener_id not in self.listeners previous = self.listeners.get(listener_id, {}) self.listeners[listener_id] = { "started_at": previous.get("started_at", now), "last_seen": now, "user": user or previous.get("user"), } if is_new: self.total_sessions += 1 self._cleanup_unlocked() def stop(self, listener_id: str): with self.lock: self.listeners.pop(listener_id, None) self._cleanup_unlocked() print(f"[listeners] stop active={len(self.listeners)}") def snapshot(self): with self.lock: self._cleanup_unlocked() seen_user_ids = set() signed_in_profiles = [] for item in self.listeners.values(): user = item.get("user") if not user: continue user_id = user.get("id") or user.get("username") if user_id in seen_user_ids: continue seen_user_ids.add(user_id) signed_in_profiles.append( { "username": user.get("username"), "avatar_url": user.get("avatar_url"), "profile_url": user.get("profile_url"), } ) total_signed_in_listener_count = len(signed_in_profiles) # Random but stable for one minute, to avoid UI flicker every /api/status refresh. rng = random.Random(int(time.time() // 60)) rng.shuffle(signed_in_profiles) visible_signed_in_profiles = signed_in_profiles[:8] return { "active_listeners": len(self.listeners), "total_connections": self.total_sessions, "listener_ttl_seconds": self.ttl_seconds, "listener_profiles": visible_signed_in_profiles, "signed_in_listener_count": total_signed_in_listener_count, "visible_listener_profile_count": len(visible_signed_in_profiles), } listeners = ListenerTracker(ttl_seconds=LISTENER_TTL_SECONDS) class LikeStore: def __init__(self, likes_dir: str, likes_file: str): self.lock = threading.Lock() self.likes_dir = Path(likes_dir) self.likes_path = self.likes_dir / likes_file self.tmp_path = self.likes_dir / f".{likes_file}.tmp" self.data = {} self.last_error = None self.load() def load(self): with self.lock: try: self.likes_dir.mkdir(parents=True, exist_ok=True) if self.likes_path.exists(): with open(self.likes_path, "r", encoding="utf-8") as f: self.data = json.load(f) else: self.data = {} self._save_unlocked() self.last_error = None print(f"[likes] Loaded {len(self.data)} feedback entries from {self.likes_path}") except Exception as e: self.last_error = str(e) self.data = {} print(f"[likes] Could not load feedback: {e}") def _save_unlocked(self): payload = json.dumps(self.data, indent=2, ensure_ascii=False) with open(self.tmp_path, "w", encoding="utf-8") as f: f.write(payload) os.replace(self.tmp_path, self.likes_path) def _item_unlocked(self, song_id: str) -> dict: item = dict(self.data.get(song_id, {})) item["count"] = int(item.get("count", 0)) item["upvote_count"] = int(item.get("upvote_count", 0)) item["downvote_count"] = int(item.get("downvote_count", 0)) item["report_count"] = int(item.get("report_count", 0)) return item def _public_result_unlocked(self, song_id: str, item: dict, **extra) -> dict: payload = { "song_id": song_id, "count": int(item.get("count", 0)), "upvote_count": int(item.get("upvote_count", 0)), "downvote_count": int(item.get("downvote_count", 0)), "report_count": int(item.get("report_count", 0)), "updated_at": item.get("updated_at"), } payload.update(extra) return payload def get_count(self, song_id: str) -> int: with self.lock: return int(self.data.get(song_id, {}).get("count", 0)) def get_upvote_count(self, song_id: str) -> int: with self.lock: return int(self.data.get(song_id, {}).get("upvote_count", 0)) def get_downvote_count(self, song_id: str) -> int: with self.lock: return int(self.data.get(song_id, {}).get("downvote_count", 0)) def get_report_count(self, song_id: str) -> int: with self.lock: return int(self.data.get(song_id, {}).get("report_count", 0)) def increment(self, song_id: str) -> dict: now = datetime.now(timezone.utc).isoformat() with self.lock: item = self._item_unlocked(song_id) item["count"] = int(item.get("count", 0)) + 1 item["updated_at"] = now self.data[song_id] = item try: self._save_unlocked() self.last_error = None except Exception as e: self.last_error = str(e) print(f"[likes] Could not save like: {e}") raise return self._public_result_unlocked(song_id, item) def upvote(self, song_id: str) -> dict: now = datetime.now(timezone.utc).isoformat() with self.lock: item = self._item_unlocked(song_id) item["upvote_count"] = int(item.get("upvote_count", 0)) + 1 item["updated_at"] = now self.data[song_id] = item try: self._save_unlocked() self.last_error = None except Exception as e: self.last_error = str(e) print(f"[likes] Could not save upvote: {e}") raise return self._public_result_unlocked(song_id, item) def unupvote(self, song_id: str) -> dict: now = datetime.now(timezone.utc).isoformat() with self.lock: item = self._item_unlocked(song_id) item["upvote_count"] = max(0, int(item.get("upvote_count", 0)) - 1) item["updated_at"] = now self.data[song_id] = item try: self._save_unlocked() self.last_error = None except Exception as e: self.last_error = str(e) print(f"[likes] Could not undo upvote: {e}") raise return self._public_result_unlocked(song_id, item) def downvote(self, song_id: str) -> dict: now = datetime.now(timezone.utc).isoformat() with self.lock: item = self._item_unlocked(song_id) item["downvote_count"] = int(item.get("downvote_count", 0)) + 1 item["updated_at"] = now self.data[song_id] = item try: self._save_unlocked() self.last_error = None except Exception as e: self.last_error = str(e) print(f"[likes] Could not save downvote: {e}") raise return self._public_result_unlocked(song_id, item) def undownvote(self, song_id: str) -> dict: now = datetime.now(timezone.utc).isoformat() with self.lock: item = self._item_unlocked(song_id) item["downvote_count"] = max(0, int(item.get("downvote_count", 0)) - 1) item["updated_at"] = now self.data[song_id] = item try: self._save_unlocked() self.last_error = None except Exception as e: self.last_error = str(e) print(f"[likes] Could not undo downvote: {e}") raise return self._public_result_unlocked(song_id, item) def report(self, song_id: str, weight: int = 1) -> dict: now = datetime.now(timezone.utc).isoformat() weight = max(1, int(weight or 1)) with self.lock: item = self._item_unlocked(song_id) item["report_count"] = int(item.get("report_count", 0)) + weight item["reported_at"] = now item["updated_at"] = now self.data[song_id] = item try: self._save_unlocked() self.last_error = None except Exception as e: self.last_error = str(e) print(f"[likes] Could not save report: {e}") raise return self._public_result_unlocked(song_id, item, reported_at=now, report_weight=weight) def unreport(self, song_id: str, weight: int = 1) -> dict: now = datetime.now(timezone.utc).isoformat() weight = max(1, int(weight or 1)) with self.lock: item = self._item_unlocked(song_id) item["report_count"] = max(0, int(item.get("report_count", 0)) - weight) item["unreported_at"] = now item["updated_at"] = now if item["report_count"] == 0: item.pop("reported_at", None) self.data[song_id] = item try: self._save_unlocked() self.last_error = None except Exception as e: self.last_error = str(e) print(f"[likes] Could not save unreport: {e}") raise return self._public_result_unlocked(song_id, item, unreported_at=now, report_weight=weight) def snapshot(self) -> dict: with self.lock: return dict(self.data) likes = LikeStore(LIKES_DIR, LIKES_FILE) class RadioState: """ Holds the logical radio state. In V2, this class no longer produces one stream per listener. The BroadcastEngine is responsible for audio output and advances the radio when the current audio item finishes. """ def __init__(self): self.lock = threading.Lock() self.fs = HfFileSystem() self.tracks = [] self.jingles = [] self.segments = [] self.custom_tracks = [] self.current_index: Optional[int] = None self.current_kind = "song" self.current_jingle: Optional[dict] = None self.current_segment: Optional[dict] = None self.started_at: Optional[float] = None self.is_playing = False self.mode = RADIO_MODE self.is_loading = False self.last_error = None self.last_refresh_at: Optional[float] = None self.next_tracks = [] self.next_rotation_ready = False self.next_rotation_is_building = False self.next_rotation_built_at: Optional[float] = None self.next_rotation_target_at: Optional[str] = None self.next_rotation_last_error = None self.next_rotation_source = None self.next_rotation_payload = None self.approved_next_rotation_last_loaded_at = None self.approved_next_rotation_last_error = None self.approved_next_rotation_last_consumed_at = None self.last_rotation_activated_at: Optional[float] = None self.current_rotation_source = "initial" self.current_rotation_target_at = None self.current_rotation_activated_at = None self.current_rotation_track_ids = [] self.songs_since_last_jingle = 0 self.last_jingle_id: Optional[str] = None self.songs_since_last_segment = 0 self.last_segment_id: Optional[str] = None self.recently_played_songs: list[dict] = [] self.runtime_config = self._default_runtime_config() self.runtime_config_source = "defaults" self.runtime_config_last_loaded_at = None self.runtime_config_last_error = None self.runtime_control_payload = {} self.track_overrides = {} self.track_overrides_last_loaded_at = None self.track_overrides_last_error = None self.cover_overrides_count = 0 self.custom_tracks_last_loaded_at = None self.custom_tracks_last_error = None def _default_runtime_config(self) -> dict: return { "max_tracks": MAX_TRACKS, "rotation_target_seconds": ROTATION_TARGET_SECONDS, "report_hide_threshold": REPORT_HIDE_THRESHOLD, "like_weight_per_like": LIKE_WEIGHT_PER_LIKE, "like_weight_max": LIKE_WEIGHT_MAX, "upvote_weight_per_upvote": UPVOTE_WEIGHT_PER_UPVOTE, "upvote_weight_max": UPVOTE_WEIGHT_MAX, "downvote_weight_per_downvote": DOWNVOTE_WEIGHT_PER_DOWNVOTE, "downvote_weight_max": DOWNVOTE_WEIGHT_MAX, "track_weight_min": TRACK_WEIGHT_MIN, "track_weight_max": TRACK_WEIGHT_MAX, "auto_rotation_min_effective_score_exclusive": AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE, "recent_track_quota": RECENT_TRACK_QUOTA, "recent_track_window_hours": RECENT_TRACK_WINDOW_HOURS, "recently_played_exclude_minutes": RECENTLY_PLAYED_EXCLUDE_MINUTES, "recently_played_memory_max": RECENTLY_PLAYED_MEMORY_MAX, "recently_played_min_candidates_after_filter": RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER, "meta_load_workers": META_LOAD_WORKERS, "enable_jingles": ENABLE_JINGLES, "enable_remote_jingles": ENABLE_REMOTE_JINGLES, "min_songs_between_jingles": MIN_SONGS_BETWEEN_JINGLES, "max_songs_between_jingles": MAX_SONGS_BETWEEN_JINGLES, "jingle_probability_after_min": JINGLE_PROBABILITY_AFTER_MIN, "enable_segments": ENABLE_SEGMENTS, "min_songs_between_segments": MIN_SONGS_BETWEEN_SEGMENTS, "max_songs_between_segments": MAX_SONGS_BETWEEN_SEGMENTS, "segment_probability_after_min": SEGMENT_PROBABILITY_AFTER_MIN, "enable_custom_tracks": ENABLE_CUSTOM_TRACKS, "custom_track_quota": CUSTOM_TRACK_QUOTA, "enable_song_mastering": ENABLE_SONG_MASTERING, "song_audio_filter": SONG_AUDIO_FILTER, } def _cfg(self, key: str, fallback=None): if fallback is None: fallback = self._default_runtime_config().get(key) return self.runtime_config.get(key, fallback) def _coerce_runtime_config(self, custom_config: dict) -> dict: defaults = self._default_runtime_config() result = dict(defaults) for key, default_value in defaults.items(): if key not in custom_config: continue raw_value = custom_config.get(key) if isinstance(default_value, bool): value = safe_bool(raw_value, default_value) elif isinstance(default_value, int) and not isinstance(default_value, bool): value = safe_int(raw_value, default_value) elif isinstance(default_value, float): value = safe_float(raw_value, default_value) elif isinstance(default_value, str): value = str(raw_value or "") else: value = raw_value if key in { "recent_track_quota", "jingle_probability_after_min", "segment_probability_after_min", "custom_track_quota", }: value = float(clamp_number(float(value), 0.0, 1.0)) if key in { "max_tracks", "rotation_target_seconds", "meta_load_workers", "recent_track_window_hours", "recently_played_exclude_minutes", "recently_played_memory_max", "recently_played_min_candidates_after_filter", "min_songs_between_jingles", "max_songs_between_jingles", "min_songs_between_segments", "max_songs_between_segments", }: value = int(clamp_number(int(value), 0, None)) if key in { "like_weight_per_like", "like_weight_max", "upvote_weight_per_upvote", "upvote_weight_max", "downvote_weight_per_downvote", "downvote_weight_max", "track_weight_min", "track_weight_max", }: value = float(clamp_number(float(value), 0.0, None)) result[key] = value if result["max_songs_between_jingles"] < result["min_songs_between_jingles"]: result["max_songs_between_jingles"] = result["min_songs_between_jingles"] if result["max_songs_between_segments"] < result["min_songs_between_segments"]: result["max_songs_between_segments"] = result["min_songs_between_segments"] if result["track_weight_max"] < result["track_weight_min"]: result["track_weight_max"] = result["track_weight_min"] return result def refresh_runtime_controls(self): control_dir = Path(RADIO_CONTROL_DIR) config_path = control_dir / RADIO_CONTROL_CONFIG_FILE overrides_path = control_dir / RADIO_TRACK_OVERRIDES_FILE now = time.time() runtime_config = self._default_runtime_config() runtime_config_source = "defaults" runtime_config_error = None runtime_payload = {} try: if config_path.exists(): runtime_payload = json.loads(config_path.read_text(encoding="utf-8")) enabled = safe_bool(runtime_payload.get("enabled", True), True) use_custom_config = safe_bool(runtime_payload.get("use_custom_config", False), False) custom_config = runtime_payload.get("config", {}) if enabled and use_custom_config and isinstance(custom_config, dict): runtime_config = self._coerce_runtime_config(custom_config) runtime_config_source = "studio_custom" else: runtime_config_source = "defaults_disabled_or_not_requested" else: runtime_config_source = "defaults_no_config_file" except Exception as e: runtime_config = self._default_runtime_config() runtime_config_source = "defaults_config_error" runtime_config_error = str(e) print(f"[radio-control] Could not load config {config_path}: {e}") track_overrides = {} overrides_error = None try: if overrides_path.exists(): payload = json.loads(overrides_path.read_text(encoding="utf-8")) if isinstance(payload, dict): for song_id, override in payload.items(): if not song_id or not isinstance(override, dict): continue track_overrides[str(song_id)] = { "hidden": safe_bool(override.get("hidden", False), False), "upvote_boost": max(0, safe_int(override.get("upvote_boost", 0), 0)), "downvote_boost": max(0, safe_int(override.get("downvote_boost", 0), 0)), "like_boost": max(0, safe_int(override.get("like_boost", 0), 0)), "notes": str(override.get("notes") or ""), "updated_at": override.get("updated_at"), } except Exception as e: overrides_error = str(e) print(f"[radio-control] Could not load track overrides {overrides_path}: {e}") cover_overrides_count = 0 cover_root = Path(COVER_OVERRIDES_DIR) try: if cover_root.exists(): cover_overrides_count = len(list(cover_root.glob("*/thumb.png"))) except Exception as e: print(f"[cover-overrides] Could not count cover overrides: {e}") with self.lock: self.runtime_config = runtime_config self.runtime_config_source = runtime_config_source self.runtime_config_last_loaded_at = now self.runtime_config_last_error = runtime_config_error self.runtime_control_payload = runtime_payload if isinstance(runtime_payload, dict) else {} self.track_overrides = track_overrides self.track_overrides_last_loaded_at = now self.track_overrides_last_error = overrides_error self.cover_overrides_count = cover_overrides_count print( f"[radio-control] config_source={runtime_config_source} " f"overrides={len(track_overrides)} covers={cover_overrides_count}" ) def _reload_track_overrides_unlocked(self): """ Reload only /radio-control/track_overrides.json while self.lock is already held. This is intentionally lighter than refresh_runtime_controls(): it does not reload the full runtime config, jingles, segments, custom tracks or rotation pack. Its only purpose is to make Studio boosts/hidden flags available when a queued song starts playing, without waiting for the next full radio refresh. """ overrides_path = Path(RADIO_CONTROL_DIR) / RADIO_TRACK_OVERRIDES_FILE now = time.time() track_overrides = {} overrides_error = None try: if overrides_path.exists(): payload = json.loads(overrides_path.read_text(encoding="utf-8")) if isinstance(payload, dict): for song_id, override in payload.items(): if not song_id or not isinstance(override, dict): continue track_overrides[str(song_id)] = { "hidden": safe_bool(override.get("hidden", False), False), "upvote_boost": max(0, safe_int(override.get("upvote_boost", 0), 0)), "downvote_boost": max(0, safe_int(override.get("downvote_boost", 0), 0)), "like_boost": max(0, safe_int(override.get("like_boost", 0), 0)), "notes": str(override.get("notes") or ""), "updated_at": override.get("updated_at"), } except Exception as e: overrides_error = str(e) print(f"[radio-control] Could not reload track overrides {overrides_path}: {e}") self.track_overrides = track_overrides self.track_overrides_last_loaded_at = now self.track_overrides_last_error = overrides_error def _track_override_unlocked(self, song_id: Optional[str]) -> dict: if not song_id: return {} return dict(self.track_overrides.get(str(song_id), {})) def _cover_override_path(self, song_id: Optional[str]) -> Optional[Path]: if not song_id: return None path = Path(COVER_OVERRIDES_DIR) / str(song_id) / "thumb.png" if path.exists() and path.is_file(): return path return None def _apply_track_controls_unlocked(self, track: dict) -> dict: if not track or track.get("is_jingle") or track.get("is_segment"): return track song_id = str(track.get("id") or "") override = self._track_override_unlocked(song_id) like_count = safe_int(track.get("like_count", 0), 0) upvote_count = safe_int(track.get("upvote_count", 0), 0) downvote_count = safe_int(track.get("downvote_count", 0), 0) report_count = safe_int(track.get("report_count", 0), 0) like_boost = max(0, safe_int(override.get("like_boost", 0), 0)) upvote_boost = max(0, safe_int(override.get("upvote_boost", 0), 0)) downvote_boost = max(0, safe_int(override.get("downvote_boost", 0), 0)) track["studio_hidden"] = bool(override.get("hidden", False)) track["studio_notes"] = override.get("notes", "") track["studio_like_boost"] = like_boost track["studio_upvote_boost"] = upvote_boost track["studio_downvote_boost"] = downvote_boost track["effective_like_count"] = like_count + like_boost track["effective_upvote_count"] = upvote_count + upvote_boost track["effective_downvote_count"] = downvote_count + downvote_boost track["effective_report_count"] = report_count cover_override = self._cover_override_path(song_id) if cover_override: track["thumb_url"] = f"/cover-overrides/{song_id}/thumb.png" track["has_cover_override"] = True else: track["has_cover_override"] = False return track def _prune_recently_played_unlocked(self): if not self.recently_played_songs: return now = time.time() recently_played_exclude_minutes = int(self._cfg("recently_played_exclude_minutes", RECENTLY_PLAYED_EXCLUDE_MINUTES)) recently_played_memory_max = int(self._cfg("recently_played_memory_max", RECENTLY_PLAYED_MEMORY_MAX)) if recently_played_exclude_minutes > 0: cutoff = now - (recently_played_exclude_minutes * 60) self.recently_played_songs = [ item for item in self.recently_played_songs if item.get("id") and float(item.get("played_at", 0)) >= cutoff ] if recently_played_memory_max > 0: self.recently_played_songs = self.recently_played_songs[-recently_played_memory_max:] def _remember_played_song_unlocked(self, song_id: Optional[str]): if not song_id: return self.recently_played_songs.append({ "id": str(song_id), "played_at": time.time(), }) self._prune_recently_played_unlocked() def _recently_played_exclusion_set_unlocked(self) -> set[str]: recently_played_exclude_minutes = int(self._cfg("recently_played_exclude_minutes", RECENTLY_PLAYED_EXCLUDE_MINUTES)) if recently_played_exclude_minutes <= 0: return set() self._prune_recently_played_unlocked() now = time.time() cutoff = now - (recently_played_exclude_minutes * 60) return { str(item.get("id")) for item in self.recently_played_songs if item.get("id") and float(item.get("played_at", 0)) >= cutoff } def _make_basic_track(self, song_id: str): track = { "id": song_id, "title": f"Song {song_id}", "description": "", "lyrics": "", "tags": [], "duration": DEFAULT_DURATION, "created_at": "", "audio_url": f"{BUCKET_RESOLVE_URL}/songs/{song_id}/{song_id}.wav", "stream_url": f"{BUCKET_RESOLVE_URL}/songs/{song_id}/{song_id}.wav", "thumb_url": f"{BUCKET_RESOLVE_URL}/songs/{song_id}/thumb.png", "meta_loaded": False, "like_count": likes.get_count(song_id), "upvote_count": likes.get_upvote_count(song_id), "downvote_count": likes.get_downvote_count(song_id), "report_count": likes.get_report_count(song_id), "is_jingle": False, "is_segment": False, "is_custom_track": False, } return self._apply_track_controls_unlocked(track) def _make_jingle_track(self, path: Path): duration = probe_audio_duration(str(path), DEFAULT_JINGLE_DURATION) return { "id": path.stem, "title": "HF Community Radio", "description": "Station ID", "lyrics": "", "tags": ["jingle"], "duration": duration, "created_at": "", "audio_url": f"/jingles/{path.name}", "stream_url": str(path), "thumb_url": "", "meta_loaded": True, "like_count": 0, "upvote_count": 0, "downvote_count": 0, "report_count": 0, "is_jingle": True, "is_segment": False, "is_custom_track": False, "source_url": "", } def _make_remote_jingle_track(self, meta_path: Path): meta = json.loads(meta_path.read_text(encoding="utf-8")) if not asset_is_enabled(meta): return None audio_path = meta_path.parent / "audio.mp3" if not audio_path.exists(): print(f"[jingles] Missing remote jingle audio: {audio_path}") return None asset_id = meta.get("id") or meta_path.parent.name kind = meta.get("kind") or "station_id" duration = safe_float(meta.get("duration"), DEFAULT_JINGLE_DURATION) return { "id": str(asset_id), "title": meta.get("title") or "HF Radio", "description": meta.get("description") or "", "lyrics": "", "tags": ["jingle", str(kind)], "duration": duration, "created_at": meta.get("created_at", ""), "updated_at": meta.get("updated_at", ""), "audio_url": meta.get("audio_url") or "", "stream_url": str(audio_path), "thumb_url": "", "meta_loaded": True, "like_count": 0, "upvote_count": 0, "downvote_count": 0, "report_count": 0, "is_jingle": True, "is_segment": False, "is_custom_track": False, "kind": kind, "source_url": "", } def _make_segment_track(self, meta_path: Path): meta = json.loads(meta_path.read_text(encoding="utf-8")) if not asset_is_enabled(meta): return None if asset_is_expired(meta): return None audio_path = meta_path.parent / "audio.mp3" if not audio_path.exists(): print(f"[segments] Missing segment audio: {audio_path}") return None asset_id = meta.get("id") or meta_path.parent.name kind = meta.get("kind") or "community_news" duration = safe_float(meta.get("duration"), DEFAULT_SEGMENT_DURATION) tags = ["community update", str(kind).replace("_", " ")] return { "id": str(asset_id), "title": meta.get("title") or "Community Update", "description": meta.get("description") or "", "lyrics": "", "tags": tags, "duration": duration, "created_at": meta.get("created_at", ""), "updated_at": meta.get("updated_at", ""), "expires_at": meta.get("expires_at"), "audio_url": meta.get("audio_url") or "", "stream_url": str(audio_path), "thumb_url": "", "meta_loaded": True, "like_count": 0, "upvote_count": 0, "downvote_count": 0, "report_count": 0, "is_jingle": False, "is_segment": True, "is_custom_track": False, "kind": kind, "public_label": meta.get("public_label") or "Community Update", "source_url": meta.get("source_url") or "", } def _make_custom_track(self, meta_path: Path): meta = json.loads(meta_path.read_text(encoding="utf-8")) if not asset_is_enabled(meta): return None audio_path = meta_path.parent / "audio.mp3" if not audio_path.exists(): print(f"[custom-tracks] Missing custom track audio: {audio_path}") return None asset_id = str(meta.get("id") or meta_path.parent.name) duration = safe_float(meta.get("duration"), DEFAULT_CUSTOM_TRACK_DURATION) tags = normalize_tags(meta.get("tags", [])) if not tags: tags = ["custom track"] thumb_path = meta_path.parent / "thumb.png" thumb_url = meta.get("thumb_url") or "" if thumb_path.exists() and thumb_path.is_file(): thumb_url = f"/custom-tracks/{asset_id}/thumb.png" track = { "id": asset_id, "title": meta.get("title") or f"Custom Track {asset_id}", "description": meta.get("description", ""), "lyrics": meta.get("lyrics", ""), "tags": tags, "duration": duration, "created_at": meta.get("created_at", ""), "updated_at": meta.get("updated_at", ""), "audio_url": meta.get("audio_url") or "", "stream_url": str(audio_path), "thumb_url": thumb_url, "meta_loaded": True, "like_count": likes.get_count(asset_id), "upvote_count": likes.get_upvote_count(asset_id), "downvote_count": likes.get_downvote_count(asset_id), "report_count": likes.get_report_count(asset_id), "is_jingle": False, "is_segment": False, "is_custom_track": True, "type": meta.get("type") or "custom_track", "source_model": meta.get("source_model") or "", "provider": meta.get("provider") or "", "prompt": meta.get("prompt") or "", "quality_score": meta.get("quality_score"), "quality_notes": meta.get("quality_notes") or "", "internal_notes": meta.get("internal_notes") or "", } return self._apply_track_controls_unlocked(track) def _song_is_hidden_by_reports(self, song_id: str) -> bool: override = self._track_override_unlocked(song_id) if override.get("hidden"): return True report_hide_threshold = int(self._cfg("report_hide_threshold", REPORT_HIDE_THRESHOLD)) if report_hide_threshold <= 0: return False return likes.get_report_count(song_id) >= report_hide_threshold def _effective_score_for_song_id_unlocked(self, song_id: Optional[str]) -> int: """Return the editorial effective score used for auto-rotation eligibility. Keep this aligned with Studio's generated draft score contract: likes + upvotes + Studio boosts - downvotes - Studio downvote penalty - reports. """ if not song_id: return 0 song_id = str(song_id) override = self._track_override_unlocked(song_id) if not isinstance(override, dict): override = {} like_count = likes.get_count(song_id) upvote_count = likes.get_upvote_count(song_id) downvote_count = likes.get_downvote_count(song_id) report_count = likes.get_report_count(song_id) like_boost = max(0, safe_int(override.get("like_boost", 0), 0)) upvote_boost = max(0, safe_int(override.get("upvote_boost", 0), 0)) downvote_boost = max(0, safe_int(override.get("downvote_boost", 0), 0)) return like_count + upvote_count + like_boost + upvote_boost - downvote_count - downvote_boost - report_count def _effective_score_for_track_unlocked(self, track: dict) -> int: if not track: return 0 song_id = track.get("id") if song_id: return self._effective_score_for_song_id_unlocked(str(song_id)) like_count = safe_int(track.get("effective_like_count", track.get("like_count", 0)), 0) upvote_count = safe_int(track.get("effective_upvote_count", track.get("upvote_count", 0)), 0) downvote_count = safe_int(track.get("effective_downvote_count", track.get("downvote_count", 0)), 0) report_count = safe_int(track.get("effective_report_count", track.get("report_count", 0)), 0) return like_count + upvote_count - downvote_count - report_count def _track_is_below_auto_rotation_score_floor_unlocked(self, track_or_song_id) -> bool: threshold = float(self._cfg( "auto_rotation_min_effective_score_exclusive", AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE, )) if isinstance(track_or_song_id, dict): score = self._effective_score_for_track_unlocked(track_or_song_id) else: score = self._effective_score_for_song_id_unlocked(str(track_or_song_id or "")) return score <= threshold def _feedback_weight_for_track(self, track: dict) -> float: like_count = safe_int(track.get("effective_like_count", track.get("like_count", 0)), 0) upvote_count = safe_int(track.get("effective_upvote_count", track.get("upvote_count", 0)), 0) downvote_count = safe_int(track.get("effective_downvote_count", track.get("downvote_count", 0)), 0) like_weight_per_like = float(self._cfg("like_weight_per_like", LIKE_WEIGHT_PER_LIKE)) like_weight_max = float(self._cfg("like_weight_max", LIKE_WEIGHT_MAX)) upvote_weight_per_upvote = float(self._cfg("upvote_weight_per_upvote", UPVOTE_WEIGHT_PER_UPVOTE)) upvote_weight_max = float(self._cfg("upvote_weight_max", UPVOTE_WEIGHT_MAX)) downvote_weight_per_downvote = float(self._cfg("downvote_weight_per_downvote", DOWNVOTE_WEIGHT_PER_DOWNVOTE)) downvote_weight_max = float(self._cfg("downvote_weight_max", DOWNVOTE_WEIGHT_MAX)) track_weight_min = float(self._cfg("track_weight_min", TRACK_WEIGHT_MIN)) track_weight_max = float(self._cfg("track_weight_max", TRACK_WEIGHT_MAX)) like_bonus = min(max(0, like_count) * like_weight_per_like, max(0.0, like_weight_max - 1.0)) upvote_bonus = min(max(0, upvote_count) * upvote_weight_per_upvote, upvote_weight_max) downvote_penalty = min(max(0, downvote_count) * downvote_weight_per_downvote, downvote_weight_max) weight = 1.0 + like_bonus + upvote_bonus - downvote_penalty return max(track_weight_min, min(track_weight_max, weight)) def _weighted_pop_track(self, tracks: list[dict]) -> dict: weights = [self._feedback_weight_for_track(track) for track in tracks] total_weight = sum(weights) if total_weight <= 0: selected_index = random.randrange(len(tracks)) else: pick = random.uniform(0, total_weight) cumulative = 0.0 selected_index = len(tracks) - 1 for i, weight in enumerate(weights): cumulative += weight if pick <= cumulative: selected_index = i break return tracks.pop(selected_index) def _track_duration_seconds(self, track: dict) -> float: try: duration = float(track.get("duration") or DEFAULT_DURATION) except Exception: duration = DEFAULT_DURATION return max(0.0, duration) def _track_is_recent(self, track: dict, now: Optional[datetime] = None) -> bool: recent_track_quota = float(self._cfg("recent_track_quota", RECENT_TRACK_QUOTA)) recent_track_window_hours = int(self._cfg("recent_track_window_hours", RECENT_TRACK_WINDOW_HOURS)) if recent_track_quota <= 0 or recent_track_window_hours <= 0: return False created_at = parse_datetime(track.get("created_at")) if created_at is None: return False now = now or datetime.now(timezone.utc) recent_cutoff = now - timedelta(hours=recent_track_window_hours) return created_at >= recent_cutoff def _average_jingle_duration_unlocked(self) -> float: if not self._cfg("enable_jingles", ENABLE_JINGLES) or not self.jingles: return 0.0 durations = [] for jingle in self.jingles: try: duration = float(jingle.get("duration") or 0) except Exception: duration = 0 if duration > 0: durations.append(duration) if not durations: return DEFAULT_JINGLE_DURATION return sum(durations) / len(durations) def _expected_songs_per_jingle(self) -> int: min_songs_between_jingles = int(self._cfg("min_songs_between_jingles", MIN_SONGS_BETWEEN_JINGLES)) max_songs_between_jingles = int(self._cfg("max_songs_between_jingles", MAX_SONGS_BETWEEN_JINGLES)) if min_songs_between_jingles <= 0 and max_songs_between_jingles <= 0: return 1 expected = round((min_songs_between_jingles + max_songs_between_jingles) / 2) return max(1, expected) def _estimated_jingle_seconds_for_song_count(self, song_count: int, average_jingle_duration: float) -> float: if not self._cfg("enable_jingles", ENABLE_JINGLES) or average_jingle_duration <= 0: return 0.0 expected_songs_per_jingle = self._expected_songs_per_jingle() estimated_jingle_count = song_count // expected_songs_per_jingle return estimated_jingle_count * average_jingle_duration def _estimated_rotation_seconds(self, tracks: list[dict], average_jingle_duration: float, current_remaining_seconds: Optional[float] = None) -> float: song_seconds = 0.0 for i, track in enumerate(tracks): if i == 0 and current_remaining_seconds is not None: song_seconds += current_remaining_seconds continue try: song_seconds += float(track.get("duration") or DEFAULT_DURATION) except Exception: song_seconds += DEFAULT_DURATION return song_seconds + self._estimated_jingle_seconds_for_song_count( len(tracks), average_jingle_duration, ) def _load_track_from_meta_path(self, meta_path: str, song_id: str) -> dict: track = self._make_basic_track(song_id) try: with self.fs.open(meta_path, "r") as f: meta = json.load(f) track["title"] = meta.get("title") or track["title"] track["description"] = meta.get("description", "") track["lyrics"] = meta.get("lyrics", "") track["tags"] = normalize_tags(meta.get("tags", [])) track["created_at"] = meta.get("created_at", "") duration = meta.get("duration") try: track["duration"] = float(duration) if duration is not None else DEFAULT_DURATION except Exception: track["duration"] = DEFAULT_DURATION track["audio_url"] = meta.get("audio_url") or track["audio_url"] track["stream_url"] = track["audio_url"] track["thumb_url"] = meta.get("thumb_url") or track["thumb_url"] track["meta_loaded"] = True except Exception as e: print(f"[radio] Could not load metadata for {song_id} during refresh: {e}") track["meta_loaded"] = True track["like_count"] = likes.get_count(song_id) track["upvote_count"] = likes.get_upvote_count(song_id) track["downvote_count"] = likes.get_downvote_count(song_id) track["report_count"] = likes.get_report_count(song_id) self._apply_track_controls_unlocked(track) return track def refresh_jingles(self): if not self._cfg("enable_jingles", ENABLE_JINGLES): with self.lock: self.jingles = [] self.current_jingle = None if self.current_kind == "jingle": self.current_kind = "song" print("[jingles] Disabled") return new_jingles = [] jingles_dir = Path(JINGLES_DIR) paths = sorted(jingles_dir.glob(JINGLES_GLOB)) for path in paths: if not path.is_file(): continue try: new_jingles.append(self._make_jingle_track(path)) except Exception as e: print(f"[jingles] Could not load local jingle {path}: {e}") if self._cfg("enable_remote_jingles", ENABLE_REMOTE_JINGLES): remote_jingles_dir = Path(REMOTE_JINGLES_DIR) if remote_jingles_dir.exists(): for meta_path in sorted(remote_jingles_dir.glob("*/meta.json")): try: jingle = self._make_remote_jingle_track(meta_path) if jingle: new_jingles.append(jingle) except Exception as e: print(f"[jingles] Could not load remote jingle {meta_path}: {e}") else: print(f"[jingles] Remote jingles mount not found: {remote_jingles_dir}") with self.lock: current_jingle_id = self.current_jingle.get("id") if self.current_jingle else None self.jingles = new_jingles if self.current_kind == "jingle": matching_jingle = next( ( jingle for jingle in self.jingles if jingle["id"] == current_jingle_id ), None, ) if matching_jingle: self.current_jingle = matching_jingle else: self.current_kind = "song" self.current_jingle = None print( f"[jingles] Loaded {len(new_jingles)} jingles " f"local_dir={JINGLES_DIR}/{JINGLES_GLOB} remote_dir={REMOTE_JINGLES_DIR}" ) def refresh_segments(self): if not self._cfg("enable_segments", ENABLE_SEGMENTS): with self.lock: self.segments = [] self.current_segment = None if self.current_kind == "segment": self.current_kind = "song" print("[segments] Disabled") return new_segments = [] remote_segments_dir = Path(REMOTE_SEGMENTS_DIR) if remote_segments_dir.exists(): for meta_path in sorted(remote_segments_dir.glob("*/meta.json")): try: segment = self._make_segment_track(meta_path) if segment: new_segments.append(segment) except Exception as e: print(f"[segments] Could not load segment {meta_path}: {e}") else: print(f"[segments] Remote segments mount not found: {remote_segments_dir}") with self.lock: current_segment_id = self.current_segment.get("id") if self.current_segment else None self.segments = new_segments if self.current_kind == "segment": matching_segment = next( ( segment for segment in self.segments if segment["id"] == current_segment_id ), None, ) if matching_segment: self.current_segment = matching_segment else: self.current_kind = "song" self.current_segment = None print(f"[segments] Loaded {len(new_segments)} enabled segments from {REMOTE_SEGMENTS_DIR}") def refresh_custom_tracks(self): if not bool(self._cfg("enable_custom_tracks", ENABLE_CUSTOM_TRACKS)): with self.lock: self.custom_tracks = [] self.custom_tracks_last_loaded_at = time.time() self.custom_tracks_last_error = None print("[custom-tracks] Disabled") return new_custom_tracks = [] custom_tracks_error = None custom_tracks_dir = Path(CUSTOM_TRACKS_DIR) try: if custom_tracks_dir.exists(): for meta_path in sorted(custom_tracks_dir.glob("*/meta.json")): try: track = self._make_custom_track(meta_path) if not track: continue if track.get("studio_hidden"): continue if self._song_is_hidden_by_reports(track.get("id")): continue new_custom_tracks.append(track) except Exception as e: print(f"[custom-tracks] Could not load custom track {meta_path}: {e}") else: print(f"[custom-tracks] Mount not found: {custom_tracks_dir}") except Exception as e: custom_tracks_error = str(e) print(f"[custom-tracks] Could not refresh custom tracks: {e}") with self.lock: self.custom_tracks = new_custom_tracks self.custom_tracks_last_loaded_at = time.time() self.custom_tracks_last_error = custom_tracks_error print(f"[custom-tracks] Loaded {len(new_custom_tracks)} enabled custom tracks from {CUSTOM_TRACKS_DIR}") def _meta_path_for_song(self, song_id: str): return f"{BUCKET_FS_ROOT}/songs/{song_id}/meta.json" def _load_meta_for_track_unlocked(self, track: dict): if not track: return track if track.get("is_jingle") or track.get("is_segment"): return track if track.get("is_custom_track"): track["like_count"] = likes.get_count(track.get("id")) track["upvote_count"] = likes.get_upvote_count(track.get("id")) track["downvote_count"] = likes.get_downvote_count(track.get("id")) track["report_count"] = likes.get_report_count(track.get("id")) self._apply_track_controls_unlocked(track) return track song_id = track["id"] if not track.get("meta_loaded"): meta_path = self._meta_path_for_song(song_id) try: with self.fs.open(meta_path, "r") as f: meta = json.load(f) track["title"] = meta.get("title") or track["title"] track["description"] = meta.get("description", "") track["lyrics"] = meta.get("lyrics", "") track["tags"] = normalize_tags(meta.get("tags", [])) track["created_at"] = meta.get("created_at", "") duration = meta.get("duration") try: track["duration"] = float(duration) if duration is not None else DEFAULT_DURATION except Exception: track["duration"] = DEFAULT_DURATION track["audio_url"] = meta.get("audio_url") or track["audio_url"] track["stream_url"] = track["audio_url"] track["thumb_url"] = meta.get("thumb_url") or track["thumb_url"] track["meta_loaded"] = True except Exception as e: print(f"[radio] Could not load metadata for {song_id}: {e}") track["meta_loaded"] = True track["like_count"] = likes.get_count(song_id) track["upvote_count"] = likes.get_upvote_count(song_id) track["downvote_count"] = likes.get_downvote_count(song_id) track["report_count"] = likes.get_report_count(song_id) self._apply_track_controls_unlocked(track) return track def refresh_tracks(self, prebuild: bool = False, target_at: Optional[datetime] = None): label = "prebuilding next rotation" if prebuild else "refreshing active rotation" print(f"[radio] {label}: {BUCKET_ID}") print(f"[radio] Bucket FS root: {BUCKET_FS_ROOT}") with self.lock: if prebuild: self.next_rotation_is_building = True self.next_rotation_last_error = None self.next_rotation_target_at = target_at.isoformat() if target_at else next_utc_hour_iso() else: self.is_loading = True self.last_error = None runtime_config = dict(self.runtime_config) average_jingle_duration = self._average_jingle_duration_unlocked() custom_tracks_snapshot = [dict(track) for track in self.custom_tracks] max_tracks = int(runtime_config.get("max_tracks", MAX_TRACKS)) rotation_target_seconds = int(runtime_config.get("rotation_target_seconds", ROTATION_TARGET_SECONDS)) recent_track_quota = float(runtime_config.get("recent_track_quota", RECENT_TRACK_QUOTA)) custom_track_quota = float(runtime_config.get("custom_track_quota", CUSTOM_TRACK_QUOTA)) recently_played_exclude_minutes = int(runtime_config.get("recently_played_exclude_minutes", RECENTLY_PLAYED_EXCLUDE_MINUTES)) recently_played_min_candidates_after_filter = int(runtime_config.get("recently_played_min_candidates_after_filter", RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER)) meta_load_workers = int(runtime_config.get("meta_load_workers", META_LOAD_WORKERS)) try: glob_pattern = f"{BUCKET_FS_ROOT}/songs/*/meta.json" print(f"[radio] Glob pattern: {glob_pattern}") meta_paths = self.fs.glob(glob_pattern) print(f"[radio] Found {len(meta_paths)} meta.json files") print(f"[radio] First paths: {meta_paths[:5]}") except Exception as e: print(f"[radio] Could not list bucket {BUCKET_ID}: {e}") with self.lock: if prebuild: self.next_rotation_is_building = False self.next_rotation_last_error = str(e) else: self.is_loading = False self.last_error = str(e) if not self.tracks: self.is_playing = False return song_meta_paths = {} for meta_path in meta_paths: try: parts = meta_path.strip("/").split("/") song_id = parts[-2] if song_id: song_meta_paths[song_id] = meta_path except Exception as e: print(f"[radio] Could not parse path {meta_path}: {e}") all_song_ids = sorted(song_meta_paths.keys()) all_custom_track_ids = {str(track.get("id")) for track in custom_tracks_snapshot if track.get("id")} with self.lock: old_track_id = self._get_current_song_id_unlocked() old_current_kind = self.current_kind old_started_at = self.started_at recently_played_ids = self._recently_played_exclusion_set_unlocked() before_filter_count = len(all_song_ids) candidate_song_ids = [ song_id for song_id in all_song_ids if not self._song_is_hidden_by_reports(song_id) ] filtered_hidden_count = before_filter_count - len(candidate_song_ids) if filtered_hidden_count: print( f"[radio] Filtered out {filtered_hidden_count} tracks " f"by reports or studio overrides" ) before_score_floor_count = len(candidate_song_ids) candidate_song_ids = [ song_id for song_id in candidate_song_ids if not self._track_is_below_auto_rotation_score_floor_unlocked(song_id) ] filtered_low_score_count = before_score_floor_count - len(candidate_song_ids) if filtered_low_score_count: threshold = float(runtime_config.get( "auto_rotation_min_effective_score_exclusive", AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE, )) print( f"[radio] Filtered out {filtered_low_score_count} low-score tracks " f"effective_score <= {threshold:g}" ) if old_track_id: recently_played_ids.discard(old_track_id) recently_played_filtered_count = 0 if recently_played_ids: filtered_candidate_song_ids = [ song_id for song_id in candidate_song_ids if song_id not in recently_played_ids ] if ( len(filtered_candidate_song_ids) >= recently_played_min_candidates_after_filter or len(filtered_candidate_song_ids) >= len(candidate_song_ids) ): recently_played_filtered_count = len(candidate_song_ids) - len(filtered_candidate_song_ids) candidate_song_ids = filtered_candidate_song_ids if recently_played_filtered_count: print( f"[radio] Filtered out {recently_played_filtered_count} recently played tracks " f"window={recently_played_exclude_minutes}min " f"remaining_candidates={len(candidate_song_ids)}" ) else: print( f"[radio] Skipped recently played exclusion because too few candidates would remain " f"remaining={len(filtered_candidate_song_ids)} " f"min={recently_played_min_candidates_after_filter} " f"window={recently_played_exclude_minutes}min" ) preserve_current_track = ( not prebuild and old_current_kind == "song" and old_track_id and (old_track_id in all_song_ids or old_track_id in all_custom_track_ids) ) preserved_track = None current_remaining_seconds = None if preserve_current_track: if old_track_id in song_meta_paths: preserved_track = self._load_track_from_meta_path( song_meta_paths[old_track_id], old_track_id, ) else: preserved_track = next( (dict(track) for track in custom_tracks_snapshot if track.get("id") == old_track_id), None, ) if preserved_track is None: preserve_current_track = False else: self._apply_track_controls_unlocked(preserved_track) duration = float((preserved_track or {}).get("duration") or DEFAULT_DURATION) if old_started_at is not None: current_elapsed = max(0.0, time.time() - old_started_at) current_remaining_seconds = max(0.0, duration - current_elapsed) else: current_remaining_seconds = duration if self._song_is_hidden_by_reports(old_track_id): print( f"[radio] Preserving currently playing hidden/reported track until it finishes: " f"{old_track_id}" ) remaining_song_ids = [ song_id for song_id in candidate_song_ids if not preserved_track or song_id != preserved_track["id"] ] candidate_tracks = [] if remaining_song_ids: worker_count = max(1, min(meta_load_workers, len(remaining_song_ids))) print(f"[radio] Loading metadata for {len(remaining_song_ids)} candidate tracks with {worker_count} workers") with ThreadPoolExecutor(max_workers=worker_count) as executor: futures = { executor.submit( self._load_track_from_meta_path, song_meta_paths[song_id], song_id, ): song_id for song_id in remaining_song_ids } for future in as_completed(futures): song_id = futures[future] try: candidate_tracks.append(future.result()) except Exception as e: print(f"[radio] Could not load candidate track {song_id}: {e}") now = datetime.now(timezone.utc) recent_candidates = [ track for track in candidate_tracks if self._track_is_recent(track, now=now) ] regular_candidates = [ track for track in candidate_tracks if not self._track_is_recent(track, now=now) ] recent_target_seconds = max(0.0, rotation_target_seconds * max(0.0, min(1.0, recent_track_quota))) selected_tracks = [] selected_recent_seconds = 0.0 selected_recent_count = 0 if preserved_track: selected_tracks.append(preserved_track) if self._track_is_recent(preserved_track, now=now): selected_recent_seconds += self._track_duration_seconds(preserved_track) selected_recent_count += 1 custom_candidates = [ dict(track) for track in custom_tracks_snapshot if track.get("id") and not (preserved_track and track.get("id") == preserved_track.get("id")) and not track.get("studio_hidden") and not self._song_is_hidden_by_reports(track.get("id")) and not self._track_is_below_auto_rotation_score_floor_unlocked(track) and track.get("id") not in recently_played_ids ] custom_target_seconds = max(0.0, rotation_target_seconds * max(0.0, min(1.0, custom_track_quota))) selected_custom_seconds = 0.0 selected_custom_count = 0 while custom_candidates and len(selected_tracks) < max_tracks and selected_custom_seconds < custom_target_seconds: estimated_seconds = self._estimated_rotation_seconds( selected_tracks, average_jingle_duration, current_remaining_seconds=current_remaining_seconds if preserved_track else None, ) if selected_tracks and estimated_seconds >= rotation_target_seconds: break track = self._weighted_pop_track(custom_candidates) selected_tracks.append(track) selected_custom_seconds += self._track_duration_seconds(track) selected_custom_count += 1 while ( recent_candidates and len(selected_tracks) < max_tracks and selected_recent_seconds < recent_target_seconds ): estimated_seconds = self._estimated_rotation_seconds( selected_tracks, average_jingle_duration, current_remaining_seconds=current_remaining_seconds if preserved_track else None, ) if selected_tracks and estimated_seconds >= rotation_target_seconds: break track = self._weighted_pop_track(recent_candidates) selected_tracks.append(track) selected_recent_seconds += self._track_duration_seconds(track) selected_recent_count += 1 remaining_tracks = regular_candidates + recent_candidates while remaining_tracks and len(selected_tracks) < max_tracks: estimated_seconds = self._estimated_rotation_seconds( selected_tracks, average_jingle_duration, current_remaining_seconds=current_remaining_seconds if preserved_track else None, ) if selected_tracks and estimated_seconds >= rotation_target_seconds: break track = self._weighted_pop_track(remaining_tracks) selected_tracks.append(track) if self._track_is_recent(track, now=now): selected_recent_seconds += self._track_duration_seconds(track) selected_recent_count += 1 if preserved_track: preserved_id = preserved_track["id"] non_preserved_tracks = [ track for track in selected_tracks if track["id"] != preserved_id ] random.shuffle(non_preserved_tracks) selected_tracks = [preserved_track] + non_preserved_tracks else: random.shuffle(selected_tracks) estimated_seconds = self._estimated_rotation_seconds( selected_tracks, average_jingle_duration, current_remaining_seconds=current_remaining_seconds if preserved_track else None, ) print( f"[radio] Using {len(selected_tracks)} tracks for rotation pack " f"estimated_seconds={estimated_seconds:.1f}/{rotation_target_seconds} " f"max_tracks={max_tracks} avg_jingle={average_jingle_duration:.1f}s " f"recent={selected_recent_count} tracks/{selected_recent_seconds:.1f}s " f"recent_quota={recent_track_quota:.2f} " f"recently_played_filtered={recently_played_filtered_count} " f"hidden_filtered={filtered_hidden_count} " f"low_score_filtered={filtered_low_score_count} " f"custom={selected_custom_count} tracks/{selected_custom_seconds:.1f}s quota={custom_track_quota:.2f} " f"available_custom={len(custom_tracks_snapshot)} " f"track_overrides={len(self.track_overrides)} " f"cover_overrides={self.cover_overrides_count} " f"jingles={len(self.jingles)} segments={len(self.segments)}" ) new_tracks = selected_tracks if prebuild: with self.lock: self.next_tracks = [dict(track) for track in new_tracks] self.next_rotation_ready = False self.next_rotation_is_building = False self.next_rotation_built_at = time.time() self.next_rotation_target_at = target_at.isoformat() if target_at else next_utc_hour_iso() self.next_rotation_last_error = None self.next_rotation_source = "auto_prebuild" self.next_rotation_payload = None print( f"[radio] Prebuilt next rotation tracks={len(new_tracks)} " f"target_at={self.next_rotation_target_at}" ) return with self.lock: live_current_kind = self.current_kind live_current_jingle = self.current_jingle live_current_segment = self.current_segment live_started_at = self.started_at live_is_playing = self.is_playing live_track_id = self._get_current_song_id_unlocked() live_track_snapshot = None if live_track_id and self.current_index is not None and self.tracks: try: live_track_snapshot = dict(self.tracks[self.current_index]) except Exception: live_track_snapshot = None # The radio may have advanced while this refresh was loading metadata. # Preserve the real current song, not the stale song captured at refresh start. if live_track_id: existing_live_index = next( ( i for i, track in enumerate(new_tracks) if track.get("id") == live_track_id ), None, ) if existing_live_index is None and live_track_snapshot: new_tracks = [ live_track_snapshot, *[ track for track in new_tracks if track.get("id") != live_track_id ], ] print( f"[radio] Reinserted live track after refresh race: {live_track_id}" ) self.tracks = new_tracks if not self.tracks: self.current_index = None self.started_at = None self.is_playing = False self.current_kind = "song" self.current_jingle = None self.current_segment = None elif live_current_kind == "jingle" and live_current_jingle: self.current_index = None self.started_at = live_started_at or time.time() self.is_playing = live_is_playing or True self.current_kind = "jingle" self.current_jingle = live_current_jingle self.current_segment = None print("[radio] Kept current live jingle after refresh") elif live_current_kind == "segment" and live_current_segment: self.current_index = None self.started_at = live_started_at or time.time() self.is_playing = live_is_playing or True self.current_kind = "segment" self.current_jingle = None self.current_segment = live_current_segment print("[radio] Kept current live segment after refresh") elif live_current_kind == "song" and live_track_id: matching_index = next( ( i for i, track in enumerate(self.tracks) if track.get("id") == live_track_id ), None, ) if matching_index is not None: self.current_index = matching_index self.started_at = live_started_at or time.time() self.is_playing = live_is_playing or True self.current_kind = "song" self.current_jingle = None self.current_segment = None print(f"[radio] Kept current live track after refresh: {live_track_id}") else: self.current_index = 0 self.started_at = time.time() self.is_playing = True self.current_kind = "song" self.current_jingle = None self.current_segment = None print("[radio] Live track not found after refresh; switched to first track") else: self.current_index = 0 self.started_at = time.time() self.is_playing = True self.current_kind = "song" self.current_jingle = None self.current_segment = None if self.current_index is not None and self.tracks: self._load_meta_for_track_unlocked(self.tracks[self.current_index]) self.is_loading = False self.last_error = None self.last_refresh_at = time.time() print(f"[radio] Loaded {len(new_tracks)} tracks") def has_track(self, song_id: str) -> bool: with self.lock: return any(track["id"] == song_id for track in self.tracks) def _choose_next_index_unlocked(self): """ The rotation pack is already selected, weighted, and shuffled during refresh_tracks(). Playback should consume that pack sequentially. This avoids repeats inside the hour and makes the 60-minute rotation pack meaningful. RADIO_MODE is kept for API/status compatibility, but randomization now happens when the pack is built, not between songs. """ if not self.tracks: return None if self.current_index is None: return 0 return (self.current_index + 1) % len(self.tracks) def _choose_jingle_unlocked(self): if not self.jingles: return None if len(self.jingles) == 1: return self.jingles[0] choices = [ jingle for jingle in self.jingles if jingle.get("id") != self.last_jingle_id ] return random.choice(choices or self.jingles) def _choose_segment_unlocked(self): if not self.segments: return None if len(self.segments) == 1: return self.segments[0] choices = [ segment for segment in self.segments if segment.get("id") != self.last_segment_id ] return random.choice(choices or self.segments) def _should_insert_jingle_unlocked(self): if not self._cfg("enable_jingles", ENABLE_JINGLES): return False if not self.jingles: return False min_songs_between_jingles = int(self._cfg("min_songs_between_jingles", MIN_SONGS_BETWEEN_JINGLES)) max_songs_between_jingles = int(self._cfg("max_songs_between_jingles", MAX_SONGS_BETWEEN_JINGLES)) jingle_probability_after_min = float(self._cfg("jingle_probability_after_min", JINGLE_PROBABILITY_AFTER_MIN)) if self.songs_since_last_jingle < min_songs_between_jingles: return False if self.songs_since_last_jingle >= max_songs_between_jingles: return True return random.random() < jingle_probability_after_min def _should_insert_segment_unlocked(self): if not self._cfg("enable_segments", ENABLE_SEGMENTS): return False if not self.segments: return False min_songs_between_segments = int(self._cfg("min_songs_between_segments", MIN_SONGS_BETWEEN_SEGMENTS)) max_songs_between_segments = int(self._cfg("max_songs_between_segments", MAX_SONGS_BETWEEN_SEGMENTS)) segment_probability_after_min = float(self._cfg("segment_probability_after_min", SEGMENT_PROBABILITY_AFTER_MIN)) if self.songs_since_last_segment < min_songs_between_segments: return False if self.songs_since_last_segment >= max_songs_between_segments: return True return random.random() < segment_probability_after_min def _start_jingle_unlocked(self): jingle = self._choose_jingle_unlocked() if not jingle: return False self.current_kind = "jingle" self.current_jingle = jingle self.current_segment = None self.last_jingle_id = jingle["id"] self.songs_since_last_jingle = 0 print(f"[jingles] Inserted jingle: {jingle['id']} duration={jingle.get('duration')}") return True def _start_segment_unlocked(self): segment = self._choose_segment_unlocked() if not segment: return False self.current_kind = "segment" self.current_jingle = None self.current_segment = segment self.last_segment_id = segment["id"] self.songs_since_last_segment = 0 print(f"[segments] Inserted segment: {segment['id']} duration={segment.get('duration')}") return True def _start_next_song_unlocked(self): next_index = self._choose_next_index_unlocked() if next_index is None: self.current_index = None self.current_kind = "song" self.current_jingle = None self.current_segment = None self.is_playing = False return self.current_index = next_index self.current_kind = "song" self.current_jingle = None self.current_segment = None self.is_playing = True next_track = self.tracks[self.current_index] # Studio boosts can be edited while this track is already queued. # Reload track_overrides.json exactly when the song starts, so queued # Studio changes are reflected without waiting for the next full refresh. self._reload_track_overrides_unlocked() self._load_meta_for_track_unlocked(next_track) print(f"[radio] Advanced to: {next_track['id']} - {next_track.get('title')}") def _get_current_item_unlocked(self): if self.current_kind == "jingle" and self.current_jingle: return self.current_jingle if self.current_kind == "segment" and self.current_segment: return self.current_segment if self.current_index is None or not self.tracks: return None if self.current_index >= len(self.tracks): self.current_index = 0 track = self.tracks[self.current_index] self._load_meta_for_track_unlocked(track) return track def _get_current_song_id_unlocked(self) -> Optional[str]: if self.current_index is None or not self.tracks: return None if self.current_index >= len(self.tracks): return None try: track = self.tracks[self.current_index] except Exception: return None if not track or track.get("is_jingle") or track.get("is_segment"): return None return track.get("id") def _next_rotation_control_path(self) -> Path: return Path(RADIO_CONTROL_DIR) / RADIO_NEXT_ROTATION_FILE def _next_rotation_archive_dir(self) -> Path: return Path(RADIO_CONTROL_DIR) / RADIO_NEXT_ROTATION_ARCHIVE_DIR def _target_hour_key(self, value: Optional[datetime]) -> Optional[datetime]: if value is None: return None return value.astimezone(timezone.utc).replace(minute=0, second=0, microsecond=0) def _approved_next_rotation_for_target(self, target_at: datetime): path = self._next_rotation_control_path() target_key = self._target_hour_key(target_at) if not path.exists(): return None, [] try: payload = json.loads(path.read_text(encoding="utf-8")) if not isinstance(payload, dict): raise ValueError("next_rotation.json must contain a JSON object") status = str(payload.get("status") or "").strip().lower() if status != "approved": raise ValueError(f"next_rotation.json status must be approved, got {status or 'empty'}") raw_target = ( payload.get("target_hour_utc") or payload.get("target_at") or payload.get("target_rotation_at") or payload.get("target") ) payload_target = parse_datetime(raw_target) payload_key = self._target_hour_key(payload_target) if payload_key is None or target_key is None or payload_key != target_key: raise ValueError( f"next_rotation.json target mismatch: payload={raw_target} expected={target_at.isoformat()}" ) track_ids = payload.get("track_ids") if not isinstance(track_ids, list): tracks = payload.get("tracks", []) if isinstance(tracks, list): track_ids = [ str(item.get("id") or item.get("song_id") or "").strip() for item in tracks if isinstance(item, dict) ] else: track_ids = [] clean_track_ids = [] seen = set() for track_id in track_ids: track_id = str(track_id or "").strip() if not track_id or track_id in seen: continue seen.add(track_id) clean_track_ids.append(track_id) if not clean_track_ids: raise ValueError("next_rotation.json does not contain any usable track_ids") self.approved_next_rotation_last_loaded_at = time.time() self.approved_next_rotation_last_error = None return payload, clean_track_ids except Exception as e: self.approved_next_rotation_last_loaded_at = time.time() self.approved_next_rotation_last_error = str(e) print(f"[next-rotation] Could not use approved rotation {path}: {e}") return None, [] def _archive_consumed_next_rotation_control_unlocked(self, activated_at: Optional[datetime] = None): path = self._next_rotation_control_path() if not path.exists(): return try: archive_dir = self._next_rotation_archive_dir() archive_dir.mkdir(parents=True, exist_ok=True) stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") archive_path = archive_dir / f"next_rotation_consumed_{stamp}.json" consumed_at = (activated_at or datetime.now(timezone.utc)).replace(microsecond=0).isoformat() payload = json.loads(path.read_text(encoding="utf-8")) if not isinstance(payload, dict): payload = {} payload["status"] = "consumed" payload["consumed_at"] = consumed_at payload["activated_at"] = consumed_at payload["activated_source"] = "studio_approved" payload["current_rotation_track_ids"] = list(self.current_rotation_track_ids or []) archive_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8") path.unlink(missing_ok=True) self.approved_next_rotation_last_consumed_at = time.time() print(f"[next-rotation] Archived consumed approved rotation to {archive_path}") except Exception as e: self.approved_next_rotation_last_error = str(e) print(f"[next-rotation] Could not archive consumed next rotation: {e}") def prebuild_approved_next_rotation(self, target_at: datetime) -> bool: with self.lock: self.next_rotation_is_building = True self.next_rotation_last_error = None self.next_rotation_target_at = target_at.isoformat() payload, track_ids = self._approved_next_rotation_for_target(target_at) if not payload or not track_ids: with self.lock: self.next_rotation_is_building = False return False try: glob_pattern = f"{BUCKET_FS_ROOT}/songs/*/meta.json" meta_paths = self.fs.glob(glob_pattern) song_meta_paths = {} for meta_path in meta_paths: try: parts = meta_path.strip("/").split("/") song_id = parts[-2] if song_id: song_meta_paths[song_id] = meta_path except Exception as e: print(f"[next-rotation] Could not parse path {meta_path}: {e}") with self.lock: custom_tracks_snapshot = [dict(track) for track in self.custom_tracks] custom_tracks_by_id = { str(track.get("id")): dict(track) for track in custom_tracks_snapshot if track.get("id") } selected_tracks = [] missing_ids = [] hidden_ids = [] for track_id in track_ids: if self._song_is_hidden_by_reports(track_id): hidden_ids.append(track_id) continue if track_id in song_meta_paths: track = self._load_track_from_meta_path(song_meta_paths[track_id], track_id) elif track_id in custom_tracks_by_id: track = dict(custom_tracks_by_id[track_id]) self._apply_track_controls_unlocked(track) else: missing_ids.append(track_id) continue selected_tracks.append(track) if not selected_tracks: raise ValueError( "approved next rotation resolved to zero playable tracks " f"(missing={len(missing_ids)} hidden={len(hidden_ids)})" ) with self.lock: self.next_tracks = selected_tracks self.next_rotation_ready = False self.next_rotation_is_building = False self.next_rotation_built_at = time.time() self.next_rotation_target_at = target_at.isoformat() self.next_rotation_last_error = None self.next_rotation_source = "studio_approved" self.next_rotation_payload = payload print( f"[next-rotation] Prebuilt approved rotation tracks={len(selected_tracks)} " f"target_at={target_at.isoformat()} missing={len(missing_ids)} hidden={len(hidden_ids)}" ) return True except Exception as e: with self.lock: self.next_rotation_is_building = False self.next_rotation_last_error = str(e) self.approved_next_rotation_last_error = str(e) print(f"[next-rotation] Approved prebuild failed: {e}") return False def mark_next_rotation_ready(self, target_at: Optional[datetime] = None): with self.lock: if self.next_tracks: self.next_rotation_ready = True self.next_rotation_target_at = target_at.isoformat() if target_at else next_utc_hour_iso() print( f"[radio] Next rotation marked ready " f"tracks={len(self.next_tracks)} target_at={self.next_rotation_target_at}" ) return True self.next_rotation_ready = False print("[radio] No prebuilt next rotation available to mark ready") return False def _activate_next_rotation_unlocked(self) -> bool: if not self.next_rotation_ready or not self.next_tracks: return False activated_source = self.next_rotation_source activated_payload = dict(self.next_rotation_payload) if isinstance(self.next_rotation_payload, dict) else None activated_target_at = self.next_rotation_target_at activated_at_dt = datetime.now(timezone.utc).replace(microsecond=0) self.tracks = [dict(track) for track in self.next_tracks] self.current_rotation_source = activated_source or "prebuilt" self.current_rotation_target_at = activated_target_at self.current_rotation_activated_at = activated_at_dt.isoformat() self.current_rotation_track_ids = [str(track.get("id") or "").strip() for track in self.tracks if str(track.get("id") or "").strip()] self.next_tracks = [] self.next_rotation_ready = False self.next_rotation_is_building = False self.next_rotation_source = None self.next_rotation_payload = None self.current_kind = "song" self.current_jingle = None self.current_segment = None self.current_index = 0 if self.tracks else None self.is_playing = bool(self.tracks) self.started_at = time.time() if self.tracks else None self.songs_since_last_jingle = 0 self.songs_since_last_segment = 0 self.last_rotation_activated_at = time.time() if self.current_index is not None: self._reload_track_overrides_unlocked() self._load_meta_for_track_unlocked(self.tracks[self.current_index]) print( f"[radio] Activated prebuilt rotation " f"tracks={len(self.tracks)} first={self.tracks[self.current_index].get('id')} " f"source={activated_source or 'unknown'}" ) else: print("[radio] Activated empty prebuilt rotation") if activated_source == "studio_approved": self._archive_consumed_next_rotation_control_unlocked(activated_at_dt) return True def _advance_one_item_unlocked(self): if self.current_kind in {"jingle", "segment"}: if self._activate_next_rotation_unlocked(): return self._start_next_song_unlocked() return current_item = self._get_current_item_unlocked() if current_item and not current_item.get("is_jingle") and not current_item.get("is_segment"): self._remember_played_song_unlocked(current_item.get("id")) if self._activate_next_rotation_unlocked(): return self.songs_since_last_jingle += 1 self.songs_since_last_segment += 1 if self._should_insert_segment_unlocked(): inserted = self._start_segment_unlocked() if inserted: return if self._should_insert_jingle_unlocked(): inserted = self._start_jingle_unlocked() if inserted: return self._start_next_song_unlocked() def _public_track_for_status_unlocked(self, item: Optional[dict]): if not item: return None public_item = dict(item) # Jingles and segments are programming/editorial items, not normal # public songs. Keep their public counters hidden / neutral. if public_item.get("is_jingle"): public_item["id"] = "" public_item["audio_url"] = "" public_item["like_count"] = 0 public_item["upvote_count"] = 0 public_item["downvote_count"] = 0 public_item["report_count"] = 0 public_item["thumb_url"] = "" public_item["source_url"] = "" return public_item if public_item.get("is_segment"): public_item["id"] = "" public_item["audio_url"] = "" public_item["like_count"] = 0 public_item["upvote_count"] = 0 public_item["downvote_count"] = 0 public_item["report_count"] = 0 public_item["thumb_url"] = "" return public_item # Normal songs: expose effective counters in the public status. # effective_* = public votes + Studio boosts / penalties. # # This means that if a track is currently queued and receives a Studio # boost, the boosted value will be shown when that track becomes # Now Playing, without mutating the underlying public vote store. public_item["like_count"] = safe_int( public_item.get("effective_like_count", public_item.get("like_count", 0)), 0, ) public_item["upvote_count"] = safe_int( public_item.get("effective_upvote_count", public_item.get("upvote_count", 0)), 0, ) public_item["downvote_count"] = safe_int( public_item.get("effective_downvote_count", public_item.get("downvote_count", 0)), 0, ) # Reports are not boosted. Keep the real report count. public_item["report_count"] = safe_int( public_item.get("report_count", 0), 0, ) return public_item def get_current_stream_item(self): with self.lock: item = self._get_current_item_unlocked() if not item: return None, 0 elapsed = 0 if self.started_at is not None: elapsed = max(0, time.time() - self.started_at) duration = item.get("duration") or DEFAULT_DURATION try: duration = float(duration) except Exception: duration = DEFAULT_DURATION if duration > 2: elapsed = min(elapsed, duration - 1) return dict(item), elapsed def advance_if_current(self, item_id: str) -> bool: with self.lock: item = self._get_current_item_unlocked() if not item: return False if item.get("id") != item_id: return False self.started_at = time.time() self._advance_one_item_unlocked() return True def set_mode(self, mode: str): if mode not in {"random", "loop"}: mode = "random" with self.lock: self.mode = mode def _status_unlocked(self): item = self._get_current_item_unlocked() elapsed = 0 if item and self.started_at is not None: elapsed = max(0, time.time() - self.started_at) try: duration = float(item.get("duration") or DEFAULT_DURATION) except Exception: duration = DEFAULT_DURATION if duration > 0: elapsed = min(elapsed, duration) listener_snapshot = listeners.snapshot() if item and item.get("is_jingle"): now_playing_type = "jingle" elif item and item.get("is_segment"): now_playing_type = "segment" else: now_playing_type = "song" runtime_config = dict(self.runtime_config) return { "bucket_id": BUCKET_ID, "bucket_fs_root": BUCKET_FS_ROOT, "bucket_resolve_url": BUCKET_RESOLVE_URL, "mode": self.mode, "track_count": len(self.tracks), "jingle_count": len(self.jingles), "segment_count": len(self.segments), "custom_track_count": len(self.custom_tracks), "now_playing_type": now_playing_type, "current_index": self.current_index, "elapsed": elapsed, "is_playing": self.is_playing, "is_loading": self.is_loading, "last_error": self.last_error, "last_refresh_at": self.last_refresh_at, "next_refresh_at": next_utc_hour_iso(), "next_rotation_prebuild_at": next_rotation_prebuild_at_iso(), "rotation_prebuild_lead_seconds": ROTATION_PREBUILD_LEAD_SECONDS, "refresh_mode": REFRESH_MODE, "next_rotation_ready": self.next_rotation_ready, "next_rotation_is_building": self.next_rotation_is_building, "next_rotation_count": len(self.next_tracks), "next_rotation_source": self.next_rotation_source, "next_rotation_control_file": str(Path(RADIO_CONTROL_DIR) / RADIO_NEXT_ROTATION_FILE), "approved_next_rotation_last_loaded_at": self.approved_next_rotation_last_loaded_at, "approved_next_rotation_last_error": self.approved_next_rotation_last_error, "approved_next_rotation_last_consumed_at": self.approved_next_rotation_last_consumed_at, "next_rotation_built_at": self.next_rotation_built_at, "next_rotation_target_at": self.next_rotation_target_at, "next_rotation_last_error": self.next_rotation_last_error, "last_rotation_activated_at": self.last_rotation_activated_at, "current_rotation_source": self.current_rotation_source, "current_rotation_target_at": self.current_rotation_target_at, "current_rotation_activated_at": self.current_rotation_activated_at, "current_rotation_track_ids": list(self.current_rotation_track_ids or []), "runtime_config_source": self.runtime_config_source, "runtime_config_last_loaded_at": self.runtime_config_last_loaded_at, "runtime_config_last_error": self.runtime_config_last_error, "runtime_config": runtime_config, "radio_control_dir": RADIO_CONTROL_DIR, "track_overrides_count": len(self.track_overrides), "track_overrides_last_loaded_at": self.track_overrides_last_loaded_at, "track_overrides_last_error": self.track_overrides_last_error, "cover_overrides_dir": COVER_OVERRIDES_DIR, "cover_overrides_count": self.cover_overrides_count, "rotation_target_seconds": runtime_config.get("rotation_target_seconds"), "max_tracks": runtime_config.get("max_tracks"), "report_hide_threshold": runtime_config.get("report_hide_threshold"), "like_weight_per_like": runtime_config.get("like_weight_per_like"), "like_weight_max": runtime_config.get("like_weight_max"), "upvote_weight_per_upvote": runtime_config.get("upvote_weight_per_upvote"), "upvote_weight_max": runtime_config.get("upvote_weight_max"), "downvote_weight_per_downvote": runtime_config.get("downvote_weight_per_downvote"), "downvote_weight_max": runtime_config.get("downvote_weight_max"), "track_weight_min": runtime_config.get("track_weight_min"), "track_weight_max": runtime_config.get("track_weight_max"), "auto_rotation_min_effective_score_exclusive": runtime_config.get("auto_rotation_min_effective_score_exclusive"), "admin_report_usernames": sorted(ADMIN_REPORT_USERNAMES), "admin_report_weight": ADMIN_REPORT_WEIGHT, "recent_track_quota": runtime_config.get("recent_track_quota"), "recent_track_window_hours": runtime_config.get("recent_track_window_hours"), "recently_played_exclude_minutes": runtime_config.get("recently_played_exclude_minutes"), "recently_played_memory_max": runtime_config.get("recently_played_memory_max"), "recently_played_min_candidates_after_filter": runtime_config.get("recently_played_min_candidates_after_filter"), "recently_played_count": len(self._recently_played_exclusion_set_unlocked()), "meta_load_workers": runtime_config.get("meta_load_workers"), "playback_order": "sequential_rotation_pack", "enable_remote_jingles": runtime_config.get("enable_remote_jingles"), "remote_jingles_dir": REMOTE_JINGLES_DIR, "enable_segments": runtime_config.get("enable_segments"), "remote_segments_dir": REMOTE_SEGMENTS_DIR, "min_songs_between_jingles": runtime_config.get("min_songs_between_jingles"), "max_songs_between_jingles": runtime_config.get("max_songs_between_jingles"), "jingle_probability_after_min": runtime_config.get("jingle_probability_after_min"), "songs_since_last_jingle": self.songs_since_last_jingle, "min_songs_between_segments": runtime_config.get("min_songs_between_segments"), "max_songs_between_segments": runtime_config.get("max_songs_between_segments"), "segment_probability_after_min": runtime_config.get("segment_probability_after_min"), "songs_since_last_segment": self.songs_since_last_segment, "enable_custom_tracks": runtime_config.get("enable_custom_tracks"), "custom_tracks_dir": CUSTOM_TRACKS_DIR, "custom_track_quota": runtime_config.get("custom_track_quota"), "custom_tracks_last_loaded_at": self.custom_tracks_last_loaded_at, "custom_tracks_last_error": self.custom_tracks_last_error, "enable_song_mastering": runtime_config.get("enable_song_mastering"), "song_audio_filter": runtime_config.get("song_audio_filter") if runtime_config.get("enable_song_mastering") else "", "likes_enabled": likes.last_error is None, "likes_error": likes.last_error, "active_listeners": listener_snapshot["active_listeners"], "total_connections": listener_snapshot["total_connections"], "listener_ttl_seconds": listener_snapshot["listener_ttl_seconds"], "listener_profiles": listener_snapshot["listener_profiles"], "signed_in_listener_count": listener_snapshot["signed_in_listener_count"], "visible_listener_profile_count": listener_snapshot["visible_listener_profile_count"], "track": self._public_track_for_status_unlocked(item), } def status(self): with self.lock: return self._status_unlocked() def rotation_snapshot(self): with self.lock: item = self._get_current_item_unlocked() if item and item.get("is_jingle"): now_playing_type = "jingle" elif item and item.get("is_segment"): now_playing_type = "segment" else: now_playing_type = "song" tracks = [] for i, track in enumerate(self.tracks): public_track = dict(track) public_track["index"] = i public_track["is_current"] = now_playing_type == "song" and i == self.current_index tracks.append(public_track) return { "now_playing_type": now_playing_type, "current_index": self.current_index, "current_item": self._public_track_for_status_unlocked(item), "tracks_count": len(self.tracks), "jingles_count": len(self.jingles), "segments_count": len(self.segments), "custom_tracks_count": len(self.custom_tracks), "tracks": tracks, "jingles": [dict(jingle) for jingle in self.jingles], "segments": [dict(segment) for segment in self.segments], "custom_tracks": [dict(track) for track in self.custom_tracks], "runtime_config_source": self.runtime_config_source, "runtime_config": dict(self.runtime_config), "track_overrides_count": len(self.track_overrides), "cover_overrides_count": self.cover_overrides_count, "last_refresh_at": self.last_refresh_at, "next_refresh_at": next_utc_hour_iso(), "next_rotation_prebuild_at": next_rotation_prebuild_at_iso(), "next_rotation_ready": self.next_rotation_ready, "next_rotation_is_building": self.next_rotation_is_building, "next_rotation_count": len(self.next_tracks), "next_rotation_source": self.next_rotation_source, "next_rotation_built_at": self.next_rotation_built_at, "next_rotation_target_at": self.next_rotation_target_at, "last_rotation_activated_at": self.last_rotation_activated_at, "current_rotation_source": self.current_rotation_source, "current_rotation_target_at": self.current_rotation_target_at, "current_rotation_activated_at": self.current_rotation_activated_at, "current_rotation_track_ids": list(self.current_rotation_track_ids or []), } radio = RadioState() class BroadcastEngine: """ Single global broadcast pipeline. One ffmpeg process produces the radio stream. Each /stream.mp3 client subscribes to the shared chunks instead of spawning its own ffmpeg process. """ def __init__(self, radio_state: RadioState): self.radio = radio_state self.lock = threading.Lock() self.subscribers = {} self.total_stream_connections = 0 self.thread: Optional[threading.Thread] = None self.running = False self.current_item_id = None self.current_item_type = None self.current_item_started_at = None self.current_process: Optional[subprocess.Popen] = None self.current_process_pid = None self.last_chunk_at = None self.last_watchdog_restart_at = None self.last_watchdog_reason = None self.last_error = None def start(self): with self.lock: if self.running and self.thread is not None and self.thread.is_alive(): return self.running = True self.thread = threading.Thread( target=self._run_loop, daemon=True, ) self.thread.start() print("[broadcast] Broadcast engine started") def ensure_running(self): with self.lock: thread_alive = self.thread is not None and self.thread.is_alive() if not thread_alive: print("[broadcast-watchdog] broadcast thread is not alive; restarting") self.start() def snapshot(self): now = time.time() with self.lock: last_chunk_age = None if self.last_chunk_at is not None: last_chunk_age = max(0.0, now - self.last_chunk_at) current_item_age = None if self.current_item_started_at is not None: current_item_age = max(0.0, now - self.current_item_started_at) return { "broadcast_running": self.running, "broadcast_thread_alive": self.thread is not None and self.thread.is_alive(), "stream_subscriber_count": len(self.subscribers), "total_stream_connections": self.total_stream_connections, "broadcast_current_item_id": self.current_item_id, "broadcast_current_item_type": self.current_item_type, "broadcast_current_item_started_at": self.current_item_started_at, "broadcast_current_item_age_seconds": current_item_age, "broadcast_current_process_pid": self.current_process_pid, "broadcast_last_chunk_at": self.last_chunk_at, "broadcast_last_chunk_age_seconds": last_chunk_age, "broadcast_last_watchdog_restart_at": self.last_watchdog_restart_at, "broadcast_last_watchdog_reason": self.last_watchdog_reason, "broadcast_last_error": self.last_error, } def subscribe(self): subscriber_id = str(uuid4()) subscriber_queue = queue.Queue(maxsize=BROADCAST_QUEUE_MAX_CHUNKS) with self.lock: self.subscribers[subscriber_id] = subscriber_queue self.total_stream_connections += 1 count = len(self.subscribers) print(f"[broadcast] subscriber joined id={subscriber_id} active={count}") return subscriber_id, subscriber_queue def unsubscribe(self, subscriber_id: str): with self.lock: self.subscribers.pop(subscriber_id, None) count = len(self.subscribers) print(f"[broadcast] subscriber left id={subscriber_id} active={count}") def _remove_slow_subscriber_unlocked(self, subscriber_id: str, subscriber_queue): self.subscribers.pop(subscriber_id, None) try: while True: subscriber_queue.get_nowait() except queue.Empty: pass try: subscriber_queue.put_nowait(None) except Exception: pass print(f"[broadcast] removed slow subscriber id={subscriber_id}") def _publish(self, chunk: bytes): if not chunk: return with self.lock: self.last_chunk_at = time.time() for subscriber_id, subscriber_queue in list(self.subscribers.items()): try: subscriber_queue.put_nowait(chunk) except queue.Full: self._remove_slow_subscriber_unlocked(subscriber_id, subscriber_queue) def kill_current_process(self, reason: str) -> bool: process = None pid = None with self.lock: process = self.current_process pid = self.current_process_pid if process is None: return False if process.poll() is not None: return False now = time.time() self.last_watchdog_restart_at = now self.last_watchdog_reason = reason self.last_error = reason print(f"[broadcast-watchdog] killing stalled ffmpeg pid={pid} reason={reason}") try: process.kill() return True except Exception as e: with self.lock: self.last_error = f"Could not kill stalled ffmpeg process: {e}" print(f"[broadcast-watchdog] could not kill ffmpeg pid={pid}: {e}") return False def check_health(self): self.ensure_running() now = time.time() reason = None with self.lock: process = self.current_process if process is None or process.poll() is not None: return if self.current_item_id is None: return item_started_at = self.current_item_started_at last_chunk_at = self.last_chunk_at current_item_id = self.current_item_id process_pid = self.current_process_pid if last_chunk_at is None: if item_started_at is not None and now - item_started_at > BROADCAST_PROCESS_START_TIMEOUT_SECONDS: reason = ( f"no audio chunks after {now - item_started_at:.1f}s " f"for item={current_item_id} pid={process_pid}" ) else: silence_seconds = now - last_chunk_at if silence_seconds > BROADCAST_STALL_TIMEOUT_SECONDS: reason = ( f"no audio chunks for {silence_seconds:.1f}s " f"on item={current_item_id} pid={process_pid}" ) if reason: self.kill_current_process(reason) def _item_type(self, item: dict) -> str: if item.get("is_jingle"): return "jingle" if item.get("is_segment"): return "segment" return "song" def _build_ffmpeg_command(self, item: dict, offset: float): stream_url = item.get("stream_url") or item.get("audio_url") item_type = self._item_type(item) cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-nostdin", "-re", ] if offset > STREAM_SEEK_EPSILON_SECONDS: cmd.extend( [ "-ss", str(max(0, offset)), ] ) cmd.extend( [ "-i", stream_url, "-vn", ] ) audio_filters = [] if item_type == "jingle": audio_filters.append(f"volume={JINGLE_VOLUME}") elif item_type == "segment": audio_filters.append(f"volume={SEGMENT_VOLUME}") elif item_type == "song" and self.radio._cfg("enable_song_mastering", ENABLE_SONG_MASTERING): song_audio_filter = str(self.radio._cfg("song_audio_filter", SONG_AUDIO_FILTER) or "").strip() if song_audio_filter: audio_filters.append(song_audio_filter) if audio_filters: cmd.extend(["-af", ",".join(audio_filters)]) cmd.extend( [ "-ac", "2", "-ar", STREAM_SAMPLE_RATE, "-b:a", STREAM_BITRATE, "-map_metadata", "-1", "-write_xing", "0", "-id3v2_version", "0", "-f", "mp3", "pipe:1", ] ) return cmd def _run_loop(self): force_next_offset_zero = False while True: item, offset = self.radio.get_current_stream_item() if not item: with self.lock: self.current_item_id = None self.current_item_type = None time.sleep(BROADCAST_NO_TRACK_SLEEP_SECONDS) continue if force_next_offset_zero: offset = 0 force_next_offset_zero = False item_id = item["id"] item_type = self._item_type(item) stream_url = item.get("stream_url") or item.get("audio_url") with self.lock: now = time.time() self.current_item_id = item_id self.current_item_type = item_type self.current_item_started_at = now self.current_process = None self.current_process_pid = None self.last_chunk_at = None self.last_error = None print(f"[broadcast] starting {item_type}={item_id} offset={offset:.2f}s source={stream_url}") cmd = self._build_ffmpeg_command(item, offset) process = None try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, ) assert process.stdout is not None with self.lock: self.current_process = process self.current_process_pid = process.pid while True: chunk = process.stdout.read(16 * 1024) if not chunk: break self._publish(chunk) return_code = process.poll() if return_code not in (0, None): stderr_text = "" try: stderr_text = process.stderr.read().decode("utf-8", errors="replace") if process.stderr else "" except Exception: stderr_text = "" print( f"[broadcast] ffmpeg exited with code={return_code} " f"item={item_id} stderr={stderr_text[:500]}" ) with self.lock: existing_error = self.last_error self.last_error = existing_error or f"ffmpeg exited with code {return_code}" except Exception as e: print(f"[broadcast] error while streaming item={item_id}: {e}") with self.lock: self.last_error = str(e) time.sleep(1) finally: if process is not None: try: if process.poll() is None: process.kill() except Exception: pass try: process.wait(timeout=2) except Exception: pass with self.lock: if self.current_process is process: self.current_process = None self.current_process_pid = None advanced = self.radio.advance_if_current(item_id) if advanced: force_next_offset_zero = True else: # If the radio state changed while this item was playing # because of a refresh or manual update, restart from the # current logical item without forcing a stale seek. force_next_offset_zero = True broadcast = BroadcastEngine(radio) def broadcast_watchdog_loop(): while True: try: broadcast.check_health() except Exception as e: print(f"[broadcast-watchdog] health check failed: {e}") time.sleep(BROADCAST_WATCHDOG_INTERVAL_SECONDS) def broadcast_stream_mp3(): subscriber_id, subscriber_queue = broadcast.subscribe() try: while True: try: chunk = subscriber_queue.get(timeout=BROADCAST_SUBSCRIBER_TIMEOUT_SECONDS) except queue.Empty: yield b"" continue if chunk is None: break yield chunk except GeneratorExit: raise finally: broadcast.unsubscribe(subscriber_id) def get_full_status(): status = radio.status() status.update(broadcast.snapshot()) return status def refresh_radio_content(label: str): try: print(f"[radio] Starting {label} playlist refresh") radio.refresh_runtime_controls() radio.refresh_jingles() radio.refresh_segments() radio.refresh_custom_tracks() radio.refresh_tracks() print(f"[radio] {label.capitalize()} playlist refresh complete") except Exception as e: print(f"[radio] {label.capitalize()} refresh failed: {e}") with radio.lock: radio.is_loading = False radio.last_error = str(e) def prebuild_radio_content(label: str, target_at: datetime): try: print( f"[radio] Starting {label} next rotation prebuild " f"for {target_at.replace(microsecond=0).isoformat()} UTC" ) radio.refresh_runtime_controls() radio.refresh_jingles() radio.refresh_segments() radio.refresh_custom_tracks() if radio.prebuild_approved_next_rotation(target_at): print(f"[radio] {label.capitalize()} approved next rotation prebuild complete") else: print("[radio] No approved next rotation available; falling back to automatic prebuild") radio.refresh_tracks(prebuild=True, target_at=target_at) print(f"[radio] {label.capitalize()} automatic next rotation prebuild complete") except Exception as e: print(f"[radio] {label.capitalize()} prebuild failed: {e}") with radio.lock: radio.next_rotation_is_building = False radio.next_rotation_last_error = str(e) def periodic_refresh_loop(): while True: target_at = next_utc_hour() prebuild_at = target_at - timedelta(seconds=max(0, ROTATION_PREBUILD_LEAD_SECONDS)) sleep_until_prebuild = max(0.0, (prebuild_at - datetime.now(timezone.utc)).total_seconds()) print( "[radio] Next rotation prebuild scheduled at " f"{prebuild_at.replace(microsecond=0).isoformat()} UTC " f"for activation target {target_at.replace(microsecond=0).isoformat()} UTC" ) time.sleep(sleep_until_prebuild) prebuild_radio_content("scheduled", target_at) sleep_until_activation = max(0.0, (target_at - datetime.now(timezone.utc)).total_seconds()) if sleep_until_activation > 0: time.sleep(sleep_until_activation) # Final handoff check: an approved Studio rotation may have been written # just after the scheduled prebuild pass or close to the approval # deadline. Re-check the control file at activation time before marking # the already-prebuilt queue as ready. Target matching still happens in # prebuild_approved_next_rotation(), so stale/future approvals are never # consumed for the wrong hour. if radio.prebuild_approved_next_rotation(target_at): print("[radio] Activation-time approved next rotation handoff complete") if not radio.mark_next_rotation_ready(target_at): print("[radio] Falling back to active hourly refresh because no prebuilt rotation is ready") refresh_radio_content("hourly fallback") @app.on_event("startup") def startup(): broadcast.start() threading.Thread( target=lambda: refresh_radio_content("initial"), daemon=True, ).start() threading.Thread( target=periodic_refresh_loop, daemon=True, ).start() threading.Thread( target=broadcast_watchdog_loop, daemon=True, ).start() @app.get("/api/me") def api_me(request: Request): user = oauth_user_from_request(request) if user is None: return JSONResponse( { "authenticated": False, "user": None, } ) return JSONResponse( { "authenticated": True, "user": user, } ) @app.get("/stream.mp3") def stream_mp3(): headers = { "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0", "Connection": "keep-alive", "X-Content-Type-Options": "nosniff", } return StreamingResponse( broadcast_stream_mp3(), media_type="audio/mpeg", headers=headers, ) @app.get("/cover-overrides/{song_id}/thumb.png") def cover_override_thumb(song_id: str): thumb_path = Path(COVER_OVERRIDES_DIR) / song_id / "thumb.png" if not thumb_path.exists() or not thumb_path.is_file(): raise HTTPException(status_code=404, detail="Cover override not found") return FileResponse( thumb_path, media_type="image/png", headers={ "Cache-Control": "no-cache, no-store, must-revalidate", }, ) @app.get("/custom-tracks/{track_id}/thumb.png") def custom_track_thumb(track_id: str): thumb_path = Path(CUSTOM_TRACKS_DIR) / track_id / "thumb.png" if not thumb_path.exists() or not thumb_path.is_file(): raise HTTPException(status_code=404, detail="Custom track thumbnail not found") return FileResponse( thumb_path, media_type="image/png", headers={ "Cache-Control": "no-cache, no-store, must-revalidate", }, ) @app.get("/api/status") def api_status(): return JSONResponse(get_full_status()) @app.get("/api/rotation") def api_rotation(): return JSONResponse(radio.rotation_snapshot()) @app.post("/api/listener/start/{listener_id}") def api_listener_start(listener_id: str, request: Request): user = oauth_user_from_request(request) listeners.start(listener_id, user=user) return JSONResponse( { "ok": True, **listeners.snapshot(), } ) @app.post("/api/listener/heartbeat/{listener_id}") def api_listener_heartbeat(listener_id: str, request: Request): user = oauth_user_from_request(request) listeners.heartbeat(listener_id, user=user) return JSONResponse( { "ok": True, **listeners.snapshot(), } ) @app.post("/api/listener/stop/{listener_id}") def api_listener_stop(listener_id: str): listeners.stop(listener_id) return JSONResponse( { "ok": True, **listeners.snapshot(), } ) def require_signed_in_user(request: Request, action: str): user = oauth_user_from_request(request) if user is None: raise HTTPException(status_code=401, detail=f"Sign in with Hugging Face to {action}") return user def ensure_track_in_playlist(song_id: str): if not radio.has_track(song_id): raise HTTPException(status_code=404, detail="Track not found in current radio playlist") def feedback_response(song_id: str, result: dict, **extra): payload = { "ok": True, "song_id": song_id, "like_count": result.get("count", 0), "upvote_count": result.get("upvote_count", 0), "downvote_count": result.get("downvote_count", 0), "report_count": result.get("report_count", 0), "updated_at": result.get("updated_at"), "status": get_full_status(), } payload.update(extra) return JSONResponse(payload) @app.post("/api/like/{song_id}") def api_like(song_id: str, request: Request): require_signed_in_user(request, "like tracks") ensure_track_in_playlist(song_id) try: result = likes.increment(song_id) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not save like: {e}") return feedback_response(song_id, result) @app.post("/api/upvote/{song_id}") def api_upvote(song_id: str, request: Request): require_signed_in_user(request, "vote on tracks") ensure_track_in_playlist(song_id) try: result = likes.upvote(song_id) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not save upvote: {e}") return feedback_response(song_id, result) @app.post("/api/unupvote/{song_id}") def api_unupvote(song_id: str, request: Request): require_signed_in_user(request, "undo votes on tracks") ensure_track_in_playlist(song_id) try: result = likes.unupvote(song_id) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not undo upvote: {e}") return feedback_response(song_id, result) @app.post("/api/downvote/{song_id}") def api_downvote(song_id: str, request: Request): require_signed_in_user(request, "vote on tracks") ensure_track_in_playlist(song_id) try: result = likes.downvote(song_id) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not save downvote: {e}") return feedback_response(song_id, result) @app.post("/api/undownvote/{song_id}") def api_undownvote(song_id: str, request: Request): require_signed_in_user(request, "undo votes on tracks") ensure_track_in_playlist(song_id) try: result = likes.undownvote(song_id) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not undo downvote: {e}") return feedback_response(song_id, result) @app.post("/api/report/{song_id}") def api_report(song_id: str, request: Request): user = require_signed_in_user(request, "report tracks") if not user.get("is_pro"): raise HTTPException(status_code=403, detail="Only Hugging Face PRO users can report tracks") ensure_track_in_playlist(song_id) weight = report_weight_for_user(user) try: result = likes.report(song_id, weight=weight) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not save report: {e}") return feedback_response( song_id, result, reported_at=result.get("reported_at"), report_weight=weight, ) @app.post("/api/unreport/{song_id}") def api_unreport(song_id: str, request: Request): user = require_signed_in_user(request, "undo reports") if not user.get("is_pro"): raise HTTPException(status_code=403, detail="Only Hugging Face PRO users can undo reports") ensure_track_in_playlist(song_id) weight = report_weight_for_user(user) try: result = likes.unreport(song_id, weight=weight) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not undo report: {e}") return feedback_response( song_id, result, unreported_at=result.get("unreported_at"), report_weight=weight, ) @app.post("/api/refresh") def api_refresh(): def refresh_in_background(): try: radio.refresh_runtime_controls() radio.refresh_jingles() radio.refresh_segments() radio.refresh_custom_tracks() radio.refresh_tracks() except Exception as e: print(f"[radio] Background refresh failed: {e}") with radio.lock: radio.is_loading = False radio.last_error = str(e) threading.Thread(target=refresh_in_background, daemon=True).start() return JSONResponse( { "ok": True, "status": get_full_status(), } ) def next_rotation_response_payload(): with radio.lock: return { "ok": True, "ready": radio.next_rotation_ready, "is_building": radio.next_rotation_is_building, "count": len(radio.next_tracks), "built_at": radio.next_rotation_built_at, "target_at": radio.next_rotation_target_at, "source": radio.next_rotation_source, "last_error": radio.next_rotation_last_error, "approved_last_error": radio.approved_next_rotation_last_error, "current_rotation_source": radio.current_rotation_source, "current_rotation_target_at": radio.current_rotation_target_at, "current_rotation_activated_at": radio.current_rotation_activated_at, "tracks": [dict(track) for track in radio.next_tracks], } @app.get("/api/next-rotation") def api_next_rotation(): return JSONResponse(next_rotation_response_payload()) # Compatibility aliases for older Studio frontends / cached clients. # The canonical endpoint is /api/next-rotation, but these prevent stale # clients from hitting a 404 after the prebuild succeeds. @app.get("/next-rotation-preview") def api_next_rotation_preview_alias(): return JSONResponse(next_rotation_response_payload()) @app.get("/api/next-rotation-preview") def api_next_rotation_preview_api_alias(): return JSONResponse(next_rotation_response_payload()) @app.post("/api/prebuild-next-rotation") def api_prebuild_next_rotation(): target_at = next_utc_hour() def prebuild_in_background(): prebuild_radio_content("manual", target_at) threading.Thread(target=prebuild_in_background, daemon=True).start() return JSONResponse( { "ok": True, "target_at": target_at.isoformat(), "prebuild_at": next_rotation_prebuild_at_iso(), "status": get_full_status(), } ) @app.post("/api/mode/{mode}") def api_mode(mode: str): radio.set_mode(mode) return JSONResponse( { "ok": True, "status": get_full_status(), } ) @app.get("/api/tracks") def api_tracks(): with radio.lock: tracks = [] for track in radio.tracks: item = dict(track) item["like_count"] = likes.get_count(item["id"]) item["upvote_count"] = likes.get_upvote_count(item["id"]) item["downvote_count"] = likes.get_downvote_count(item["id"]) item["report_count"] = likes.get_report_count(item["id"]) radio._apply_track_controls_unlocked(item) tracks.append(item) return JSONResponse( { "count": len(tracks), "tracks": tracks, } ) @app.get("/api/likes") def api_likes(): return JSONResponse( { "likes": likes.snapshot(), "likes_enabled": likes.last_error is None, "likes_error": likes.last_error, "likes_path": str(likes.likes_path), } ) app.mount("/", StaticFiles(directory="static", html=True), name="static")