HF-Radio / app.py
fffiloni's picture
Upload 3 files
2c38e03 verified
import json
import os
import urllib.error
import urllib.request
import queue
import random
import shutil
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
from uuid import uuid4
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from huggingface_hub import (
HfFileSystem,
attach_huggingface_oauth,
parse_huggingface_oauth,
)
BUCKET_ID = os.environ.get("BUCKET_ID", "victor/ace-step-community")
RADIO_MODE = os.environ.get("RADIO_MODE", "random")
DEFAULT_DURATION = float(os.environ.get("DEFAULT_DURATION", "180"))
MAX_TRACKS = int(os.environ.get("MAX_TRACKS", "150"))
ROTATION_TARGET_SECONDS = int(os.environ.get("ROTATION_TARGET_SECONDS", "3600"))
ROTATION_PREBUILD_LEAD_SECONDS = int(os.environ.get("ROTATION_PREBUILD_LEAD_SECONDS", "600"))
REFRESH_MODE = "prebuild_before_top_of_hour_utc"
OAUTH_USERINFO_URL = os.environ.get("OAUTH_USERINFO_URL", "https://huggingface.co/oauth/userinfo")
OAUTH_USERINFO_CACHE_TTL_SECONDS = int(os.environ.get("OAUTH_USERINFO_CACHE_TTL_SECONDS", "300"))
STREAM_BITRATE = os.environ.get("STREAM_BITRATE", "128k")
STREAM_SAMPLE_RATE = os.environ.get("STREAM_SAMPLE_RATE", "44100")
STREAM_SEEK_EPSILON_SECONDS = float(os.environ.get("STREAM_SEEK_EPSILON_SECONDS", "0.08"))
BROADCAST_QUEUE_MAX_CHUNKS = int(os.environ.get("BROADCAST_QUEUE_MAX_CHUNKS", "256"))
BROADCAST_NO_TRACK_SLEEP_SECONDS = float(os.environ.get("BROADCAST_NO_TRACK_SLEEP_SECONDS", "1.0"))
BROADCAST_SUBSCRIBER_TIMEOUT_SECONDS = float(os.environ.get("BROADCAST_SUBSCRIBER_TIMEOUT_SECONDS", "15.0"))
BROADCAST_STALL_TIMEOUT_SECONDS = float(os.environ.get("BROADCAST_STALL_TIMEOUT_SECONDS", "25.0"))
BROADCAST_WATCHDOG_INTERVAL_SECONDS = float(os.environ.get("BROADCAST_WATCHDOG_INTERVAL_SECONDS", "5.0"))
BROADCAST_PROCESS_START_TIMEOUT_SECONDS = float(os.environ.get("BROADCAST_PROCESS_START_TIMEOUT_SECONDS", "20.0"))
LIKES_DIR = os.environ.get("LIKES_DIR", "/likes")
LIKES_FILE = os.environ.get("LIKES_FILE", "likes.json")
REPORT_HIDE_THRESHOLD = int(os.environ.get("REPORT_HIDE_THRESHOLD", "2"))
ADMIN_REPORT_USERNAMES = {
username.strip().lower()
for username in os.environ.get("ADMIN_REPORT_USERNAMES", "fffiloni").split(",")
if username.strip()
}
ADMIN_REPORT_WEIGHT = int(os.environ.get("ADMIN_REPORT_WEIGHT", "2"))
LIKE_WEIGHT_PER_LIKE = float(os.environ.get("LIKE_WEIGHT_PER_LIKE", "0.35"))
LIKE_WEIGHT_MAX = float(os.environ.get("LIKE_WEIGHT_MAX", "2.0"))
UPVOTE_WEIGHT_PER_UPVOTE = float(os.environ.get("UPVOTE_WEIGHT_PER_UPVOTE", "0.12"))
UPVOTE_WEIGHT_MAX = float(os.environ.get("UPVOTE_WEIGHT_MAX", "0.6"))
DOWNVOTE_WEIGHT_PER_DOWNVOTE = float(os.environ.get("DOWNVOTE_WEIGHT_PER_DOWNVOTE", "0.15"))
DOWNVOTE_WEIGHT_MAX = float(os.environ.get("DOWNVOTE_WEIGHT_MAX", "0.7"))
TRACK_WEIGHT_MIN = float(os.environ.get("TRACK_WEIGHT_MIN", "0.25"))
TRACK_WEIGHT_MAX = float(os.environ.get("TRACK_WEIGHT_MAX", str(LIKE_WEIGHT_MAX)))
AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE = float(os.environ.get("AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE", "-5"))
RECENT_TRACK_QUOTA = float(os.environ.get("RECENT_TRACK_QUOTA", "0.15"))
RECENT_TRACK_WINDOW_HOURS = int(os.environ.get("RECENT_TRACK_WINDOW_HOURS", "72"))
RECENTLY_PLAYED_EXCLUDE_MINUTES = int(os.environ.get("RECENTLY_PLAYED_EXCLUDE_MINUTES", "90"))
RECENTLY_PLAYED_MEMORY_MAX = int(os.environ.get("RECENTLY_PLAYED_MEMORY_MAX", "200"))
RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER = int(os.environ.get("RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER", "30"))
META_LOAD_WORKERS = int(os.environ.get("META_LOAD_WORKERS", "16"))
LISTENER_TTL_SECONDS = int(os.environ.get("LISTENER_TTL_SECONDS", "45"))
ENABLE_JINGLES = os.environ.get("ENABLE_JINGLES", "true").lower() == "true"
JINGLES_DIR = os.environ.get("JINGLES_DIR", "static/jingles")
JINGLES_GLOB = os.environ.get("JINGLES_GLOB", "jingle_hf_radio_*.wav")
DEFAULT_JINGLE_DURATION = float(os.environ.get("DEFAULT_JINGLE_DURATION", "6"))
JINGLE_VOLUME = float(os.environ.get("JINGLE_VOLUME", "2.00"))
ENABLE_REMOTE_JINGLES = os.environ.get("ENABLE_REMOTE_JINGLES", "true").lower() == "true"
REMOTE_JINGLES_DIR = os.environ.get("REMOTE_JINGLES_DIR", "/remote-jingles")
ENABLE_SEGMENTS = os.environ.get("ENABLE_SEGMENTS", "true").lower() == "true"
REMOTE_SEGMENTS_DIR = os.environ.get("REMOTE_SEGMENTS_DIR", "/remote-segments")
DEFAULT_SEGMENT_DURATION = float(os.environ.get("DEFAULT_SEGMENT_DURATION", "30"))
SEGMENT_VOLUME = float(os.environ.get("SEGMENT_VOLUME", "2.00"))
MIN_SONGS_BETWEEN_SEGMENTS = int(os.environ.get("MIN_SONGS_BETWEEN_SEGMENTS", "12"))
MAX_SONGS_BETWEEN_SEGMENTS = int(os.environ.get("MAX_SONGS_BETWEEN_SEGMENTS", "18"))
SEGMENT_PROBABILITY_AFTER_MIN = float(os.environ.get("SEGMENT_PROBABILITY_AFTER_MIN", "0.20"))
ENABLE_SONG_MASTERING = os.environ.get("ENABLE_SONG_MASTERING", "true").lower() == "true"
SONG_AUDIO_FILTER = os.environ.get(
"SONG_AUDIO_FILTER",
"equalizer=f=95:t=q:w=0.9:g=1.8,equalizer=f=3500:t=q:w=1.0:g=-0.8,equalizer=f=8500:t=q:w=0.8:g=-2.2",
)
MIN_SONGS_BETWEEN_JINGLES = int(os.environ.get("MIN_SONGS_BETWEEN_JINGLES", "6"))
MAX_SONGS_BETWEEN_JINGLES = int(os.environ.get("MAX_SONGS_BETWEEN_JINGLES", "10"))
JINGLE_PROBABILITY_AFTER_MIN = float(os.environ.get("JINGLE_PROBABILITY_AFTER_MIN", "0.25"))
RADIO_CONTROL_DIR = os.environ.get("RADIO_CONTROL_DIR", "/radio-control")
RADIO_CONTROL_CONFIG_FILE = os.environ.get("RADIO_CONTROL_CONFIG_FILE", "config.json")
RADIO_TRACK_OVERRIDES_FILE = os.environ.get("RADIO_TRACK_OVERRIDES_FILE", "track_overrides.json")
RADIO_NEXT_ROTATION_FILE = os.environ.get("RADIO_NEXT_ROTATION_FILE", "next_rotation.json")
RADIO_NEXT_ROTATION_ARCHIVE_DIR = os.environ.get("RADIO_NEXT_ROTATION_ARCHIVE_DIR", "next_rotation_archive")
COVER_OVERRIDES_DIR = os.environ.get("COVER_OVERRIDES_DIR", "/cover-overrides")
ENABLE_CUSTOM_TRACKS = os.environ.get("ENABLE_CUSTOM_TRACKS", "true").lower() == "true"
CUSTOM_TRACKS_DIR = os.environ.get("CUSTOM_TRACKS_DIR", "/custom-tracks")
DEFAULT_CUSTOM_TRACK_DURATION = float(os.environ.get("DEFAULT_CUSTOM_TRACK_DURATION", "180"))
CUSTOM_TRACK_QUOTA = float(os.environ.get("CUSTOM_TRACK_QUOTA", "0.10"))
BUCKET_FS_ROOT = f"hf://buckets/{BUCKET_ID}"
BUCKET_RESOLVE_URL = f"https://huggingface.co/buckets/{BUCKET_ID}/resolve"
app = FastAPI(title="HF Bucket Radio Stream")
attach_huggingface_oauth(app)
def normalize_tags(raw_tags):
if isinstance(raw_tags, str):
normalized = (
raw_tags
.replace("#", "")
.replace(";", ",")
.replace("|", ",")
)
return [
tag.strip()
for tag in normalized.split(",")
if tag.strip()
]
if isinstance(raw_tags, list):
return [
str(tag).strip()
for tag in raw_tags
if str(tag).strip()
]
return []
def probe_audio_duration(path_or_url: str, fallback: float) -> float:
try:
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
path_or_url,
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=10,
check=True,
)
duration = float(result.stdout.strip())
if duration > 0:
return duration
except Exception as e:
print(f"[audio] Could not probe duration for {path_or_url}: {e}")
return fallback
def parse_datetime(value):
if not value:
return None
if isinstance(value, datetime):
parsed = value
else:
try:
normalized = str(value).strip()
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
parsed = datetime.fromisoformat(normalized)
except Exception:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def asset_is_enabled(meta: dict) -> bool:
return bool(meta.get("enabled", True))
def asset_is_expired(meta: dict, now: Optional[datetime] = None) -> bool:
expires_at = parse_datetime(meta.get("expires_at"))
if expires_at is None:
return False
now = now or datetime.now(timezone.utc)
return expires_at <= now
def safe_float(value, fallback: float) -> float:
try:
parsed = float(value)
if parsed > 0:
return parsed
except Exception:
pass
return fallback
def safe_int(value, fallback: int) -> int:
try:
return int(value)
except Exception:
return fallback
def safe_bool(value, fallback: bool) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
if value is None:
return fallback
return bool(value)
def clamp_number(value, minimum=None, maximum=None):
if minimum is not None:
value = max(minimum, value)
if maximum is not None:
value = min(maximum, value)
return value
def next_utc_hour() -> datetime:
now = datetime.now(timezone.utc)
return now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
def seconds_until_next_utc_hour() -> float:
return max(0.0, (next_utc_hour() - datetime.now(timezone.utc)).total_seconds())
def next_utc_hour_iso() -> str:
return next_utc_hour().isoformat()
def next_rotation_prebuild_at() -> datetime:
return next_utc_hour() - timedelta(seconds=max(0, ROTATION_PREBUILD_LEAD_SECONDS))
def next_rotation_prebuild_at_iso() -> str:
return next_rotation_prebuild_at().isoformat()
oauth_userinfo_cache_lock = threading.Lock()
oauth_userinfo_cache = {}
def oauth_bool(value) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
return bool(value)
def fetch_oauth_userinfo(access_token: Optional[str]) -> dict:
if not access_token:
return {}
now = time.time()
with oauth_userinfo_cache_lock:
cached = oauth_userinfo_cache.get(access_token)
if cached and now - cached.get("fetched_at", 0) < OAUTH_USERINFO_CACHE_TTL_SECONDS:
return dict(cached.get("payload", {}))
request = urllib.request.Request(
OAUTH_USERINFO_URL,
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
},
method="GET",
)
try:
with urllib.request.urlopen(request, timeout=5) as response:
payload = json.loads(response.read().decode("utf-8"))
if not isinstance(payload, dict):
payload = {}
with oauth_userinfo_cache_lock:
oauth_userinfo_cache[access_token] = {
"fetched_at": now,
"payload": dict(payload),
}
return payload
except Exception as e:
print(f"[oauth] Could not fetch OAuth userinfo endpoint: {e}")
return {}
def extract_oauth_field(user_info, *field_names, default=None):
for field_name in field_names:
if isinstance(user_info, dict) and field_name in user_info:
return user_info.get(field_name, default)
value = getattr(user_info, field_name, None)
if value is not None:
return value
raw = getattr(user_info, "__dict__", {}) or {}
for field_name in field_names:
if field_name in raw:
return raw.get(field_name, default)
return default
def oauth_user_from_request(request: Request):
"""
Returns a compact public user profile if the visitor is signed in with HF.
Returns None for anonymous visitors.
Hugging Face exposes the PRO flag as `is_pro` in Python OAuthUserInfo
and as `isPro` in the JS/OIDC userinfo payload. We read both.
When the installed huggingface_hub version does not hydrate the field
correctly, we fall back to the OAuth `/oauth/userinfo` endpoint using
the current OAuth access token.
"""
try:
oauth_info = parse_huggingface_oauth(request)
except Exception as e:
print(f"[oauth] Could not parse OAuth info: {e}")
return None
if oauth_info is None or oauth_info.user_info is None:
return None
user = oauth_info.user_info
access_token = extract_oauth_field(oauth_info, "access_token")
raw_userinfo = fetch_oauth_userinfo(access_token)
username = (
extract_oauth_field(user, "preferred_username")
or extract_oauth_field(raw_userinfo, "preferred_username")
or extract_oauth_field(user, "name")
or extract_oauth_field(raw_userinfo, "name")
or "unknown"
)
raw_is_pro_value = (
extract_oauth_field(user, "is_pro", "isPro", default=None)
)
if raw_is_pro_value is None:
raw_is_pro_value = extract_oauth_field(raw_userinfo, "is_pro", "isPro", default=False)
elif oauth_bool(raw_is_pro_value) is False:
# Some older hub versions hydrate unknown HF-specific fields as False.
# If the raw userinfo endpoint says true, trust the endpoint.
endpoint_is_pro = extract_oauth_field(raw_userinfo, "is_pro", "isPro", default=None)
if endpoint_is_pro is not None:
raw_is_pro_value = endpoint_is_pro
is_pro = oauth_bool(raw_is_pro_value)
return {
"id": extract_oauth_field(user, "sub") or extract_oauth_field(raw_userinfo, "sub"),
"username": username,
"avatar_url": extract_oauth_field(user, "picture") or extract_oauth_field(raw_userinfo, "picture"),
"profile_url": (
extract_oauth_field(user, "profile")
or extract_oauth_field(raw_userinfo, "profile")
or f"https://huggingface.co/{username}"
),
"is_pro": is_pro,
"isPro": is_pro,
"pro_status_source": "oauth_userinfo_endpoint" if raw_userinfo else "oauth_session",
}
def report_weight_for_user(user: Optional[dict]) -> int:
if not user:
return 1
username = str(user.get("username") or "").strip().lower().lstrip("@")
if username in ADMIN_REPORT_USERNAMES:
return max(1, ADMIN_REPORT_WEIGHT)
return 1
class ListenerTracker:
"""
Tracks logical active listeners from the web UI with heartbeats.
Anonymous users are counted.
Signed-in HF users are counted and exposed with avatar metadata.
Direct /stream.mp3 listeners are handled separately by BroadcastEngine.
"""
def __init__(self, ttl_seconds: int = 45):
self.lock = threading.Lock()
self.listeners = {}
self.total_sessions = 0
self.ttl_seconds = ttl_seconds
def _cleanup_unlocked(self):
now = time.time()
expired_ids = [
listener_id
for listener_id, item in self.listeners.items()
if now - item.get("last_seen", 0) > self.ttl_seconds
]
for listener_id in expired_ids:
self.listeners.pop(listener_id, None)
if expired_ids:
print(
f"[listeners] cleaned up expired={len(expired_ids)} "
f"active={len(self.listeners)}"
)
def start(self, listener_id: str, user: Optional[dict] = None):
now = time.time()
with self.lock:
is_new = listener_id not in self.listeners
previous = self.listeners.get(listener_id, {})
self.listeners[listener_id] = {
"started_at": previous.get("started_at", now),
"last_seen": now,
"user": user,
}
if is_new:
self.total_sessions += 1
self._cleanup_unlocked()
print(
f"[listeners] start active={len(self.listeners)} "
f"total_sessions={self.total_sessions} "
f"user={user.get('username') if user else 'anonymous'}"
)
def heartbeat(self, listener_id: str, user: Optional[dict] = None):
now = time.time()
with self.lock:
is_new = listener_id not in self.listeners
previous = self.listeners.get(listener_id, {})
self.listeners[listener_id] = {
"started_at": previous.get("started_at", now),
"last_seen": now,
"user": user or previous.get("user"),
}
if is_new:
self.total_sessions += 1
self._cleanup_unlocked()
def stop(self, listener_id: str):
with self.lock:
self.listeners.pop(listener_id, None)
self._cleanup_unlocked()
print(f"[listeners] stop active={len(self.listeners)}")
def snapshot(self):
with self.lock:
self._cleanup_unlocked()
seen_user_ids = set()
signed_in_profiles = []
for item in self.listeners.values():
user = item.get("user")
if not user:
continue
user_id = user.get("id") or user.get("username")
if user_id in seen_user_ids:
continue
seen_user_ids.add(user_id)
signed_in_profiles.append(
{
"username": user.get("username"),
"avatar_url": user.get("avatar_url"),
"profile_url": user.get("profile_url"),
}
)
total_signed_in_listener_count = len(signed_in_profiles)
# Random but stable for one minute, to avoid UI flicker every /api/status refresh.
rng = random.Random(int(time.time() // 60))
rng.shuffle(signed_in_profiles)
visible_signed_in_profiles = signed_in_profiles[:8]
return {
"active_listeners": len(self.listeners),
"total_connections": self.total_sessions,
"listener_ttl_seconds": self.ttl_seconds,
"listener_profiles": visible_signed_in_profiles,
"signed_in_listener_count": total_signed_in_listener_count,
"visible_listener_profile_count": len(visible_signed_in_profiles),
}
listeners = ListenerTracker(ttl_seconds=LISTENER_TTL_SECONDS)
class LikeStore:
def __init__(self, likes_dir: str, likes_file: str):
self.lock = threading.Lock()
self.likes_dir = Path(likes_dir)
self.likes_path = self.likes_dir / likes_file
self.tmp_path = self.likes_dir / f".{likes_file}.tmp"
self.data = {}
self.last_error = None
self.load()
def load(self):
with self.lock:
try:
self.likes_dir.mkdir(parents=True, exist_ok=True)
if self.likes_path.exists():
with open(self.likes_path, "r", encoding="utf-8") as f:
self.data = json.load(f)
else:
self.data = {}
self._save_unlocked()
self.last_error = None
print(f"[likes] Loaded {len(self.data)} feedback entries from {self.likes_path}")
except Exception as e:
self.last_error = str(e)
self.data = {}
print(f"[likes] Could not load feedback: {e}")
def _save_unlocked(self):
payload = json.dumps(self.data, indent=2, ensure_ascii=False)
with open(self.tmp_path, "w", encoding="utf-8") as f:
f.write(payload)
os.replace(self.tmp_path, self.likes_path)
def _item_unlocked(self, song_id: str) -> dict:
item = dict(self.data.get(song_id, {}))
item["count"] = int(item.get("count", 0))
item["upvote_count"] = int(item.get("upvote_count", 0))
item["downvote_count"] = int(item.get("downvote_count", 0))
item["report_count"] = int(item.get("report_count", 0))
return item
def _public_result_unlocked(self, song_id: str, item: dict, **extra) -> dict:
payload = {
"song_id": song_id,
"count": int(item.get("count", 0)),
"upvote_count": int(item.get("upvote_count", 0)),
"downvote_count": int(item.get("downvote_count", 0)),
"report_count": int(item.get("report_count", 0)),
"updated_at": item.get("updated_at"),
}
payload.update(extra)
return payload
def get_count(self, song_id: str) -> int:
with self.lock:
return int(self.data.get(song_id, {}).get("count", 0))
def get_upvote_count(self, song_id: str) -> int:
with self.lock:
return int(self.data.get(song_id, {}).get("upvote_count", 0))
def get_downvote_count(self, song_id: str) -> int:
with self.lock:
return int(self.data.get(song_id, {}).get("downvote_count", 0))
def get_report_count(self, song_id: str) -> int:
with self.lock:
return int(self.data.get(song_id, {}).get("report_count", 0))
def increment(self, song_id: str) -> dict:
now = datetime.now(timezone.utc).isoformat()
with self.lock:
item = self._item_unlocked(song_id)
item["count"] = int(item.get("count", 0)) + 1
item["updated_at"] = now
self.data[song_id] = item
try:
self._save_unlocked()
self.last_error = None
except Exception as e:
self.last_error = str(e)
print(f"[likes] Could not save like: {e}")
raise
return self._public_result_unlocked(song_id, item)
def upvote(self, song_id: str) -> dict:
now = datetime.now(timezone.utc).isoformat()
with self.lock:
item = self._item_unlocked(song_id)
item["upvote_count"] = int(item.get("upvote_count", 0)) + 1
item["updated_at"] = now
self.data[song_id] = item
try:
self._save_unlocked()
self.last_error = None
except Exception as e:
self.last_error = str(e)
print(f"[likes] Could not save upvote: {e}")
raise
return self._public_result_unlocked(song_id, item)
def unupvote(self, song_id: str) -> dict:
now = datetime.now(timezone.utc).isoformat()
with self.lock:
item = self._item_unlocked(song_id)
item["upvote_count"] = max(0, int(item.get("upvote_count", 0)) - 1)
item["updated_at"] = now
self.data[song_id] = item
try:
self._save_unlocked()
self.last_error = None
except Exception as e:
self.last_error = str(e)
print(f"[likes] Could not undo upvote: {e}")
raise
return self._public_result_unlocked(song_id, item)
def downvote(self, song_id: str) -> dict:
now = datetime.now(timezone.utc).isoformat()
with self.lock:
item = self._item_unlocked(song_id)
item["downvote_count"] = int(item.get("downvote_count", 0)) + 1
item["updated_at"] = now
self.data[song_id] = item
try:
self._save_unlocked()
self.last_error = None
except Exception as e:
self.last_error = str(e)
print(f"[likes] Could not save downvote: {e}")
raise
return self._public_result_unlocked(song_id, item)
def undownvote(self, song_id: str) -> dict:
now = datetime.now(timezone.utc).isoformat()
with self.lock:
item = self._item_unlocked(song_id)
item["downvote_count"] = max(0, int(item.get("downvote_count", 0)) - 1)
item["updated_at"] = now
self.data[song_id] = item
try:
self._save_unlocked()
self.last_error = None
except Exception as e:
self.last_error = str(e)
print(f"[likes] Could not undo downvote: {e}")
raise
return self._public_result_unlocked(song_id, item)
def report(self, song_id: str, weight: int = 1) -> dict:
now = datetime.now(timezone.utc).isoformat()
weight = max(1, int(weight or 1))
with self.lock:
item = self._item_unlocked(song_id)
item["report_count"] = int(item.get("report_count", 0)) + weight
item["reported_at"] = now
item["updated_at"] = now
self.data[song_id] = item
try:
self._save_unlocked()
self.last_error = None
except Exception as e:
self.last_error = str(e)
print(f"[likes] Could not save report: {e}")
raise
return self._public_result_unlocked(song_id, item, reported_at=now, report_weight=weight)
def unreport(self, song_id: str, weight: int = 1) -> dict:
now = datetime.now(timezone.utc).isoformat()
weight = max(1, int(weight or 1))
with self.lock:
item = self._item_unlocked(song_id)
item["report_count"] = max(0, int(item.get("report_count", 0)) - weight)
item["unreported_at"] = now
item["updated_at"] = now
if item["report_count"] == 0:
item.pop("reported_at", None)
self.data[song_id] = item
try:
self._save_unlocked()
self.last_error = None
except Exception as e:
self.last_error = str(e)
print(f"[likes] Could not save unreport: {e}")
raise
return self._public_result_unlocked(song_id, item, unreported_at=now, report_weight=weight)
def snapshot(self) -> dict:
with self.lock:
return dict(self.data)
likes = LikeStore(LIKES_DIR, LIKES_FILE)
class RadioState:
"""
Holds the logical radio state.
In V2, this class no longer produces one stream per listener.
The BroadcastEngine is responsible for audio output and advances the radio
when the current audio item finishes.
"""
def __init__(self):
self.lock = threading.Lock()
self.fs = HfFileSystem()
self.tracks = []
self.jingles = []
self.segments = []
self.custom_tracks = []
self.current_index: Optional[int] = None
self.current_kind = "song"
self.current_jingle: Optional[dict] = None
self.current_segment: Optional[dict] = None
self.started_at: Optional[float] = None
self.is_playing = False
self.mode = RADIO_MODE
self.is_loading = False
self.last_error = None
self.last_refresh_at: Optional[float] = None
self.next_tracks = []
self.next_rotation_ready = False
self.next_rotation_is_building = False
self.next_rotation_built_at: Optional[float] = None
self.next_rotation_target_at: Optional[str] = None
self.next_rotation_last_error = None
self.next_rotation_source = None
self.next_rotation_payload = None
self.approved_next_rotation_last_loaded_at = None
self.approved_next_rotation_last_error = None
self.approved_next_rotation_last_consumed_at = None
self.last_rotation_activated_at: Optional[float] = None
self.current_rotation_source = "initial"
self.current_rotation_target_at = None
self.current_rotation_activated_at = None
self.current_rotation_track_ids = []
self.songs_since_last_jingle = 0
self.last_jingle_id: Optional[str] = None
self.songs_since_last_segment = 0
self.last_segment_id: Optional[str] = None
self.recently_played_songs: list[dict] = []
self.runtime_config = self._default_runtime_config()
self.runtime_config_source = "defaults"
self.runtime_config_last_loaded_at = None
self.runtime_config_last_error = None
self.runtime_control_payload = {}
self.track_overrides = {}
self.track_overrides_last_loaded_at = None
self.track_overrides_last_error = None
self.cover_overrides_count = 0
self.custom_tracks_last_loaded_at = None
self.custom_tracks_last_error = None
def _default_runtime_config(self) -> dict:
return {
"max_tracks": MAX_TRACKS,
"rotation_target_seconds": ROTATION_TARGET_SECONDS,
"report_hide_threshold": REPORT_HIDE_THRESHOLD,
"like_weight_per_like": LIKE_WEIGHT_PER_LIKE,
"like_weight_max": LIKE_WEIGHT_MAX,
"upvote_weight_per_upvote": UPVOTE_WEIGHT_PER_UPVOTE,
"upvote_weight_max": UPVOTE_WEIGHT_MAX,
"downvote_weight_per_downvote": DOWNVOTE_WEIGHT_PER_DOWNVOTE,
"downvote_weight_max": DOWNVOTE_WEIGHT_MAX,
"track_weight_min": TRACK_WEIGHT_MIN,
"track_weight_max": TRACK_WEIGHT_MAX,
"auto_rotation_min_effective_score_exclusive": AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE,
"recent_track_quota": RECENT_TRACK_QUOTA,
"recent_track_window_hours": RECENT_TRACK_WINDOW_HOURS,
"recently_played_exclude_minutes": RECENTLY_PLAYED_EXCLUDE_MINUTES,
"recently_played_memory_max": RECENTLY_PLAYED_MEMORY_MAX,
"recently_played_min_candidates_after_filter": RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER,
"meta_load_workers": META_LOAD_WORKERS,
"enable_jingles": ENABLE_JINGLES,
"enable_remote_jingles": ENABLE_REMOTE_JINGLES,
"min_songs_between_jingles": MIN_SONGS_BETWEEN_JINGLES,
"max_songs_between_jingles": MAX_SONGS_BETWEEN_JINGLES,
"jingle_probability_after_min": JINGLE_PROBABILITY_AFTER_MIN,
"enable_segments": ENABLE_SEGMENTS,
"min_songs_between_segments": MIN_SONGS_BETWEEN_SEGMENTS,
"max_songs_between_segments": MAX_SONGS_BETWEEN_SEGMENTS,
"segment_probability_after_min": SEGMENT_PROBABILITY_AFTER_MIN,
"enable_custom_tracks": ENABLE_CUSTOM_TRACKS,
"custom_track_quota": CUSTOM_TRACK_QUOTA,
"enable_song_mastering": ENABLE_SONG_MASTERING,
"song_audio_filter": SONG_AUDIO_FILTER,
}
def _cfg(self, key: str, fallback=None):
if fallback is None:
fallback = self._default_runtime_config().get(key)
return self.runtime_config.get(key, fallback)
def _coerce_runtime_config(self, custom_config: dict) -> dict:
defaults = self._default_runtime_config()
result = dict(defaults)
for key, default_value in defaults.items():
if key not in custom_config:
continue
raw_value = custom_config.get(key)
if isinstance(default_value, bool):
value = safe_bool(raw_value, default_value)
elif isinstance(default_value, int) and not isinstance(default_value, bool):
value = safe_int(raw_value, default_value)
elif isinstance(default_value, float):
value = safe_float(raw_value, default_value)
elif isinstance(default_value, str):
value = str(raw_value or "")
else:
value = raw_value
if key in {
"recent_track_quota",
"jingle_probability_after_min",
"segment_probability_after_min",
"custom_track_quota",
}:
value = float(clamp_number(float(value), 0.0, 1.0))
if key in {
"max_tracks",
"rotation_target_seconds",
"meta_load_workers",
"recent_track_window_hours",
"recently_played_exclude_minutes",
"recently_played_memory_max",
"recently_played_min_candidates_after_filter",
"min_songs_between_jingles",
"max_songs_between_jingles",
"min_songs_between_segments",
"max_songs_between_segments",
}:
value = int(clamp_number(int(value), 0, None))
if key in {
"like_weight_per_like",
"like_weight_max",
"upvote_weight_per_upvote",
"upvote_weight_max",
"downvote_weight_per_downvote",
"downvote_weight_max",
"track_weight_min",
"track_weight_max",
}:
value = float(clamp_number(float(value), 0.0, None))
result[key] = value
if result["max_songs_between_jingles"] < result["min_songs_between_jingles"]:
result["max_songs_between_jingles"] = result["min_songs_between_jingles"]
if result["max_songs_between_segments"] < result["min_songs_between_segments"]:
result["max_songs_between_segments"] = result["min_songs_between_segments"]
if result["track_weight_max"] < result["track_weight_min"]:
result["track_weight_max"] = result["track_weight_min"]
return result
def refresh_runtime_controls(self):
control_dir = Path(RADIO_CONTROL_DIR)
config_path = control_dir / RADIO_CONTROL_CONFIG_FILE
overrides_path = control_dir / RADIO_TRACK_OVERRIDES_FILE
now = time.time()
runtime_config = self._default_runtime_config()
runtime_config_source = "defaults"
runtime_config_error = None
runtime_payload = {}
try:
if config_path.exists():
runtime_payload = json.loads(config_path.read_text(encoding="utf-8"))
enabled = safe_bool(runtime_payload.get("enabled", True), True)
use_custom_config = safe_bool(runtime_payload.get("use_custom_config", False), False)
custom_config = runtime_payload.get("config", {})
if enabled and use_custom_config and isinstance(custom_config, dict):
runtime_config = self._coerce_runtime_config(custom_config)
runtime_config_source = "studio_custom"
else:
runtime_config_source = "defaults_disabled_or_not_requested"
else:
runtime_config_source = "defaults_no_config_file"
except Exception as e:
runtime_config = self._default_runtime_config()
runtime_config_source = "defaults_config_error"
runtime_config_error = str(e)
print(f"[radio-control] Could not load config {config_path}: {e}")
track_overrides = {}
overrides_error = None
try:
if overrides_path.exists():
payload = json.loads(overrides_path.read_text(encoding="utf-8"))
if isinstance(payload, dict):
for song_id, override in payload.items():
if not song_id or not isinstance(override, dict):
continue
track_overrides[str(song_id)] = {
"hidden": safe_bool(override.get("hidden", False), False),
"upvote_boost": max(0, safe_int(override.get("upvote_boost", 0), 0)),
"downvote_boost": max(0, safe_int(override.get("downvote_boost", 0), 0)),
"like_boost": max(0, safe_int(override.get("like_boost", 0), 0)),
"notes": str(override.get("notes") or ""),
"updated_at": override.get("updated_at"),
}
except Exception as e:
overrides_error = str(e)
print(f"[radio-control] Could not load track overrides {overrides_path}: {e}")
cover_overrides_count = 0
cover_root = Path(COVER_OVERRIDES_DIR)
try:
if cover_root.exists():
cover_overrides_count = len(list(cover_root.glob("*/thumb.png")))
except Exception as e:
print(f"[cover-overrides] Could not count cover overrides: {e}")
with self.lock:
self.runtime_config = runtime_config
self.runtime_config_source = runtime_config_source
self.runtime_config_last_loaded_at = now
self.runtime_config_last_error = runtime_config_error
self.runtime_control_payload = runtime_payload if isinstance(runtime_payload, dict) else {}
self.track_overrides = track_overrides
self.track_overrides_last_loaded_at = now
self.track_overrides_last_error = overrides_error
self.cover_overrides_count = cover_overrides_count
print(
f"[radio-control] config_source={runtime_config_source} "
f"overrides={len(track_overrides)} covers={cover_overrides_count}"
)
def _reload_track_overrides_unlocked(self):
"""
Reload only /radio-control/track_overrides.json while self.lock is
already held.
This is intentionally lighter than refresh_runtime_controls(): it does
not reload the full runtime config, jingles, segments, custom tracks or
rotation pack. Its only purpose is to make Studio boosts/hidden flags
available when a queued song starts playing, without waiting for the
next full radio refresh.
"""
overrides_path = Path(RADIO_CONTROL_DIR) / RADIO_TRACK_OVERRIDES_FILE
now = time.time()
track_overrides = {}
overrides_error = None
try:
if overrides_path.exists():
payload = json.loads(overrides_path.read_text(encoding="utf-8"))
if isinstance(payload, dict):
for song_id, override in payload.items():
if not song_id or not isinstance(override, dict):
continue
track_overrides[str(song_id)] = {
"hidden": safe_bool(override.get("hidden", False), False),
"upvote_boost": max(0, safe_int(override.get("upvote_boost", 0), 0)),
"downvote_boost": max(0, safe_int(override.get("downvote_boost", 0), 0)),
"like_boost": max(0, safe_int(override.get("like_boost", 0), 0)),
"notes": str(override.get("notes") or ""),
"updated_at": override.get("updated_at"),
}
except Exception as e:
overrides_error = str(e)
print(f"[radio-control] Could not reload track overrides {overrides_path}: {e}")
self.track_overrides = track_overrides
self.track_overrides_last_loaded_at = now
self.track_overrides_last_error = overrides_error
def _track_override_unlocked(self, song_id: Optional[str]) -> dict:
if not song_id:
return {}
return dict(self.track_overrides.get(str(song_id), {}))
def _cover_override_path(self, song_id: Optional[str]) -> Optional[Path]:
if not song_id:
return None
path = Path(COVER_OVERRIDES_DIR) / str(song_id) / "thumb.png"
if path.exists() and path.is_file():
return path
return None
def _apply_track_controls_unlocked(self, track: dict) -> dict:
if not track or track.get("is_jingle") or track.get("is_segment"):
return track
song_id = str(track.get("id") or "")
override = self._track_override_unlocked(song_id)
like_count = safe_int(track.get("like_count", 0), 0)
upvote_count = safe_int(track.get("upvote_count", 0), 0)
downvote_count = safe_int(track.get("downvote_count", 0), 0)
report_count = safe_int(track.get("report_count", 0), 0)
like_boost = max(0, safe_int(override.get("like_boost", 0), 0))
upvote_boost = max(0, safe_int(override.get("upvote_boost", 0), 0))
downvote_boost = max(0, safe_int(override.get("downvote_boost", 0), 0))
track["studio_hidden"] = bool(override.get("hidden", False))
track["studio_notes"] = override.get("notes", "")
track["studio_like_boost"] = like_boost
track["studio_upvote_boost"] = upvote_boost
track["studio_downvote_boost"] = downvote_boost
track["effective_like_count"] = like_count + like_boost
track["effective_upvote_count"] = upvote_count + upvote_boost
track["effective_downvote_count"] = downvote_count + downvote_boost
track["effective_report_count"] = report_count
cover_override = self._cover_override_path(song_id)
if cover_override:
track["thumb_url"] = f"/cover-overrides/{song_id}/thumb.png"
track["has_cover_override"] = True
else:
track["has_cover_override"] = False
return track
def _prune_recently_played_unlocked(self):
if not self.recently_played_songs:
return
now = time.time()
recently_played_exclude_minutes = int(self._cfg("recently_played_exclude_minutes", RECENTLY_PLAYED_EXCLUDE_MINUTES))
recently_played_memory_max = int(self._cfg("recently_played_memory_max", RECENTLY_PLAYED_MEMORY_MAX))
if recently_played_exclude_minutes > 0:
cutoff = now - (recently_played_exclude_minutes * 60)
self.recently_played_songs = [
item
for item in self.recently_played_songs
if item.get("id") and float(item.get("played_at", 0)) >= cutoff
]
if recently_played_memory_max > 0:
self.recently_played_songs = self.recently_played_songs[-recently_played_memory_max:]
def _remember_played_song_unlocked(self, song_id: Optional[str]):
if not song_id:
return
self.recently_played_songs.append({
"id": str(song_id),
"played_at": time.time(),
})
self._prune_recently_played_unlocked()
def _recently_played_exclusion_set_unlocked(self) -> set[str]:
recently_played_exclude_minutes = int(self._cfg("recently_played_exclude_minutes", RECENTLY_PLAYED_EXCLUDE_MINUTES))
if recently_played_exclude_minutes <= 0:
return set()
self._prune_recently_played_unlocked()
now = time.time()
cutoff = now - (recently_played_exclude_minutes * 60)
return {
str(item.get("id"))
for item in self.recently_played_songs
if item.get("id") and float(item.get("played_at", 0)) >= cutoff
}
def _make_basic_track(self, song_id: str):
track = {
"id": song_id,
"title": f"Song {song_id}",
"description": "",
"lyrics": "",
"tags": [],
"duration": DEFAULT_DURATION,
"created_at": "",
"audio_url": f"{BUCKET_RESOLVE_URL}/songs/{song_id}/{song_id}.wav",
"stream_url": f"{BUCKET_RESOLVE_URL}/songs/{song_id}/{song_id}.wav",
"thumb_url": f"{BUCKET_RESOLVE_URL}/songs/{song_id}/thumb.png",
"meta_loaded": False,
"like_count": likes.get_count(song_id),
"upvote_count": likes.get_upvote_count(song_id),
"downvote_count": likes.get_downvote_count(song_id),
"report_count": likes.get_report_count(song_id),
"is_jingle": False,
"is_segment": False,
"is_custom_track": False,
}
return self._apply_track_controls_unlocked(track)
def _make_jingle_track(self, path: Path):
duration = probe_audio_duration(str(path), DEFAULT_JINGLE_DURATION)
return {
"id": path.stem,
"title": "HF Community Radio",
"description": "Station ID",
"lyrics": "",
"tags": ["jingle"],
"duration": duration,
"created_at": "",
"audio_url": f"/jingles/{path.name}",
"stream_url": str(path),
"thumb_url": "",
"meta_loaded": True,
"like_count": 0,
"upvote_count": 0,
"downvote_count": 0,
"report_count": 0,
"is_jingle": True,
"is_segment": False,
"is_custom_track": False,
"source_url": "",
}
def _make_remote_jingle_track(self, meta_path: Path):
meta = json.loads(meta_path.read_text(encoding="utf-8"))
if not asset_is_enabled(meta):
return None
audio_path = meta_path.parent / "audio.mp3"
if not audio_path.exists():
print(f"[jingles] Missing remote jingle audio: {audio_path}")
return None
asset_id = meta.get("id") or meta_path.parent.name
kind = meta.get("kind") or "station_id"
duration = safe_float(meta.get("duration"), DEFAULT_JINGLE_DURATION)
return {
"id": str(asset_id),
"title": meta.get("title") or "HF Radio",
"description": meta.get("description") or "",
"lyrics": "",
"tags": ["jingle", str(kind)],
"duration": duration,
"created_at": meta.get("created_at", ""),
"updated_at": meta.get("updated_at", ""),
"audio_url": meta.get("audio_url") or "",
"stream_url": str(audio_path),
"thumb_url": "",
"meta_loaded": True,
"like_count": 0,
"upvote_count": 0,
"downvote_count": 0,
"report_count": 0,
"is_jingle": True,
"is_segment": False,
"is_custom_track": False,
"kind": kind,
"source_url": "",
}
def _make_segment_track(self, meta_path: Path):
meta = json.loads(meta_path.read_text(encoding="utf-8"))
if not asset_is_enabled(meta):
return None
if asset_is_expired(meta):
return None
audio_path = meta_path.parent / "audio.mp3"
if not audio_path.exists():
print(f"[segments] Missing segment audio: {audio_path}")
return None
asset_id = meta.get("id") or meta_path.parent.name
kind = meta.get("kind") or "community_news"
duration = safe_float(meta.get("duration"), DEFAULT_SEGMENT_DURATION)
tags = ["community update", str(kind).replace("_", " ")]
return {
"id": str(asset_id),
"title": meta.get("title") or "Community Update",
"description": meta.get("description") or "",
"lyrics": "",
"tags": tags,
"duration": duration,
"created_at": meta.get("created_at", ""),
"updated_at": meta.get("updated_at", ""),
"expires_at": meta.get("expires_at"),
"audio_url": meta.get("audio_url") or "",
"stream_url": str(audio_path),
"thumb_url": "",
"meta_loaded": True,
"like_count": 0,
"upvote_count": 0,
"downvote_count": 0,
"report_count": 0,
"is_jingle": False,
"is_segment": True,
"is_custom_track": False,
"kind": kind,
"public_label": meta.get("public_label") or "Community Update",
"source_url": meta.get("source_url") or "",
}
def _make_custom_track(self, meta_path: Path):
meta = json.loads(meta_path.read_text(encoding="utf-8"))
if not asset_is_enabled(meta):
return None
audio_path = meta_path.parent / "audio.mp3"
if not audio_path.exists():
print(f"[custom-tracks] Missing custom track audio: {audio_path}")
return None
asset_id = str(meta.get("id") or meta_path.parent.name)
duration = safe_float(meta.get("duration"), DEFAULT_CUSTOM_TRACK_DURATION)
tags = normalize_tags(meta.get("tags", []))
if not tags:
tags = ["custom track"]
thumb_path = meta_path.parent / "thumb.png"
thumb_url = meta.get("thumb_url") or ""
if thumb_path.exists() and thumb_path.is_file():
thumb_url = f"/custom-tracks/{asset_id}/thumb.png"
track = {
"id": asset_id,
"title": meta.get("title") or f"Custom Track {asset_id}",
"description": meta.get("description", ""),
"lyrics": meta.get("lyrics", ""),
"tags": tags,
"duration": duration,
"created_at": meta.get("created_at", ""),
"updated_at": meta.get("updated_at", ""),
"audio_url": meta.get("audio_url") or "",
"stream_url": str(audio_path),
"thumb_url": thumb_url,
"meta_loaded": True,
"like_count": likes.get_count(asset_id),
"upvote_count": likes.get_upvote_count(asset_id),
"downvote_count": likes.get_downvote_count(asset_id),
"report_count": likes.get_report_count(asset_id),
"is_jingle": False,
"is_segment": False,
"is_custom_track": True,
"type": meta.get("type") or "custom_track",
"source_model": meta.get("source_model") or "",
"provider": meta.get("provider") or "",
"prompt": meta.get("prompt") or "",
"quality_score": meta.get("quality_score"),
"quality_notes": meta.get("quality_notes") or "",
"internal_notes": meta.get("internal_notes") or "",
}
return self._apply_track_controls_unlocked(track)
def _song_is_hidden_by_reports(self, song_id: str) -> bool:
override = self._track_override_unlocked(song_id)
if override.get("hidden"):
return True
report_hide_threshold = int(self._cfg("report_hide_threshold", REPORT_HIDE_THRESHOLD))
if report_hide_threshold <= 0:
return False
return likes.get_report_count(song_id) >= report_hide_threshold
def _effective_score_for_song_id_unlocked(self, song_id: Optional[str]) -> int:
"""Return the editorial effective score used for auto-rotation eligibility.
Keep this aligned with Studio's generated draft score contract:
likes + upvotes + Studio boosts - downvotes - Studio downvote penalty - reports.
"""
if not song_id:
return 0
song_id = str(song_id)
override = self._track_override_unlocked(song_id)
if not isinstance(override, dict):
override = {}
like_count = likes.get_count(song_id)
upvote_count = likes.get_upvote_count(song_id)
downvote_count = likes.get_downvote_count(song_id)
report_count = likes.get_report_count(song_id)
like_boost = max(0, safe_int(override.get("like_boost", 0), 0))
upvote_boost = max(0, safe_int(override.get("upvote_boost", 0), 0))
downvote_boost = max(0, safe_int(override.get("downvote_boost", 0), 0))
return like_count + upvote_count + like_boost + upvote_boost - downvote_count - downvote_boost - report_count
def _effective_score_for_track_unlocked(self, track: dict) -> int:
if not track:
return 0
song_id = track.get("id")
if song_id:
return self._effective_score_for_song_id_unlocked(str(song_id))
like_count = safe_int(track.get("effective_like_count", track.get("like_count", 0)), 0)
upvote_count = safe_int(track.get("effective_upvote_count", track.get("upvote_count", 0)), 0)
downvote_count = safe_int(track.get("effective_downvote_count", track.get("downvote_count", 0)), 0)
report_count = safe_int(track.get("effective_report_count", track.get("report_count", 0)), 0)
return like_count + upvote_count - downvote_count - report_count
def _track_is_below_auto_rotation_score_floor_unlocked(self, track_or_song_id) -> bool:
threshold = float(self._cfg(
"auto_rotation_min_effective_score_exclusive",
AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE,
))
if isinstance(track_or_song_id, dict):
score = self._effective_score_for_track_unlocked(track_or_song_id)
else:
score = self._effective_score_for_song_id_unlocked(str(track_or_song_id or ""))
return score <= threshold
def _feedback_weight_for_track(self, track: dict) -> float:
like_count = safe_int(track.get("effective_like_count", track.get("like_count", 0)), 0)
upvote_count = safe_int(track.get("effective_upvote_count", track.get("upvote_count", 0)), 0)
downvote_count = safe_int(track.get("effective_downvote_count", track.get("downvote_count", 0)), 0)
like_weight_per_like = float(self._cfg("like_weight_per_like", LIKE_WEIGHT_PER_LIKE))
like_weight_max = float(self._cfg("like_weight_max", LIKE_WEIGHT_MAX))
upvote_weight_per_upvote = float(self._cfg("upvote_weight_per_upvote", UPVOTE_WEIGHT_PER_UPVOTE))
upvote_weight_max = float(self._cfg("upvote_weight_max", UPVOTE_WEIGHT_MAX))
downvote_weight_per_downvote = float(self._cfg("downvote_weight_per_downvote", DOWNVOTE_WEIGHT_PER_DOWNVOTE))
downvote_weight_max = float(self._cfg("downvote_weight_max", DOWNVOTE_WEIGHT_MAX))
track_weight_min = float(self._cfg("track_weight_min", TRACK_WEIGHT_MIN))
track_weight_max = float(self._cfg("track_weight_max", TRACK_WEIGHT_MAX))
like_bonus = min(max(0, like_count) * like_weight_per_like, max(0.0, like_weight_max - 1.0))
upvote_bonus = min(max(0, upvote_count) * upvote_weight_per_upvote, upvote_weight_max)
downvote_penalty = min(max(0, downvote_count) * downvote_weight_per_downvote, downvote_weight_max)
weight = 1.0 + like_bonus + upvote_bonus - downvote_penalty
return max(track_weight_min, min(track_weight_max, weight))
def _weighted_pop_track(self, tracks: list[dict]) -> dict:
weights = [self._feedback_weight_for_track(track) for track in tracks]
total_weight = sum(weights)
if total_weight <= 0:
selected_index = random.randrange(len(tracks))
else:
pick = random.uniform(0, total_weight)
cumulative = 0.0
selected_index = len(tracks) - 1
for i, weight in enumerate(weights):
cumulative += weight
if pick <= cumulative:
selected_index = i
break
return tracks.pop(selected_index)
def _track_duration_seconds(self, track: dict) -> float:
try:
duration = float(track.get("duration") or DEFAULT_DURATION)
except Exception:
duration = DEFAULT_DURATION
return max(0.0, duration)
def _track_is_recent(self, track: dict, now: Optional[datetime] = None) -> bool:
recent_track_quota = float(self._cfg("recent_track_quota", RECENT_TRACK_QUOTA))
recent_track_window_hours = int(self._cfg("recent_track_window_hours", RECENT_TRACK_WINDOW_HOURS))
if recent_track_quota <= 0 or recent_track_window_hours <= 0:
return False
created_at = parse_datetime(track.get("created_at"))
if created_at is None:
return False
now = now or datetime.now(timezone.utc)
recent_cutoff = now - timedelta(hours=recent_track_window_hours)
return created_at >= recent_cutoff
def _average_jingle_duration_unlocked(self) -> float:
if not self._cfg("enable_jingles", ENABLE_JINGLES) or not self.jingles:
return 0.0
durations = []
for jingle in self.jingles:
try:
duration = float(jingle.get("duration") or 0)
except Exception:
duration = 0
if duration > 0:
durations.append(duration)
if not durations:
return DEFAULT_JINGLE_DURATION
return sum(durations) / len(durations)
def _expected_songs_per_jingle(self) -> int:
min_songs_between_jingles = int(self._cfg("min_songs_between_jingles", MIN_SONGS_BETWEEN_JINGLES))
max_songs_between_jingles = int(self._cfg("max_songs_between_jingles", MAX_SONGS_BETWEEN_JINGLES))
if min_songs_between_jingles <= 0 and max_songs_between_jingles <= 0:
return 1
expected = round((min_songs_between_jingles + max_songs_between_jingles) / 2)
return max(1, expected)
def _estimated_jingle_seconds_for_song_count(self, song_count: int, average_jingle_duration: float) -> float:
if not self._cfg("enable_jingles", ENABLE_JINGLES) or average_jingle_duration <= 0:
return 0.0
expected_songs_per_jingle = self._expected_songs_per_jingle()
estimated_jingle_count = song_count // expected_songs_per_jingle
return estimated_jingle_count * average_jingle_duration
def _estimated_rotation_seconds(self, tracks: list[dict], average_jingle_duration: float, current_remaining_seconds: Optional[float] = None) -> float:
song_seconds = 0.0
for i, track in enumerate(tracks):
if i == 0 and current_remaining_seconds is not None:
song_seconds += current_remaining_seconds
continue
try:
song_seconds += float(track.get("duration") or DEFAULT_DURATION)
except Exception:
song_seconds += DEFAULT_DURATION
return song_seconds + self._estimated_jingle_seconds_for_song_count(
len(tracks),
average_jingle_duration,
)
def _load_track_from_meta_path(self, meta_path: str, song_id: str) -> dict:
track = self._make_basic_track(song_id)
try:
with self.fs.open(meta_path, "r") as f:
meta = json.load(f)
track["title"] = meta.get("title") or track["title"]
track["description"] = meta.get("description", "")
track["lyrics"] = meta.get("lyrics", "")
track["tags"] = normalize_tags(meta.get("tags", []))
track["created_at"] = meta.get("created_at", "")
duration = meta.get("duration")
try:
track["duration"] = float(duration) if duration is not None else DEFAULT_DURATION
except Exception:
track["duration"] = DEFAULT_DURATION
track["audio_url"] = meta.get("audio_url") or track["audio_url"]
track["stream_url"] = track["audio_url"]
track["thumb_url"] = meta.get("thumb_url") or track["thumb_url"]
track["meta_loaded"] = True
except Exception as e:
print(f"[radio] Could not load metadata for {song_id} during refresh: {e}")
track["meta_loaded"] = True
track["like_count"] = likes.get_count(song_id)
track["upvote_count"] = likes.get_upvote_count(song_id)
track["downvote_count"] = likes.get_downvote_count(song_id)
track["report_count"] = likes.get_report_count(song_id)
self._apply_track_controls_unlocked(track)
return track
def refresh_jingles(self):
if not self._cfg("enable_jingles", ENABLE_JINGLES):
with self.lock:
self.jingles = []
self.current_jingle = None
if self.current_kind == "jingle":
self.current_kind = "song"
print("[jingles] Disabled")
return
new_jingles = []
jingles_dir = Path(JINGLES_DIR)
paths = sorted(jingles_dir.glob(JINGLES_GLOB))
for path in paths:
if not path.is_file():
continue
try:
new_jingles.append(self._make_jingle_track(path))
except Exception as e:
print(f"[jingles] Could not load local jingle {path}: {e}")
if self._cfg("enable_remote_jingles", ENABLE_REMOTE_JINGLES):
remote_jingles_dir = Path(REMOTE_JINGLES_DIR)
if remote_jingles_dir.exists():
for meta_path in sorted(remote_jingles_dir.glob("*/meta.json")):
try:
jingle = self._make_remote_jingle_track(meta_path)
if jingle:
new_jingles.append(jingle)
except Exception as e:
print(f"[jingles] Could not load remote jingle {meta_path}: {e}")
else:
print(f"[jingles] Remote jingles mount not found: {remote_jingles_dir}")
with self.lock:
current_jingle_id = self.current_jingle.get("id") if self.current_jingle else None
self.jingles = new_jingles
if self.current_kind == "jingle":
matching_jingle = next(
(
jingle
for jingle in self.jingles
if jingle["id"] == current_jingle_id
),
None,
)
if matching_jingle:
self.current_jingle = matching_jingle
else:
self.current_kind = "song"
self.current_jingle = None
print(
f"[jingles] Loaded {len(new_jingles)} jingles "
f"local_dir={JINGLES_DIR}/{JINGLES_GLOB} remote_dir={REMOTE_JINGLES_DIR}"
)
def refresh_segments(self):
if not self._cfg("enable_segments", ENABLE_SEGMENTS):
with self.lock:
self.segments = []
self.current_segment = None
if self.current_kind == "segment":
self.current_kind = "song"
print("[segments] Disabled")
return
new_segments = []
remote_segments_dir = Path(REMOTE_SEGMENTS_DIR)
if remote_segments_dir.exists():
for meta_path in sorted(remote_segments_dir.glob("*/meta.json")):
try:
segment = self._make_segment_track(meta_path)
if segment:
new_segments.append(segment)
except Exception as e:
print(f"[segments] Could not load segment {meta_path}: {e}")
else:
print(f"[segments] Remote segments mount not found: {remote_segments_dir}")
with self.lock:
current_segment_id = self.current_segment.get("id") if self.current_segment else None
self.segments = new_segments
if self.current_kind == "segment":
matching_segment = next(
(
segment
for segment in self.segments
if segment["id"] == current_segment_id
),
None,
)
if matching_segment:
self.current_segment = matching_segment
else:
self.current_kind = "song"
self.current_segment = None
print(f"[segments] Loaded {len(new_segments)} enabled segments from {REMOTE_SEGMENTS_DIR}")
def refresh_custom_tracks(self):
if not bool(self._cfg("enable_custom_tracks", ENABLE_CUSTOM_TRACKS)):
with self.lock:
self.custom_tracks = []
self.custom_tracks_last_loaded_at = time.time()
self.custom_tracks_last_error = None
print("[custom-tracks] Disabled")
return
new_custom_tracks = []
custom_tracks_error = None
custom_tracks_dir = Path(CUSTOM_TRACKS_DIR)
try:
if custom_tracks_dir.exists():
for meta_path in sorted(custom_tracks_dir.glob("*/meta.json")):
try:
track = self._make_custom_track(meta_path)
if not track:
continue
if track.get("studio_hidden"):
continue
if self._song_is_hidden_by_reports(track.get("id")):
continue
new_custom_tracks.append(track)
except Exception as e:
print(f"[custom-tracks] Could not load custom track {meta_path}: {e}")
else:
print(f"[custom-tracks] Mount not found: {custom_tracks_dir}")
except Exception as e:
custom_tracks_error = str(e)
print(f"[custom-tracks] Could not refresh custom tracks: {e}")
with self.lock:
self.custom_tracks = new_custom_tracks
self.custom_tracks_last_loaded_at = time.time()
self.custom_tracks_last_error = custom_tracks_error
print(f"[custom-tracks] Loaded {len(new_custom_tracks)} enabled custom tracks from {CUSTOM_TRACKS_DIR}")
def _meta_path_for_song(self, song_id: str):
return f"{BUCKET_FS_ROOT}/songs/{song_id}/meta.json"
def _load_meta_for_track_unlocked(self, track: dict):
if not track:
return track
if track.get("is_jingle") or track.get("is_segment"):
return track
if track.get("is_custom_track"):
track["like_count"] = likes.get_count(track.get("id"))
track["upvote_count"] = likes.get_upvote_count(track.get("id"))
track["downvote_count"] = likes.get_downvote_count(track.get("id"))
track["report_count"] = likes.get_report_count(track.get("id"))
self._apply_track_controls_unlocked(track)
return track
song_id = track["id"]
if not track.get("meta_loaded"):
meta_path = self._meta_path_for_song(song_id)
try:
with self.fs.open(meta_path, "r") as f:
meta = json.load(f)
track["title"] = meta.get("title") or track["title"]
track["description"] = meta.get("description", "")
track["lyrics"] = meta.get("lyrics", "")
track["tags"] = normalize_tags(meta.get("tags", []))
track["created_at"] = meta.get("created_at", "")
duration = meta.get("duration")
try:
track["duration"] = float(duration) if duration is not None else DEFAULT_DURATION
except Exception:
track["duration"] = DEFAULT_DURATION
track["audio_url"] = meta.get("audio_url") or track["audio_url"]
track["stream_url"] = track["audio_url"]
track["thumb_url"] = meta.get("thumb_url") or track["thumb_url"]
track["meta_loaded"] = True
except Exception as e:
print(f"[radio] Could not load metadata for {song_id}: {e}")
track["meta_loaded"] = True
track["like_count"] = likes.get_count(song_id)
track["upvote_count"] = likes.get_upvote_count(song_id)
track["downvote_count"] = likes.get_downvote_count(song_id)
track["report_count"] = likes.get_report_count(song_id)
self._apply_track_controls_unlocked(track)
return track
def refresh_tracks(self, prebuild: bool = False, target_at: Optional[datetime] = None):
label = "prebuilding next rotation" if prebuild else "refreshing active rotation"
print(f"[radio] {label}: {BUCKET_ID}")
print(f"[radio] Bucket FS root: {BUCKET_FS_ROOT}")
with self.lock:
if prebuild:
self.next_rotation_is_building = True
self.next_rotation_last_error = None
self.next_rotation_target_at = target_at.isoformat() if target_at else next_utc_hour_iso()
else:
self.is_loading = True
self.last_error = None
runtime_config = dict(self.runtime_config)
average_jingle_duration = self._average_jingle_duration_unlocked()
custom_tracks_snapshot = [dict(track) for track in self.custom_tracks]
max_tracks = int(runtime_config.get("max_tracks", MAX_TRACKS))
rotation_target_seconds = int(runtime_config.get("rotation_target_seconds", ROTATION_TARGET_SECONDS))
recent_track_quota = float(runtime_config.get("recent_track_quota", RECENT_TRACK_QUOTA))
custom_track_quota = float(runtime_config.get("custom_track_quota", CUSTOM_TRACK_QUOTA))
recently_played_exclude_minutes = int(runtime_config.get("recently_played_exclude_minutes", RECENTLY_PLAYED_EXCLUDE_MINUTES))
recently_played_min_candidates_after_filter = int(runtime_config.get("recently_played_min_candidates_after_filter", RECENTLY_PLAYED_MIN_CANDIDATES_AFTER_FILTER))
meta_load_workers = int(runtime_config.get("meta_load_workers", META_LOAD_WORKERS))
try:
glob_pattern = f"{BUCKET_FS_ROOT}/songs/*/meta.json"
print(f"[radio] Glob pattern: {glob_pattern}")
meta_paths = self.fs.glob(glob_pattern)
print(f"[radio] Found {len(meta_paths)} meta.json files")
print(f"[radio] First paths: {meta_paths[:5]}")
except Exception as e:
print(f"[radio] Could not list bucket {BUCKET_ID}: {e}")
with self.lock:
if prebuild:
self.next_rotation_is_building = False
self.next_rotation_last_error = str(e)
else:
self.is_loading = False
self.last_error = str(e)
if not self.tracks:
self.is_playing = False
return
song_meta_paths = {}
for meta_path in meta_paths:
try:
parts = meta_path.strip("/").split("/")
song_id = parts[-2]
if song_id:
song_meta_paths[song_id] = meta_path
except Exception as e:
print(f"[radio] Could not parse path {meta_path}: {e}")
all_song_ids = sorted(song_meta_paths.keys())
all_custom_track_ids = {str(track.get("id")) for track in custom_tracks_snapshot if track.get("id")}
with self.lock:
old_track_id = self._get_current_song_id_unlocked()
old_current_kind = self.current_kind
old_started_at = self.started_at
recently_played_ids = self._recently_played_exclusion_set_unlocked()
before_filter_count = len(all_song_ids)
candidate_song_ids = [
song_id
for song_id in all_song_ids
if not self._song_is_hidden_by_reports(song_id)
]
filtered_hidden_count = before_filter_count - len(candidate_song_ids)
if filtered_hidden_count:
print(
f"[radio] Filtered out {filtered_hidden_count} tracks "
f"by reports or studio overrides"
)
before_score_floor_count = len(candidate_song_ids)
candidate_song_ids = [
song_id
for song_id in candidate_song_ids
if not self._track_is_below_auto_rotation_score_floor_unlocked(song_id)
]
filtered_low_score_count = before_score_floor_count - len(candidate_song_ids)
if filtered_low_score_count:
threshold = float(runtime_config.get(
"auto_rotation_min_effective_score_exclusive",
AUTO_ROTATION_MIN_EFFECTIVE_SCORE_EXCLUSIVE,
))
print(
f"[radio] Filtered out {filtered_low_score_count} low-score tracks "
f"effective_score <= {threshold:g}"
)
if old_track_id:
recently_played_ids.discard(old_track_id)
recently_played_filtered_count = 0
if recently_played_ids:
filtered_candidate_song_ids = [
song_id
for song_id in candidate_song_ids
if song_id not in recently_played_ids
]
if (
len(filtered_candidate_song_ids) >= recently_played_min_candidates_after_filter
or len(filtered_candidate_song_ids) >= len(candidate_song_ids)
):
recently_played_filtered_count = len(candidate_song_ids) - len(filtered_candidate_song_ids)
candidate_song_ids = filtered_candidate_song_ids
if recently_played_filtered_count:
print(
f"[radio] Filtered out {recently_played_filtered_count} recently played tracks "
f"window={recently_played_exclude_minutes}min "
f"remaining_candidates={len(candidate_song_ids)}"
)
else:
print(
f"[radio] Skipped recently played exclusion because too few candidates would remain "
f"remaining={len(filtered_candidate_song_ids)} "
f"min={recently_played_min_candidates_after_filter} "
f"window={recently_played_exclude_minutes}min"
)
preserve_current_track = (
not prebuild
and old_current_kind == "song"
and old_track_id
and (old_track_id in all_song_ids or old_track_id in all_custom_track_ids)
)
preserved_track = None
current_remaining_seconds = None
if preserve_current_track:
if old_track_id in song_meta_paths:
preserved_track = self._load_track_from_meta_path(
song_meta_paths[old_track_id],
old_track_id,
)
else:
preserved_track = next(
(dict(track) for track in custom_tracks_snapshot if track.get("id") == old_track_id),
None,
)
if preserved_track is None:
preserve_current_track = False
else:
self._apply_track_controls_unlocked(preserved_track)
duration = float((preserved_track or {}).get("duration") or DEFAULT_DURATION)
if old_started_at is not None:
current_elapsed = max(0.0, time.time() - old_started_at)
current_remaining_seconds = max(0.0, duration - current_elapsed)
else:
current_remaining_seconds = duration
if self._song_is_hidden_by_reports(old_track_id):
print(
f"[radio] Preserving currently playing hidden/reported track until it finishes: "
f"{old_track_id}"
)
remaining_song_ids = [
song_id
for song_id in candidate_song_ids
if not preserved_track or song_id != preserved_track["id"]
]
candidate_tracks = []
if remaining_song_ids:
worker_count = max(1, min(meta_load_workers, len(remaining_song_ids)))
print(f"[radio] Loading metadata for {len(remaining_song_ids)} candidate tracks with {worker_count} workers")
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = {
executor.submit(
self._load_track_from_meta_path,
song_meta_paths[song_id],
song_id,
): song_id
for song_id in remaining_song_ids
}
for future in as_completed(futures):
song_id = futures[future]
try:
candidate_tracks.append(future.result())
except Exception as e:
print(f"[radio] Could not load candidate track {song_id}: {e}")
now = datetime.now(timezone.utc)
recent_candidates = [
track
for track in candidate_tracks
if self._track_is_recent(track, now=now)
]
regular_candidates = [
track
for track in candidate_tracks
if not self._track_is_recent(track, now=now)
]
recent_target_seconds = max(0.0, rotation_target_seconds * max(0.0, min(1.0, recent_track_quota)))
selected_tracks = []
selected_recent_seconds = 0.0
selected_recent_count = 0
if preserved_track:
selected_tracks.append(preserved_track)
if self._track_is_recent(preserved_track, now=now):
selected_recent_seconds += self._track_duration_seconds(preserved_track)
selected_recent_count += 1
custom_candidates = [
dict(track)
for track in custom_tracks_snapshot
if track.get("id")
and not (preserved_track and track.get("id") == preserved_track.get("id"))
and not track.get("studio_hidden")
and not self._song_is_hidden_by_reports(track.get("id"))
and not self._track_is_below_auto_rotation_score_floor_unlocked(track)
and track.get("id") not in recently_played_ids
]
custom_target_seconds = max(0.0, rotation_target_seconds * max(0.0, min(1.0, custom_track_quota)))
selected_custom_seconds = 0.0
selected_custom_count = 0
while custom_candidates and len(selected_tracks) < max_tracks and selected_custom_seconds < custom_target_seconds:
estimated_seconds = self._estimated_rotation_seconds(
selected_tracks,
average_jingle_duration,
current_remaining_seconds=current_remaining_seconds if preserved_track else None,
)
if selected_tracks and estimated_seconds >= rotation_target_seconds:
break
track = self._weighted_pop_track(custom_candidates)
selected_tracks.append(track)
selected_custom_seconds += self._track_duration_seconds(track)
selected_custom_count += 1
while (
recent_candidates
and len(selected_tracks) < max_tracks
and selected_recent_seconds < recent_target_seconds
):
estimated_seconds = self._estimated_rotation_seconds(
selected_tracks,
average_jingle_duration,
current_remaining_seconds=current_remaining_seconds if preserved_track else None,
)
if selected_tracks and estimated_seconds >= rotation_target_seconds:
break
track = self._weighted_pop_track(recent_candidates)
selected_tracks.append(track)
selected_recent_seconds += self._track_duration_seconds(track)
selected_recent_count += 1
remaining_tracks = regular_candidates + recent_candidates
while remaining_tracks and len(selected_tracks) < max_tracks:
estimated_seconds = self._estimated_rotation_seconds(
selected_tracks,
average_jingle_duration,
current_remaining_seconds=current_remaining_seconds if preserved_track else None,
)
if selected_tracks and estimated_seconds >= rotation_target_seconds:
break
track = self._weighted_pop_track(remaining_tracks)
selected_tracks.append(track)
if self._track_is_recent(track, now=now):
selected_recent_seconds += self._track_duration_seconds(track)
selected_recent_count += 1
if preserved_track:
preserved_id = preserved_track["id"]
non_preserved_tracks = [
track
for track in selected_tracks
if track["id"] != preserved_id
]
random.shuffle(non_preserved_tracks)
selected_tracks = [preserved_track] + non_preserved_tracks
else:
random.shuffle(selected_tracks)
estimated_seconds = self._estimated_rotation_seconds(
selected_tracks,
average_jingle_duration,
current_remaining_seconds=current_remaining_seconds if preserved_track else None,
)
print(
f"[radio] Using {len(selected_tracks)} tracks for rotation pack "
f"estimated_seconds={estimated_seconds:.1f}/{rotation_target_seconds} "
f"max_tracks={max_tracks} avg_jingle={average_jingle_duration:.1f}s "
f"recent={selected_recent_count} tracks/{selected_recent_seconds:.1f}s "
f"recent_quota={recent_track_quota:.2f} "
f"recently_played_filtered={recently_played_filtered_count} "
f"hidden_filtered={filtered_hidden_count} "
f"low_score_filtered={filtered_low_score_count} "
f"custom={selected_custom_count} tracks/{selected_custom_seconds:.1f}s quota={custom_track_quota:.2f} "
f"available_custom={len(custom_tracks_snapshot)} "
f"track_overrides={len(self.track_overrides)} "
f"cover_overrides={self.cover_overrides_count} "
f"jingles={len(self.jingles)} segments={len(self.segments)}"
)
new_tracks = selected_tracks
if prebuild:
with self.lock:
self.next_tracks = [dict(track) for track in new_tracks]
self.next_rotation_ready = False
self.next_rotation_is_building = False
self.next_rotation_built_at = time.time()
self.next_rotation_target_at = target_at.isoformat() if target_at else next_utc_hour_iso()
self.next_rotation_last_error = None
self.next_rotation_source = "auto_prebuild"
self.next_rotation_payload = None
print(
f"[radio] Prebuilt next rotation tracks={len(new_tracks)} "
f"target_at={self.next_rotation_target_at}"
)
return
with self.lock:
live_current_kind = self.current_kind
live_current_jingle = self.current_jingle
live_current_segment = self.current_segment
live_started_at = self.started_at
live_is_playing = self.is_playing
live_track_id = self._get_current_song_id_unlocked()
live_track_snapshot = None
if live_track_id and self.current_index is not None and self.tracks:
try:
live_track_snapshot = dict(self.tracks[self.current_index])
except Exception:
live_track_snapshot = None
# The radio may have advanced while this refresh was loading metadata.
# Preserve the real current song, not the stale song captured at refresh start.
if live_track_id:
existing_live_index = next(
(
i
for i, track in enumerate(new_tracks)
if track.get("id") == live_track_id
),
None,
)
if existing_live_index is None and live_track_snapshot:
new_tracks = [
live_track_snapshot,
*[
track
for track in new_tracks
if track.get("id") != live_track_id
],
]
print(
f"[radio] Reinserted live track after refresh race: {live_track_id}"
)
self.tracks = new_tracks
if not self.tracks:
self.current_index = None
self.started_at = None
self.is_playing = False
self.current_kind = "song"
self.current_jingle = None
self.current_segment = None
elif live_current_kind == "jingle" and live_current_jingle:
self.current_index = None
self.started_at = live_started_at or time.time()
self.is_playing = live_is_playing or True
self.current_kind = "jingle"
self.current_jingle = live_current_jingle
self.current_segment = None
print("[radio] Kept current live jingle after refresh")
elif live_current_kind == "segment" and live_current_segment:
self.current_index = None
self.started_at = live_started_at or time.time()
self.is_playing = live_is_playing or True
self.current_kind = "segment"
self.current_jingle = None
self.current_segment = live_current_segment
print("[radio] Kept current live segment after refresh")
elif live_current_kind == "song" and live_track_id:
matching_index = next(
(
i
for i, track in enumerate(self.tracks)
if track.get("id") == live_track_id
),
None,
)
if matching_index is not None:
self.current_index = matching_index
self.started_at = live_started_at or time.time()
self.is_playing = live_is_playing or True
self.current_kind = "song"
self.current_jingle = None
self.current_segment = None
print(f"[radio] Kept current live track after refresh: {live_track_id}")
else:
self.current_index = 0
self.started_at = time.time()
self.is_playing = True
self.current_kind = "song"
self.current_jingle = None
self.current_segment = None
print("[radio] Live track not found after refresh; switched to first track")
else:
self.current_index = 0
self.started_at = time.time()
self.is_playing = True
self.current_kind = "song"
self.current_jingle = None
self.current_segment = None
if self.current_index is not None and self.tracks:
self._load_meta_for_track_unlocked(self.tracks[self.current_index])
self.is_loading = False
self.last_error = None
self.last_refresh_at = time.time()
print(f"[radio] Loaded {len(new_tracks)} tracks")
def has_track(self, song_id: str) -> bool:
with self.lock:
return any(track["id"] == song_id for track in self.tracks)
def _choose_next_index_unlocked(self):
"""
The rotation pack is already selected, weighted, and shuffled during
refresh_tracks(). Playback should consume that pack sequentially.
This avoids repeats inside the hour and makes the 60-minute rotation
pack meaningful. RADIO_MODE is kept for API/status compatibility, but
randomization now happens when the pack is built, not between songs.
"""
if not self.tracks:
return None
if self.current_index is None:
return 0
return (self.current_index + 1) % len(self.tracks)
def _choose_jingle_unlocked(self):
if not self.jingles:
return None
if len(self.jingles) == 1:
return self.jingles[0]
choices = [
jingle
for jingle in self.jingles
if jingle.get("id") != self.last_jingle_id
]
return random.choice(choices or self.jingles)
def _choose_segment_unlocked(self):
if not self.segments:
return None
if len(self.segments) == 1:
return self.segments[0]
choices = [
segment
for segment in self.segments
if segment.get("id") != self.last_segment_id
]
return random.choice(choices or self.segments)
def _should_insert_jingle_unlocked(self):
if not self._cfg("enable_jingles", ENABLE_JINGLES):
return False
if not self.jingles:
return False
min_songs_between_jingles = int(self._cfg("min_songs_between_jingles", MIN_SONGS_BETWEEN_JINGLES))
max_songs_between_jingles = int(self._cfg("max_songs_between_jingles", MAX_SONGS_BETWEEN_JINGLES))
jingle_probability_after_min = float(self._cfg("jingle_probability_after_min", JINGLE_PROBABILITY_AFTER_MIN))
if self.songs_since_last_jingle < min_songs_between_jingles:
return False
if self.songs_since_last_jingle >= max_songs_between_jingles:
return True
return random.random() < jingle_probability_after_min
def _should_insert_segment_unlocked(self):
if not self._cfg("enable_segments", ENABLE_SEGMENTS):
return False
if not self.segments:
return False
min_songs_between_segments = int(self._cfg("min_songs_between_segments", MIN_SONGS_BETWEEN_SEGMENTS))
max_songs_between_segments = int(self._cfg("max_songs_between_segments", MAX_SONGS_BETWEEN_SEGMENTS))
segment_probability_after_min = float(self._cfg("segment_probability_after_min", SEGMENT_PROBABILITY_AFTER_MIN))
if self.songs_since_last_segment < min_songs_between_segments:
return False
if self.songs_since_last_segment >= max_songs_between_segments:
return True
return random.random() < segment_probability_after_min
def _start_jingle_unlocked(self):
jingle = self._choose_jingle_unlocked()
if not jingle:
return False
self.current_kind = "jingle"
self.current_jingle = jingle
self.current_segment = None
self.last_jingle_id = jingle["id"]
self.songs_since_last_jingle = 0
print(f"[jingles] Inserted jingle: {jingle['id']} duration={jingle.get('duration')}")
return True
def _start_segment_unlocked(self):
segment = self._choose_segment_unlocked()
if not segment:
return False
self.current_kind = "segment"
self.current_jingle = None
self.current_segment = segment
self.last_segment_id = segment["id"]
self.songs_since_last_segment = 0
print(f"[segments] Inserted segment: {segment['id']} duration={segment.get('duration')}")
return True
def _start_next_song_unlocked(self):
next_index = self._choose_next_index_unlocked()
if next_index is None:
self.current_index = None
self.current_kind = "song"
self.current_jingle = None
self.current_segment = None
self.is_playing = False
return
self.current_index = next_index
self.current_kind = "song"
self.current_jingle = None
self.current_segment = None
self.is_playing = True
next_track = self.tracks[self.current_index]
# Studio boosts can be edited while this track is already queued.
# Reload track_overrides.json exactly when the song starts, so queued
# Studio changes are reflected without waiting for the next full refresh.
self._reload_track_overrides_unlocked()
self._load_meta_for_track_unlocked(next_track)
print(f"[radio] Advanced to: {next_track['id']} - {next_track.get('title')}")
def _get_current_item_unlocked(self):
if self.current_kind == "jingle" and self.current_jingle:
return self.current_jingle
if self.current_kind == "segment" and self.current_segment:
return self.current_segment
if self.current_index is None or not self.tracks:
return None
if self.current_index >= len(self.tracks):
self.current_index = 0
track = self.tracks[self.current_index]
self._load_meta_for_track_unlocked(track)
return track
def _get_current_song_id_unlocked(self) -> Optional[str]:
if self.current_index is None or not self.tracks:
return None
if self.current_index >= len(self.tracks):
return None
try:
track = self.tracks[self.current_index]
except Exception:
return None
if not track or track.get("is_jingle") or track.get("is_segment"):
return None
return track.get("id")
def _next_rotation_control_path(self) -> Path:
return Path(RADIO_CONTROL_DIR) / RADIO_NEXT_ROTATION_FILE
def _next_rotation_archive_dir(self) -> Path:
return Path(RADIO_CONTROL_DIR) / RADIO_NEXT_ROTATION_ARCHIVE_DIR
def _target_hour_key(self, value: Optional[datetime]) -> Optional[datetime]:
if value is None:
return None
return value.astimezone(timezone.utc).replace(minute=0, second=0, microsecond=0)
def _approved_next_rotation_for_target(self, target_at: datetime):
path = self._next_rotation_control_path()
target_key = self._target_hour_key(target_at)
if not path.exists():
return None, []
try:
payload = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
raise ValueError("next_rotation.json must contain a JSON object")
status = str(payload.get("status") or "").strip().lower()
if status != "approved":
raise ValueError(f"next_rotation.json status must be approved, got {status or 'empty'}")
raw_target = (
payload.get("target_hour_utc")
or payload.get("target_at")
or payload.get("target_rotation_at")
or payload.get("target")
)
payload_target = parse_datetime(raw_target)
payload_key = self._target_hour_key(payload_target)
if payload_key is None or target_key is None or payload_key != target_key:
raise ValueError(
f"next_rotation.json target mismatch: payload={raw_target} expected={target_at.isoformat()}"
)
track_ids = payload.get("track_ids")
if not isinstance(track_ids, list):
tracks = payload.get("tracks", [])
if isinstance(tracks, list):
track_ids = [
str(item.get("id") or item.get("song_id") or "").strip()
for item in tracks
if isinstance(item, dict)
]
else:
track_ids = []
clean_track_ids = []
seen = set()
for track_id in track_ids:
track_id = str(track_id or "").strip()
if not track_id or track_id in seen:
continue
seen.add(track_id)
clean_track_ids.append(track_id)
if not clean_track_ids:
raise ValueError("next_rotation.json does not contain any usable track_ids")
self.approved_next_rotation_last_loaded_at = time.time()
self.approved_next_rotation_last_error = None
return payload, clean_track_ids
except Exception as e:
self.approved_next_rotation_last_loaded_at = time.time()
self.approved_next_rotation_last_error = str(e)
print(f"[next-rotation] Could not use approved rotation {path}: {e}")
return None, []
def _archive_consumed_next_rotation_control_unlocked(self, activated_at: Optional[datetime] = None):
path = self._next_rotation_control_path()
if not path.exists():
return
try:
archive_dir = self._next_rotation_archive_dir()
archive_dir.mkdir(parents=True, exist_ok=True)
stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
archive_path = archive_dir / f"next_rotation_consumed_{stamp}.json"
consumed_at = (activated_at or datetime.now(timezone.utc)).replace(microsecond=0).isoformat()
payload = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
payload = {}
payload["status"] = "consumed"
payload["consumed_at"] = consumed_at
payload["activated_at"] = consumed_at
payload["activated_source"] = "studio_approved"
payload["current_rotation_track_ids"] = list(self.current_rotation_track_ids or [])
archive_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
path.unlink(missing_ok=True)
self.approved_next_rotation_last_consumed_at = time.time()
print(f"[next-rotation] Archived consumed approved rotation to {archive_path}")
except Exception as e:
self.approved_next_rotation_last_error = str(e)
print(f"[next-rotation] Could not archive consumed next rotation: {e}")
def prebuild_approved_next_rotation(self, target_at: datetime) -> bool:
with self.lock:
self.next_rotation_is_building = True
self.next_rotation_last_error = None
self.next_rotation_target_at = target_at.isoformat()
payload, track_ids = self._approved_next_rotation_for_target(target_at)
if not payload or not track_ids:
with self.lock:
self.next_rotation_is_building = False
return False
try:
glob_pattern = f"{BUCKET_FS_ROOT}/songs/*/meta.json"
meta_paths = self.fs.glob(glob_pattern)
song_meta_paths = {}
for meta_path in meta_paths:
try:
parts = meta_path.strip("/").split("/")
song_id = parts[-2]
if song_id:
song_meta_paths[song_id] = meta_path
except Exception as e:
print(f"[next-rotation] Could not parse path {meta_path}: {e}")
with self.lock:
custom_tracks_snapshot = [dict(track) for track in self.custom_tracks]
custom_tracks_by_id = {
str(track.get("id")): dict(track)
for track in custom_tracks_snapshot
if track.get("id")
}
selected_tracks = []
missing_ids = []
hidden_ids = []
for track_id in track_ids:
if self._song_is_hidden_by_reports(track_id):
hidden_ids.append(track_id)
continue
if track_id in song_meta_paths:
track = self._load_track_from_meta_path(song_meta_paths[track_id], track_id)
elif track_id in custom_tracks_by_id:
track = dict(custom_tracks_by_id[track_id])
self._apply_track_controls_unlocked(track)
else:
missing_ids.append(track_id)
continue
selected_tracks.append(track)
if not selected_tracks:
raise ValueError(
"approved next rotation resolved to zero playable tracks "
f"(missing={len(missing_ids)} hidden={len(hidden_ids)})"
)
with self.lock:
self.next_tracks = selected_tracks
self.next_rotation_ready = False
self.next_rotation_is_building = False
self.next_rotation_built_at = time.time()
self.next_rotation_target_at = target_at.isoformat()
self.next_rotation_last_error = None
self.next_rotation_source = "studio_approved"
self.next_rotation_payload = payload
print(
f"[next-rotation] Prebuilt approved rotation tracks={len(selected_tracks)} "
f"target_at={target_at.isoformat()} missing={len(missing_ids)} hidden={len(hidden_ids)}"
)
return True
except Exception as e:
with self.lock:
self.next_rotation_is_building = False
self.next_rotation_last_error = str(e)
self.approved_next_rotation_last_error = str(e)
print(f"[next-rotation] Approved prebuild failed: {e}")
return False
def mark_next_rotation_ready(self, target_at: Optional[datetime] = None):
with self.lock:
if self.next_tracks:
self.next_rotation_ready = True
self.next_rotation_target_at = target_at.isoformat() if target_at else next_utc_hour_iso()
print(
f"[radio] Next rotation marked ready "
f"tracks={len(self.next_tracks)} target_at={self.next_rotation_target_at}"
)
return True
self.next_rotation_ready = False
print("[radio] No prebuilt next rotation available to mark ready")
return False
def _activate_next_rotation_unlocked(self) -> bool:
if not self.next_rotation_ready or not self.next_tracks:
return False
activated_source = self.next_rotation_source
activated_payload = dict(self.next_rotation_payload) if isinstance(self.next_rotation_payload, dict) else None
activated_target_at = self.next_rotation_target_at
activated_at_dt = datetime.now(timezone.utc).replace(microsecond=0)
self.tracks = [dict(track) for track in self.next_tracks]
self.current_rotation_source = activated_source or "prebuilt"
self.current_rotation_target_at = activated_target_at
self.current_rotation_activated_at = activated_at_dt.isoformat()
self.current_rotation_track_ids = [str(track.get("id") or "").strip() for track in self.tracks if str(track.get("id") or "").strip()]
self.next_tracks = []
self.next_rotation_ready = False
self.next_rotation_is_building = False
self.next_rotation_source = None
self.next_rotation_payload = None
self.current_kind = "song"
self.current_jingle = None
self.current_segment = None
self.current_index = 0 if self.tracks else None
self.is_playing = bool(self.tracks)
self.started_at = time.time() if self.tracks else None
self.songs_since_last_jingle = 0
self.songs_since_last_segment = 0
self.last_rotation_activated_at = time.time()
if self.current_index is not None:
self._reload_track_overrides_unlocked()
self._load_meta_for_track_unlocked(self.tracks[self.current_index])
print(
f"[radio] Activated prebuilt rotation "
f"tracks={len(self.tracks)} first={self.tracks[self.current_index].get('id')} "
f"source={activated_source or 'unknown'}"
)
else:
print("[radio] Activated empty prebuilt rotation")
if activated_source == "studio_approved":
self._archive_consumed_next_rotation_control_unlocked(activated_at_dt)
return True
def _advance_one_item_unlocked(self):
if self.current_kind in {"jingle", "segment"}:
if self._activate_next_rotation_unlocked():
return
self._start_next_song_unlocked()
return
current_item = self._get_current_item_unlocked()
if current_item and not current_item.get("is_jingle") and not current_item.get("is_segment"):
self._remember_played_song_unlocked(current_item.get("id"))
if self._activate_next_rotation_unlocked():
return
self.songs_since_last_jingle += 1
self.songs_since_last_segment += 1
if self._should_insert_segment_unlocked():
inserted = self._start_segment_unlocked()
if inserted:
return
if self._should_insert_jingle_unlocked():
inserted = self._start_jingle_unlocked()
if inserted:
return
self._start_next_song_unlocked()
def _public_track_for_status_unlocked(self, item: Optional[dict]):
if not item:
return None
public_item = dict(item)
# Jingles and segments are programming/editorial items, not normal
# public songs. Keep their public counters hidden / neutral.
if public_item.get("is_jingle"):
public_item["id"] = ""
public_item["audio_url"] = ""
public_item["like_count"] = 0
public_item["upvote_count"] = 0
public_item["downvote_count"] = 0
public_item["report_count"] = 0
public_item["thumb_url"] = ""
public_item["source_url"] = ""
return public_item
if public_item.get("is_segment"):
public_item["id"] = ""
public_item["audio_url"] = ""
public_item["like_count"] = 0
public_item["upvote_count"] = 0
public_item["downvote_count"] = 0
public_item["report_count"] = 0
public_item["thumb_url"] = ""
return public_item
# Normal songs: expose effective counters in the public status.
# effective_* = public votes + Studio boosts / penalties.
#
# This means that if a track is currently queued and receives a Studio
# boost, the boosted value will be shown when that track becomes
# Now Playing, without mutating the underlying public vote store.
public_item["like_count"] = safe_int(
public_item.get("effective_like_count", public_item.get("like_count", 0)),
0,
)
public_item["upvote_count"] = safe_int(
public_item.get("effective_upvote_count", public_item.get("upvote_count", 0)),
0,
)
public_item["downvote_count"] = safe_int(
public_item.get("effective_downvote_count", public_item.get("downvote_count", 0)),
0,
)
# Reports are not boosted. Keep the real report count.
public_item["report_count"] = safe_int(
public_item.get("report_count", 0),
0,
)
return public_item
def get_current_stream_item(self):
with self.lock:
item = self._get_current_item_unlocked()
if not item:
return None, 0
elapsed = 0
if self.started_at is not None:
elapsed = max(0, time.time() - self.started_at)
duration = item.get("duration") or DEFAULT_DURATION
try:
duration = float(duration)
except Exception:
duration = DEFAULT_DURATION
if duration > 2:
elapsed = min(elapsed, duration - 1)
return dict(item), elapsed
def advance_if_current(self, item_id: str) -> bool:
with self.lock:
item = self._get_current_item_unlocked()
if not item:
return False
if item.get("id") != item_id:
return False
self.started_at = time.time()
self._advance_one_item_unlocked()
return True
def set_mode(self, mode: str):
if mode not in {"random", "loop"}:
mode = "random"
with self.lock:
self.mode = mode
def _status_unlocked(self):
item = self._get_current_item_unlocked()
elapsed = 0
if item and self.started_at is not None:
elapsed = max(0, time.time() - self.started_at)
try:
duration = float(item.get("duration") or DEFAULT_DURATION)
except Exception:
duration = DEFAULT_DURATION
if duration > 0:
elapsed = min(elapsed, duration)
listener_snapshot = listeners.snapshot()
if item and item.get("is_jingle"):
now_playing_type = "jingle"
elif item and item.get("is_segment"):
now_playing_type = "segment"
else:
now_playing_type = "song"
runtime_config = dict(self.runtime_config)
return {
"bucket_id": BUCKET_ID,
"bucket_fs_root": BUCKET_FS_ROOT,
"bucket_resolve_url": BUCKET_RESOLVE_URL,
"mode": self.mode,
"track_count": len(self.tracks),
"jingle_count": len(self.jingles),
"segment_count": len(self.segments),
"custom_track_count": len(self.custom_tracks),
"now_playing_type": now_playing_type,
"current_index": self.current_index,
"elapsed": elapsed,
"is_playing": self.is_playing,
"is_loading": self.is_loading,
"last_error": self.last_error,
"last_refresh_at": self.last_refresh_at,
"next_refresh_at": next_utc_hour_iso(),
"next_rotation_prebuild_at": next_rotation_prebuild_at_iso(),
"rotation_prebuild_lead_seconds": ROTATION_PREBUILD_LEAD_SECONDS,
"refresh_mode": REFRESH_MODE,
"next_rotation_ready": self.next_rotation_ready,
"next_rotation_is_building": self.next_rotation_is_building,
"next_rotation_count": len(self.next_tracks),
"next_rotation_source": self.next_rotation_source,
"next_rotation_control_file": str(Path(RADIO_CONTROL_DIR) / RADIO_NEXT_ROTATION_FILE),
"approved_next_rotation_last_loaded_at": self.approved_next_rotation_last_loaded_at,
"approved_next_rotation_last_error": self.approved_next_rotation_last_error,
"approved_next_rotation_last_consumed_at": self.approved_next_rotation_last_consumed_at,
"next_rotation_built_at": self.next_rotation_built_at,
"next_rotation_target_at": self.next_rotation_target_at,
"next_rotation_last_error": self.next_rotation_last_error,
"last_rotation_activated_at": self.last_rotation_activated_at,
"current_rotation_source": self.current_rotation_source,
"current_rotation_target_at": self.current_rotation_target_at,
"current_rotation_activated_at": self.current_rotation_activated_at,
"current_rotation_track_ids": list(self.current_rotation_track_ids or []),
"runtime_config_source": self.runtime_config_source,
"runtime_config_last_loaded_at": self.runtime_config_last_loaded_at,
"runtime_config_last_error": self.runtime_config_last_error,
"runtime_config": runtime_config,
"radio_control_dir": RADIO_CONTROL_DIR,
"track_overrides_count": len(self.track_overrides),
"track_overrides_last_loaded_at": self.track_overrides_last_loaded_at,
"track_overrides_last_error": self.track_overrides_last_error,
"cover_overrides_dir": COVER_OVERRIDES_DIR,
"cover_overrides_count": self.cover_overrides_count,
"rotation_target_seconds": runtime_config.get("rotation_target_seconds"),
"max_tracks": runtime_config.get("max_tracks"),
"report_hide_threshold": runtime_config.get("report_hide_threshold"),
"like_weight_per_like": runtime_config.get("like_weight_per_like"),
"like_weight_max": runtime_config.get("like_weight_max"),
"upvote_weight_per_upvote": runtime_config.get("upvote_weight_per_upvote"),
"upvote_weight_max": runtime_config.get("upvote_weight_max"),
"downvote_weight_per_downvote": runtime_config.get("downvote_weight_per_downvote"),
"downvote_weight_max": runtime_config.get("downvote_weight_max"),
"track_weight_min": runtime_config.get("track_weight_min"),
"track_weight_max": runtime_config.get("track_weight_max"),
"auto_rotation_min_effective_score_exclusive": runtime_config.get("auto_rotation_min_effective_score_exclusive"),
"admin_report_usernames": sorted(ADMIN_REPORT_USERNAMES),
"admin_report_weight": ADMIN_REPORT_WEIGHT,
"recent_track_quota": runtime_config.get("recent_track_quota"),
"recent_track_window_hours": runtime_config.get("recent_track_window_hours"),
"recently_played_exclude_minutes": runtime_config.get("recently_played_exclude_minutes"),
"recently_played_memory_max": runtime_config.get("recently_played_memory_max"),
"recently_played_min_candidates_after_filter": runtime_config.get("recently_played_min_candidates_after_filter"),
"recently_played_count": len(self._recently_played_exclusion_set_unlocked()),
"meta_load_workers": runtime_config.get("meta_load_workers"),
"playback_order": "sequential_rotation_pack",
"enable_remote_jingles": runtime_config.get("enable_remote_jingles"),
"remote_jingles_dir": REMOTE_JINGLES_DIR,
"enable_segments": runtime_config.get("enable_segments"),
"remote_segments_dir": REMOTE_SEGMENTS_DIR,
"min_songs_between_jingles": runtime_config.get("min_songs_between_jingles"),
"max_songs_between_jingles": runtime_config.get("max_songs_between_jingles"),
"jingle_probability_after_min": runtime_config.get("jingle_probability_after_min"),
"songs_since_last_jingle": self.songs_since_last_jingle,
"min_songs_between_segments": runtime_config.get("min_songs_between_segments"),
"max_songs_between_segments": runtime_config.get("max_songs_between_segments"),
"segment_probability_after_min": runtime_config.get("segment_probability_after_min"),
"songs_since_last_segment": self.songs_since_last_segment,
"enable_custom_tracks": runtime_config.get("enable_custom_tracks"),
"custom_tracks_dir": CUSTOM_TRACKS_DIR,
"custom_track_quota": runtime_config.get("custom_track_quota"),
"custom_tracks_last_loaded_at": self.custom_tracks_last_loaded_at,
"custom_tracks_last_error": self.custom_tracks_last_error,
"enable_song_mastering": runtime_config.get("enable_song_mastering"),
"song_audio_filter": runtime_config.get("song_audio_filter") if runtime_config.get("enable_song_mastering") else "",
"likes_enabled": likes.last_error is None,
"likes_error": likes.last_error,
"active_listeners": listener_snapshot["active_listeners"],
"total_connections": listener_snapshot["total_connections"],
"listener_ttl_seconds": listener_snapshot["listener_ttl_seconds"],
"listener_profiles": listener_snapshot["listener_profiles"],
"signed_in_listener_count": listener_snapshot["signed_in_listener_count"],
"visible_listener_profile_count": listener_snapshot["visible_listener_profile_count"],
"track": self._public_track_for_status_unlocked(item),
}
def status(self):
with self.lock:
return self._status_unlocked()
def rotation_snapshot(self):
with self.lock:
item = self._get_current_item_unlocked()
if item and item.get("is_jingle"):
now_playing_type = "jingle"
elif item and item.get("is_segment"):
now_playing_type = "segment"
else:
now_playing_type = "song"
tracks = []
for i, track in enumerate(self.tracks):
public_track = dict(track)
public_track["index"] = i
public_track["is_current"] = now_playing_type == "song" and i == self.current_index
tracks.append(public_track)
return {
"now_playing_type": now_playing_type,
"current_index": self.current_index,
"current_item": self._public_track_for_status_unlocked(item),
"tracks_count": len(self.tracks),
"jingles_count": len(self.jingles),
"segments_count": len(self.segments),
"custom_tracks_count": len(self.custom_tracks),
"tracks": tracks,
"jingles": [dict(jingle) for jingle in self.jingles],
"segments": [dict(segment) for segment in self.segments],
"custom_tracks": [dict(track) for track in self.custom_tracks],
"runtime_config_source": self.runtime_config_source,
"runtime_config": dict(self.runtime_config),
"track_overrides_count": len(self.track_overrides),
"cover_overrides_count": self.cover_overrides_count,
"last_refresh_at": self.last_refresh_at,
"next_refresh_at": next_utc_hour_iso(),
"next_rotation_prebuild_at": next_rotation_prebuild_at_iso(),
"next_rotation_ready": self.next_rotation_ready,
"next_rotation_is_building": self.next_rotation_is_building,
"next_rotation_count": len(self.next_tracks),
"next_rotation_source": self.next_rotation_source,
"next_rotation_built_at": self.next_rotation_built_at,
"next_rotation_target_at": self.next_rotation_target_at,
"last_rotation_activated_at": self.last_rotation_activated_at,
"current_rotation_source": self.current_rotation_source,
"current_rotation_target_at": self.current_rotation_target_at,
"current_rotation_activated_at": self.current_rotation_activated_at,
"current_rotation_track_ids": list(self.current_rotation_track_ids or []),
}
radio = RadioState()
class BroadcastEngine:
"""
Single global broadcast pipeline.
One ffmpeg process produces the radio stream.
Each /stream.mp3 client subscribes to the shared chunks instead of spawning
its own ffmpeg process.
"""
def __init__(self, radio_state: RadioState):
self.radio = radio_state
self.lock = threading.Lock()
self.subscribers = {}
self.total_stream_connections = 0
self.thread: Optional[threading.Thread] = None
self.running = False
self.current_item_id = None
self.current_item_type = None
self.current_item_started_at = None
self.current_process: Optional[subprocess.Popen] = None
self.current_process_pid = None
self.last_chunk_at = None
self.last_watchdog_restart_at = None
self.last_watchdog_reason = None
self.last_error = None
def start(self):
with self.lock:
if self.running and self.thread is not None and self.thread.is_alive():
return
self.running = True
self.thread = threading.Thread(
target=self._run_loop,
daemon=True,
)
self.thread.start()
print("[broadcast] Broadcast engine started")
def ensure_running(self):
with self.lock:
thread_alive = self.thread is not None and self.thread.is_alive()
if not thread_alive:
print("[broadcast-watchdog] broadcast thread is not alive; restarting")
self.start()
def snapshot(self):
now = time.time()
with self.lock:
last_chunk_age = None
if self.last_chunk_at is not None:
last_chunk_age = max(0.0, now - self.last_chunk_at)
current_item_age = None
if self.current_item_started_at is not None:
current_item_age = max(0.0, now - self.current_item_started_at)
return {
"broadcast_running": self.running,
"broadcast_thread_alive": self.thread is not None and self.thread.is_alive(),
"stream_subscriber_count": len(self.subscribers),
"total_stream_connections": self.total_stream_connections,
"broadcast_current_item_id": self.current_item_id,
"broadcast_current_item_type": self.current_item_type,
"broadcast_current_item_started_at": self.current_item_started_at,
"broadcast_current_item_age_seconds": current_item_age,
"broadcast_current_process_pid": self.current_process_pid,
"broadcast_last_chunk_at": self.last_chunk_at,
"broadcast_last_chunk_age_seconds": last_chunk_age,
"broadcast_last_watchdog_restart_at": self.last_watchdog_restart_at,
"broadcast_last_watchdog_reason": self.last_watchdog_reason,
"broadcast_last_error": self.last_error,
}
def subscribe(self):
subscriber_id = str(uuid4())
subscriber_queue = queue.Queue(maxsize=BROADCAST_QUEUE_MAX_CHUNKS)
with self.lock:
self.subscribers[subscriber_id] = subscriber_queue
self.total_stream_connections += 1
count = len(self.subscribers)
print(f"[broadcast] subscriber joined id={subscriber_id} active={count}")
return subscriber_id, subscriber_queue
def unsubscribe(self, subscriber_id: str):
with self.lock:
self.subscribers.pop(subscriber_id, None)
count = len(self.subscribers)
print(f"[broadcast] subscriber left id={subscriber_id} active={count}")
def _remove_slow_subscriber_unlocked(self, subscriber_id: str, subscriber_queue):
self.subscribers.pop(subscriber_id, None)
try:
while True:
subscriber_queue.get_nowait()
except queue.Empty:
pass
try:
subscriber_queue.put_nowait(None)
except Exception:
pass
print(f"[broadcast] removed slow subscriber id={subscriber_id}")
def _publish(self, chunk: bytes):
if not chunk:
return
with self.lock:
self.last_chunk_at = time.time()
for subscriber_id, subscriber_queue in list(self.subscribers.items()):
try:
subscriber_queue.put_nowait(chunk)
except queue.Full:
self._remove_slow_subscriber_unlocked(subscriber_id, subscriber_queue)
def kill_current_process(self, reason: str) -> bool:
process = None
pid = None
with self.lock:
process = self.current_process
pid = self.current_process_pid
if process is None:
return False
if process.poll() is not None:
return False
now = time.time()
self.last_watchdog_restart_at = now
self.last_watchdog_reason = reason
self.last_error = reason
print(f"[broadcast-watchdog] killing stalled ffmpeg pid={pid} reason={reason}")
try:
process.kill()
return True
except Exception as e:
with self.lock:
self.last_error = f"Could not kill stalled ffmpeg process: {e}"
print(f"[broadcast-watchdog] could not kill ffmpeg pid={pid}: {e}")
return False
def check_health(self):
self.ensure_running()
now = time.time()
reason = None
with self.lock:
process = self.current_process
if process is None or process.poll() is not None:
return
if self.current_item_id is None:
return
item_started_at = self.current_item_started_at
last_chunk_at = self.last_chunk_at
current_item_id = self.current_item_id
process_pid = self.current_process_pid
if last_chunk_at is None:
if item_started_at is not None and now - item_started_at > BROADCAST_PROCESS_START_TIMEOUT_SECONDS:
reason = (
f"no audio chunks after {now - item_started_at:.1f}s "
f"for item={current_item_id} pid={process_pid}"
)
else:
silence_seconds = now - last_chunk_at
if silence_seconds > BROADCAST_STALL_TIMEOUT_SECONDS:
reason = (
f"no audio chunks for {silence_seconds:.1f}s "
f"on item={current_item_id} pid={process_pid}"
)
if reason:
self.kill_current_process(reason)
def _item_type(self, item: dict) -> str:
if item.get("is_jingle"):
return "jingle"
if item.get("is_segment"):
return "segment"
return "song"
def _build_ffmpeg_command(self, item: dict, offset: float):
stream_url = item.get("stream_url") or item.get("audio_url")
item_type = self._item_type(item)
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-nostdin",
"-re",
]
if offset > STREAM_SEEK_EPSILON_SECONDS:
cmd.extend(
[
"-ss",
str(max(0, offset)),
]
)
cmd.extend(
[
"-i",
stream_url,
"-vn",
]
)
audio_filters = []
if item_type == "jingle":
audio_filters.append(f"volume={JINGLE_VOLUME}")
elif item_type == "segment":
audio_filters.append(f"volume={SEGMENT_VOLUME}")
elif item_type == "song" and self.radio._cfg("enable_song_mastering", ENABLE_SONG_MASTERING):
song_audio_filter = str(self.radio._cfg("song_audio_filter", SONG_AUDIO_FILTER) or "").strip()
if song_audio_filter:
audio_filters.append(song_audio_filter)
if audio_filters:
cmd.extend(["-af", ",".join(audio_filters)])
cmd.extend(
[
"-ac",
"2",
"-ar",
STREAM_SAMPLE_RATE,
"-b:a",
STREAM_BITRATE,
"-map_metadata",
"-1",
"-write_xing",
"0",
"-id3v2_version",
"0",
"-f",
"mp3",
"pipe:1",
]
)
return cmd
def _run_loop(self):
force_next_offset_zero = False
while True:
item, offset = self.radio.get_current_stream_item()
if not item:
with self.lock:
self.current_item_id = None
self.current_item_type = None
time.sleep(BROADCAST_NO_TRACK_SLEEP_SECONDS)
continue
if force_next_offset_zero:
offset = 0
force_next_offset_zero = False
item_id = item["id"]
item_type = self._item_type(item)
stream_url = item.get("stream_url") or item.get("audio_url")
with self.lock:
now = time.time()
self.current_item_id = item_id
self.current_item_type = item_type
self.current_item_started_at = now
self.current_process = None
self.current_process_pid = None
self.last_chunk_at = None
self.last_error = None
print(f"[broadcast] starting {item_type}={item_id} offset={offset:.2f}s source={stream_url}")
cmd = self._build_ffmpeg_command(item, offset)
process = None
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
assert process.stdout is not None
with self.lock:
self.current_process = process
self.current_process_pid = process.pid
while True:
chunk = process.stdout.read(16 * 1024)
if not chunk:
break
self._publish(chunk)
return_code = process.poll()
if return_code not in (0, None):
stderr_text = ""
try:
stderr_text = process.stderr.read().decode("utf-8", errors="replace") if process.stderr else ""
except Exception:
stderr_text = ""
print(
f"[broadcast] ffmpeg exited with code={return_code} "
f"item={item_id} stderr={stderr_text[:500]}"
)
with self.lock:
existing_error = self.last_error
self.last_error = existing_error or f"ffmpeg exited with code {return_code}"
except Exception as e:
print(f"[broadcast] error while streaming item={item_id}: {e}")
with self.lock:
self.last_error = str(e)
time.sleep(1)
finally:
if process is not None:
try:
if process.poll() is None:
process.kill()
except Exception:
pass
try:
process.wait(timeout=2)
except Exception:
pass
with self.lock:
if self.current_process is process:
self.current_process = None
self.current_process_pid = None
advanced = self.radio.advance_if_current(item_id)
if advanced:
force_next_offset_zero = True
else:
# If the radio state changed while this item was playing
# because of a refresh or manual update, restart from the
# current logical item without forcing a stale seek.
force_next_offset_zero = True
broadcast = BroadcastEngine(radio)
def broadcast_watchdog_loop():
while True:
try:
broadcast.check_health()
except Exception as e:
print(f"[broadcast-watchdog] health check failed: {e}")
time.sleep(BROADCAST_WATCHDOG_INTERVAL_SECONDS)
def broadcast_stream_mp3():
subscriber_id, subscriber_queue = broadcast.subscribe()
try:
while True:
try:
chunk = subscriber_queue.get(timeout=BROADCAST_SUBSCRIBER_TIMEOUT_SECONDS)
except queue.Empty:
yield b""
continue
if chunk is None:
break
yield chunk
except GeneratorExit:
raise
finally:
broadcast.unsubscribe(subscriber_id)
def get_full_status():
status = radio.status()
status.update(broadcast.snapshot())
return status
def refresh_radio_content(label: str):
try:
print(f"[radio] Starting {label} playlist refresh")
radio.refresh_runtime_controls()
radio.refresh_jingles()
radio.refresh_segments()
radio.refresh_custom_tracks()
radio.refresh_tracks()
print(f"[radio] {label.capitalize()} playlist refresh complete")
except Exception as e:
print(f"[radio] {label.capitalize()} refresh failed: {e}")
with radio.lock:
radio.is_loading = False
radio.last_error = str(e)
def prebuild_radio_content(label: str, target_at: datetime):
try:
print(
f"[radio] Starting {label} next rotation prebuild "
f"for {target_at.replace(microsecond=0).isoformat()} UTC"
)
radio.refresh_runtime_controls()
radio.refresh_jingles()
radio.refresh_segments()
radio.refresh_custom_tracks()
if radio.prebuild_approved_next_rotation(target_at):
print(f"[radio] {label.capitalize()} approved next rotation prebuild complete")
else:
print("[radio] No approved next rotation available; falling back to automatic prebuild")
radio.refresh_tracks(prebuild=True, target_at=target_at)
print(f"[radio] {label.capitalize()} automatic next rotation prebuild complete")
except Exception as e:
print(f"[radio] {label.capitalize()} prebuild failed: {e}")
with radio.lock:
radio.next_rotation_is_building = False
radio.next_rotation_last_error = str(e)
def periodic_refresh_loop():
while True:
target_at = next_utc_hour()
prebuild_at = target_at - timedelta(seconds=max(0, ROTATION_PREBUILD_LEAD_SECONDS))
sleep_until_prebuild = max(0.0, (prebuild_at - datetime.now(timezone.utc)).total_seconds())
print(
"[radio] Next rotation prebuild scheduled at "
f"{prebuild_at.replace(microsecond=0).isoformat()} UTC "
f"for activation target {target_at.replace(microsecond=0).isoformat()} UTC"
)
time.sleep(sleep_until_prebuild)
prebuild_radio_content("scheduled", target_at)
sleep_until_activation = max(0.0, (target_at - datetime.now(timezone.utc)).total_seconds())
if sleep_until_activation > 0:
time.sleep(sleep_until_activation)
# Final handoff check: an approved Studio rotation may have been written
# just after the scheduled prebuild pass or close to the approval
# deadline. Re-check the control file at activation time before marking
# the already-prebuilt queue as ready. Target matching still happens in
# prebuild_approved_next_rotation(), so stale/future approvals are never
# consumed for the wrong hour.
if radio.prebuild_approved_next_rotation(target_at):
print("[radio] Activation-time approved next rotation handoff complete")
if not radio.mark_next_rotation_ready(target_at):
print("[radio] Falling back to active hourly refresh because no prebuilt rotation is ready")
refresh_radio_content("hourly fallback")
@app.on_event("startup")
def startup():
broadcast.start()
threading.Thread(
target=lambda: refresh_radio_content("initial"),
daemon=True,
).start()
threading.Thread(
target=periodic_refresh_loop,
daemon=True,
).start()
threading.Thread(
target=broadcast_watchdog_loop,
daemon=True,
).start()
@app.get("/api/me")
def api_me(request: Request):
user = oauth_user_from_request(request)
if user is None:
return JSONResponse(
{
"authenticated": False,
"user": None,
}
)
return JSONResponse(
{
"authenticated": True,
"user": user,
}
)
@app.get("/stream.mp3")
def stream_mp3():
headers = {
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
"Connection": "keep-alive",
"X-Content-Type-Options": "nosniff",
}
return StreamingResponse(
broadcast_stream_mp3(),
media_type="audio/mpeg",
headers=headers,
)
@app.get("/cover-overrides/{song_id}/thumb.png")
def cover_override_thumb(song_id: str):
thumb_path = Path(COVER_OVERRIDES_DIR) / song_id / "thumb.png"
if not thumb_path.exists() or not thumb_path.is_file():
raise HTTPException(status_code=404, detail="Cover override not found")
return FileResponse(
thumb_path,
media_type="image/png",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
},
)
@app.get("/custom-tracks/{track_id}/thumb.png")
def custom_track_thumb(track_id: str):
thumb_path = Path(CUSTOM_TRACKS_DIR) / track_id / "thumb.png"
if not thumb_path.exists() or not thumb_path.is_file():
raise HTTPException(status_code=404, detail="Custom track thumbnail not found")
return FileResponse(
thumb_path,
media_type="image/png",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
},
)
@app.get("/api/status")
def api_status():
return JSONResponse(get_full_status())
@app.get("/api/rotation")
def api_rotation():
return JSONResponse(radio.rotation_snapshot())
@app.post("/api/listener/start/{listener_id}")
def api_listener_start(listener_id: str, request: Request):
user = oauth_user_from_request(request)
listeners.start(listener_id, user=user)
return JSONResponse(
{
"ok": True,
**listeners.snapshot(),
}
)
@app.post("/api/listener/heartbeat/{listener_id}")
def api_listener_heartbeat(listener_id: str, request: Request):
user = oauth_user_from_request(request)
listeners.heartbeat(listener_id, user=user)
return JSONResponse(
{
"ok": True,
**listeners.snapshot(),
}
)
@app.post("/api/listener/stop/{listener_id}")
def api_listener_stop(listener_id: str):
listeners.stop(listener_id)
return JSONResponse(
{
"ok": True,
**listeners.snapshot(),
}
)
def require_signed_in_user(request: Request, action: str):
user = oauth_user_from_request(request)
if user is None:
raise HTTPException(status_code=401, detail=f"Sign in with Hugging Face to {action}")
return user
def ensure_track_in_playlist(song_id: str):
if not radio.has_track(song_id):
raise HTTPException(status_code=404, detail="Track not found in current radio playlist")
def feedback_response(song_id: str, result: dict, **extra):
payload = {
"ok": True,
"song_id": song_id,
"like_count": result.get("count", 0),
"upvote_count": result.get("upvote_count", 0),
"downvote_count": result.get("downvote_count", 0),
"report_count": result.get("report_count", 0),
"updated_at": result.get("updated_at"),
"status": get_full_status(),
}
payload.update(extra)
return JSONResponse(payload)
@app.post("/api/like/{song_id}")
def api_like(song_id: str, request: Request):
require_signed_in_user(request, "like tracks")
ensure_track_in_playlist(song_id)
try:
result = likes.increment(song_id)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not save like: {e}")
return feedback_response(song_id, result)
@app.post("/api/upvote/{song_id}")
def api_upvote(song_id: str, request: Request):
require_signed_in_user(request, "vote on tracks")
ensure_track_in_playlist(song_id)
try:
result = likes.upvote(song_id)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not save upvote: {e}")
return feedback_response(song_id, result)
@app.post("/api/unupvote/{song_id}")
def api_unupvote(song_id: str, request: Request):
require_signed_in_user(request, "undo votes on tracks")
ensure_track_in_playlist(song_id)
try:
result = likes.unupvote(song_id)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not undo upvote: {e}")
return feedback_response(song_id, result)
@app.post("/api/downvote/{song_id}")
def api_downvote(song_id: str, request: Request):
require_signed_in_user(request, "vote on tracks")
ensure_track_in_playlist(song_id)
try:
result = likes.downvote(song_id)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not save downvote: {e}")
return feedback_response(song_id, result)
@app.post("/api/undownvote/{song_id}")
def api_undownvote(song_id: str, request: Request):
require_signed_in_user(request, "undo votes on tracks")
ensure_track_in_playlist(song_id)
try:
result = likes.undownvote(song_id)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not undo downvote: {e}")
return feedback_response(song_id, result)
@app.post("/api/report/{song_id}")
def api_report(song_id: str, request: Request):
user = require_signed_in_user(request, "report tracks")
if not user.get("is_pro"):
raise HTTPException(status_code=403, detail="Only Hugging Face PRO users can report tracks")
ensure_track_in_playlist(song_id)
weight = report_weight_for_user(user)
try:
result = likes.report(song_id, weight=weight)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not save report: {e}")
return feedback_response(
song_id,
result,
reported_at=result.get("reported_at"),
report_weight=weight,
)
@app.post("/api/unreport/{song_id}")
def api_unreport(song_id: str, request: Request):
user = require_signed_in_user(request, "undo reports")
if not user.get("is_pro"):
raise HTTPException(status_code=403, detail="Only Hugging Face PRO users can undo reports")
ensure_track_in_playlist(song_id)
weight = report_weight_for_user(user)
try:
result = likes.unreport(song_id, weight=weight)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not undo report: {e}")
return feedback_response(
song_id,
result,
unreported_at=result.get("unreported_at"),
report_weight=weight,
)
@app.post("/api/refresh")
def api_refresh():
def refresh_in_background():
try:
radio.refresh_runtime_controls()
radio.refresh_jingles()
radio.refresh_segments()
radio.refresh_custom_tracks()
radio.refresh_tracks()
except Exception as e:
print(f"[radio] Background refresh failed: {e}")
with radio.lock:
radio.is_loading = False
radio.last_error = str(e)
threading.Thread(target=refresh_in_background, daemon=True).start()
return JSONResponse(
{
"ok": True,
"status": get_full_status(),
}
)
def next_rotation_response_payload():
with radio.lock:
return {
"ok": True,
"ready": radio.next_rotation_ready,
"is_building": radio.next_rotation_is_building,
"count": len(radio.next_tracks),
"built_at": radio.next_rotation_built_at,
"target_at": radio.next_rotation_target_at,
"source": radio.next_rotation_source,
"last_error": radio.next_rotation_last_error,
"approved_last_error": radio.approved_next_rotation_last_error,
"current_rotation_source": radio.current_rotation_source,
"current_rotation_target_at": radio.current_rotation_target_at,
"current_rotation_activated_at": radio.current_rotation_activated_at,
"tracks": [dict(track) for track in radio.next_tracks],
}
@app.get("/api/next-rotation")
def api_next_rotation():
return JSONResponse(next_rotation_response_payload())
# Compatibility aliases for older Studio frontends / cached clients.
# The canonical endpoint is /api/next-rotation, but these prevent stale
# clients from hitting a 404 after the prebuild succeeds.
@app.get("/next-rotation-preview")
def api_next_rotation_preview_alias():
return JSONResponse(next_rotation_response_payload())
@app.get("/api/next-rotation-preview")
def api_next_rotation_preview_api_alias():
return JSONResponse(next_rotation_response_payload())
@app.post("/api/prebuild-next-rotation")
def api_prebuild_next_rotation():
target_at = next_utc_hour()
def prebuild_in_background():
prebuild_radio_content("manual", target_at)
threading.Thread(target=prebuild_in_background, daemon=True).start()
return JSONResponse(
{
"ok": True,
"target_at": target_at.isoformat(),
"prebuild_at": next_rotation_prebuild_at_iso(),
"status": get_full_status(),
}
)
@app.post("/api/mode/{mode}")
def api_mode(mode: str):
radio.set_mode(mode)
return JSONResponse(
{
"ok": True,
"status": get_full_status(),
}
)
@app.get("/api/tracks")
def api_tracks():
with radio.lock:
tracks = []
for track in radio.tracks:
item = dict(track)
item["like_count"] = likes.get_count(item["id"])
item["upvote_count"] = likes.get_upvote_count(item["id"])
item["downvote_count"] = likes.get_downvote_count(item["id"])
item["report_count"] = likes.get_report_count(item["id"])
radio._apply_track_controls_unlocked(item)
tracks.append(item)
return JSONResponse(
{
"count": len(tracks),
"tracks": tracks,
}
)
@app.get("/api/likes")
def api_likes():
return JSONResponse(
{
"likes": likes.snapshot(),
"likes_enabled": likes.last_error is None,
"likes_error": likes.last_error,
"likes_path": str(likes.likes_path),
}
)
app.mount("/", StaticFiles(directory="static", html=True), name="static")