| import json |
| import os |
| import urllib.error |
| import urllib.request |
| import queue |
| import random |
| import shutil |
| import subprocess |
| import threading |
| import time |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from datetime import datetime, timedelta, timezone |
| from pathlib import Path |
| from typing import Optional |
| from uuid import uuid4 |
|
|
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.responses import JSONResponse, StreamingResponse, FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from huggingface_hub import ( |
| HfFileSystem, |
| attach_huggingface_oauth, |
| parse_huggingface_oauth, |
| ) |
|
|
|
|
| BUCKET_ID = os.environ.get("BUCKET_ID", "victor/ace-step-community") |
| RADIO_MODE = os.environ.get("RADIO_MODE", "random") |
| DEFAULT_DURATION = float(os.environ.get("DEFAULT_DURATION", "180")) |
| MAX_TRACKS = int(os.environ.get("MAX_TRACKS", "150")) |
| ROTATION_TARGET_SECONDS = int(os.environ.get("ROTATION_TARGET_SECONDS", "3600")) |
| ROTATION_PREBUILD_LEAD_SECONDS = int(os.environ.get("ROTATION_PREBUILD_LEAD_SECONDS", "600")) |
| REFRESH_MODE = "prebuild_before_top_of_hour_utc" |
| OAUTH_USERINFO_URL = os.environ.get("OAUTH_USERINFO_URL", "https://huggingface.co/oauth/userinfo") |
| OAUTH_USERINFO_CACHE_TTL_SECONDS = int(os.environ.get("OAUTH_USERINFO_CACHE_TTL_SECONDS", "300")) |
|
|
| STREAM_BITRATE = os.environ.get("STREAM_BITRATE", "128k") |
| STREAM_SAMPLE_RATE = os.environ.get("STREAM_SAMPLE_RATE", "44100") |
| STREAM_SEEK_EPSILON_SECONDS = float(os.environ.get("STREAM_SEEK_EPSILON_SECONDS", "0.08")) |
|
|
| BROADCAST_QUEUE_MAX_CHUNKS = int(os.environ.get("BROADCAST_QUEUE_MAX_CHUNKS", "256")) |
| BROADCAST_NO_TRACK_SLEEP_SECONDS = float(os.environ.get("BROADCAST_NO_TRACK_SLEEP_SECONDS", "1.0")) |
| BROADCAST_SUBSCRIBER_TIMEOUT_SECONDS = float(os.environ.get("BROADCAST_SUBSCRIBER_TIMEOUT_SECONDS", "15.0")) |
| BROADCAST_STALL_TIMEOUT_SECONDS = float(os.environ.get("BROADCAST_STALL_TIMEOUT_SECONDS", "25.0")) |
| BROADCAST_WATCHDOG_INTERVAL_SECONDS = float(os.environ.get("BROADCAST_WATCHDOG_INTERVAL_SECONDS", "5.0")) |
| BROADCAST_PROCESS_START_TIMEOUT_SECONDS = float(os.environ.get("BROADCAST_PROCESS_START_TIMEOUT_SECONDS", "20.0")) |
|
|
| LIKES_DIR = os.environ.get("LIKES_DIR", "/likes") |
| LIKES_FILE = os.environ.get("LIKES_FILE", "likes.json") |
| REPORT_HIDE_THRESHOLD = int(os.environ.get("REPORT_HIDE_THRESHOLD", "2")) |
| ADMIN_REPORT_USERNAMES = { |
| username.strip().lower() |
| for username in os.environ.get("ADMIN_REPORT_USERNAMES", "fffiloni").split(",") |
| if username.strip() |
| } |
| ADMIN_REPORT_WEIGHT = int(os.environ.get("ADMIN_REPORT_WEIGHT", "2")) |
| LIKE_WEIGHT_PER_LIKE = float(os.environ.get("LIKE_WEIGHT_PER_LIKE", "0.35")) |
| LIKE_WEIGHT_MAX = float(os.environ.get("LIKE_WEIGHT_MAX", "2.0")) |
| UPVOTE_WEIGHT_PER_UPVOTE = float(os.environ.get("UPVOTE_WEIGHT_PER_UPVOTE", "0.12")) |
| UPVOTE_WEIGHT_MAX = float(os.environ.get("UPVOTE_WEIGHT_MAX", "0.6")) |
| DOWNVOTE_WEIGHT_PER_DOWNVOTE = float(os.environ.get("DOWNVOTE_WEIGHT_PER_DOWNVOTE", "0.15")) |
| DOWNVOTE_WEIGHT_MAX = float(os.environ.get("DOWNVOTE_WEIGHT_MAX", "0.7")) |
| TRACK_WEIGHT_MIN = float(os.environ.get("TRACK_WEIGHT_MIN", "0.25")) |
| TRACK_WEIGHT_MAX = float(os.environ.get("TRACK_WEIGHT_MAX", str(LIKE_WEIGHT_MAX))) |
| AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE = float(os.environ.get("AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE", "-5")) |
| RECENT_TRACK_QUOTA = float(os.environ.get("RECENT_TRACK_QUOTA", "0.15")) |
| RECENT_TRACK_WINDOW_HOURS = int(os.environ.get("RECENT_TRACK_WINDOW_HOURS", "72")) |
| RECENTLY_PLAYED_EXCLUDE_MINUTES = int(os.environ.get("RECENTLY_PLAYED_EXCLUDE_MINUTES", "90")) |
| RECENTLY_PLAYED_MEMORY_MAX = int(os.environ.get("RECENTLY_PLAYED_MEMORY_MAX", "200")) |
| RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER = int(os.environ.get("RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER", "30")) |
| META_LOAD_WORKERS = int(os.environ.get("META_LOAD_WORKERS", "16")) |
|
|
| LISTENER_TTL_SECONDS = int(os.environ.get("LISTENER_TTL_SECONDS", "45")) |
|
|
| ENABLE_JINGLES = os.environ.get("ENABLE_JINGLES", "true").lower() == "true" |
| JINGLES_DIR = os.environ.get("JINGLES_DIR", "static/jingles") |
| JINGLES_GLOB = os.environ.get("JINGLES_GLOB", "jingle_hf_radio_*.wav") |
| DEFAULT_JINGLE_DURATION = float(os.environ.get("DEFAULT_JINGLE_DURATION", "6")) |
| JINGLE_VOLUME = float(os.environ.get("JINGLE_VOLUME", "2.00")) |
| ENABLE_REMOTE_JINGLES = os.environ.get("ENABLE_REMOTE_JINGLES", "true").lower() == "true" |
| REMOTE_JINGLES_DIR = os.environ.get("REMOTE_JINGLES_DIR", "/remote-jingles") |
|
|
| ENABLE_SEGMENTS = os.environ.get("ENABLE_SEGMENTS", "true").lower() == "true" |
| REMOTE_SEGMENTS_DIR = os.environ.get("REMOTE_SEGMENTS_DIR", "/remote-segments") |
| DEFAULT_SEGMENT_DURATION = float(os.environ.get("DEFAULT_SEGMENT_DURATION", "30")) |
| SEGMENT_VOLUME = float(os.environ.get("SEGMENT_VOLUME", "2.00")) |
| MIN_SONGS_BETWEEN_SEGMENTS = int(os.environ.get("MIN_SONGS_BETWEEN_SEGMENTS", "12")) |
| MAX_SONGS_BETWEEN_SEGMENTS = int(os.environ.get("MAX_SONGS_BETWEEN_SEGMENTS", "18")) |
| SEGMENT_PROBABILITY_AFTER_MIN = float(os.environ.get("SEGMENT_PROBABILITY_AFTER_MIN", "0.20")) |
|
|
| ENABLE_SONG_MASTERING = os.environ.get("ENABLE_SONG_MASTERING", "true").lower() == "true" |
| SONG_AUDIO_FILTER = os.environ.get( |
| "SONG_AUDIO_FILTER", |
| "equalizer=f=95:t=q:w=0.9:g=1.8,equalizer=f=3500:t=q:w=1.0:g=-0.8,equalizer=f=8500:t=q:w=0.8:g=-2.2", |
| ) |
| MIN_SONGS_BETWEEN_JINGLES = int(os.environ.get("MIN_SONGS_BETWEEN_JINGLES", "6")) |
| MAX_SONGS_BETWEEN_JINGLES = int(os.environ.get("MAX_SONGS_BETWEEN_JINGLES", "10")) |
| JINGLE_PROBABILITY_AFTER_MIN = float(os.environ.get("JINGLE_PROBABILITY_AFTER_MIN", "0.25")) |
|
|
| RADIO_CONTROL_DIR = os.environ.get("RADIO_CONTROL_DIR", "/radio-control") |
| RADIO_CONTROL_CONFIG_FILE = os.environ.get("RADIO_CONTROL_CONFIG_FILE", "config.json") |
| RADIO_TRACK_OVERRIDES_FILE = os.environ.get("RADIO_TRACK_OVERRIDES_FILE", "track_overrides.json") |
| RADIO_NEXT_ROTATION_FILE = os.environ.get("RADIO_NEXT_ROTATION_FILE", "next_rotation.json") |
| RADIO_NEXT_ROTATION_ARCHIVE_DIR = os.environ.get("RADIO_NEXT_ROTATION_ARCHIVE_DIR", "next_rotation_archive") |
| COVER_OVERRIDES_DIR = os.environ.get("COVER_OVERRIDES_DIR", "/cover-overrides") |
|
|
| ENABLE_CUSTOM_TRACKS = os.environ.get("ENABLE_CUSTOM_TRACKS", "true").lower() == "true" |
| CUSTOM_TRACKS_DIR = os.environ.get("CUSTOM_TRACKS_DIR", "/custom-tracks") |
| DEFAULT_CUSTOM_TRACK_DURATION = float(os.environ.get("DEFAULT_CUSTOM_TRACK_DURATION", "180")) |
| CUSTOM_TRACK_QUOTA = float(os.environ.get("CUSTOM_TRACK_QUOTA", "0.10")) |
|
|
| BUCKET_FS_ROOT = f"hf://buckets/{BUCKET_ID}" |
| BUCKET_RESOLVE_URL = f"https://huggingface.co/buckets/{BUCKET_ID}/resolve" |
|
|
| app = FastAPI(title="HF Bucket Radio Stream") |
|
|
| attach_huggingface_oauth(app) |
|
|
|
|
| def normalize_tags(raw_tags): |
| if isinstance(raw_tags, str): |
| normalized = ( |
| raw_tags |
| .replace("#", "") |
| .replace(";", ",") |
| .replace("|", ",") |
| ) |
|
|
| return [ |
| tag.strip() |
| for tag in normalized.split(",") |
| if tag.strip() |
| ] |
|
|
| if isinstance(raw_tags, list): |
| return [ |
| str(tag).strip() |
| for tag in raw_tags |
| if str(tag).strip() |
| ] |
|
|
| return [] |
|
|
|
|
| def probe_audio_duration(path_or_url: str, fallback: float) -> float: |
| try: |
| cmd = [ |
| "ffprobe", |
| "-v", |
| "error", |
| "-show_entries", |
| "format=duration", |
| "-of", |
| "default=noprint_wrappers=1:nokey=1", |
| path_or_url, |
| ] |
|
|
| result = subprocess.run( |
| cmd, |
| capture_output=True, |
| text=True, |
| timeout=10, |
| check=True, |
| ) |
|
|
| duration = float(result.stdout.strip()) |
|
|
| if duration > 0: |
| return duration |
|
|
| except Exception as e: |
| print(f"[audio] Could not probe duration for {path_or_url}: {e}") |
|
|
| return fallback |
|
|
|
|
|
|
| def parse_datetime(value): |
| if not value: |
| return None |
|
|
| if isinstance(value, datetime): |
| parsed = value |
| else: |
| try: |
| normalized = str(value).strip() |
|
|
| if normalized.endswith("Z"): |
| normalized = normalized[:-1] + "+00:00" |
|
|
| parsed = datetime.fromisoformat(normalized) |
| except Exception: |
| return None |
|
|
| if parsed.tzinfo is None: |
| parsed = parsed.replace(tzinfo=timezone.utc) |
|
|
| return parsed.astimezone(timezone.utc) |
|
|
|
|
| def asset_is_enabled(meta: dict) -> bool: |
| return bool(meta.get("enabled", True)) |
|
|
|
|
| def asset_is_expired(meta: dict, now: Optional[datetime] = None) -> bool: |
| expires_at = parse_datetime(meta.get("expires_at")) |
|
|
| if expires_at is None: |
| return False |
|
|
| now = now or datetime.now(timezone.utc) |
| return expires_at <= now |
|
|
|
|
| def safe_float(value, fallback: float) -> float: |
| try: |
| parsed = float(value) |
|
|
| if parsed > 0: |
| return parsed |
| except Exception: |
| pass |
|
|
| return fallback |
|
|
|
|
| def safe_int(value, fallback: int) -> int: |
| try: |
| return int(value) |
| except Exception: |
| return fallback |
|
|
|
|
| def safe_bool(value, fallback: bool) -> bool: |
| if isinstance(value, bool): |
| return value |
|
|
| if isinstance(value, str): |
| return value.strip().lower() in {"1", "true", "yes", "y", "on"} |
|
|
| if value is None: |
| return fallback |
|
|
| return bool(value) |
|
|
|
|
| def clamp_number(value, minimum=None, maximum=None): |
| if minimum is not None: |
| value = max(minimum, value) |
|
|
| if maximum is not None: |
| value = min(maximum, value) |
|
|
| return value |
|
|
|
|
| def next_utc_hour() -> datetime: |
| now = datetime.now(timezone.utc) |
| return now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) |
|
|
|
|
| def seconds_until_next_utc_hour() -> float: |
| return max(0.0, (next_utc_hour() - datetime.now(timezone.utc)).total_seconds()) |
|
|
|
|
| def next_utc_hour_iso() -> str: |
| return next_utc_hour().isoformat() |
|
|
|
|
| def next_rotation_prebuild_at() -> datetime: |
| return next_utc_hour() - timedelta(seconds=max(0, ROTATION_PREBUILD_LEAD_SECONDS)) |
|
|
|
|
| def next_rotation_prebuild_at_iso() -> str: |
| return next_rotation_prebuild_at().isoformat() |
|
|
|
|
|
|
| oauth_userinfo_cache_lock = threading.Lock() |
| oauth_userinfo_cache = {} |
|
|
|
|
| def oauth_bool(value) -> bool: |
| if isinstance(value, bool): |
| return value |
|
|
| if isinstance(value, str): |
| return value.strip().lower() in {"1", "true", "yes", "y", "on"} |
|
|
| return bool(value) |
|
|
|
|
| def fetch_oauth_userinfo(access_token: Optional[str]) -> dict: |
| if not access_token: |
| return {} |
|
|
| now = time.time() |
|
|
| with oauth_userinfo_cache_lock: |
| cached = oauth_userinfo_cache.get(access_token) |
|
|
| if cached and now - cached.get("fetched_at", 0) < OAUTH_USERINFO_CACHE_TTL_SECONDS: |
| return dict(cached.get("payload", {})) |
|
|
| request = urllib.request.Request( |
| OAUTH_USERINFO_URL, |
| headers={ |
| "Authorization": f"Bearer {access_token}", |
| "Accept": "application/json", |
| }, |
| method="GET", |
| ) |
|
|
| try: |
| with urllib.request.urlopen(request, timeout=5) as response: |
| payload = json.loads(response.read().decode("utf-8")) |
|
|
| if not isinstance(payload, dict): |
| payload = {} |
|
|
| with oauth_userinfo_cache_lock: |
| oauth_userinfo_cache[access_token] = { |
| "fetched_at": now, |
| "payload": dict(payload), |
| } |
|
|
| return payload |
|
|
| except Exception as e: |
| print(f"[oauth] Could not fetch OAuth userinfo endpoint: {e}") |
| return {} |
|
|
|
|
| def extract_oauth_field(user_info, *field_names, default=None): |
| for field_name in field_names: |
| if isinstance(user_info, dict) and field_name in user_info: |
| return user_info.get(field_name, default) |
|
|
| value = getattr(user_info, field_name, None) |
|
|
| if value is not None: |
| return value |
|
|
| raw = getattr(user_info, "__dict__", {}) or {} |
|
|
| for field_name in field_names: |
| if field_name in raw: |
| return raw.get(field_name, default) |
|
|
| return default |
|
|
|
|
| def oauth_user_from_request(request: Request): |
| """ |
| Returns a compact public user profile if the visitor is signed in with HF. |
| Returns None for anonymous visitors. |
| |
| Hugging Face exposes the PRO flag as `is_pro` in Python OAuthUserInfo |
| and as `isPro` in the JS/OIDC userinfo payload. We read both. |
| When the installed huggingface_hub version does not hydrate the field |
| correctly, we fall back to the OAuth `/oauth/userinfo` endpoint using |
| the current OAuth access token. |
| """ |
| try: |
| oauth_info = parse_huggingface_oauth(request) |
| except Exception as e: |
| print(f"[oauth] Could not parse OAuth info: {e}") |
| return None |
|
|
| if oauth_info is None or oauth_info.user_info is None: |
| return None |
|
|
| user = oauth_info.user_info |
| access_token = extract_oauth_field(oauth_info, "access_token") |
| raw_userinfo = fetch_oauth_userinfo(access_token) |
|
|
| username = ( |
| extract_oauth_field(user, "preferred_username") |
| or extract_oauth_field(raw_userinfo, "preferred_username") |
| or extract_oauth_field(user, "name") |
| or extract_oauth_field(raw_userinfo, "name") |
| or "unknown" |
| ) |
|
|
| raw_is_pro_value = ( |
| extract_oauth_field(user, "is_pro", "isPro", default=None) |
| ) |
|
|
| if raw_is_pro_value is None: |
| raw_is_pro_value = extract_oauth_field(raw_userinfo, "is_pro", "isPro", default=False) |
| elif oauth_bool(raw_is_pro_value) is False: |
| |
| |
| endpoint_is_pro = extract_oauth_field(raw_userinfo, "is_pro", "isPro", default=None) |
| if endpoint_is_pro is not None: |
| raw_is_pro_value = endpoint_is_pro |
|
|
| is_pro = oauth_bool(raw_is_pro_value) |
|
|
| return { |
| "id": extract_oauth_field(user, "sub") or extract_oauth_field(raw_userinfo, "sub"), |
| "username": username, |
| "avatar_url": extract_oauth_field(user, "picture") or extract_oauth_field(raw_userinfo, "picture"), |
| "profile_url": ( |
| extract_oauth_field(user, "profile") |
| or extract_oauth_field(raw_userinfo, "profile") |
| or f"https://huggingface.co/{username}" |
| ), |
| "is_pro": is_pro, |
| "isPro": is_pro, |
| "pro_status_source": "oauth_userinfo_endpoint" if raw_userinfo else "oauth_session", |
| } |
|
|
|
|
| def report_weight_for_user(user: Optional[dict]) -> int: |
| if not user: |
| return 1 |
|
|
| username = str(user.get("username") or "").strip().lower().lstrip("@") |
|
|
| if username in ADMIN_REPORT_USERNAMES: |
| return max(1, ADMIN_REPORT_WEIGHT) |
|
|
| return 1 |
|
|
|
|
| class ListenerTracker: |
| """ |
| Tracks logical active listeners from the web UI with heartbeats. |
| |
| Anonymous users are counted. |
| Signed-in HF users are counted and exposed with avatar metadata. |
| Direct /stream.mp3 listeners are handled separately by BroadcastEngine. |
| """ |
|
|
| def __init__(self, ttl_seconds: int = 45): |
| self.lock = threading.Lock() |
| self.listeners = {} |
| self.total_sessions = 0 |
| self.ttl_seconds = ttl_seconds |
|
|
| def _cleanup_unlocked(self): |
| now = time.time() |
|
|
| expired_ids = [ |
| listener_id |
| for listener_id, item in self.listeners.items() |
| if now - item.get("last_seen", 0) > self.ttl_seconds |
| ] |
|
|
| for listener_id in expired_ids: |
| self.listeners.pop(listener_id, None) |
|
|
| if expired_ids: |
| print( |
| f"[listeners] cleaned up expired={len(expired_ids)} " |
| f"active={len(self.listeners)}" |
| ) |
|
|
| def start(self, listener_id: str, user: Optional[dict] = None): |
| now = time.time() |
|
|
| with self.lock: |
| is_new = listener_id not in self.listeners |
| previous = self.listeners.get(listener_id, {}) |
|
|
| self.listeners[listener_id] = { |
| "started_at": previous.get("started_at", now), |
| "last_seen": now, |
| "user": user, |
| } |
|
|
| if is_new: |
| self.total_sessions += 1 |
|
|
| self._cleanup_unlocked() |
|
|
| print( |
| f"[listeners] start active={len(self.listeners)} " |
| f"total_sessions={self.total_sessions} " |
| f"user={user.get('username') if user else 'anonymous'}" |
| ) |
|
|
| def heartbeat(self, listener_id: str, user: Optional[dict] = None): |
| now = time.time() |
|
|
| with self.lock: |
| is_new = listener_id not in self.listeners |
| previous = self.listeners.get(listener_id, {}) |
|
|
| self.listeners[listener_id] = { |
| "started_at": previous.get("started_at", now), |
| "last_seen": now, |
| "user": user or previous.get("user"), |
| } |
|
|
| if is_new: |
| self.total_sessions += 1 |
|
|
| self._cleanup_unlocked() |
|
|
| def stop(self, listener_id: str): |
| with self.lock: |
| self.listeners.pop(listener_id, None) |
| self._cleanup_unlocked() |
| print(f"[listeners] stop active={len(self.listeners)}") |
|
|
| def snapshot(self): |
| with self.lock: |
| self._cleanup_unlocked() |
|
|
| seen_user_ids = set() |
| signed_in_profiles = [] |
|
|
| for item in self.listeners.values(): |
| user = item.get("user") |
|
|
| if not user: |
| continue |
|
|
| user_id = user.get("id") or user.get("username") |
|
|
| if user_id in seen_user_ids: |
| continue |
|
|
| seen_user_ids.add(user_id) |
|
|
| signed_in_profiles.append( |
| { |
| "username": user.get("username"), |
| "avatar_url": user.get("avatar_url"), |
| "profile_url": user.get("profile_url"), |
| } |
| ) |
|
|
| total_signed_in_listener_count = len(signed_in_profiles) |
|
|
| |
| rng = random.Random(int(time.time() // 60)) |
| rng.shuffle(signed_in_profiles) |
|
|
| visible_signed_in_profiles = signed_in_profiles[:8] |
|
|
| return { |
| "active_listeners": len(self.listeners), |
| "total_connections": self.total_sessions, |
| "listener_ttl_seconds": self.ttl_seconds, |
| "listener_profiles": visible_signed_in_profiles, |
| "signed_in_listener_count": total_signed_in_listener_count, |
| "visible_listener_profile_count": len(visible_signed_in_profiles), |
| } |
|
|
|
|
| listeners = ListenerTracker(ttl_seconds=LISTENER_TTL_SECONDS) |
|
|
|
|
| class LikeStore: |
| def __init__(self, likes_dir: str, likes_file: str): |
| self.lock = threading.Lock() |
| self.likes_dir = Path(likes_dir) |
| self.likes_path = self.likes_dir / likes_file |
| self.tmp_path = self.likes_dir / f".{likes_file}.tmp" |
| self.data = {} |
| self.last_error = None |
|
|
| self.load() |
|
|
| def load(self): |
| with self.lock: |
| try: |
| self.likes_dir.mkdir(parents=True, exist_ok=True) |
|
|
| if self.likes_path.exists(): |
| with open(self.likes_path, "r", encoding="utf-8") as f: |
| self.data = json.load(f) |
| else: |
| self.data = {} |
| self._save_unlocked() |
|
|
| self.last_error = None |
| print(f"[likes] Loaded {len(self.data)} feedback entries from {self.likes_path}") |
|
|
| except Exception as e: |
| self.last_error = str(e) |
| self.data = {} |
| print(f"[likes] Could not load feedback: {e}") |
|
|
| def _save_unlocked(self): |
| payload = json.dumps(self.data, indent=2, ensure_ascii=False) |
|
|
| with open(self.tmp_path, "w", encoding="utf-8") as f: |
| f.write(payload) |
|
|
| os.replace(self.tmp_path, self.likes_path) |
|
|
| def _item_unlocked(self, song_id: str) -> dict: |
| item = dict(self.data.get(song_id, {})) |
| item["count"] = int(item.get("count", 0)) |
| item["upvote_count"] = int(item.get("upvote_count", 0)) |
| item["downvote_count"] = int(item.get("downvote_count", 0)) |
| item["report_count"] = int(item.get("report_count", 0)) |
| return item |
|
|
| def _public_result_unlocked(self, song_id: str, item: dict, **extra) -> dict: |
| payload = { |
| "song_id": song_id, |
| "count": int(item.get("count", 0)), |
| "upvote_count": int(item.get("upvote_count", 0)), |
| "downvote_count": int(item.get("downvote_count", 0)), |
| "report_count": int(item.get("report_count", 0)), |
| "updated_at": item.get("updated_at"), |
| } |
| payload.update(extra) |
| return payload |
|
|
| def get_count(self, song_id: str) -> int: |
| with self.lock: |
| return int(self.data.get(song_id, {}).get("count", 0)) |
|
|
| def get_upvote_count(self, song_id: str) -> int: |
| with self.lock: |
| return int(self.data.get(song_id, {}).get("upvote_count", 0)) |
|
|
| def get_downvote_count(self, song_id: str) -> int: |
| with self.lock: |
| return int(self.data.get(song_id, {}).get("downvote_count", 0)) |
|
|
| def get_report_count(self, song_id: str) -> int: |
| with self.lock: |
| return int(self.data.get(song_id, {}).get("report_count", 0)) |
|
|
| def increment(self, song_id: str) -> dict: |
| now = datetime.now(timezone.utc).isoformat() |
|
|
| with self.lock: |
| item = self._item_unlocked(song_id) |
| item["count"] = int(item.get("count", 0)) + 1 |
| item["updated_at"] = now |
|
|
| self.data[song_id] = item |
|
|
| try: |
| self._save_unlocked() |
| self.last_error = None |
| except Exception as e: |
| self.last_error = str(e) |
| print(f"[likes] Could not save like: {e}") |
| raise |
|
|
| return self._public_result_unlocked(song_id, item) |
|
|
| def upvote(self, song_id: str) -> dict: |
| now = datetime.now(timezone.utc).isoformat() |
|
|
| with self.lock: |
| item = self._item_unlocked(song_id) |
| item["upvote_count"] = int(item.get("upvote_count", 0)) + 1 |
| item["updated_at"] = now |
|
|
| self.data[song_id] = item |
|
|
| try: |
| self._save_unlocked() |
| self.last_error = None |
| except Exception as e: |
| self.last_error = str(e) |
| print(f"[likes] Could not save upvote: {e}") |
| raise |
|
|
| return self._public_result_unlocked(song_id, item) |
|
|
| def unupvote(self, song_id: str) -> dict: |
| now = datetime.now(timezone.utc).isoformat() |
|
|
| with self.lock: |
| item = self._item_unlocked(song_id) |
| item["upvote_count"] = max(0, int(item.get("upvote_count", 0)) - 1) |
| item["updated_at"] = now |
|
|
| self.data[song_id] = item |
|
|
| try: |
| self._save_unlocked() |
| self.last_error = None |
| except Exception as e: |
| self.last_error = str(e) |
| print(f"[likes] Could not undo upvote: {e}") |
| raise |
|
|
| return self._public_result_unlocked(song_id, item) |
|
|
| def downvote(self, song_id: str) -> dict: |
| now = datetime.now(timezone.utc).isoformat() |
|
|
| with self.lock: |
| item = self._item_unlocked(song_id) |
| item["downvote_count"] = int(item.get("downvote_count", 0)) + 1 |
| item["updated_at"] = now |
|
|
| self.data[song_id] = item |
|
|
| try: |
| self._save_unlocked() |
| self.last_error = None |
| except Exception as e: |
| self.last_error = str(e) |
| print(f"[likes] Could not save downvote: {e}") |
| raise |
|
|
| return self._public_result_unlocked(song_id, item) |
|
|
| def undownvote(self, song_id: str) -> dict: |
| now = datetime.now(timezone.utc).isoformat() |
|
|
| with self.lock: |
| item = self._item_unlocked(song_id) |
| item["downvote_count"] = max(0, int(item.get("downvote_count", 0)) - 1) |
| item["updated_at"] = now |
|
|
| self.data[song_id] = item |
|
|
| try: |
| self._save_unlocked() |
| self.last_error = None |
| except Exception as e: |
| self.last_error = str(e) |
| print(f"[likes] Could not undo downvote: {e}") |
| raise |
|
|
| return self._public_result_unlocked(song_id, item) |
|
|
| def report(self, song_id: str, weight: int = 1) -> dict: |
| now = datetime.now(timezone.utc).isoformat() |
| weight = max(1, int(weight or 1)) |
|
|
| with self.lock: |
| item = self._item_unlocked(song_id) |
| item["report_count"] = int(item.get("report_count", 0)) + weight |
| item["reported_at"] = now |
| item["updated_at"] = now |
|
|
| self.data[song_id] = item |
|
|
| try: |
| self._save_unlocked() |
| self.last_error = None |
| except Exception as e: |
| self.last_error = str(e) |
| print(f"[likes] Could not save report: {e}") |
| raise |
|
|
| return self._public_result_unlocked(song_id, item, reported_at=now, report_weight=weight) |
|
|
| def unreport(self, song_id: str, weight: int = 1) -> dict: |
| now = datetime.now(timezone.utc).isoformat() |
| weight = max(1, int(weight or 1)) |
|
|
| with self.lock: |
| item = self._item_unlocked(song_id) |
| item["report_count"] = max(0, int(item.get("report_count", 0)) - weight) |
| item["unreported_at"] = now |
| item["updated_at"] = now |
|
|
| if item["report_count"] == 0: |
| item.pop("reported_at", None) |
|
|
| self.data[song_id] = item |
|
|
| try: |
| self._save_unlocked() |
| self.last_error = None |
| except Exception as e: |
| self.last_error = str(e) |
| print(f"[likes] Could not save unreport: {e}") |
| raise |
|
|
| return self._public_result_unlocked(song_id, item, unreported_at=now, report_weight=weight) |
|
|
| def snapshot(self) -> dict: |
| with self.lock: |
| return dict(self.data) |
|
|
| likes = LikeStore(LIKES_DIR, LIKES_FILE) |
|
|
|
|
| class RadioState: |
| """ |
| Holds the logical radio state. |
| |
| In V2, this class no longer produces one stream per listener. |
| The BroadcastEngine is responsible for audio output and advances the radio |
| when the current audio item finishes. |
| """ |
|
|
| def __init__(self): |
| self.lock = threading.Lock() |
| self.fs = HfFileSystem() |
|
|
| self.tracks = [] |
| self.jingles = [] |
| self.segments = [] |
| self.custom_tracks = [] |
|
|
| self.current_index: Optional[int] = None |
| self.current_kind = "song" |
| self.current_jingle: Optional[dict] = None |
| self.current_segment: Optional[dict] = None |
|
|
| self.started_at: Optional[float] = None |
|
|
| self.is_playing = False |
| self.mode = RADIO_MODE |
|
|
| self.is_loading = False |
| self.last_error = None |
| self.last_refresh_at: Optional[float] = None |
|
|
| self.next_tracks = [] |
| self.next_rotation_ready = False |
| self.next_rotation_is_building = False |
| self.next_rotation_built_at: Optional[float] = None |
| self.next_rotation_target_at: Optional[str] = None |
| self.next_rotation_last_error = None |
| self.next_rotation_source = None |
| self.next_rotation_payload = None |
| self.approved_next_rotation_last_loaded_at = None |
| self.approved_next_rotation_last_error = None |
| self.approved_next_rotation_last_consumed_at = None |
| self.last_rotation_activated_at: Optional[float] = None |
| self.current_rotation_source = "initial" |
| self.current_rotation_target_at = None |
| self.current_rotation_activated_at = None |
| self.current_rotation_track_ids = [] |
|
|
| self.songs_since_last_jingle = 0 |
| self.last_jingle_id: Optional[str] = None |
| self.songs_since_last_segment = 0 |
| self.last_segment_id: Optional[str] = None |
| self.recently_played_songs: list[dict] = [] |
|
|
| self.runtime_config = self._default_runtime_config() |
| self.runtime_config_source = "defaults" |
| self.runtime_config_last_loaded_at = None |
| self.runtime_config_last_error = None |
| self.runtime_control_payload = {} |
|
|
| self.track_overrides = {} |
| self.track_overrides_last_loaded_at = None |
| self.track_overrides_last_error = None |
| self.cover_overrides_count = 0 |
| self.custom_tracks_last_loaded_at = None |
| self.custom_tracks_last_error = None |
|
|
| def _default_runtime_config(self) -> dict: |
| return { |
| "max_tracks": MAX_TRACKS, |
| "rotation_target_seconds": ROTATION_TARGET_SECONDS, |
| "report_hide_threshold": REPORT_HIDE_THRESHOLD, |
| "like_weight_per_like": LIKE_WEIGHT_PER_LIKE, |
| "like_weight_max": LIKE_WEIGHT_MAX, |
| "upvote_weight_per_upvote": UPVOTE_WEIGHT_PER_UPVOTE, |
| "upvote_weight_max": UPVOTE_WEIGHT_MAX, |
| "downvote_weight_per_downvote": DOWNVOTE_WEIGHT_PER_DOWNVOTE, |
| "downvote_weight_max": DOWNVOTE_WEIGHT_MAX, |
| "track_weight_min": TRACK_WEIGHT_MIN, |
| "track_weight_max": TRACK_WEIGHT_MAX, |
| "auto_rotation_min_effective_score_exclusive": AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE, |
| "recent_track_quota": RECENT_TRACK_QUOTA, |
| "recent_track_window_hours": RECENT_TRACK_WINDOW_HOURS, |
| "recently_played_exclude_minutes": RECENTLY_PLAYED_EXCLUDE_MINUTES, |
| "recently_played_memory_max": RECENTLY_PLAYED_MEMORY_MAX, |
| "recently_played_min_candidates_after_filter": RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER, |
| "meta_load_workers": META_LOAD_WORKERS, |
| "enable_jingles": ENABLE_JINGLES, |
| "enable_remote_jingles": ENABLE_REMOTE_JINGLES, |
| "min_songs_between_jingles": MIN_SONGS_BETWEEN_JINGLES, |
| "max_songs_between_jingles": MAX_SONGS_BETWEEN_JINGLES, |
| "jingle_probability_after_min": JINGLE_PROBABILITY_AFTER_MIN, |
| "enable_segments": ENABLE_SEGMENTS, |
| "min_songs_between_segments": MIN_SONGS_BETWEEN_SEGMENTS, |
| "max_songs_between_segments": MAX_SONGS_BETWEEN_SEGMENTS, |
| "segment_probability_after_min": SEGMENT_PROBABILITY_AFTER_MIN, |
| "enable_custom_tracks": ENABLE_CUSTOM_TRACKS, |
| "custom_track_quota": CUSTOM_TRACK_QUOTA, |
| "enable_song_mastering": ENABLE_SONG_MASTERING, |
| "song_audio_filter": SONG_AUDIO_FILTER, |
| } |
|
|
| def _cfg(self, key: str, fallback=None): |
| if fallback is None: |
| fallback = self._default_runtime_config().get(key) |
|
|
| return self.runtime_config.get(key, fallback) |
|
|
| def _coerce_runtime_config(self, custom_config: dict) -> dict: |
| defaults = self._default_runtime_config() |
| result = dict(defaults) |
|
|
| for key, default_value in defaults.items(): |
| if key not in custom_config: |
| continue |
|
|
| raw_value = custom_config.get(key) |
|
|
| if isinstance(default_value, bool): |
| value = safe_bool(raw_value, default_value) |
| elif isinstance(default_value, int) and not isinstance(default_value, bool): |
| value = safe_int(raw_value, default_value) |
| elif isinstance(default_value, float): |
| value = safe_float(raw_value, default_value) |
| elif isinstance(default_value, str): |
| value = str(raw_value or "") |
| else: |
| value = raw_value |
|
|
| if key in { |
| "recent_track_quota", |
| "jingle_probability_after_min", |
| "segment_probability_after_min", |
| "custom_track_quota", |
| }: |
| value = float(clamp_number(float(value), 0.0, 1.0)) |
|
|
| if key in { |
| "max_tracks", |
| "rotation_target_seconds", |
| "meta_load_workers", |
| "recent_track_window_hours", |
| "recently_played_exclude_minutes", |
| "recently_played_memory_max", |
| "recently_played_min_candidates_after_filter", |
| "min_songs_between_jingles", |
| "max_songs_between_jingles", |
| "min_songs_between_segments", |
| "max_songs_between_segments", |
| }: |
| value = int(clamp_number(int(value), 0, None)) |
|
|
| if key in { |
| "like_weight_per_like", |
| "like_weight_max", |
| "upvote_weight_per_upvote", |
| "upvote_weight_max", |
| "downvote_weight_per_downvote", |
| "downvote_weight_max", |
| "track_weight_min", |
| "track_weight_max", |
| }: |
| value = float(clamp_number(float(value), 0.0, None)) |
|
|
| result[key] = value |
|
|
| if result["max_songs_between_jingles"] < result["min_songs_between_jingles"]: |
| result["max_songs_between_jingles"] = result["min_songs_between_jingles"] |
|
|
| if result["max_songs_between_segments"] < result["min_songs_between_segments"]: |
| result["max_songs_between_segments"] = result["min_songs_between_segments"] |
|
|
| if result["track_weight_max"] < result["track_weight_min"]: |
| result["track_weight_max"] = result["track_weight_min"] |
|
|
| return result |
|
|
| def refresh_runtime_controls(self): |
| control_dir = Path(RADIO_CONTROL_DIR) |
| config_path = control_dir / RADIO_CONTROL_CONFIG_FILE |
| overrides_path = control_dir / RADIO_TRACK_OVERRIDES_FILE |
| now = time.time() |
|
|
| runtime_config = self._default_runtime_config() |
| runtime_config_source = "defaults" |
| runtime_config_error = None |
| runtime_payload = {} |
|
|
| try: |
| if config_path.exists(): |
| runtime_payload = json.loads(config_path.read_text(encoding="utf-8")) |
|
|
| enabled = safe_bool(runtime_payload.get("enabled", True), True) |
| use_custom_config = safe_bool(runtime_payload.get("use_custom_config", False), False) |
| custom_config = runtime_payload.get("config", {}) |
|
|
| if enabled and use_custom_config and isinstance(custom_config, dict): |
| runtime_config = self._coerce_runtime_config(custom_config) |
| runtime_config_source = "studio_custom" |
| else: |
| runtime_config_source = "defaults_disabled_or_not_requested" |
| else: |
| runtime_config_source = "defaults_no_config_file" |
| except Exception as e: |
| runtime_config = self._default_runtime_config() |
| runtime_config_source = "defaults_config_error" |
| runtime_config_error = str(e) |
| print(f"[radio-control] Could not load config {config_path}: {e}") |
|
|
| track_overrides = {} |
| overrides_error = None |
|
|
| try: |
| if overrides_path.exists(): |
| payload = json.loads(overrides_path.read_text(encoding="utf-8")) |
|
|
| if isinstance(payload, dict): |
| for song_id, override in payload.items(): |
| if not song_id or not isinstance(override, dict): |
| continue |
|
|
| track_overrides[str(song_id)] = { |
| "hidden": safe_bool(override.get("hidden", False), False), |
| "upvote_boost": max(0, safe_int(override.get("upvote_boost", 0), 0)), |
| "downvote_boost": max(0, safe_int(override.get("downvote_boost", 0), 0)), |
| "like_boost": max(0, safe_int(override.get("like_boost", 0), 0)), |
| "notes": str(override.get("notes") or ""), |
| "updated_at": override.get("updated_at"), |
| } |
| except Exception as e: |
| overrides_error = str(e) |
| print(f"[radio-control] Could not load track overrides {overrides_path}: {e}") |
|
|
| cover_overrides_count = 0 |
| cover_root = Path(COVER_OVERRIDES_DIR) |
| try: |
| if cover_root.exists(): |
| cover_overrides_count = len(list(cover_root.glob("*/thumb.png"))) |
| except Exception as e: |
| print(f"[cover-overrides] Could not count cover overrides: {e}") |
|
|
| with self.lock: |
| self.runtime_config = runtime_config |
| self.runtime_config_source = runtime_config_source |
| self.runtime_config_last_loaded_at = now |
| self.runtime_config_last_error = runtime_config_error |
| self.runtime_control_payload = runtime_payload if isinstance(runtime_payload, dict) else {} |
| self.track_overrides = track_overrides |
| self.track_overrides_last_loaded_at = now |
| self.track_overrides_last_error = overrides_error |
| self.cover_overrides_count = cover_overrides_count |
|
|
| print( |
| f"[radio-control] config_source={runtime_config_source} " |
| f"overrides={len(track_overrides)} covers={cover_overrides_count}" |
| ) |
|
|
| def _reload_track_overrides_unlocked(self): |
| """ |
| Reload only /radio-control/track_overrides.json while self.lock is |
| already held. |
| |
| This is intentionally lighter than refresh_runtime_controls(): it does |
| not reload the full runtime config, jingles, segments, custom tracks or |
| rotation pack. Its only purpose is to make Studio boosts/hidden flags |
| available when a queued song starts playing, without waiting for the |
| next full radio refresh. |
| """ |
| overrides_path = Path(RADIO_CONTROL_DIR) / RADIO_TRACK_OVERRIDES_FILE |
| now = time.time() |
|
|
| track_overrides = {} |
| overrides_error = None |
|
|
| try: |
| if overrides_path.exists(): |
| payload = json.loads(overrides_path.read_text(encoding="utf-8")) |
|
|
| if isinstance(payload, dict): |
| for song_id, override in payload.items(): |
| if not song_id or not isinstance(override, dict): |
| continue |
|
|
| track_overrides[str(song_id)] = { |
| "hidden": safe_bool(override.get("hidden", False), False), |
| "upvote_boost": max(0, safe_int(override.get("upvote_boost", 0), 0)), |
| "downvote_boost": max(0, safe_int(override.get("downvote_boost", 0), 0)), |
| "like_boost": max(0, safe_int(override.get("like_boost", 0), 0)), |
| "notes": str(override.get("notes") or ""), |
| "updated_at": override.get("updated_at"), |
| } |
| except Exception as e: |
| overrides_error = str(e) |
| print(f"[radio-control] Could not reload track overrides {overrides_path}: {e}") |
|
|
| self.track_overrides = track_overrides |
| self.track_overrides_last_loaded_at = now |
| self.track_overrides_last_error = overrides_error |
|
|
| def _track_override_unlocked(self, song_id: Optional[str]) -> dict: |
| if not song_id: |
| return {} |
|
|
| return dict(self.track_overrides.get(str(song_id), {})) |
|
|
| def _cover_override_path(self, song_id: Optional[str]) -> Optional[Path]: |
| if not song_id: |
| return None |
|
|
| path = Path(COVER_OVERRIDES_DIR) / str(song_id) / "thumb.png" |
|
|
| if path.exists() and path.is_file(): |
| return path |
|
|
| return None |
|
|
| def _apply_track_controls_unlocked(self, track: dict) -> dict: |
| if not track or track.get("is_jingle") or track.get("is_segment"): |
| return track |
|
|
| song_id = str(track.get("id") or "") |
| override = self._track_override_unlocked(song_id) |
|
|
| like_count = safe_int(track.get("like_count", 0), 0) |
| upvote_count = safe_int(track.get("upvote_count", 0), 0) |
| downvote_count = safe_int(track.get("downvote_count", 0), 0) |
| report_count = safe_int(track.get("report_count", 0), 0) |
|
|
| like_boost = max(0, safe_int(override.get("like_boost", 0), 0)) |
| upvote_boost = max(0, safe_int(override.get("upvote_boost", 0), 0)) |
| downvote_boost = max(0, safe_int(override.get("downvote_boost", 0), 0)) |
|
|
| track["studio_hidden"] = bool(override.get("hidden", False)) |
| track["studio_notes"] = override.get("notes", "") |
| track["studio_like_boost"] = like_boost |
| track["studio_upvote_boost"] = upvote_boost |
| track["studio_downvote_boost"] = downvote_boost |
| track["effective_like_count"] = like_count + like_boost |
| track["effective_upvote_count"] = upvote_count + upvote_boost |
| track["effective_downvote_count"] = downvote_count + downvote_boost |
| track["effective_report_count"] = report_count |
|
|
| cover_override = self._cover_override_path(song_id) |
|
|
| if cover_override: |
| track["thumb_url"] = f"/cover-overrides/{song_id}/thumb.png" |
| track["has_cover_override"] = True |
| else: |
| track["has_cover_override"] = False |
|
|
| return track |
|
|
| def _prune_recently_played_unlocked(self): |
| if not self.recently_played_songs: |
| return |
|
|
| now = time.time() |
| recently_played_exclude_minutes = int(self._cfg("recently_played_exclude_minutes", RECENTLY_PLAYED_EXCLUDE_MINUTES)) |
| recently_played_memory_max = int(self._cfg("recently_played_memory_max", RECENTLY_PLAYED_MEMORY_MAX)) |
|
|
| if recently_played_exclude_minutes > 0: |
| cutoff = now - (recently_played_exclude_minutes * 60) |
| self.recently_played_songs = [ |
| item |
| for item in self.recently_played_songs |
| if item.get("id") and float(item.get("played_at", 0)) >= cutoff |
| ] |
|
|
| if recently_played_memory_max > 0: |
| self.recently_played_songs = self.recently_played_songs[-recently_played_memory_max:] |
|
|
| def _remember_played_song_unlocked(self, song_id: Optional[str]): |
| if not song_id: |
| return |
|
|
| self.recently_played_songs.append({ |
| "id": str(song_id), |
| "played_at": time.time(), |
| }) |
| self._prune_recently_played_unlocked() |
|
|
| def _recently_played_exclusion_set_unlocked(self) -> set[str]: |
| recently_played_exclude_minutes = int(self._cfg("recently_played_exclude_minutes", RECENTLY_PLAYED_EXCLUDE_MINUTES)) |
|
|
| if recently_played_exclude_minutes <= 0: |
| return set() |
|
|
| self._prune_recently_played_unlocked() |
|
|
| now = time.time() |
| cutoff = now - (recently_played_exclude_minutes * 60) |
|
|
| return { |
| str(item.get("id")) |
| for item in self.recently_played_songs |
| if item.get("id") and float(item.get("played_at", 0)) >= cutoff |
| } |
|
|
| def _make_basic_track(self, song_id: str): |
| track = { |
| "id": song_id, |
| "title": f"Song {song_id}", |
| "description": "", |
| "lyrics": "", |
| "tags": [], |
| "duration": DEFAULT_DURATION, |
| "created_at": "", |
| "audio_url": f"{BUCKET_RESOLVE_URL}/songs/{song_id}/{song_id}.wav", |
| "stream_url": f"{BUCKET_RESOLVE_URL}/songs/{song_id}/{song_id}.wav", |
| "thumb_url": f"{BUCKET_RESOLVE_URL}/songs/{song_id}/thumb.png", |
| "meta_loaded": False, |
| "like_count": likes.get_count(song_id), |
| "upvote_count": likes.get_upvote_count(song_id), |
| "downvote_count": likes.get_downvote_count(song_id), |
| "report_count": likes.get_report_count(song_id), |
| "is_jingle": False, |
| "is_segment": False, |
| "is_custom_track": False, |
| } |
|
|
| return self._apply_track_controls_unlocked(track) |
|
|
| def _make_jingle_track(self, path: Path): |
| duration = probe_audio_duration(str(path), DEFAULT_JINGLE_DURATION) |
|
|
| return { |
| "id": path.stem, |
| "title": "HF Community Radio", |
| "description": "Station ID", |
| "lyrics": "", |
| "tags": ["jingle"], |
| "duration": duration, |
| "created_at": "", |
| "audio_url": f"/jingles/{path.name}", |
| "stream_url": str(path), |
| "thumb_url": "", |
| "meta_loaded": True, |
| "like_count": 0, |
| "upvote_count": 0, |
| "downvote_count": 0, |
| "report_count": 0, |
| "is_jingle": True, |
| "is_segment": False, |
| "is_custom_track": False, |
| "source_url": "", |
| } |
|
|
| def _make_remote_jingle_track(self, meta_path: Path): |
| meta = json.loads(meta_path.read_text(encoding="utf-8")) |
|
|
| if not asset_is_enabled(meta): |
| return None |
|
|
| audio_path = meta_path.parent / "audio.mp3" |
|
|
| if not audio_path.exists(): |
| print(f"[jingles] Missing remote jingle audio: {audio_path}") |
| return None |
|
|
| asset_id = meta.get("id") or meta_path.parent.name |
| kind = meta.get("kind") or "station_id" |
| duration = safe_float(meta.get("duration"), DEFAULT_JINGLE_DURATION) |
|
|
| return { |
| "id": str(asset_id), |
| "title": meta.get("title") or "HF Radio", |
| "description": meta.get("description") or "", |
| "lyrics": "", |
| "tags": ["jingle", str(kind)], |
| "duration": duration, |
| "created_at": meta.get("created_at", ""), |
| "updated_at": meta.get("updated_at", ""), |
| "audio_url": meta.get("audio_url") or "", |
| "stream_url": str(audio_path), |
| "thumb_url": "", |
| "meta_loaded": True, |
| "like_count": 0, |
| "upvote_count": 0, |
| "downvote_count": 0, |
| "report_count": 0, |
| "is_jingle": True, |
| "is_segment": False, |
| "is_custom_track": False, |
| "kind": kind, |
| "source_url": "", |
| } |
|
|
| def _make_segment_track(self, meta_path: Path): |
| meta = json.loads(meta_path.read_text(encoding="utf-8")) |
|
|
| if not asset_is_enabled(meta): |
| return None |
|
|
| if asset_is_expired(meta): |
| return None |
|
|
| audio_path = meta_path.parent / "audio.mp3" |
|
|
| if not audio_path.exists(): |
| print(f"[segments] Missing segment audio: {audio_path}") |
| return None |
|
|
| asset_id = meta.get("id") or meta_path.parent.name |
| kind = meta.get("kind") or "community_news" |
| duration = safe_float(meta.get("duration"), DEFAULT_SEGMENT_DURATION) |
|
|
| tags = ["community update", str(kind).replace("_", " ")] |
|
|
| return { |
| "id": str(asset_id), |
| "title": meta.get("title") or "Community Update", |
| "description": meta.get("description") or "", |
| "lyrics": "", |
| "tags": tags, |
| "duration": duration, |
| "created_at": meta.get("created_at", ""), |
| "updated_at": meta.get("updated_at", ""), |
| "expires_at": meta.get("expires_at"), |
| "audio_url": meta.get("audio_url") or "", |
| "stream_url": str(audio_path), |
| "thumb_url": "", |
| "meta_loaded": True, |
| "like_count": 0, |
| "upvote_count": 0, |
| "downvote_count": 0, |
| "report_count": 0, |
| "is_jingle": False, |
| "is_segment": True, |
| "is_custom_track": False, |
| "kind": kind, |
| "public_label": meta.get("public_label") or "Community Update", |
| "source_url": meta.get("source_url") or "", |
| } |
|
|
| def _make_custom_track(self, meta_path: Path): |
| meta = json.loads(meta_path.read_text(encoding="utf-8")) |
|
|
| if not asset_is_enabled(meta): |
| return None |
|
|
| audio_path = meta_path.parent / "audio.mp3" |
|
|
| if not audio_path.exists(): |
| print(f"[custom-tracks] Missing custom track audio: {audio_path}") |
| return None |
|
|
| asset_id = str(meta.get("id") or meta_path.parent.name) |
| duration = safe_float(meta.get("duration"), DEFAULT_CUSTOM_TRACK_DURATION) |
| tags = normalize_tags(meta.get("tags", [])) |
|
|
| if not tags: |
| tags = ["custom track"] |
|
|
| thumb_path = meta_path.parent / "thumb.png" |
| thumb_url = meta.get("thumb_url") or "" |
|
|
| if thumb_path.exists() and thumb_path.is_file(): |
| thumb_url = f"/custom-tracks/{asset_id}/thumb.png" |
|
|
| track = { |
| "id": asset_id, |
| "title": meta.get("title") or f"Custom Track {asset_id}", |
| "description": meta.get("description", ""), |
| "lyrics": meta.get("lyrics", ""), |
| "tags": tags, |
| "duration": duration, |
| "created_at": meta.get("created_at", ""), |
| "updated_at": meta.get("updated_at", ""), |
| "audio_url": meta.get("audio_url") or "", |
| "stream_url": str(audio_path), |
| "thumb_url": thumb_url, |
| "meta_loaded": True, |
| "like_count": likes.get_count(asset_id), |
| "upvote_count": likes.get_upvote_count(asset_id), |
| "downvote_count": likes.get_downvote_count(asset_id), |
| "report_count": likes.get_report_count(asset_id), |
| "is_jingle": False, |
| "is_segment": False, |
| "is_custom_track": True, |
| "type": meta.get("type") or "custom_track", |
| "source_model": meta.get("source_model") or "", |
| "provider": meta.get("provider") or "", |
| "prompt": meta.get("prompt") or "", |
| "quality_score": meta.get("quality_score"), |
| "quality_notes": meta.get("quality_notes") or "", |
| "internal_notes": meta.get("internal_notes") or "", |
| } |
|
|
| return self._apply_track_controls_unlocked(track) |
|
|
| def _song_is_hidden_by_reports(self, song_id: str) -> bool: |
| override = self._track_override_unlocked(song_id) |
|
|
| if override.get("hidden"): |
| return True |
|
|
| report_hide_threshold = int(self._cfg("report_hide_threshold", REPORT_HIDE_THRESHOLD)) |
|
|
| if report_hide_threshold <= 0: |
| return False |
|
|
| return likes.get_report_count(song_id) >= report_hide_threshold |
|
|
| def _effective_score_for_song_id_unlocked(self, song_id: Optional[str]) -> int: |
| """Return the editorial effective score used for auto-rotation eligibility. |
| |
| Keep this aligned with Studio's generated draft score contract: |
| likes + upvotes + Studio boosts - downvotes - Studio downvote penalty - reports. |
| """ |
| if not song_id: |
| return 0 |
|
|
| song_id = str(song_id) |
| override = self._track_override_unlocked(song_id) |
| if not isinstance(override, dict): |
| override = {} |
|
|
| like_count = likes.get_count(song_id) |
| upvote_count = likes.get_upvote_count(song_id) |
| downvote_count = likes.get_downvote_count(song_id) |
| report_count = likes.get_report_count(song_id) |
|
|
| like_boost = max(0, safe_int(override.get("like_boost", 0), 0)) |
| upvote_boost = max(0, safe_int(override.get("upvote_boost", 0), 0)) |
| downvote_boost = max(0, safe_int(override.get("downvote_boost", 0), 0)) |
|
|
| return like_count + upvote_count + like_boost + upvote_boost - downvote_count - downvote_boost - report_count |
|
|
| def _effective_score_for_track_unlocked(self, track: dict) -> int: |
| if not track: |
| return 0 |
| song_id = track.get("id") |
| if song_id: |
| return self._effective_score_for_song_id_unlocked(str(song_id)) |
| like_count = safe_int(track.get("effective_like_count", track.get("like_count", 0)), 0) |
| upvote_count = safe_int(track.get("effective_upvote_count", track.get("upvote_count", 0)), 0) |
| downvote_count = safe_int(track.get("effective_downvote_count", track.get("downvote_count", 0)), 0) |
| report_count = safe_int(track.get("effective_report_count", track.get("report_count", 0)), 0) |
| return like_count + upvote_count - downvote_count - report_count |
|
|
| def _track_is_below_auto_rotation_score_floor_unlocked(self, track_or_song_id) -> bool: |
| threshold = float(self._cfg( |
| "auto_rotation_min_effective_score_exclusive", |
| AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE, |
| )) |
| if isinstance(track_or_song_id, dict): |
| score = self._effective_score_for_track_unlocked(track_or_song_id) |
| else: |
| score = self._effective_score_for_song_id_unlocked(str(track_or_song_id or "")) |
| return score <= threshold |
|
|
| def _feedback_weight_for_track(self, track: dict) -> float: |
| like_count = safe_int(track.get("effective_like_count", track.get("like_count", 0)), 0) |
| upvote_count = safe_int(track.get("effective_upvote_count", track.get("upvote_count", 0)), 0) |
| downvote_count = safe_int(track.get("effective_downvote_count", track.get("downvote_count", 0)), 0) |
|
|
| like_weight_per_like = float(self._cfg("like_weight_per_like", LIKE_WEIGHT_PER_LIKE)) |
| like_weight_max = float(self._cfg("like_weight_max", LIKE_WEIGHT_MAX)) |
| upvote_weight_per_upvote = float(self._cfg("upvote_weight_per_upvote", UPVOTE_WEIGHT_PER_UPVOTE)) |
| upvote_weight_max = float(self._cfg("upvote_weight_max", UPVOTE_WEIGHT_MAX)) |
| downvote_weight_per_downvote = float(self._cfg("downvote_weight_per_downvote", DOWNVOTE_WEIGHT_PER_DOWNVOTE)) |
| downvote_weight_max = float(self._cfg("downvote_weight_max", DOWNVOTE_WEIGHT_MAX)) |
| track_weight_min = float(self._cfg("track_weight_min", TRACK_WEIGHT_MIN)) |
| track_weight_max = float(self._cfg("track_weight_max", TRACK_WEIGHT_MAX)) |
|
|
| like_bonus = min(max(0, like_count) * like_weight_per_like, max(0.0, like_weight_max - 1.0)) |
| upvote_bonus = min(max(0, upvote_count) * upvote_weight_per_upvote, upvote_weight_max) |
| downvote_penalty = min(max(0, downvote_count) * downvote_weight_per_downvote, downvote_weight_max) |
|
|
| weight = 1.0 + like_bonus + upvote_bonus - downvote_penalty |
|
|
| return max(track_weight_min, min(track_weight_max, weight)) |
|
|
| def _weighted_pop_track(self, tracks: list[dict]) -> dict: |
| weights = [self._feedback_weight_for_track(track) for track in tracks] |
| total_weight = sum(weights) |
|
|
| if total_weight <= 0: |
| selected_index = random.randrange(len(tracks)) |
| else: |
| pick = random.uniform(0, total_weight) |
| cumulative = 0.0 |
| selected_index = len(tracks) - 1 |
|
|
| for i, weight in enumerate(weights): |
| cumulative += weight |
|
|
| if pick <= cumulative: |
| selected_index = i |
| break |
|
|
| return tracks.pop(selected_index) |
|
|
| def _track_duration_seconds(self, track: dict) -> float: |
| try: |
| duration = float(track.get("duration") or DEFAULT_DURATION) |
| except Exception: |
| duration = DEFAULT_DURATION |
|
|
| return max(0.0, duration) |
|
|
| def _track_is_recent(self, track: dict, now: Optional[datetime] = None) -> bool: |
| recent_track_quota = float(self._cfg("recent_track_quota", RECENT_TRACK_QUOTA)) |
| recent_track_window_hours = int(self._cfg("recent_track_window_hours", RECENT_TRACK_WINDOW_HOURS)) |
|
|
| if recent_track_quota <= 0 or recent_track_window_hours <= 0: |
| return False |
|
|
| created_at = parse_datetime(track.get("created_at")) |
|
|
| if created_at is None: |
| return False |
|
|
| now = now or datetime.now(timezone.utc) |
| recent_cutoff = now - timedelta(hours=recent_track_window_hours) |
|
|
| return created_at >= recent_cutoff |
|
|
| def _average_jingle_duration_unlocked(self) -> float: |
| if not self._cfg("enable_jingles", ENABLE_JINGLES) or not self.jingles: |
| return 0.0 |
|
|
| durations = [] |
|
|
| for jingle in self.jingles: |
| try: |
| duration = float(jingle.get("duration") or 0) |
| except Exception: |
| duration = 0 |
|
|
| if duration > 0: |
| durations.append(duration) |
|
|
| if not durations: |
| return DEFAULT_JINGLE_DURATION |
|
|
| return sum(durations) / len(durations) |
|
|
| def _expected_songs_per_jingle(self) -> int: |
| min_songs_between_jingles = int(self._cfg("min_songs_between_jingles", MIN_SONGS_BETWEEN_JINGLES)) |
| max_songs_between_jingles = int(self._cfg("max_songs_between_jingles", MAX_SONGS_BETWEEN_JINGLES)) |
|
|
| if min_songs_between_jingles <= 0 and max_songs_between_jingles <= 0: |
| return 1 |
|
|
| expected = round((min_songs_between_jingles + max_songs_between_jingles) / 2) |
| return max(1, expected) |
|
|
| def _estimated_jingle_seconds_for_song_count(self, song_count: int, average_jingle_duration: float) -> float: |
| if not self._cfg("enable_jingles", ENABLE_JINGLES) or average_jingle_duration <= 0: |
| return 0.0 |
|
|
| expected_songs_per_jingle = self._expected_songs_per_jingle() |
| estimated_jingle_count = song_count // expected_songs_per_jingle |
|
|
| return estimated_jingle_count * average_jingle_duration |
|
|
| def _estimated_rotation_seconds(self, tracks: list[dict], average_jingle_duration: float, current_remaining_seconds: Optional[float] = None) -> float: |
| song_seconds = 0.0 |
|
|
| for i, track in enumerate(tracks): |
| if i == 0 and current_remaining_seconds is not None: |
| song_seconds += current_remaining_seconds |
| continue |
|
|
| try: |
| song_seconds += float(track.get("duration") or DEFAULT_DURATION) |
| except Exception: |
| song_seconds += DEFAULT_DURATION |
|
|
| return song_seconds + self._estimated_jingle_seconds_for_song_count( |
| len(tracks), |
| average_jingle_duration, |
| ) |
|
|
| def _load_track_from_meta_path(self, meta_path: str, song_id: str) -> dict: |
| track = self._make_basic_track(song_id) |
|
|
| try: |
| with self.fs.open(meta_path, "r") as f: |
| meta = json.load(f) |
|
|
| track["title"] = meta.get("title") or track["title"] |
| track["description"] = meta.get("description", "") |
| track["lyrics"] = meta.get("lyrics", "") |
| track["tags"] = normalize_tags(meta.get("tags", [])) |
| track["created_at"] = meta.get("created_at", "") |
|
|
| duration = meta.get("duration") |
|
|
| try: |
| track["duration"] = float(duration) if duration is not None else DEFAULT_DURATION |
| except Exception: |
| track["duration"] = DEFAULT_DURATION |
|
|
| track["audio_url"] = meta.get("audio_url") or track["audio_url"] |
| track["stream_url"] = track["audio_url"] |
| track["thumb_url"] = meta.get("thumb_url") or track["thumb_url"] |
| track["meta_loaded"] = True |
|
|
| except Exception as e: |
| print(f"[radio] Could not load metadata for {song_id} during refresh: {e}") |
| track["meta_loaded"] = True |
|
|
| track["like_count"] = likes.get_count(song_id) |
| track["upvote_count"] = likes.get_upvote_count(song_id) |
| track["downvote_count"] = likes.get_downvote_count(song_id) |
| track["report_count"] = likes.get_report_count(song_id) |
| self._apply_track_controls_unlocked(track) |
|
|
| return track |
|
|
| def refresh_jingles(self): |
| if not self._cfg("enable_jingles", ENABLE_JINGLES): |
| with self.lock: |
| self.jingles = [] |
| self.current_jingle = None |
|
|
| if self.current_kind == "jingle": |
| self.current_kind = "song" |
|
|
| print("[jingles] Disabled") |
| return |
|
|
| new_jingles = [] |
|
|
| jingles_dir = Path(JINGLES_DIR) |
| paths = sorted(jingles_dir.glob(JINGLES_GLOB)) |
|
|
| for path in paths: |
| if not path.is_file(): |
| continue |
|
|
| try: |
| new_jingles.append(self._make_jingle_track(path)) |
| except Exception as e: |
| print(f"[jingles] Could not load local jingle {path}: {e}") |
|
|
| if self._cfg("enable_remote_jingles", ENABLE_REMOTE_JINGLES): |
| remote_jingles_dir = Path(REMOTE_JINGLES_DIR) |
|
|
| if remote_jingles_dir.exists(): |
| for meta_path in sorted(remote_jingles_dir.glob("*/meta.json")): |
| try: |
| jingle = self._make_remote_jingle_track(meta_path) |
|
|
| if jingle: |
| new_jingles.append(jingle) |
| except Exception as e: |
| print(f"[jingles] Could not load remote jingle {meta_path}: {e}") |
| else: |
| print(f"[jingles] Remote jingles mount not found: {remote_jingles_dir}") |
|
|
| with self.lock: |
| current_jingle_id = self.current_jingle.get("id") if self.current_jingle else None |
| self.jingles = new_jingles |
|
|
| if self.current_kind == "jingle": |
| matching_jingle = next( |
| ( |
| jingle |
| for jingle in self.jingles |
| if jingle["id"] == current_jingle_id |
| ), |
| None, |
| ) |
|
|
| if matching_jingle: |
| self.current_jingle = matching_jingle |
| else: |
| self.current_kind = "song" |
| self.current_jingle = None |
|
|
| print( |
| f"[jingles] Loaded {len(new_jingles)} jingles " |
| f"local_dir={JINGLES_DIR}/{JINGLES_GLOB} remote_dir={REMOTE_JINGLES_DIR}" |
| ) |
|
|
| def refresh_segments(self): |
| if not self._cfg("enable_segments", ENABLE_SEGMENTS): |
| with self.lock: |
| self.segments = [] |
| self.current_segment = None |
|
|
| if self.current_kind == "segment": |
| self.current_kind = "song" |
|
|
| print("[segments] Disabled") |
| return |
|
|
| new_segments = [] |
| remote_segments_dir = Path(REMOTE_SEGMENTS_DIR) |
|
|
| if remote_segments_dir.exists(): |
| for meta_path in sorted(remote_segments_dir.glob("*/meta.json")): |
| try: |
| segment = self._make_segment_track(meta_path) |
|
|
| if segment: |
| new_segments.append(segment) |
| except Exception as e: |
| print(f"[segments] Could not load segment {meta_path}: {e}") |
| else: |
| print(f"[segments] Remote segments mount not found: {remote_segments_dir}") |
|
|
| with self.lock: |
| current_segment_id = self.current_segment.get("id") if self.current_segment else None |
| self.segments = new_segments |
|
|
| if self.current_kind == "segment": |
| matching_segment = next( |
| ( |
| segment |
| for segment in self.segments |
| if segment["id"] == current_segment_id |
| ), |
| None, |
| ) |
|
|
| if matching_segment: |
| self.current_segment = matching_segment |
| else: |
| self.current_kind = "song" |
| self.current_segment = None |
|
|
| print(f"[segments] Loaded {len(new_segments)} enabled segments from {REMOTE_SEGMENTS_DIR}") |
|
|
| def refresh_custom_tracks(self): |
| if not bool(self._cfg("enable_custom_tracks", ENABLE_CUSTOM_TRACKS)): |
| with self.lock: |
| self.custom_tracks = [] |
| self.custom_tracks_last_loaded_at = time.time() |
| self.custom_tracks_last_error = None |
|
|
| print("[custom-tracks] Disabled") |
| return |
|
|
| new_custom_tracks = [] |
| custom_tracks_error = None |
| custom_tracks_dir = Path(CUSTOM_TRACKS_DIR) |
|
|
| try: |
| if custom_tracks_dir.exists(): |
| for meta_path in sorted(custom_tracks_dir.glob("*/meta.json")): |
| try: |
| track = self._make_custom_track(meta_path) |
|
|
| if not track: |
| continue |
|
|
| if track.get("studio_hidden"): |
| continue |
|
|
| if self._song_is_hidden_by_reports(track.get("id")): |
| continue |
|
|
| new_custom_tracks.append(track) |
| except Exception as e: |
| print(f"[custom-tracks] Could not load custom track {meta_path}: {e}") |
| else: |
| print(f"[custom-tracks] Mount not found: {custom_tracks_dir}") |
| except Exception as e: |
| custom_tracks_error = str(e) |
| print(f"[custom-tracks] Could not refresh custom tracks: {e}") |
|
|
| with self.lock: |
| self.custom_tracks = new_custom_tracks |
| self.custom_tracks_last_loaded_at = time.time() |
| self.custom_tracks_last_error = custom_tracks_error |
|
|
| print(f"[custom-tracks] Loaded {len(new_custom_tracks)} enabled custom tracks from {CUSTOM_TRACKS_DIR}") |
|
|
| def _meta_path_for_song(self, song_id: str): |
| return f"{BUCKET_FS_ROOT}/songs/{song_id}/meta.json" |
|
|
| def _load_meta_for_track_unlocked(self, track: dict): |
| if not track: |
| return track |
|
|
| if track.get("is_jingle") or track.get("is_segment"): |
| return track |
|
|
| if track.get("is_custom_track"): |
| track["like_count"] = likes.get_count(track.get("id")) |
| track["upvote_count"] = likes.get_upvote_count(track.get("id")) |
| track["downvote_count"] = likes.get_downvote_count(track.get("id")) |
| track["report_count"] = likes.get_report_count(track.get("id")) |
| self._apply_track_controls_unlocked(track) |
| return track |
|
|
| song_id = track["id"] |
|
|
| if not track.get("meta_loaded"): |
| meta_path = self._meta_path_for_song(song_id) |
|
|
| try: |
| with self.fs.open(meta_path, "r") as f: |
| meta = json.load(f) |
|
|
| track["title"] = meta.get("title") or track["title"] |
| track["description"] = meta.get("description", "") |
| track["lyrics"] = meta.get("lyrics", "") |
| track["tags"] = normalize_tags(meta.get("tags", [])) |
| track["created_at"] = meta.get("created_at", "") |
|
|
| duration = meta.get("duration") |
| try: |
| track["duration"] = float(duration) if duration is not None else DEFAULT_DURATION |
| except Exception: |
| track["duration"] = DEFAULT_DURATION |
|
|
| track["audio_url"] = meta.get("audio_url") or track["audio_url"] |
| track["stream_url"] = track["audio_url"] |
| track["thumb_url"] = meta.get("thumb_url") or track["thumb_url"] |
| track["meta_loaded"] = True |
|
|
| except Exception as e: |
| print(f"[radio] Could not load metadata for {song_id}: {e}") |
| track["meta_loaded"] = True |
|
|
| track["like_count"] = likes.get_count(song_id) |
| track["upvote_count"] = likes.get_upvote_count(song_id) |
| track["downvote_count"] = likes.get_downvote_count(song_id) |
| track["report_count"] = likes.get_report_count(song_id) |
| self._apply_track_controls_unlocked(track) |
|
|
| return track |
|
|
| def refresh_tracks(self, prebuild: bool = False, target_at: Optional[datetime] = None): |
| label = "prebuilding next rotation" if prebuild else "refreshing active rotation" |
| print(f"[radio] {label}: {BUCKET_ID}") |
| print(f"[radio] Bucket FS root: {BUCKET_FS_ROOT}") |
|
|
| with self.lock: |
| if prebuild: |
| self.next_rotation_is_building = True |
| self.next_rotation_last_error = None |
| self.next_rotation_target_at = target_at.isoformat() if target_at else next_utc_hour_iso() |
| else: |
| self.is_loading = True |
| self.last_error = None |
|
|
| runtime_config = dict(self.runtime_config) |
| average_jingle_duration = self._average_jingle_duration_unlocked() |
| custom_tracks_snapshot = [dict(track) for track in self.custom_tracks] |
|
|
| max_tracks = int(runtime_config.get("max_tracks", MAX_TRACKS)) |
| rotation_target_seconds = int(runtime_config.get("rotation_target_seconds", ROTATION_TARGET_SECONDS)) |
| recent_track_quota = float(runtime_config.get("recent_track_quota", RECENT_TRACK_QUOTA)) |
| custom_track_quota = float(runtime_config.get("custom_track_quota", CUSTOM_TRACK_QUOTA)) |
| recently_played_exclude_minutes = int(runtime_config.get("recently_played_exclude_minutes", RECENTLY_PLAYED_EXCLUDE_MINUTES)) |
| recently_played_min_candidates_after_filter = int(runtime_config.get("recently_played_min_candidates_after_filter", RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER)) |
| meta_load_workers = int(runtime_config.get("meta_load_workers", META_LOAD_WORKERS)) |
|
|
| try: |
| glob_pattern = f"{BUCKET_FS_ROOT}/songs/*/meta.json" |
| print(f"[radio] Glob pattern: {glob_pattern}") |
|
|
| meta_paths = self.fs.glob(glob_pattern) |
|
|
| print(f"[radio] Found {len(meta_paths)} meta.json files") |
| print(f"[radio] First paths: {meta_paths[:5]}") |
|
|
| except Exception as e: |
| print(f"[radio] Could not list bucket {BUCKET_ID}: {e}") |
|
|
| with self.lock: |
| if prebuild: |
| self.next_rotation_is_building = False |
| self.next_rotation_last_error = str(e) |
| else: |
| self.is_loading = False |
| self.last_error = str(e) |
|
|
| if not self.tracks: |
| self.is_playing = False |
|
|
| return |
|
|
| song_meta_paths = {} |
|
|
| for meta_path in meta_paths: |
| try: |
| parts = meta_path.strip("/").split("/") |
| song_id = parts[-2] |
|
|
| if song_id: |
| song_meta_paths[song_id] = meta_path |
|
|
| except Exception as e: |
| print(f"[radio] Could not parse path {meta_path}: {e}") |
|
|
| all_song_ids = sorted(song_meta_paths.keys()) |
| all_custom_track_ids = {str(track.get("id")) for track in custom_tracks_snapshot if track.get("id")} |
|
|
| with self.lock: |
| old_track_id = self._get_current_song_id_unlocked() |
| old_current_kind = self.current_kind |
| old_started_at = self.started_at |
| recently_played_ids = self._recently_played_exclusion_set_unlocked() |
|
|
| before_filter_count = len(all_song_ids) |
|
|
| candidate_song_ids = [ |
| song_id |
| for song_id in all_song_ids |
| if not self._song_is_hidden_by_reports(song_id) |
| ] |
|
|
| filtered_hidden_count = before_filter_count - len(candidate_song_ids) |
|
|
| if filtered_hidden_count: |
| print( |
| f"[radio] Filtered out {filtered_hidden_count} tracks " |
| f"by reports or studio overrides" |
| ) |
|
|
| before_score_floor_count = len(candidate_song_ids) |
| candidate_song_ids = [ |
| song_id |
| for song_id in candidate_song_ids |
| if not self._track_is_below_auto_rotation_score_floor_unlocked(song_id) |
| ] |
| filtered_low_score_count = before_score_floor_count - len(candidate_song_ids) |
|
|
| if filtered_low_score_count: |
| threshold = float(runtime_config.get( |
| "auto_rotation_min_effective_score_exclusive", |
| AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE, |
| )) |
| print( |
| f"[radio] Filtered out {filtered_low_score_count} low-score tracks " |
| f"effective_score <= {threshold:g}" |
| ) |
|
|
| if old_track_id: |
| recently_played_ids.discard(old_track_id) |
|
|
| recently_played_filtered_count = 0 |
|
|
| if recently_played_ids: |
| filtered_candidate_song_ids = [ |
| song_id |
| for song_id in candidate_song_ids |
| if song_id not in recently_played_ids |
| ] |
|
|
| if ( |
| len(filtered_candidate_song_ids) >= recently_played_min_candidates_after_filter |
| or len(filtered_candidate_song_ids) >= len(candidate_song_ids) |
| ): |
| recently_played_filtered_count = len(candidate_song_ids) - len(filtered_candidate_song_ids) |
| candidate_song_ids = filtered_candidate_song_ids |
|
|
| if recently_played_filtered_count: |
| print( |
| f"[radio] Filtered out {recently_played_filtered_count} recently played tracks " |
| f"window={recently_played_exclude_minutes}min " |
| f"remaining_candidates={len(candidate_song_ids)}" |
| ) |
| else: |
| print( |
| f"[radio] Skipped recently played exclusion because too few candidates would remain " |
| f"remaining={len(filtered_candidate_song_ids)} " |
| f"min={recently_played_min_candidates_after_filter} " |
| f"window={recently_played_exclude_minutes}min" |
| ) |
|
|
| preserve_current_track = ( |
| not prebuild |
| and old_current_kind == "song" |
| and old_track_id |
| and (old_track_id in all_song_ids or old_track_id in all_custom_track_ids) |
| ) |
|
|
| preserved_track = None |
| current_remaining_seconds = None |
|
|
| if preserve_current_track: |
| if old_track_id in song_meta_paths: |
| preserved_track = self._load_track_from_meta_path( |
| song_meta_paths[old_track_id], |
| old_track_id, |
| ) |
| else: |
| preserved_track = next( |
| (dict(track) for track in custom_tracks_snapshot if track.get("id") == old_track_id), |
| None, |
| ) |
|
|
| if preserved_track is None: |
| preserve_current_track = False |
| else: |
| self._apply_track_controls_unlocked(preserved_track) |
|
|
| duration = float((preserved_track or {}).get("duration") or DEFAULT_DURATION) |
|
|
| if old_started_at is not None: |
| current_elapsed = max(0.0, time.time() - old_started_at) |
| current_remaining_seconds = max(0.0, duration - current_elapsed) |
| else: |
| current_remaining_seconds = duration |
|
|
| if self._song_is_hidden_by_reports(old_track_id): |
| print( |
| f"[radio] Preserving currently playing hidden/reported track until it finishes: " |
| f"{old_track_id}" |
| ) |
|
|
| remaining_song_ids = [ |
| song_id |
| for song_id in candidate_song_ids |
| if not preserved_track or song_id != preserved_track["id"] |
| ] |
|
|
| candidate_tracks = [] |
|
|
| if remaining_song_ids: |
| worker_count = max(1, min(meta_load_workers, len(remaining_song_ids))) |
| print(f"[radio] Loading metadata for {len(remaining_song_ids)} candidate tracks with {worker_count} workers") |
|
|
| with ThreadPoolExecutor(max_workers=worker_count) as executor: |
| futures = { |
| executor.submit( |
| self._load_track_from_meta_path, |
| song_meta_paths[song_id], |
| song_id, |
| ): song_id |
| for song_id in remaining_song_ids |
| } |
|
|
| for future in as_completed(futures): |
| song_id = futures[future] |
|
|
| try: |
| candidate_tracks.append(future.result()) |
| except Exception as e: |
| print(f"[radio] Could not load candidate track {song_id}: {e}") |
|
|
| now = datetime.now(timezone.utc) |
| recent_candidates = [ |
| track |
| for track in candidate_tracks |
| if self._track_is_recent(track, now=now) |
| ] |
| regular_candidates = [ |
| track |
| for track in candidate_tracks |
| if not self._track_is_recent(track, now=now) |
| ] |
|
|
| recent_target_seconds = max(0.0, rotation_target_seconds * max(0.0, min(1.0, recent_track_quota))) |
| selected_tracks = [] |
| selected_recent_seconds = 0.0 |
| selected_recent_count = 0 |
|
|
| if preserved_track: |
| selected_tracks.append(preserved_track) |
|
|
| if self._track_is_recent(preserved_track, now=now): |
| selected_recent_seconds += self._track_duration_seconds(preserved_track) |
| selected_recent_count += 1 |
|
|
| custom_candidates = [ |
| dict(track) |
| for track in custom_tracks_snapshot |
| if track.get("id") |
| and not (preserved_track and track.get("id") == preserved_track.get("id")) |
| and not track.get("studio_hidden") |
| and not self._song_is_hidden_by_reports(track.get("id")) |
| and not self._track_is_below_auto_rotation_score_floor_unlocked(track) |
| and track.get("id") not in recently_played_ids |
| ] |
| custom_target_seconds = max(0.0, rotation_target_seconds * max(0.0, min(1.0, custom_track_quota))) |
| selected_custom_seconds = 0.0 |
| selected_custom_count = 0 |
|
|
| while custom_candidates and len(selected_tracks) < max_tracks and selected_custom_seconds < custom_target_seconds: |
| estimated_seconds = self._estimated_rotation_seconds( |
| selected_tracks, |
| average_jingle_duration, |
| current_remaining_seconds=current_remaining_seconds if preserved_track else None, |
| ) |
|
|
| if selected_tracks and estimated_seconds >= rotation_target_seconds: |
| break |
|
|
| track = self._weighted_pop_track(custom_candidates) |
| selected_tracks.append(track) |
| selected_custom_seconds += self._track_duration_seconds(track) |
| selected_custom_count += 1 |
|
|
| while ( |
| recent_candidates |
| and len(selected_tracks) < max_tracks |
| and selected_recent_seconds < recent_target_seconds |
| ): |
| estimated_seconds = self._estimated_rotation_seconds( |
| selected_tracks, |
| average_jingle_duration, |
| current_remaining_seconds=current_remaining_seconds if preserved_track else None, |
| ) |
|
|
| if selected_tracks and estimated_seconds >= rotation_target_seconds: |
| break |
|
|
| track = self._weighted_pop_track(recent_candidates) |
| selected_tracks.append(track) |
| selected_recent_seconds += self._track_duration_seconds(track) |
| selected_recent_count += 1 |
|
|
| remaining_tracks = regular_candidates + recent_candidates |
|
|
| while remaining_tracks and len(selected_tracks) < max_tracks: |
| estimated_seconds = self._estimated_rotation_seconds( |
| selected_tracks, |
| average_jingle_duration, |
| current_remaining_seconds=current_remaining_seconds if preserved_track else None, |
| ) |
|
|
| if selected_tracks and estimated_seconds >= rotation_target_seconds: |
| break |
|
|
| track = self._weighted_pop_track(remaining_tracks) |
| selected_tracks.append(track) |
|
|
| if self._track_is_recent(track, now=now): |
| selected_recent_seconds += self._track_duration_seconds(track) |
| selected_recent_count += 1 |
|
|
| if preserved_track: |
| preserved_id = preserved_track["id"] |
| non_preserved_tracks = [ |
| track |
| for track in selected_tracks |
| if track["id"] != preserved_id |
| ] |
| random.shuffle(non_preserved_tracks) |
| selected_tracks = [preserved_track] + non_preserved_tracks |
| else: |
| random.shuffle(selected_tracks) |
|
|
| estimated_seconds = self._estimated_rotation_seconds( |
| selected_tracks, |
| average_jingle_duration, |
| current_remaining_seconds=current_remaining_seconds if preserved_track else None, |
| ) |
|
|
| print( |
| f"[radio] Using {len(selected_tracks)} tracks for rotation pack " |
| f"estimated_seconds={estimated_seconds:.1f}/{rotation_target_seconds} " |
| f"max_tracks={max_tracks} avg_jingle={average_jingle_duration:.1f}s " |
| f"recent={selected_recent_count} tracks/{selected_recent_seconds:.1f}s " |
| f"recent_quota={recent_track_quota:.2f} " |
| f"recently_played_filtered={recently_played_filtered_count} " |
| f"hidden_filtered={filtered_hidden_count} " |
| f"low_score_filtered={filtered_low_score_count} " |
| f"custom={selected_custom_count} tracks/{selected_custom_seconds:.1f}s quota={custom_track_quota:.2f} " |
| f"available_custom={len(custom_tracks_snapshot)} " |
| f"track_overrides={len(self.track_overrides)} " |
| f"cover_overrides={self.cover_overrides_count} " |
| f"jingles={len(self.jingles)} segments={len(self.segments)}" |
| ) |
|
|
| new_tracks = selected_tracks |
|
|
| if prebuild: |
| with self.lock: |
| self.next_tracks = [dict(track) for track in new_tracks] |
| self.next_rotation_ready = False |
| self.next_rotation_is_building = False |
| self.next_rotation_built_at = time.time() |
| self.next_rotation_target_at = target_at.isoformat() if target_at else next_utc_hour_iso() |
| self.next_rotation_last_error = None |
| self.next_rotation_source = "auto_prebuild" |
| self.next_rotation_payload = None |
| print( |
| f"[radio] Prebuilt next rotation tracks={len(new_tracks)} " |
| f"target_at={self.next_rotation_target_at}" |
| ) |
| return |
|
|
| with self.lock: |
| live_current_kind = self.current_kind |
| live_current_jingle = self.current_jingle |
| live_current_segment = self.current_segment |
| live_started_at = self.started_at |
| live_is_playing = self.is_playing |
| live_track_id = self._get_current_song_id_unlocked() |
|
|
| live_track_snapshot = None |
|
|
| if live_track_id and self.current_index is not None and self.tracks: |
| try: |
| live_track_snapshot = dict(self.tracks[self.current_index]) |
| except Exception: |
| live_track_snapshot = None |
|
|
| |
| |
| if live_track_id: |
| existing_live_index = next( |
| ( |
| i |
| for i, track in enumerate(new_tracks) |
| if track.get("id") == live_track_id |
| ), |
| None, |
| ) |
|
|
| if existing_live_index is None and live_track_snapshot: |
| new_tracks = [ |
| live_track_snapshot, |
| *[ |
| track |
| for track in new_tracks |
| if track.get("id") != live_track_id |
| ], |
| ] |
|
|
| print( |
| f"[radio] Reinserted live track after refresh race: {live_track_id}" |
| ) |
|
|
| self.tracks = new_tracks |
|
|
| if not self.tracks: |
| self.current_index = None |
| self.started_at = None |
| self.is_playing = False |
| self.current_kind = "song" |
| self.current_jingle = None |
| self.current_segment = None |
|
|
| elif live_current_kind == "jingle" and live_current_jingle: |
| self.current_index = None |
| self.started_at = live_started_at or time.time() |
| self.is_playing = live_is_playing or True |
| self.current_kind = "jingle" |
| self.current_jingle = live_current_jingle |
| self.current_segment = None |
|
|
| print("[radio] Kept current live jingle after refresh") |
|
|
| elif live_current_kind == "segment" and live_current_segment: |
| self.current_index = None |
| self.started_at = live_started_at or time.time() |
| self.is_playing = live_is_playing or True |
| self.current_kind = "segment" |
| self.current_jingle = None |
| self.current_segment = live_current_segment |
|
|
| print("[radio] Kept current live segment after refresh") |
|
|
| elif live_current_kind == "song" and live_track_id: |
| matching_index = next( |
| ( |
| i |
| for i, track in enumerate(self.tracks) |
| if track.get("id") == live_track_id |
| ), |
| None, |
| ) |
|
|
| if matching_index is not None: |
| self.current_index = matching_index |
| self.started_at = live_started_at or time.time() |
| self.is_playing = live_is_playing or True |
| self.current_kind = "song" |
| self.current_jingle = None |
| self.current_segment = None |
|
|
| print(f"[radio] Kept current live track after refresh: {live_track_id}") |
|
|
| else: |
| self.current_index = 0 |
| self.started_at = time.time() |
| self.is_playing = True |
| self.current_kind = "song" |
| self.current_jingle = None |
| self.current_segment = None |
|
|
| print("[radio] Live track not found after refresh; switched to first track") |
|
|
| else: |
| self.current_index = 0 |
| self.started_at = time.time() |
| self.is_playing = True |
| self.current_kind = "song" |
| self.current_jingle = None |
| self.current_segment = None |
|
|
| if self.current_index is not None and self.tracks: |
| self._load_meta_for_track_unlocked(self.tracks[self.current_index]) |
|
|
| self.is_loading = False |
| self.last_error = None |
| self.last_refresh_at = time.time() |
|
|
| print(f"[radio] Loaded {len(new_tracks)} tracks") |
|
|
| def has_track(self, song_id: str) -> bool: |
| with self.lock: |
| return any(track["id"] == song_id for track in self.tracks) |
|
|
| def _choose_next_index_unlocked(self): |
| """ |
| The rotation pack is already selected, weighted, and shuffled during |
| refresh_tracks(). Playback should consume that pack sequentially. |
| |
| This avoids repeats inside the hour and makes the 60-minute rotation |
| pack meaningful. RADIO_MODE is kept for API/status compatibility, but |
| randomization now happens when the pack is built, not between songs. |
| """ |
| if not self.tracks: |
| return None |
|
|
| if self.current_index is None: |
| return 0 |
|
|
| return (self.current_index + 1) % len(self.tracks) |
|
|
| def _choose_jingle_unlocked(self): |
| if not self.jingles: |
| return None |
|
|
| if len(self.jingles) == 1: |
| return self.jingles[0] |
|
|
| choices = [ |
| jingle |
| for jingle in self.jingles |
| if jingle.get("id") != self.last_jingle_id |
| ] |
|
|
| return random.choice(choices or self.jingles) |
|
|
| def _choose_segment_unlocked(self): |
| if not self.segments: |
| return None |
|
|
| if len(self.segments) == 1: |
| return self.segments[0] |
|
|
| choices = [ |
| segment |
| for segment in self.segments |
| if segment.get("id") != self.last_segment_id |
| ] |
|
|
| return random.choice(choices or self.segments) |
|
|
| def _should_insert_jingle_unlocked(self): |
| if not self._cfg("enable_jingles", ENABLE_JINGLES): |
| return False |
|
|
| if not self.jingles: |
| return False |
|
|
| min_songs_between_jingles = int(self._cfg("min_songs_between_jingles", MIN_SONGS_BETWEEN_JINGLES)) |
| max_songs_between_jingles = int(self._cfg("max_songs_between_jingles", MAX_SONGS_BETWEEN_JINGLES)) |
| jingle_probability_after_min = float(self._cfg("jingle_probability_after_min", JINGLE_PROBABILITY_AFTER_MIN)) |
|
|
| if self.songs_since_last_jingle < min_songs_between_jingles: |
| return False |
|
|
| if self.songs_since_last_jingle >= max_songs_between_jingles: |
| return True |
|
|
| return random.random() < jingle_probability_after_min |
|
|
| def _should_insert_segment_unlocked(self): |
| if not self._cfg("enable_segments", ENABLE_SEGMENTS): |
| return False |
|
|
| if not self.segments: |
| return False |
|
|
| min_songs_between_segments = int(self._cfg("min_songs_between_segments", MIN_SONGS_BETWEEN_SEGMENTS)) |
| max_songs_between_segments = int(self._cfg("max_songs_between_segments", MAX_SONGS_BETWEEN_SEGMENTS)) |
| segment_probability_after_min = float(self._cfg("segment_probability_after_min", SEGMENT_PROBABILITY_AFTER_MIN)) |
|
|
| if self.songs_since_last_segment < min_songs_between_segments: |
| return False |
|
|
| if self.songs_since_last_segment >= max_songs_between_segments: |
| return True |
|
|
| return random.random() < segment_probability_after_min |
|
|
| def _start_jingle_unlocked(self): |
| jingle = self._choose_jingle_unlocked() |
|
|
| if not jingle: |
| return False |
|
|
| self.current_kind = "jingle" |
| self.current_jingle = jingle |
| self.current_segment = None |
| self.last_jingle_id = jingle["id"] |
| self.songs_since_last_jingle = 0 |
|
|
| print(f"[jingles] Inserted jingle: {jingle['id']} duration={jingle.get('duration')}") |
| return True |
|
|
| def _start_segment_unlocked(self): |
| segment = self._choose_segment_unlocked() |
|
|
| if not segment: |
| return False |
|
|
| self.current_kind = "segment" |
| self.current_jingle = None |
| self.current_segment = segment |
| self.last_segment_id = segment["id"] |
| self.songs_since_last_segment = 0 |
|
|
| print(f"[segments] Inserted segment: {segment['id']} duration={segment.get('duration')}") |
| return True |
|
|
| def _start_next_song_unlocked(self): |
| next_index = self._choose_next_index_unlocked() |
|
|
| if next_index is None: |
| self.current_index = None |
| self.current_kind = "song" |
| self.current_jingle = None |
| self.current_segment = None |
| self.is_playing = False |
| return |
|
|
| self.current_index = next_index |
| self.current_kind = "song" |
| self.current_jingle = None |
| self.current_segment = None |
| self.is_playing = True |
|
|
| next_track = self.tracks[self.current_index] |
|
|
| |
| |
| |
| self._reload_track_overrides_unlocked() |
| self._load_meta_for_track_unlocked(next_track) |
|
|
| print(f"[radio] Advanced to: {next_track['id']} - {next_track.get('title')}") |
|
|
| def _get_current_item_unlocked(self): |
| if self.current_kind == "jingle" and self.current_jingle: |
| return self.current_jingle |
|
|
| if self.current_kind == "segment" and self.current_segment: |
| return self.current_segment |
|
|
| if self.current_index is None or not self.tracks: |
| return None |
|
|
| if self.current_index >= len(self.tracks): |
| self.current_index = 0 |
|
|
| track = self.tracks[self.current_index] |
| self._load_meta_for_track_unlocked(track) |
| return track |
|
|
| def _get_current_song_id_unlocked(self) -> Optional[str]: |
| if self.current_index is None or not self.tracks: |
| return None |
|
|
| if self.current_index >= len(self.tracks): |
| return None |
|
|
| try: |
| track = self.tracks[self.current_index] |
| except Exception: |
| return None |
|
|
| if not track or track.get("is_jingle") or track.get("is_segment"): |
| return None |
|
|
| return track.get("id") |
|
|
| def _next_rotation_control_path(self) -> Path: |
| return Path(RADIO_CONTROL_DIR) / RADIO_NEXT_ROTATION_FILE |
|
|
| def _next_rotation_archive_dir(self) -> Path: |
| return Path(RADIO_CONTROL_DIR) / RADIO_NEXT_ROTATION_ARCHIVE_DIR |
|
|
| def _target_hour_key(self, value: Optional[datetime]) -> Optional[datetime]: |
| if value is None: |
| return None |
|
|
| return value.astimezone(timezone.utc).replace(minute=0, second=0, microsecond=0) |
|
|
| def _approved_next_rotation_for_target(self, target_at: datetime): |
| path = self._next_rotation_control_path() |
| target_key = self._target_hour_key(target_at) |
|
|
| if not path.exists(): |
| return None, [] |
|
|
| try: |
| payload = json.loads(path.read_text(encoding="utf-8")) |
|
|
| if not isinstance(payload, dict): |
| raise ValueError("next_rotation.json must contain a JSON object") |
|
|
| status = str(payload.get("status") or "").strip().lower() |
| if status != "approved": |
| raise ValueError(f"next_rotation.json status must be approved, got {status or 'empty'}") |
|
|
| raw_target = ( |
| payload.get("target_hour_utc") |
| or payload.get("target_at") |
| or payload.get("target_rotation_at") |
| or payload.get("target") |
| ) |
| payload_target = parse_datetime(raw_target) |
| payload_key = self._target_hour_key(payload_target) |
|
|
| if payload_key is None or target_key is None or payload_key != target_key: |
| raise ValueError( |
| f"next_rotation.json target mismatch: payload={raw_target} expected={target_at.isoformat()}" |
| ) |
|
|
| track_ids = payload.get("track_ids") |
|
|
| if not isinstance(track_ids, list): |
| tracks = payload.get("tracks", []) |
| if isinstance(tracks, list): |
| track_ids = [ |
| str(item.get("id") or item.get("song_id") or "").strip() |
| for item in tracks |
| if isinstance(item, dict) |
| ] |
| else: |
| track_ids = [] |
|
|
| clean_track_ids = [] |
| seen = set() |
|
|
| for track_id in track_ids: |
| track_id = str(track_id or "").strip() |
|
|
| if not track_id or track_id in seen: |
| continue |
|
|
| seen.add(track_id) |
| clean_track_ids.append(track_id) |
|
|
| if not clean_track_ids: |
| raise ValueError("next_rotation.json does not contain any usable track_ids") |
|
|
| self.approved_next_rotation_last_loaded_at = time.time() |
| self.approved_next_rotation_last_error = None |
| return payload, clean_track_ids |
|
|
| except Exception as e: |
| self.approved_next_rotation_last_loaded_at = time.time() |
| self.approved_next_rotation_last_error = str(e) |
| print(f"[next-rotation] Could not use approved rotation {path}: {e}") |
| return None, [] |
|
|
| def _archive_consumed_next_rotation_control_unlocked(self, activated_at: Optional[datetime] = None): |
| path = self._next_rotation_control_path() |
|
|
| if not path.exists(): |
| return |
|
|
| try: |
| archive_dir = self._next_rotation_archive_dir() |
| archive_dir.mkdir(parents=True, exist_ok=True) |
| stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") |
| archive_path = archive_dir / f"next_rotation_consumed_{stamp}.json" |
| consumed_at = (activated_at or datetime.now(timezone.utc)).replace(microsecond=0).isoformat() |
| payload = json.loads(path.read_text(encoding="utf-8")) |
| if not isinstance(payload, dict): |
| payload = {} |
| payload["status"] = "consumed" |
| payload["consumed_at"] = consumed_at |
| payload["activated_at"] = consumed_at |
| payload["activated_source"] = "studio_approved" |
| payload["current_rotation_track_ids"] = list(self.current_rotation_track_ids or []) |
| archive_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8") |
| path.unlink(missing_ok=True) |
| self.approved_next_rotation_last_consumed_at = time.time() |
| print(f"[next-rotation] Archived consumed approved rotation to {archive_path}") |
| except Exception as e: |
| self.approved_next_rotation_last_error = str(e) |
| print(f"[next-rotation] Could not archive consumed next rotation: {e}") |
|
|
| def prebuild_approved_next_rotation(self, target_at: datetime) -> bool: |
| with self.lock: |
| self.next_rotation_is_building = True |
| self.next_rotation_last_error = None |
| self.next_rotation_target_at = target_at.isoformat() |
|
|
| payload, track_ids = self._approved_next_rotation_for_target(target_at) |
|
|
| if not payload or not track_ids: |
| with self.lock: |
| self.next_rotation_is_building = False |
| return False |
|
|
| try: |
| glob_pattern = f"{BUCKET_FS_ROOT}/songs/*/meta.json" |
| meta_paths = self.fs.glob(glob_pattern) |
| song_meta_paths = {} |
|
|
| for meta_path in meta_paths: |
| try: |
| parts = meta_path.strip("/").split("/") |
| song_id = parts[-2] |
|
|
| if song_id: |
| song_meta_paths[song_id] = meta_path |
| except Exception as e: |
| print(f"[next-rotation] Could not parse path {meta_path}: {e}") |
|
|
| with self.lock: |
| custom_tracks_snapshot = [dict(track) for track in self.custom_tracks] |
|
|
| custom_tracks_by_id = { |
| str(track.get("id")): dict(track) |
| for track in custom_tracks_snapshot |
| if track.get("id") |
| } |
|
|
| selected_tracks = [] |
| missing_ids = [] |
| hidden_ids = [] |
|
|
| for track_id in track_ids: |
| if self._song_is_hidden_by_reports(track_id): |
| hidden_ids.append(track_id) |
| continue |
|
|
| if track_id in song_meta_paths: |
| track = self._load_track_from_meta_path(song_meta_paths[track_id], track_id) |
| elif track_id in custom_tracks_by_id: |
| track = dict(custom_tracks_by_id[track_id]) |
| self._apply_track_controls_unlocked(track) |
| else: |
| missing_ids.append(track_id) |
| continue |
|
|
| selected_tracks.append(track) |
|
|
| if not selected_tracks: |
| raise ValueError( |
| "approved next rotation resolved to zero playable tracks " |
| f"(missing={len(missing_ids)} hidden={len(hidden_ids)})" |
| ) |
|
|
| with self.lock: |
| self.next_tracks = selected_tracks |
| self.next_rotation_ready = False |
| self.next_rotation_is_building = False |
| self.next_rotation_built_at = time.time() |
| self.next_rotation_target_at = target_at.isoformat() |
| self.next_rotation_last_error = None |
| self.next_rotation_source = "studio_approved" |
| self.next_rotation_payload = payload |
|
|
| print( |
| f"[next-rotation] Prebuilt approved rotation tracks={len(selected_tracks)} " |
| f"target_at={target_at.isoformat()} missing={len(missing_ids)} hidden={len(hidden_ids)}" |
| ) |
|
|
| return True |
|
|
| except Exception as e: |
| with self.lock: |
| self.next_rotation_is_building = False |
| self.next_rotation_last_error = str(e) |
| self.approved_next_rotation_last_error = str(e) |
|
|
| print(f"[next-rotation] Approved prebuild failed: {e}") |
| return False |
|
|
| def mark_next_rotation_ready(self, target_at: Optional[datetime] = None): |
| with self.lock: |
| if self.next_tracks: |
| self.next_rotation_ready = True |
| self.next_rotation_target_at = target_at.isoformat() if target_at else next_utc_hour_iso() |
| print( |
| f"[radio] Next rotation marked ready " |
| f"tracks={len(self.next_tracks)} target_at={self.next_rotation_target_at}" |
| ) |
| return True |
|
|
| self.next_rotation_ready = False |
| print("[radio] No prebuilt next rotation available to mark ready") |
| return False |
|
|
| def _activate_next_rotation_unlocked(self) -> bool: |
| if not self.next_rotation_ready or not self.next_tracks: |
| return False |
|
|
| activated_source = self.next_rotation_source |
| activated_payload = dict(self.next_rotation_payload) if isinstance(self.next_rotation_payload, dict) else None |
| activated_target_at = self.next_rotation_target_at |
| activated_at_dt = datetime.now(timezone.utc).replace(microsecond=0) |
| self.tracks = [dict(track) for track in self.next_tracks] |
| self.current_rotation_source = activated_source or "prebuilt" |
| self.current_rotation_target_at = activated_target_at |
| self.current_rotation_activated_at = activated_at_dt.isoformat() |
| self.current_rotation_track_ids = [str(track.get("id") or "").strip() for track in self.tracks if str(track.get("id") or "").strip()] |
| self.next_tracks = [] |
| self.next_rotation_ready = False |
| self.next_rotation_is_building = False |
| self.next_rotation_source = None |
| self.next_rotation_payload = None |
| self.current_kind = "song" |
| self.current_jingle = None |
| self.current_segment = None |
| self.current_index = 0 if self.tracks else None |
| self.is_playing = bool(self.tracks) |
| self.started_at = time.time() if self.tracks else None |
| self.songs_since_last_jingle = 0 |
| self.songs_since_last_segment = 0 |
| self.last_rotation_activated_at = time.time() |
|
|
| if self.current_index is not None: |
| self._reload_track_overrides_unlocked() |
| self._load_meta_for_track_unlocked(self.tracks[self.current_index]) |
| print( |
| f"[radio] Activated prebuilt rotation " |
| f"tracks={len(self.tracks)} first={self.tracks[self.current_index].get('id')} " |
| f"source={activated_source or 'unknown'}" |
| ) |
| else: |
| print("[radio] Activated empty prebuilt rotation") |
|
|
| if activated_source == "studio_approved": |
| self._archive_consumed_next_rotation_control_unlocked(activated_at_dt) |
|
|
| return True |
|
|
| def _advance_one_item_unlocked(self): |
| if self.current_kind in {"jingle", "segment"}: |
| if self._activate_next_rotation_unlocked(): |
| return |
|
|
| self._start_next_song_unlocked() |
| return |
|
|
| current_item = self._get_current_item_unlocked() |
|
|
| if current_item and not current_item.get("is_jingle") and not current_item.get("is_segment"): |
| self._remember_played_song_unlocked(current_item.get("id")) |
|
|
| if self._activate_next_rotation_unlocked(): |
| return |
|
|
| self.songs_since_last_jingle += 1 |
| self.songs_since_last_segment += 1 |
|
|
| if self._should_insert_segment_unlocked(): |
| inserted = self._start_segment_unlocked() |
|
|
| if inserted: |
| return |
|
|
| if self._should_insert_jingle_unlocked(): |
| inserted = self._start_jingle_unlocked() |
|
|
| if inserted: |
| return |
|
|
| self._start_next_song_unlocked() |
|
|
| def _public_track_for_status_unlocked(self, item: Optional[dict]): |
| if not item: |
| return None |
|
|
| public_item = dict(item) |
|
|
| |
| |
| if public_item.get("is_jingle"): |
| public_item["id"] = "" |
| public_item["audio_url"] = "" |
| public_item["like_count"] = 0 |
| public_item["upvote_count"] = 0 |
| public_item["downvote_count"] = 0 |
| public_item["report_count"] = 0 |
| public_item["thumb_url"] = "" |
| public_item["source_url"] = "" |
| return public_item |
|
|
| if public_item.get("is_segment"): |
| public_item["id"] = "" |
| public_item["audio_url"] = "" |
| public_item["like_count"] = 0 |
| public_item["upvote_count"] = 0 |
| public_item["downvote_count"] = 0 |
| public_item["report_count"] = 0 |
| public_item["thumb_url"] = "" |
| return public_item |
|
|
| |
| |
| |
| |
| |
| |
| public_item["like_count"] = safe_int( |
| public_item.get("effective_like_count", public_item.get("like_count", 0)), |
| 0, |
| ) |
| public_item["upvote_count"] = safe_int( |
| public_item.get("effective_upvote_count", public_item.get("upvote_count", 0)), |
| 0, |
| ) |
| public_item["downvote_count"] = safe_int( |
| public_item.get("effective_downvote_count", public_item.get("downvote_count", 0)), |
| 0, |
| ) |
|
|
| |
| public_item["report_count"] = safe_int( |
| public_item.get("report_count", 0), |
| 0, |
| ) |
|
|
| return public_item |
|
|
| def get_current_stream_item(self): |
| with self.lock: |
| item = self._get_current_item_unlocked() |
|
|
| if not item: |
| return None, 0 |
|
|
| elapsed = 0 |
|
|
| if self.started_at is not None: |
| elapsed = max(0, time.time() - self.started_at) |
|
|
| duration = item.get("duration") or DEFAULT_DURATION |
|
|
| try: |
| duration = float(duration) |
| except Exception: |
| duration = DEFAULT_DURATION |
|
|
| if duration > 2: |
| elapsed = min(elapsed, duration - 1) |
|
|
| return dict(item), elapsed |
|
|
| def advance_if_current(self, item_id: str) -> bool: |
| with self.lock: |
| item = self._get_current_item_unlocked() |
|
|
| if not item: |
| return False |
|
|
| if item.get("id") != item_id: |
| return False |
|
|
| self.started_at = time.time() |
| self._advance_one_item_unlocked() |
| return True |
|
|
| def set_mode(self, mode: str): |
| if mode not in {"random", "loop"}: |
| mode = "random" |
|
|
| with self.lock: |
| self.mode = mode |
|
|
| def _status_unlocked(self): |
| item = self._get_current_item_unlocked() |
|
|
| elapsed = 0 |
|
|
| if item and self.started_at is not None: |
| elapsed = max(0, time.time() - self.started_at) |
|
|
| try: |
| duration = float(item.get("duration") or DEFAULT_DURATION) |
| except Exception: |
| duration = DEFAULT_DURATION |
|
|
| if duration > 0: |
| elapsed = min(elapsed, duration) |
|
|
| listener_snapshot = listeners.snapshot() |
|
|
| if item and item.get("is_jingle"): |
| now_playing_type = "jingle" |
| elif item and item.get("is_segment"): |
| now_playing_type = "segment" |
| else: |
| now_playing_type = "song" |
|
|
| runtime_config = dict(self.runtime_config) |
|
|
| return { |
| "bucket_id": BUCKET_ID, |
| "bucket_fs_root": BUCKET_FS_ROOT, |
| "bucket_resolve_url": BUCKET_RESOLVE_URL, |
| "mode": self.mode, |
| "track_count": len(self.tracks), |
| "jingle_count": len(self.jingles), |
| "segment_count": len(self.segments), |
| "custom_track_count": len(self.custom_tracks), |
| "now_playing_type": now_playing_type, |
| "current_index": self.current_index, |
| "elapsed": elapsed, |
| "is_playing": self.is_playing, |
| "is_loading": self.is_loading, |
| "last_error": self.last_error, |
| "last_refresh_at": self.last_refresh_at, |
| "next_refresh_at": next_utc_hour_iso(), |
| "next_rotation_prebuild_at": next_rotation_prebuild_at_iso(), |
| "rotation_prebuild_lead_seconds": ROTATION_PREBUILD_LEAD_SECONDS, |
| "refresh_mode": REFRESH_MODE, |
| "next_rotation_ready": self.next_rotation_ready, |
| "next_rotation_is_building": self.next_rotation_is_building, |
| "next_rotation_count": len(self.next_tracks), |
| "next_rotation_source": self.next_rotation_source, |
| "next_rotation_control_file": str(Path(RADIO_CONTROL_DIR) / RADIO_NEXT_ROTATION_FILE), |
| "approved_next_rotation_last_loaded_at": self.approved_next_rotation_last_loaded_at, |
| "approved_next_rotation_last_error": self.approved_next_rotation_last_error, |
| "approved_next_rotation_last_consumed_at": self.approved_next_rotation_last_consumed_at, |
| "next_rotation_built_at": self.next_rotation_built_at, |
| "next_rotation_target_at": self.next_rotation_target_at, |
| "next_rotation_last_error": self.next_rotation_last_error, |
| "last_rotation_activated_at": self.last_rotation_activated_at, |
| "current_rotation_source": self.current_rotation_source, |
| "current_rotation_target_at": self.current_rotation_target_at, |
| "current_rotation_activated_at": self.current_rotation_activated_at, |
| "current_rotation_track_ids": list(self.current_rotation_track_ids or []), |
| "runtime_config_source": self.runtime_config_source, |
| "runtime_config_last_loaded_at": self.runtime_config_last_loaded_at, |
| "runtime_config_last_error": self.runtime_config_last_error, |
| "runtime_config": runtime_config, |
| "radio_control_dir": RADIO_CONTROL_DIR, |
| "track_overrides_count": len(self.track_overrides), |
| "track_overrides_last_loaded_at": self.track_overrides_last_loaded_at, |
| "track_overrides_last_error": self.track_overrides_last_error, |
| "cover_overrides_dir": COVER_OVERRIDES_DIR, |
| "cover_overrides_count": self.cover_overrides_count, |
| "rotation_target_seconds": runtime_config.get("rotation_target_seconds"), |
| "max_tracks": runtime_config.get("max_tracks"), |
| "report_hide_threshold": runtime_config.get("report_hide_threshold"), |
| "like_weight_per_like": runtime_config.get("like_weight_per_like"), |
| "like_weight_max": runtime_config.get("like_weight_max"), |
| "upvote_weight_per_upvote": runtime_config.get("upvote_weight_per_upvote"), |
| "upvote_weight_max": runtime_config.get("upvote_weight_max"), |
| "downvote_weight_per_downvote": runtime_config.get("downvote_weight_per_downvote"), |
| "downvote_weight_max": runtime_config.get("downvote_weight_max"), |
| "track_weight_min": runtime_config.get("track_weight_min"), |
| "track_weight_max": runtime_config.get("track_weight_max"), |
| "auto_rotation_min_effective_score_exclusive": runtime_config.get("auto_rotation_min_effective_score_exclusive"), |
| "admin_report_usernames": sorted(ADMIN_REPORT_USERNAMES), |
| "admin_report_weight": ADMIN_REPORT_WEIGHT, |
| "recent_track_quota": runtime_config.get("recent_track_quota"), |
| "recent_track_window_hours": runtime_config.get("recent_track_window_hours"), |
| "recently_played_exclude_minutes": runtime_config.get("recently_played_exclude_minutes"), |
| "recently_played_memory_max": runtime_config.get("recently_played_memory_max"), |
| "recently_played_min_candidates_after_filter": runtime_config.get("recently_played_min_candidates_after_filter"), |
| "recently_played_count": len(self._recently_played_exclusion_set_unlocked()), |
| "meta_load_workers": runtime_config.get("meta_load_workers"), |
| "playback_order": "sequential_rotation_pack", |
| "enable_remote_jingles": runtime_config.get("enable_remote_jingles"), |
| "remote_jingles_dir": REMOTE_JINGLES_DIR, |
| "enable_segments": runtime_config.get("enable_segments"), |
| "remote_segments_dir": REMOTE_SEGMENTS_DIR, |
| "min_songs_between_jingles": runtime_config.get("min_songs_between_jingles"), |
| "max_songs_between_jingles": runtime_config.get("max_songs_between_jingles"), |
| "jingle_probability_after_min": runtime_config.get("jingle_probability_after_min"), |
| "songs_since_last_jingle": self.songs_since_last_jingle, |
| "min_songs_between_segments": runtime_config.get("min_songs_between_segments"), |
| "max_songs_between_segments": runtime_config.get("max_songs_between_segments"), |
| "segment_probability_after_min": runtime_config.get("segment_probability_after_min"), |
| "songs_since_last_segment": self.songs_since_last_segment, |
| "enable_custom_tracks": runtime_config.get("enable_custom_tracks"), |
| "custom_tracks_dir": CUSTOM_TRACKS_DIR, |
| "custom_track_quota": runtime_config.get("custom_track_quota"), |
| "custom_tracks_last_loaded_at": self.custom_tracks_last_loaded_at, |
| "custom_tracks_last_error": self.custom_tracks_last_error, |
| "enable_song_mastering": runtime_config.get("enable_song_mastering"), |
| "song_audio_filter": runtime_config.get("song_audio_filter") if runtime_config.get("enable_song_mastering") else "", |
| "likes_enabled": likes.last_error is None, |
| "likes_error": likes.last_error, |
| "active_listeners": listener_snapshot["active_listeners"], |
| "total_connections": listener_snapshot["total_connections"], |
| "listener_ttl_seconds": listener_snapshot["listener_ttl_seconds"], |
| "listener_profiles": listener_snapshot["listener_profiles"], |
| "signed_in_listener_count": listener_snapshot["signed_in_listener_count"], |
| "visible_listener_profile_count": listener_snapshot["visible_listener_profile_count"], |
| "track": self._public_track_for_status_unlocked(item), |
| } |
|
|
| def status(self): |
| with self.lock: |
| return self._status_unlocked() |
|
|
| def rotation_snapshot(self): |
| with self.lock: |
| item = self._get_current_item_unlocked() |
|
|
| if item and item.get("is_jingle"): |
| now_playing_type = "jingle" |
| elif item and item.get("is_segment"): |
| now_playing_type = "segment" |
| else: |
| now_playing_type = "song" |
|
|
| tracks = [] |
| for i, track in enumerate(self.tracks): |
| public_track = dict(track) |
| public_track["index"] = i |
| public_track["is_current"] = now_playing_type == "song" and i == self.current_index |
| tracks.append(public_track) |
|
|
| return { |
| "now_playing_type": now_playing_type, |
| "current_index": self.current_index, |
| "current_item": self._public_track_for_status_unlocked(item), |
| "tracks_count": len(self.tracks), |
| "jingles_count": len(self.jingles), |
| "segments_count": len(self.segments), |
| "custom_tracks_count": len(self.custom_tracks), |
| "tracks": tracks, |
| "jingles": [dict(jingle) for jingle in self.jingles], |
| "segments": [dict(segment) for segment in self.segments], |
| "custom_tracks": [dict(track) for track in self.custom_tracks], |
| "runtime_config_source": self.runtime_config_source, |
| "runtime_config": dict(self.runtime_config), |
| "track_overrides_count": len(self.track_overrides), |
| "cover_overrides_count": self.cover_overrides_count, |
| "last_refresh_at": self.last_refresh_at, |
| "next_refresh_at": next_utc_hour_iso(), |
| "next_rotation_prebuild_at": next_rotation_prebuild_at_iso(), |
| "next_rotation_ready": self.next_rotation_ready, |
| "next_rotation_is_building": self.next_rotation_is_building, |
| "next_rotation_count": len(self.next_tracks), |
| "next_rotation_source": self.next_rotation_source, |
| "next_rotation_built_at": self.next_rotation_built_at, |
| "next_rotation_target_at": self.next_rotation_target_at, |
| "last_rotation_activated_at": self.last_rotation_activated_at, |
| "current_rotation_source": self.current_rotation_source, |
| "current_rotation_target_at": self.current_rotation_target_at, |
| "current_rotation_activated_at": self.current_rotation_activated_at, |
| "current_rotation_track_ids": list(self.current_rotation_track_ids or []), |
| } |
|
|
|
|
| radio = RadioState() |
|
|
|
|
| class BroadcastEngine: |
| """ |
| Single global broadcast pipeline. |
| |
| One ffmpeg process produces the radio stream. |
| Each /stream.mp3 client subscribes to the shared chunks instead of spawning |
| its own ffmpeg process. |
| """ |
|
|
| def __init__(self, radio_state: RadioState): |
| self.radio = radio_state |
| self.lock = threading.Lock() |
| self.subscribers = {} |
| self.total_stream_connections = 0 |
|
|
| self.thread: Optional[threading.Thread] = None |
| self.running = False |
|
|
| self.current_item_id = None |
| self.current_item_type = None |
| self.current_item_started_at = None |
| self.current_process: Optional[subprocess.Popen] = None |
| self.current_process_pid = None |
| self.last_chunk_at = None |
| self.last_watchdog_restart_at = None |
| self.last_watchdog_reason = None |
| self.last_error = None |
|
|
| def start(self): |
| with self.lock: |
| if self.running and self.thread is not None and self.thread.is_alive(): |
| return |
|
|
| self.running = True |
|
|
| self.thread = threading.Thread( |
| target=self._run_loop, |
| daemon=True, |
| ) |
| self.thread.start() |
|
|
| print("[broadcast] Broadcast engine started") |
|
|
| def ensure_running(self): |
| with self.lock: |
| thread_alive = self.thread is not None and self.thread.is_alive() |
|
|
| if not thread_alive: |
| print("[broadcast-watchdog] broadcast thread is not alive; restarting") |
| self.start() |
|
|
| def snapshot(self): |
| now = time.time() |
|
|
| with self.lock: |
| last_chunk_age = None |
| if self.last_chunk_at is not None: |
| last_chunk_age = max(0.0, now - self.last_chunk_at) |
|
|
| current_item_age = None |
| if self.current_item_started_at is not None: |
| current_item_age = max(0.0, now - self.current_item_started_at) |
|
|
| return { |
| "broadcast_running": self.running, |
| "broadcast_thread_alive": self.thread is not None and self.thread.is_alive(), |
| "stream_subscriber_count": len(self.subscribers), |
| "total_stream_connections": self.total_stream_connections, |
| "broadcast_current_item_id": self.current_item_id, |
| "broadcast_current_item_type": self.current_item_type, |
| "broadcast_current_item_started_at": self.current_item_started_at, |
| "broadcast_current_item_age_seconds": current_item_age, |
| "broadcast_current_process_pid": self.current_process_pid, |
| "broadcast_last_chunk_at": self.last_chunk_at, |
| "broadcast_last_chunk_age_seconds": last_chunk_age, |
| "broadcast_last_watchdog_restart_at": self.last_watchdog_restart_at, |
| "broadcast_last_watchdog_reason": self.last_watchdog_reason, |
| "broadcast_last_error": self.last_error, |
| } |
|
|
| def subscribe(self): |
| subscriber_id = str(uuid4()) |
| subscriber_queue = queue.Queue(maxsize=BROADCAST_QUEUE_MAX_CHUNKS) |
|
|
| with self.lock: |
| self.subscribers[subscriber_id] = subscriber_queue |
| self.total_stream_connections += 1 |
| count = len(self.subscribers) |
|
|
| print(f"[broadcast] subscriber joined id={subscriber_id} active={count}") |
|
|
| return subscriber_id, subscriber_queue |
|
|
| def unsubscribe(self, subscriber_id: str): |
| with self.lock: |
| self.subscribers.pop(subscriber_id, None) |
| count = len(self.subscribers) |
|
|
| print(f"[broadcast] subscriber left id={subscriber_id} active={count}") |
|
|
| def _remove_slow_subscriber_unlocked(self, subscriber_id: str, subscriber_queue): |
| self.subscribers.pop(subscriber_id, None) |
|
|
| try: |
| while True: |
| subscriber_queue.get_nowait() |
| except queue.Empty: |
| pass |
|
|
| try: |
| subscriber_queue.put_nowait(None) |
| except Exception: |
| pass |
|
|
| print(f"[broadcast] removed slow subscriber id={subscriber_id}") |
|
|
| def _publish(self, chunk: bytes): |
| if not chunk: |
| return |
|
|
| with self.lock: |
| self.last_chunk_at = time.time() |
|
|
| for subscriber_id, subscriber_queue in list(self.subscribers.items()): |
| try: |
| subscriber_queue.put_nowait(chunk) |
| except queue.Full: |
| self._remove_slow_subscriber_unlocked(subscriber_id, subscriber_queue) |
|
|
| def kill_current_process(self, reason: str) -> bool: |
| process = None |
| pid = None |
|
|
| with self.lock: |
| process = self.current_process |
| pid = self.current_process_pid |
|
|
| if process is None: |
| return False |
|
|
| if process.poll() is not None: |
| return False |
|
|
| now = time.time() |
| self.last_watchdog_restart_at = now |
| self.last_watchdog_reason = reason |
| self.last_error = reason |
|
|
| print(f"[broadcast-watchdog] killing stalled ffmpeg pid={pid} reason={reason}") |
|
|
| try: |
| process.kill() |
| return True |
| except Exception as e: |
| with self.lock: |
| self.last_error = f"Could not kill stalled ffmpeg process: {e}" |
|
|
| print(f"[broadcast-watchdog] could not kill ffmpeg pid={pid}: {e}") |
| return False |
|
|
| def check_health(self): |
| self.ensure_running() |
|
|
| now = time.time() |
| reason = None |
|
|
| with self.lock: |
| process = self.current_process |
|
|
| if process is None or process.poll() is not None: |
| return |
|
|
| if self.current_item_id is None: |
| return |
|
|
| item_started_at = self.current_item_started_at |
| last_chunk_at = self.last_chunk_at |
| current_item_id = self.current_item_id |
| process_pid = self.current_process_pid |
|
|
| if last_chunk_at is None: |
| if item_started_at is not None and now - item_started_at > BROADCAST_PROCESS_START_TIMEOUT_SECONDS: |
| reason = ( |
| f"no audio chunks after {now - item_started_at:.1f}s " |
| f"for item={current_item_id} pid={process_pid}" |
| ) |
| else: |
| silence_seconds = now - last_chunk_at |
|
|
| if silence_seconds > BROADCAST_STALL_TIMEOUT_SECONDS: |
| reason = ( |
| f"no audio chunks for {silence_seconds:.1f}s " |
| f"on item={current_item_id} pid={process_pid}" |
| ) |
|
|
| if reason: |
| self.kill_current_process(reason) |
|
|
| def _item_type(self, item: dict) -> str: |
| if item.get("is_jingle"): |
| return "jingle" |
|
|
| if item.get("is_segment"): |
| return "segment" |
|
|
| return "song" |
|
|
| def _build_ffmpeg_command(self, item: dict, offset: float): |
| stream_url = item.get("stream_url") or item.get("audio_url") |
| item_type = self._item_type(item) |
|
|
| cmd = [ |
| "ffmpeg", |
| "-hide_banner", |
| "-loglevel", |
| "error", |
| "-nostdin", |
| "-re", |
| ] |
|
|
| if offset > STREAM_SEEK_EPSILON_SECONDS: |
| cmd.extend( |
| [ |
| "-ss", |
| str(max(0, offset)), |
| ] |
| ) |
|
|
| cmd.extend( |
| [ |
| "-i", |
| stream_url, |
| "-vn", |
| ] |
| ) |
|
|
| audio_filters = [] |
|
|
| if item_type == "jingle": |
| audio_filters.append(f"volume={JINGLE_VOLUME}") |
| elif item_type == "segment": |
| audio_filters.append(f"volume={SEGMENT_VOLUME}") |
| elif item_type == "song" and self.radio._cfg("enable_song_mastering", ENABLE_SONG_MASTERING): |
| song_audio_filter = str(self.radio._cfg("song_audio_filter", SONG_AUDIO_FILTER) or "").strip() |
| if song_audio_filter: |
| audio_filters.append(song_audio_filter) |
|
|
| if audio_filters: |
| cmd.extend(["-af", ",".join(audio_filters)]) |
|
|
| cmd.extend( |
| [ |
| "-ac", |
| "2", |
| "-ar", |
| STREAM_SAMPLE_RATE, |
| "-b:a", |
| STREAM_BITRATE, |
| "-map_metadata", |
| "-1", |
| "-write_xing", |
| "0", |
| "-id3v2_version", |
| "0", |
| "-f", |
| "mp3", |
| "pipe:1", |
| ] |
| ) |
|
|
| return cmd |
|
|
| def _run_loop(self): |
| force_next_offset_zero = False |
|
|
| while True: |
| item, offset = self.radio.get_current_stream_item() |
|
|
| if not item: |
| with self.lock: |
| self.current_item_id = None |
| self.current_item_type = None |
|
|
| time.sleep(BROADCAST_NO_TRACK_SLEEP_SECONDS) |
| continue |
|
|
| if force_next_offset_zero: |
| offset = 0 |
| force_next_offset_zero = False |
|
|
| item_id = item["id"] |
| item_type = self._item_type(item) |
| stream_url = item.get("stream_url") or item.get("audio_url") |
|
|
| with self.lock: |
| now = time.time() |
| self.current_item_id = item_id |
| self.current_item_type = item_type |
| self.current_item_started_at = now |
| self.current_process = None |
| self.current_process_pid = None |
| self.last_chunk_at = None |
| self.last_error = None |
|
|
| print(f"[broadcast] starting {item_type}={item_id} offset={offset:.2f}s source={stream_url}") |
|
|
| cmd = self._build_ffmpeg_command(item, offset) |
|
|
| process = None |
|
|
| try: |
| process = subprocess.Popen( |
| cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| bufsize=0, |
| ) |
|
|
| assert process.stdout is not None |
|
|
| with self.lock: |
| self.current_process = process |
| self.current_process_pid = process.pid |
|
|
| while True: |
| chunk = process.stdout.read(16 * 1024) |
|
|
| if not chunk: |
| break |
|
|
| self._publish(chunk) |
|
|
| return_code = process.poll() |
|
|
| if return_code not in (0, None): |
| stderr_text = "" |
|
|
| try: |
| stderr_text = process.stderr.read().decode("utf-8", errors="replace") if process.stderr else "" |
| except Exception: |
| stderr_text = "" |
|
|
| print( |
| f"[broadcast] ffmpeg exited with code={return_code} " |
| f"item={item_id} stderr={stderr_text[:500]}" |
| ) |
|
|
| with self.lock: |
| existing_error = self.last_error |
| self.last_error = existing_error or f"ffmpeg exited with code {return_code}" |
|
|
| except Exception as e: |
| print(f"[broadcast] error while streaming item={item_id}: {e}") |
|
|
| with self.lock: |
| self.last_error = str(e) |
|
|
| time.sleep(1) |
|
|
| finally: |
| if process is not None: |
| try: |
| if process.poll() is None: |
| process.kill() |
| except Exception: |
| pass |
|
|
| try: |
| process.wait(timeout=2) |
| except Exception: |
| pass |
|
|
| with self.lock: |
| if self.current_process is process: |
| self.current_process = None |
| self.current_process_pid = None |
|
|
| advanced = self.radio.advance_if_current(item_id) |
|
|
| if advanced: |
| force_next_offset_zero = True |
| else: |
| |
| |
| |
| force_next_offset_zero = True |
|
|
|
|
| broadcast = BroadcastEngine(radio) |
|
|
|
|
| def broadcast_watchdog_loop(): |
| while True: |
| try: |
| broadcast.check_health() |
| except Exception as e: |
| print(f"[broadcast-watchdog] health check failed: {e}") |
|
|
| time.sleep(BROADCAST_WATCHDOG_INTERVAL_SECONDS) |
|
|
|
|
| def broadcast_stream_mp3(): |
| subscriber_id, subscriber_queue = broadcast.subscribe() |
|
|
| try: |
| while True: |
| try: |
| chunk = subscriber_queue.get(timeout=BROADCAST_SUBSCRIBER_TIMEOUT_SECONDS) |
| except queue.Empty: |
| yield b"" |
| continue |
|
|
| if chunk is None: |
| break |
|
|
| yield chunk |
|
|
| except GeneratorExit: |
| raise |
|
|
| finally: |
| broadcast.unsubscribe(subscriber_id) |
|
|
|
|
| def get_full_status(): |
| status = radio.status() |
| status.update(broadcast.snapshot()) |
| return status |
|
|
|
|
| def refresh_radio_content(label: str): |
| try: |
| print(f"[radio] Starting {label} playlist refresh") |
| radio.refresh_runtime_controls() |
| radio.refresh_jingles() |
| radio.refresh_segments() |
| radio.refresh_custom_tracks() |
| radio.refresh_tracks() |
| print(f"[radio] {label.capitalize()} playlist refresh complete") |
| except Exception as e: |
| print(f"[radio] {label.capitalize()} refresh failed: {e}") |
|
|
| with radio.lock: |
| radio.is_loading = False |
| radio.last_error = str(e) |
|
|
|
|
| def prebuild_radio_content(label: str, target_at: datetime): |
| try: |
| print( |
| f"[radio] Starting {label} next rotation prebuild " |
| f"for {target_at.replace(microsecond=0).isoformat()} UTC" |
| ) |
| radio.refresh_runtime_controls() |
| radio.refresh_jingles() |
| radio.refresh_segments() |
| radio.refresh_custom_tracks() |
|
|
| if radio.prebuild_approved_next_rotation(target_at): |
| print(f"[radio] {label.capitalize()} approved next rotation prebuild complete") |
| else: |
| print("[radio] No approved next rotation available; falling back to automatic prebuild") |
| radio.refresh_tracks(prebuild=True, target_at=target_at) |
| print(f"[radio] {label.capitalize()} automatic next rotation prebuild complete") |
| except Exception as e: |
| print(f"[radio] {label.capitalize()} prebuild failed: {e}") |
|
|
| with radio.lock: |
| radio.next_rotation_is_building = False |
| radio.next_rotation_last_error = str(e) |
|
|
|
|
| def periodic_refresh_loop(): |
| while True: |
| target_at = next_utc_hour() |
| prebuild_at = target_at - timedelta(seconds=max(0, ROTATION_PREBUILD_LEAD_SECONDS)) |
| sleep_until_prebuild = max(0.0, (prebuild_at - datetime.now(timezone.utc)).total_seconds()) |
|
|
| print( |
| "[radio] Next rotation prebuild scheduled at " |
| f"{prebuild_at.replace(microsecond=0).isoformat()} UTC " |
| f"for activation target {target_at.replace(microsecond=0).isoformat()} UTC" |
| ) |
|
|
| time.sleep(sleep_until_prebuild) |
| prebuild_radio_content("scheduled", target_at) |
|
|
| sleep_until_activation = max(0.0, (target_at - datetime.now(timezone.utc)).total_seconds()) |
| if sleep_until_activation > 0: |
| time.sleep(sleep_until_activation) |
|
|
| |
| |
| |
| |
| |
| |
| if radio.prebuild_approved_next_rotation(target_at): |
| print("[radio] Activation-time approved next rotation handoff complete") |
|
|
| if not radio.mark_next_rotation_ready(target_at): |
| print("[radio] Falling back to active hourly refresh because no prebuilt rotation is ready") |
| refresh_radio_content("hourly fallback") |
|
|
|
|
| @app.on_event("startup") |
| def startup(): |
| broadcast.start() |
|
|
| threading.Thread( |
| target=lambda: refresh_radio_content("initial"), |
| daemon=True, |
| ).start() |
|
|
| threading.Thread( |
| target=periodic_refresh_loop, |
| daemon=True, |
| ).start() |
|
|
| threading.Thread( |
| target=broadcast_watchdog_loop, |
| daemon=True, |
| ).start() |
|
|
|
|
| @app.get("/api/me") |
| def api_me(request: Request): |
| user = oauth_user_from_request(request) |
|
|
| if user is None: |
| return JSONResponse( |
| { |
| "authenticated": False, |
| "user": None, |
| } |
| ) |
|
|
| return JSONResponse( |
| { |
| "authenticated": True, |
| "user": user, |
| } |
| ) |
|
|
|
|
| @app.get("/stream.mp3") |
| def stream_mp3(): |
| headers = { |
| "Cache-Control": "no-cache, no-store, must-revalidate", |
| "Pragma": "no-cache", |
| "Expires": "0", |
| "Connection": "keep-alive", |
| "X-Content-Type-Options": "nosniff", |
| } |
|
|
| return StreamingResponse( |
| broadcast_stream_mp3(), |
| media_type="audio/mpeg", |
| headers=headers, |
| ) |
|
|
|
|
| @app.get("/cover-overrides/{song_id}/thumb.png") |
| def cover_override_thumb(song_id: str): |
| thumb_path = Path(COVER_OVERRIDES_DIR) / song_id / "thumb.png" |
|
|
| if not thumb_path.exists() or not thumb_path.is_file(): |
| raise HTTPException(status_code=404, detail="Cover override not found") |
|
|
| return FileResponse( |
| thumb_path, |
| media_type="image/png", |
| headers={ |
| "Cache-Control": "no-cache, no-store, must-revalidate", |
| }, |
| ) |
|
|
|
|
| @app.get("/custom-tracks/{track_id}/thumb.png") |
| def custom_track_thumb(track_id: str): |
| thumb_path = Path(CUSTOM_TRACKS_DIR) / track_id / "thumb.png" |
|
|
| if not thumb_path.exists() or not thumb_path.is_file(): |
| raise HTTPException(status_code=404, detail="Custom track thumbnail not found") |
|
|
| return FileResponse( |
| thumb_path, |
| media_type="image/png", |
| headers={ |
| "Cache-Control": "no-cache, no-store, must-revalidate", |
| }, |
| ) |
|
|
|
|
| @app.get("/api/status") |
| def api_status(): |
| return JSONResponse(get_full_status()) |
|
|
|
|
| @app.get("/api/rotation") |
| def api_rotation(): |
| return JSONResponse(radio.rotation_snapshot()) |
|
|
|
|
| @app.post("/api/listener/start/{listener_id}") |
| def api_listener_start(listener_id: str, request: Request): |
| user = oauth_user_from_request(request) |
| listeners.start(listener_id, user=user) |
|
|
| return JSONResponse( |
| { |
| "ok": True, |
| **listeners.snapshot(), |
| } |
| ) |
|
|
|
|
| @app.post("/api/listener/heartbeat/{listener_id}") |
| def api_listener_heartbeat(listener_id: str, request: Request): |
| user = oauth_user_from_request(request) |
| listeners.heartbeat(listener_id, user=user) |
|
|
| return JSONResponse( |
| { |
| "ok": True, |
| **listeners.snapshot(), |
| } |
| ) |
|
|
|
|
| @app.post("/api/listener/stop/{listener_id}") |
| def api_listener_stop(listener_id: str): |
| listeners.stop(listener_id) |
|
|
| return JSONResponse( |
| { |
| "ok": True, |
| **listeners.snapshot(), |
| } |
| ) |
|
|
|
|
| def require_signed_in_user(request: Request, action: str): |
| user = oauth_user_from_request(request) |
|
|
| if user is None: |
| raise HTTPException(status_code=401, detail=f"Sign in with Hugging Face to {action}") |
|
|
| return user |
|
|
|
|
| def ensure_track_in_playlist(song_id: str): |
| if not radio.has_track(song_id): |
| raise HTTPException(status_code=404, detail="Track not found in current radio playlist") |
|
|
|
|
| def feedback_response(song_id: str, result: dict, **extra): |
| payload = { |
| "ok": True, |
| "song_id": song_id, |
| "like_count": result.get("count", 0), |
| "upvote_count": result.get("upvote_count", 0), |
| "downvote_count": result.get("downvote_count", 0), |
| "report_count": result.get("report_count", 0), |
| "updated_at": result.get("updated_at"), |
| "status": get_full_status(), |
| } |
| payload.update(extra) |
| return JSONResponse(payload) |
|
|
|
|
| @app.post("/api/like/{song_id}") |
| def api_like(song_id: str, request: Request): |
| require_signed_in_user(request, "like tracks") |
| ensure_track_in_playlist(song_id) |
|
|
| try: |
| result = likes.increment(song_id) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Could not save like: {e}") |
|
|
| return feedback_response(song_id, result) |
|
|
|
|
| @app.post("/api/upvote/{song_id}") |
| def api_upvote(song_id: str, request: Request): |
| require_signed_in_user(request, "vote on tracks") |
| ensure_track_in_playlist(song_id) |
|
|
| try: |
| result = likes.upvote(song_id) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Could not save upvote: {e}") |
|
|
| return feedback_response(song_id, result) |
|
|
|
|
| @app.post("/api/unupvote/{song_id}") |
| def api_unupvote(song_id: str, request: Request): |
| require_signed_in_user(request, "undo votes on tracks") |
| ensure_track_in_playlist(song_id) |
|
|
| try: |
| result = likes.unupvote(song_id) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Could not undo upvote: {e}") |
|
|
| return feedback_response(song_id, result) |
|
|
|
|
| @app.post("/api/downvote/{song_id}") |
| def api_downvote(song_id: str, request: Request): |
| require_signed_in_user(request, "vote on tracks") |
| ensure_track_in_playlist(song_id) |
|
|
| try: |
| result = likes.downvote(song_id) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Could not save downvote: {e}") |
|
|
| return feedback_response(song_id, result) |
|
|
|
|
| @app.post("/api/undownvote/{song_id}") |
| def api_undownvote(song_id: str, request: Request): |
| require_signed_in_user(request, "undo votes on tracks") |
| ensure_track_in_playlist(song_id) |
|
|
| try: |
| result = likes.undownvote(song_id) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Could not undo downvote: {e}") |
|
|
| return feedback_response(song_id, result) |
|
|
|
|
| @app.post("/api/report/{song_id}") |
| def api_report(song_id: str, request: Request): |
| user = require_signed_in_user(request, "report tracks") |
|
|
| if not user.get("is_pro"): |
| raise HTTPException(status_code=403, detail="Only Hugging Face PRO users can report tracks") |
|
|
| ensure_track_in_playlist(song_id) |
|
|
| weight = report_weight_for_user(user) |
|
|
| try: |
| result = likes.report(song_id, weight=weight) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Could not save report: {e}") |
|
|
| return feedback_response( |
| song_id, |
| result, |
| reported_at=result.get("reported_at"), |
| report_weight=weight, |
| ) |
|
|
|
|
| @app.post("/api/unreport/{song_id}") |
| def api_unreport(song_id: str, request: Request): |
| user = require_signed_in_user(request, "undo reports") |
|
|
| if not user.get("is_pro"): |
| raise HTTPException(status_code=403, detail="Only Hugging Face PRO users can undo reports") |
|
|
| ensure_track_in_playlist(song_id) |
|
|
| weight = report_weight_for_user(user) |
|
|
| try: |
| result = likes.unreport(song_id, weight=weight) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Could not undo report: {e}") |
|
|
| return feedback_response( |
| song_id, |
| result, |
| unreported_at=result.get("unreported_at"), |
| report_weight=weight, |
| ) |
|
|
|
|
| @app.post("/api/refresh") |
| def api_refresh(): |
| def refresh_in_background(): |
| try: |
| radio.refresh_runtime_controls() |
| radio.refresh_jingles() |
| radio.refresh_segments() |
| radio.refresh_custom_tracks() |
| radio.refresh_tracks() |
| except Exception as e: |
| print(f"[radio] Background refresh failed: {e}") |
|
|
| with radio.lock: |
| radio.is_loading = False |
| radio.last_error = str(e) |
|
|
| threading.Thread(target=refresh_in_background, daemon=True).start() |
|
|
| return JSONResponse( |
| { |
| "ok": True, |
| "status": get_full_status(), |
| } |
| ) |
|
|
|
|
| def next_rotation_response_payload(): |
| with radio.lock: |
| return { |
| "ok": True, |
| "ready": radio.next_rotation_ready, |
| "is_building": radio.next_rotation_is_building, |
| "count": len(radio.next_tracks), |
| "built_at": radio.next_rotation_built_at, |
| "target_at": radio.next_rotation_target_at, |
| "source": radio.next_rotation_source, |
| "last_error": radio.next_rotation_last_error, |
| "approved_last_error": radio.approved_next_rotation_last_error, |
| "current_rotation_source": radio.current_rotation_source, |
| "current_rotation_target_at": radio.current_rotation_target_at, |
| "current_rotation_activated_at": radio.current_rotation_activated_at, |
| "tracks": [dict(track) for track in radio.next_tracks], |
| } |
|
|
|
|
| @app.get("/api/next-rotation") |
| def api_next_rotation(): |
| return JSONResponse(next_rotation_response_payload()) |
|
|
|
|
| |
| |
| |
| @app.get("/next-rotation-preview") |
| def api_next_rotation_preview_alias(): |
| return JSONResponse(next_rotation_response_payload()) |
|
|
|
|
| @app.get("/api/next-rotation-preview") |
| def api_next_rotation_preview_api_alias(): |
| return JSONResponse(next_rotation_response_payload()) |
|
|
|
|
| @app.post("/api/prebuild-next-rotation") |
| def api_prebuild_next_rotation(): |
| target_at = next_utc_hour() |
|
|
| def prebuild_in_background(): |
| prebuild_radio_content("manual", target_at) |
|
|
| threading.Thread(target=prebuild_in_background, daemon=True).start() |
|
|
| return JSONResponse( |
| { |
| "ok": True, |
| "target_at": target_at.isoformat(), |
| "prebuild_at": next_rotation_prebuild_at_iso(), |
| "status": get_full_status(), |
| } |
| ) |
|
|
|
|
| @app.post("/api/mode/{mode}") |
| def api_mode(mode: str): |
| radio.set_mode(mode) |
|
|
| return JSONResponse( |
| { |
| "ok": True, |
| "status": get_full_status(), |
| } |
| ) |
|
|
|
|
| @app.get("/api/tracks") |
| def api_tracks(): |
| with radio.lock: |
| tracks = [] |
|
|
| for track in radio.tracks: |
| item = dict(track) |
| item["like_count"] = likes.get_count(item["id"]) |
| item["upvote_count"] = likes.get_upvote_count(item["id"]) |
| item["downvote_count"] = likes.get_downvote_count(item["id"]) |
| item["report_count"] = likes.get_report_count(item["id"]) |
| radio._apply_track_controls_unlocked(item) |
| tracks.append(item) |
|
|
| return JSONResponse( |
| { |
| "count": len(tracks), |
| "tracks": tracks, |
| } |
| ) |
|
|
|
|
| @app.get("/api/likes") |
| def api_likes(): |
| return JSONResponse( |
| { |
| "likes": likes.snapshot(), |
| "likes_enabled": likes.last_error is None, |
| "likes_error": likes.last_error, |
| "likes_path": str(likes.likes_path), |
| } |
| ) |
|
|
|
|
| app.mount("/", StaticFiles(directory="static", html=True), name="static") |