snare_scout / src /scout.py
john221113's picture
Move files to src
1a46553
"""
Snare Scout v7.0 β€” Personalized Perceptual Search
Builds on v6.9 (Blended Perceptual + Gated Identity) with:
- Feedback collection (πŸ‘/πŸ‘Ž on results)
- Pairwise learning-to-rank (learns YOUR definition of "similar")
- Online training (improves immediately from votes)
- Generalization across whole library (learns channel weights, not clip IDs)
The system learns what similarity channels YOU trust:
- Do you care more about attack shape or spectral envelope?
- Do you prefer embedding similarity or acoustic features?
- What tradeoffs matter to YOUR ear?
This transfers to new samples automatically.
"""
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import io, sys, json, time, math, random, sqlite3, tempfile, subprocess, hashlib, uuid
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import numpy as np
import soundfile as sf
import librosa
import torch
from transformers import ClapModel, ClapProcessor
# v6.6 modules
try:
import preprocessing
import embeddings_mert
import embeddings_panns
import distance_metrics
V66_MODULES_AVAILABLE = True
except ImportError as e:
print(f"[scout] Warning: v6.6 modules not available: {e}")
V66_MODULES_AVAILABLE = False
DEFAULT_DB_PATH = os.path.join("library", "snare_scout.sqlite")
DEFAULT_MODEL_NAME = "laion/larger_clap_music"
os.makedirs("library", exist_ok=True)
CURRENT_INDEX_VERSION = 12 # Same as v6.9
# =============================================================================
# CONFIGURATION
# =============================================================================
# Identity boost (from v6.9)
IDENTITY_THRESHOLD = float(os.getenv("SCOUT_IDENTITY_THR", "0.88"))
IDENTITY_BOOST_WEIGHT = float(os.getenv("SCOUT_IDENTITY_BOOST", "0.35"))
# Personalization settings
PERSONALIZATION_MIN_PAIRS = int(os.getenv("SCOUT_MIN_PAIRS", "10")) # Min pairs before using
PERSONALIZATION_MAX_ALPHA = float(os.getenv("SCOUT_MAX_ALPHA", "0.7")) # Max blend weight
PERSONALIZATION_LEARNING_RATE = float(os.getenv("SCOUT_LR", "0.03"))
PERSONALIZATION_REGULARIZATION = float(os.getenv("SCOUT_REG", "0.0005"))
# Stage A weights (perceptual retrieval)
W_RICH = 0.55
W_PATCH = 0.25
W_MEL = 0.15
W_EMB_STAGE_A = 0.05
# Stage B weights (perceptual rerank)
W_PATCH_SHIFT_T = 0.12
W_PATCH_SHIFT_B = 0.08
W_TRANS_DESC = 0.05
# Stage C weights (identity score)
W_ID_FULL = 0.60
W_ID_TRANS = 0.25
W_ID_TAIL = 0.15
# =============================================================================
# FEATURE VECTOR FOR PERSONALIZATION
# =============================================================================
# These are the similarity channels we'll learn to weight
FEATURE_NAMES = [
"sim_rich", # Rich acoustic features
"sim_patch_multi", # Multiscale patches
"sim_mel", # Mel envelope
"sim_patch_shift_t", # Shift-tolerant trans
"sim_patch_shift_b", # Shift-tolerant tail
"sim_trans_desc", # Transient descriptor
"sim_emb_full", # PANNs full embedding
"sim_emb_trans", # MERT trans embedding
"sim_emb_tail", # MERT tail embedding
"bias" # Constant term
]
N_FEATURES = len(FEATURE_NAMES)
# Default weights (matches v6.9 behavior before any learning)
DEFAULT_WEIGHTS = np.array([
0.55, # rich
0.25, # patch_multi
0.15, # mel
0.12, # patch_shift_t
0.08, # patch_shift_b
0.05, # trans_desc
0.05, # emb_full (minimal in perceptual mode)
0.02, # emb_trans
0.02, # emb_tail
0.0 # bias
], dtype=np.float32)
# =============================================================================
# Slice timing
# =============================================================================
if V66_MODULES_AVAILABLE:
FULL_PRE = preprocessing.ONSET_PRE_MS / 1000.0
FULL_POST = preprocessing.ONSET_POST_MS / 1000.0
TRANS_POST = max(0.0, preprocessing.TRANS_END_MS / 1000.0 - FULL_PRE)
TAIL_START = max(0.0, preprocessing.TAIL_START_MS / 1000.0 - FULL_PRE)
TAIL_END = max(0.0, preprocessing.TAIL_END_MS / 1000.0 - FULL_PRE)
else:
FULL_PRE, FULL_POST = 0.015, 0.735
TRANS_POST = 0.070
TAIL_START, TAIL_END = 0.015, 0.635
# Feature dimensions
PATCH_N_MELS, PATCH_N_FRAMES = 32, 24
PATCH_DIM = PATCH_N_MELS * PATCH_N_FRAMES
SHIFT_FRAMES = 3
SHIFT_VARIANTS = [-SHIFT_FRAMES, 0, SHIFT_FRAMES]
SHIFTED_PATCH_DIM = PATCH_DIM * len(SHIFT_VARIANTS)
PATCH_SCALES = [16, 24, 32, 48]
MULTISCALE_PATCH_DIM = PATCH_N_MELS * sum(PATCH_SCALES)
TRANS_DESC_DIM = 8
# Rich features
N_MFCC = 20
MFCC_DIM = N_MFCC * 3
SPECTRAL_DIM = 13
ENVELOPE_DIM = 16
ATTACK_DECAY_DIM = 16
TEXTURE_DIM = 8
RICH_FEATURES_DIM = MFCC_DIM + SPECTRAL_DIM + ENVELOPE_DIM + ATTACK_DECAY_DIM + TEXTURE_DIM
# Embedding backends
EMB_FULL_BACKEND = os.getenv("SCOUT_EMB_FULL", "panns").strip().lower()
EMB_DETAIL_BACKEND = os.getenv("SCOUT_EMB_DETAIL", "mert").strip().lower()
FULL_MODE = os.getenv("SCOUT_FULL_MODE", "1").strip().lower() in ("1", "true", "yes", "on")
def explain_pipeline():
return f"""
**Snare Scout v7.0 β€” Personalized Perceptual Search**
**Base:** Blended perceptual + gated identity (v6.9)
**New:** Learning-to-rank from your feedback
**How it works:**
1. πŸ‘/πŸ‘Ž on results creates preference pairs
2. System learns which similarity channels YOU trust
3. Personalization blends in as you give more feedback
**Current settings:**
- Min pairs to activate: {PERSONALIZATION_MIN_PAIRS}
- Max personalization blend: {PERSONALIZATION_MAX_ALPHA:.0%}
- Learning rate: {PERSONALIZATION_LEARNING_RATE}
**Feature channels being learned:**
{', '.join(FEATURE_NAMES[:-1])}
(Index v{CURRENT_INDEX_VERSION})
"""
# =============================================================================
# Database
# =============================================================================
def _connect_db(db_path):
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
con = sqlite3.connect(db_path, timeout=30)
con.execute("PRAGMA journal_mode=WAL")
con.execute("PRAGMA busy_timeout=10000")
return con
def init_db(db_path):
con = _connect_db(db_path)
# Original tables
con.execute("CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value TEXT)")
con.execute("""CREATE TABLE IF NOT EXISTS videos (
video_id TEXT PRIMARY KEY, title TEXT, status TEXT, last_error TEXT,
updated_at REAL, index_ver INTEGER)""")
con.execute("""CREATE TABLE IF NOT EXISTS clips (
id INTEGER PRIMARY KEY, video_id TEXT, title TEXT, url TEXT, t0 REAL, t1 REAL,
emb BLOB, mel BLOB, emb_t BLOB, emb_b BLOB, mel_t BLOB, mel_b BLOB,
patch_t BLOB, patch_b BLOB, patch_t_shifted BLOB, patch_b_shifted BLOB,
trans_desc BLOB, patch_multi BLOB, rich_features BLOB,
index_ver INTEGER, created_at REAL)""")
# v7.0 Personalization tables
con.execute("""CREATE TABLE IF NOT EXISTS feedback_sessions (
session_id TEXT PRIMARY KEY,
created_at REAL,
query_hash TEXT,
mode TEXT,
notes TEXT
)""")
con.execute("""CREATE TABLE IF NOT EXISTS feedback_votes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
clip_id INTEGER,
vote INTEGER,
rank_at_vote INTEGER,
created_at REAL
)""")
con.execute("CREATE INDEX IF NOT EXISTS idx_votes_session ON feedback_votes(session_id)")
con.execute("""CREATE TABLE IF NOT EXISTS feedback_pairs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
pos_clip_id INTEGER,
neg_clip_id INTEGER,
created_at REAL
)""")
con.execute("""CREATE TABLE IF NOT EXISTS feedback_candidate_features (
session_id TEXT,
clip_id INTEGER,
features BLOB,
score REAL DEFAULT 0,
rank INTEGER DEFAULT 0,
PRIMARY KEY(session_id, clip_id)
)""")
con.execute("""CREATE TABLE IF NOT EXISTS personalization_model (
key TEXT PRIMARY KEY,
value BLOB
)""")
# Migration for clips table
cols = {r[1] for r in con.execute("PRAGMA table_info(clips)").fetchall()}
for col, t in [("emb", "BLOB"), ("mel", "BLOB"), ("emb_t", "BLOB"), ("emb_b", "BLOB"),
("mel_t", "BLOB"), ("mel_b", "BLOB"), ("patch_t", "BLOB"), ("patch_b", "BLOB"),
("patch_t_shifted", "BLOB"), ("patch_b_shifted", "BLOB"), ("trans_desc", "BLOB"),
("patch_multi", "BLOB"), ("rich_features", "BLOB"), ("index_ver", "INTEGER"),
("created_at", "REAL")]:
if col not in cols:
con.execute(f"ALTER TABLE clips ADD COLUMN {col} {t}")
con.execute("CREATE INDEX IF NOT EXISTS idx_clips_ver ON clips(index_ver)")
# Migration for feedback_candidate_features (v7.0+)
try:
fcf_cols = {r[1] for r in con.execute("PRAGMA table_info(feedback_candidate_features)").fetchall()}
for col, t in [("score", "REAL DEFAULT 0"), ("rank", "INTEGER DEFAULT 0")]:
col_name = col.split()[0] if " " in col else col
if col_name not in fcf_cols:
con.execute(f"ALTER TABLE feedback_candidate_features ADD COLUMN {col} {t}")
except:
pass # Table might not exist yet
con.commit()
con.close()
def get_db_stats(db_path):
init_db(db_path)
con = _connect_db(db_path)
clips = con.execute("SELECT COUNT(*) FROM clips").fetchone()[0]
clips_cur = con.execute("SELECT COUNT(*) FROM clips WHERE index_ver=?",
(CURRENT_INDEX_VERSION,)).fetchone()[0]
videos_ok = con.execute("SELECT COUNT(*) FROM videos WHERE status='ok'").fetchone()[0]
videos_total = con.execute("SELECT COUNT(*) FROM videos").fetchone()[0]
# Personalization stats
n_pairs = con.execute("SELECT COUNT(*) FROM feedback_pairs").fetchone()[0]
n_votes = con.execute("SELECT COUNT(*) FROM feedback_votes").fetchone()[0]
n_sessions = con.execute("SELECT COUNT(*) FROM feedback_sessions").fetchone()[0]
con.close()
return {
"clips": clips, "clips_v4": clips_cur, "clips_legacy": clips - clips_cur,
"videos_total": videos_total, "videos_ok": videos_ok,
"videos_dead": videos_total - videos_ok,
"feedback_pairs": n_pairs,
"feedback_votes": n_votes,
"feedback_sessions": n_sessions
}
def purge_legacy(db_path):
init_db(db_path)
con = _connect_db(db_path)
n = con.execute("SELECT COUNT(*) FROM clips WHERE COALESCE(index_ver,1)!=?",
(CURRENT_INDEX_VERSION,)).fetchone()[0]
con.execute("DELETE FROM clips WHERE COALESCE(index_ver,1)!=?", (CURRENT_INDEX_VERSION,))
con.commit()
con.close()
return int(n)
# =============================================================================
# PERSONALIZATION MODEL
# =============================================================================
class PersonalizationModel:
"""
Pairwise logistic regression for learning user preferences.
Learns weights for similarity channels based on πŸ‘/πŸ‘Ž feedback.
"""
def __init__(self, db_path: str):
self.db_path = db_path
self.weights = self._load_weights()
self.n_pairs_trained = self._count_pairs()
def _load_weights(self) -> np.ndarray:
"""Load weights from DB or return defaults."""
try:
con = _connect_db(self.db_path)
row = con.execute(
"SELECT value FROM personalization_model WHERE key='weights'"
).fetchone()
con.close()
if row:
return np.frombuffer(row[0], dtype=np.float32).copy()
except:
pass
return DEFAULT_WEIGHTS.copy()
def _save_weights(self):
"""Save weights to DB."""
con = _connect_db(self.db_path)
con.execute(
"INSERT OR REPLACE INTO personalization_model (key, value) VALUES (?, ?)",
("weights", self.weights.astype(np.float32).tobytes())
)
con.commit()
con.close()
def _count_pairs(self) -> int:
"""Count total training pairs."""
try:
con = _connect_db(self.db_path)
n = con.execute("SELECT COUNT(*) FROM feedback_pairs").fetchone()[0]
con.close()
return n
except:
return 0
def get_blend_alpha(self) -> float:
"""
How much to blend personalized scores vs base scores.
Increases with more training data.
"""
if self.n_pairs_trained < PERSONALIZATION_MIN_PAIRS:
return 0.0 # Not enough data yet
# Gradually increase alpha as we get more pairs
alpha = 0.15 + 0.002 * (self.n_pairs_trained - PERSONALIZATION_MIN_PAIRS)
return min(PERSONALIZATION_MAX_ALPHA, alpha)
def train_step(self, pos_features: np.ndarray, neg_features: np.ndarray):
"""
Single SGD step for pairwise logistic regression.
pos_features: feature vector for upvoted candidate
neg_features: feature vector for downvoted candidate
"""
d = pos_features - neg_features
# Sigmoid
logit = np.dot(self.weights, d)
p = 1.0 / (1.0 + np.exp(-np.clip(logit, -30, 30)))
# Gradient update: want wΒ·d to be positive (pos ranks above neg)
grad = (1.0 - p) * d - PERSONALIZATION_REGULARIZATION * self.weights
self.weights += PERSONALIZATION_LEARNING_RATE * grad
# Keep weights bounded
self.weights = np.clip(self.weights, -5.0, 5.0)
def train_on_pairs(self, pairs: List[Tuple[np.ndarray, np.ndarray]], epochs: int = 3):
"""Train on a batch of pairs."""
for _ in range(epochs):
random.shuffle(pairs)
for pos_f, neg_f in pairs:
self.train_step(pos_f, neg_f)
self._save_weights()
self.n_pairs_trained = self._count_pairs()
def score(self, features: np.ndarray) -> float:
"""Compute personalized score for a candidate."""
return float(np.dot(self.weights, features))
def score_batch(self, feature_matrix: np.ndarray) -> np.ndarray:
"""Compute personalized scores for multiple candidates."""
return feature_matrix @ self.weights
def reset(self):
"""Reset to default weights and clear all feedback."""
self.weights = DEFAULT_WEIGHTS.copy()
con = _connect_db(self.db_path)
con.execute("DELETE FROM feedback_pairs")
con.execute("DELETE FROM feedback_votes")
con.execute("DELETE FROM feedback_sessions")
con.execute("DELETE FROM feedback_candidate_features")
con.execute("DELETE FROM personalization_model")
con.commit()
con.close()
self.n_pairs_trained = 0
def get_weight_report(self) -> str:
"""Human-readable report of learned weights."""
lines = ["**Learned Weights:**"]
for name, w, default in zip(FEATURE_NAMES, self.weights, DEFAULT_WEIGHTS):
delta = w - default
arrow = "↑" if delta > 0.01 else "↓" if delta < -0.01 else "="
lines.append(f" {name}: {w:.3f} (default {default:.3f}) {arrow}")
lines.append(f"\n**Training pairs:** {self.n_pairs_trained}")
lines.append(f"**Blend alpha:** {self.get_blend_alpha():.2f}")
return "\n".join(lines)
# Global personalization model (lazy loaded)
_PERSONALIZATION_MODEL = None
def get_personalization_model(db_path: str = DEFAULT_DB_PATH) -> PersonalizationModel:
global _PERSONALIZATION_MODEL
if _PERSONALIZATION_MODEL is None or _PERSONALIZATION_MODEL.db_path != db_path:
_PERSONALIZATION_MODEL = PersonalizationModel(db_path)
return _PERSONALIZATION_MODEL
# =============================================================================
# FEEDBACK MANAGEMENT
# =============================================================================
def create_feedback_session(db_path: str, query_hash: str, mode: str = "perceptual") -> str:
"""Create a new feedback session for a search query."""
session_id = str(uuid.uuid4())[:12]
con = _connect_db(db_path)
con.execute(
"INSERT INTO feedback_sessions (session_id, created_at, query_hash, mode) VALUES (?, ?, ?, ?)",
(session_id, time.time(), query_hash, mode)
)
con.commit()
con.close()
return session_id
def store_candidate_features(db_path: str, session_id: str, clip_id: int,
features: np.ndarray, score: float = 0.0, rank: int = 0):
"""Store feature vector and score for a candidate in a session."""
con = _connect_db(db_path)
con.execute(
"INSERT OR REPLACE INTO feedback_candidate_features (session_id, clip_id, features, score, rank) VALUES (?, ?, ?, ?, ?)",
(session_id, clip_id, features.astype(np.float32).tobytes(), score, rank)
)
con.commit()
con.close()
def store_candidate_features_batch(db_path: str, session_id: str,
clip_ids: List[int], features_list: List[np.ndarray],
scores: List[float] = None, ranks: List[int] = None):
"""Store feature vectors, scores, and ranks for multiple candidates."""
con = _connect_db(db_path)
if scores is None:
scores = [0.0] * len(clip_ids)
if ranks is None:
ranks = list(range(1, len(clip_ids) + 1))
for clip_id, features, score, rank in zip(clip_ids, features_list, scores, ranks):
con.execute(
"INSERT OR REPLACE INTO feedback_candidate_features (session_id, clip_id, features, score, rank) VALUES (?, ?, ?, ?, ?)",
(session_id, clip_id, features.astype(np.float32).tobytes(), score, rank)
)
con.commit()
con.close()
def record_vote(db_path: str, session_id: str, clip_id: int, vote: int, rank: int):
"""
Record a vote (πŸ‘ = +1, πŸ‘Ž = -1).
Training logic (robust learning-to-rank):
- YES + NO: Create explicit pairs (strongest signal)
- YES only: Create pairs against implicit negatives that are:
* Ranked below the upvoted item
* Have score at least MARGIN lower than the upvoted item
* Were actually shown to the user
- NO only: Store but don't train (no positive to learn from)
Returns number of pairs created.
"""
IMPLICIT_NEGATIVE_MARGIN = 0.10 # Score gap required for implicit negative
MAX_IMPLICIT_NEGATIVES = 3 # Max implicit pairs per upvote
con = _connect_db(db_path)
# Store vote
con.execute(
"INSERT INTO feedback_votes (session_id, clip_id, vote, rank_at_vote, created_at) VALUES (?, ?, ?, ?, ?)",
(session_id, clip_id, vote, rank, time.time())
)
# Get all votes for this session
votes = con.execute(
"SELECT clip_id, vote, rank_at_vote FROM feedback_votes WHERE session_id=?",
(session_id,)
).fetchall()
upvoted = [(v[0], v[2]) for v in votes if v[1] > 0] # (clip_id, rank)
downvoted = [(v[0], v[2]) for v in votes if v[1] < 0] # (clip_id, rank)
# No upvotes = no training (downvote-only doesn't help)
if not upvoted:
con.commit()
con.close()
return 0
# Get all candidates shown in this session WITH their scores
all_candidates = con.execute(
"SELECT clip_id, score, rank FROM feedback_candidate_features WHERE session_id=? ORDER BY rank",
(session_id,)
).fetchall()
# Build lookup: clip_id -> (score, rank)
candidate_info = {r[0]: (r[1], r[2]) for r in all_candidates}
voted_ids = {v[0] for v in votes}
downvoted_ids = {v[0] for v in downvoted}
new_pairs = []
for pos_id, pos_vote_rank in upvoted:
pos_score, pos_orig_rank = candidate_info.get(pos_id, (0.0, pos_vote_rank))
# Strategy 1: Explicit negatives (user clicked πŸ‘Ž) - strongest signal
for neg_id, neg_vote_rank in downvoted:
existing = con.execute(
"SELECT 1 FROM feedback_pairs WHERE session_id=? AND pos_clip_id=? AND neg_clip_id=?",
(session_id, pos_id, neg_id)
).fetchone()
if not existing:
con.execute(
"INSERT INTO feedback_pairs (session_id, pos_clip_id, neg_clip_id, created_at) VALUES (?, ?, ?, ?)",
(session_id, pos_id, neg_id, time.time())
)
new_pairs.append((pos_id, neg_id))
# Strategy 2: Implicit negatives (only if no explicit downvotes)
# Use margin rule: item must be ranked below AND score gap >= MARGIN
if not downvoted:
implicit_negatives = []
for cid, (cand_score, cand_rank) in candidate_info.items():
# Skip if: already voted on, same as positive, or not meeting criteria
if cid in voted_ids:
continue
if cid == pos_id:
continue
# Must be ranked below the upvoted item
if cand_rank <= pos_orig_rank:
continue
# Must have score at least MARGIN lower
score_gap = pos_score - cand_score
if score_gap < IMPLICIT_NEGATIVE_MARGIN:
continue
# Good implicit negative candidate
implicit_negatives.append((cid, cand_score, cand_rank, score_gap))
# Sort by score gap (larger gap = more confident negative)
implicit_negatives.sort(key=lambda x: -x[3])
# Take top k
for neg_id, neg_score, neg_rank, gap in implicit_negatives[:MAX_IMPLICIT_NEGATIVES]:
existing = con.execute(
"SELECT 1 FROM feedback_pairs WHERE session_id=? AND pos_clip_id=? AND neg_clip_id=?",
(session_id, pos_id, neg_id)
).fetchone()
if not existing:
con.execute(
"INSERT INTO feedback_pairs (session_id, pos_clip_id, neg_clip_id, created_at) VALUES (?, ?, ?, ?)",
(session_id, pos_id, neg_id, time.time())
)
new_pairs.append((pos_id, neg_id))
con.commit()
# Train on new pairs
if new_pairs:
training_pairs = []
for pos_id, neg_id in new_pairs:
pos_row = con.execute(
"SELECT features FROM feedback_candidate_features WHERE session_id=? AND clip_id=?",
(session_id, pos_id)
).fetchone()
neg_row = con.execute(
"SELECT features FROM feedback_candidate_features WHERE session_id=? AND clip_id=?",
(session_id, neg_id)
).fetchone()
if pos_row and neg_row:
pos_f = np.frombuffer(pos_row[0], dtype=np.float32)
neg_f = np.frombuffer(neg_row[0], dtype=np.float32)
training_pairs.append((pos_f, neg_f))
if training_pairs:
model = get_personalization_model(db_path)
model.train_on_pairs(training_pairs, epochs=2)
con.close()
return len(new_pairs)
def remove_vote(db_path: str, session_id: str, clip_id: int):
"""
Remove a vote and any training pairs it created.
Returns number of pairs removed.
"""
con = _connect_db(db_path)
# Get the vote being removed
vote_row = con.execute(
"SELECT vote FROM feedback_votes WHERE session_id=? AND clip_id=? ORDER BY created_at DESC LIMIT 1",
(session_id, clip_id)
).fetchone()
if not vote_row:
con.close()
return 0
removed_vote = vote_row[0]
# Remove the vote(s) for this clip in this session
con.execute(
"DELETE FROM feedback_votes WHERE session_id=? AND clip_id=?",
(session_id, clip_id)
)
# Remove any pairs involving this clip
pairs_removed = 0
if removed_vote > 0:
# Was upvoted - remove pairs where this was the positive
result = con.execute(
"DELETE FROM feedback_pairs WHERE session_id=? AND pos_clip_id=?",
(session_id, clip_id)
)
pairs_removed = result.rowcount
else:
# Was downvoted - remove pairs where this was the negative
result = con.execute(
"DELETE FROM feedback_pairs WHERE session_id=? AND neg_clip_id=?",
(session_id, clip_id)
)
pairs_removed = result.rowcount
con.commit()
con.close()
# Note: We don't "untrain" the model - the pairs are just removed from future training
# The model will naturally adjust as more votes come in
return pairs_removed
def get_feedback_stats(db_path: str) -> dict:
"""Get feedback statistics."""
con = _connect_db(db_path)
n_pairs = con.execute("SELECT COUNT(*) FROM feedback_pairs").fetchone()[0]
n_votes = con.execute("SELECT COUNT(*) FROM feedback_votes").fetchone()[0]
n_up = con.execute("SELECT COUNT(*) FROM feedback_votes WHERE vote > 0").fetchone()[0]
n_down = con.execute("SELECT COUNT(*) FROM feedback_votes WHERE vote < 0").fetchone()[0]
n_sessions = con.execute("SELECT COUNT(*) FROM feedback_sessions").fetchone()[0]
con.close()
model = get_personalization_model(db_path)
return {
"total_pairs": n_pairs,
"total_votes": n_votes,
"upvotes": n_up,
"downvotes": n_down,
"sessions": n_sessions,
"blend_alpha": model.get_blend_alpha(),
"personalization_active": model.get_blend_alpha() > 0
}
def reset_personalization(db_path: str):
"""Reset all personalization data."""
model = get_personalization_model(db_path)
model.reset()
# =============================================================================
# Audio utilities
# =============================================================================
# Minimum samples needed for neural networks (0.5 sec at 48kHz)
MIN_AUDIO_SAMPLES = 24000
def _resample_mono(y, sr, target_sr=48000):
if y.ndim > 1:
y = np.mean(y, axis=1)
y = y.astype(np.float32)
peak = np.max(np.abs(y))
if peak > 1e-9:
y = y / peak
if sr != target_sr:
y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
return y, target_sr
def _pad_to_minimum(y, min_samples=MIN_AUDIO_SAMPLES):
"""Pad audio to minimum length required by neural networks."""
if len(y) >= min_samples:
return y
# Pad with zeros (silence) at the end
return np.pad(y, (0, min_samples - len(y)), mode='constant')
def _is_too_short(y, min_samples=MIN_AUDIO_SAMPLES):
"""Check if audio is too short even for padding to help."""
# If it's less than 10% of minimum, it's probably not a real hit
return len(y) < min_samples // 10
# =============================================================================
# CLAP embedder
# =============================================================================
@dataclass
class Embedder:
model_name: str
device: str
processor: ClapProcessor
model: ClapModel
text_cache: Dict[str, np.ndarray]
_EMBEDDER_CACHE = {}
def get_embedder(model_name=DEFAULT_MODEL_NAME):
if model_name in _EMBEDDER_CACHE:
return _EMBEDDER_CACHE[model_name]
print(f"[scout] Loading CLAP model: {model_name}...")
device = "mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu"
print(f"[scout] Using device: {device}")
processor = ClapProcessor.from_pretrained(model_name)
model = ClapModel.from_pretrained(model_name).to(device).eval()
print(f"[scout] Model loaded and ready")
embedder = Embedder(model_name, device, processor, model, {})
_EMBEDDER_CACHE[model_name] = embedder
return embedder
@torch.inference_mode()
def embed_audio(embedder, y, sr):
y, sr = _resample_mono(y, sr, 48000)
# Pad short audio for CLAP
y = _pad_to_minimum(y, MIN_AUDIO_SAMPLES)
try:
inputs = embedder.processor(audio=y, sampling_rate=sr, return_tensors="pt")
except:
inputs = embedder.processor(audios=y, sampling_rate=sr, return_tensors="pt")
inputs = {k: v.to(embedder.device) for k, v in inputs.items()}
v = embedder.model.get_audio_features(**inputs).detach().float().cpu().numpy().reshape(-1)
return (v / (np.linalg.norm(v) + 1e-9)).astype(np.float32)
@torch.inference_mode()
def embed_texts(embedder, texts):
new = [t for t in texts if t not in embedder.text_cache]
if new:
inputs = embedder.processor(text=new, return_tensors="pt", padding=True)
inputs = {k: v.to(embedder.device) for k, v in inputs.items()}
arr = embedder.model.get_text_features(**inputs).detach().float().cpu().numpy()
arr = arr / (np.linalg.norm(arr, axis=1, keepdims=True) + 1e-9)
for t, v in zip(new, arr):
embedder.text_cache[t] = v.astype(np.float32)
return np.stack([embedder.text_cache[t] for t in texts])
# =============================================================================
# Embedding backends
# =============================================================================
def _embed_with_backend(backend: str, embedder, audio: np.ndarray, sr: int) -> np.ndarray:
backend = (backend or "clap").lower()
# Pad short audio to minimum length
audio = _pad_to_minimum(audio, MIN_AUDIO_SAMPLES)
if backend == "panns" and V66_MODULES_AVAILABLE:
return embeddings_panns.embed_audio_panns(audio, sr)
if backend == "mert" and V66_MODULES_AVAILABLE and embeddings_mert.is_mert_available():
return embeddings_mert.embed_audio_mert(audio, sr)
return embed_audio(embedder, audio, sr)
def embed_full(embedder, audio: np.ndarray, sr: int) -> np.ndarray:
return _embed_with_backend(EMB_FULL_BACKEND, embedder, audio, sr)
def embed_detail(embedder, audio: np.ndarray, sr: int) -> np.ndarray:
return _embed_with_backend(EMB_DETAIL_BACKEND, embedder, audio, sr)
def embed_matching_library(embedder, audio: np.ndarray, sr: int, target_dim: int) -> np.ndarray:
"""Auto-select backend based on target dimension, with padding for short audio."""
# Pad short audio to minimum length
audio = _pad_to_minimum(audio, MIN_AUDIO_SAMPLES)
if target_dim == 2048 and V66_MODULES_AVAILABLE:
return embeddings_panns.embed_audio_panns(audio, sr)
elif target_dim == 1024 and V66_MODULES_AVAILABLE and embeddings_mert.is_mert_available():
return embeddings_mert.embed_audio_mert(audio, sr)
elif target_dim == 512:
return embed_audio(embedder, audio, sr)
raise ValueError(f"Unknown target dimension: {target_dim}")
# =============================================================================
# Feature extraction
# =============================================================================
def mel_shape(y, sr, n_mels=64):
y, sr = _resample_mono(y, sr, 48000)
S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels, fmax=14000)
v = librosa.power_to_db(S + 1e-10).mean(axis=1).astype(np.float32)
v -= np.mean(v)
return (v / (np.linalg.norm(v) + 1e-9)).astype(np.float32)
def mel_patch(y, sr, n_mels=PATCH_N_MELS, n_frames=PATCH_N_FRAMES):
y, sr = _resample_mono(y, sr, 48000)
S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels, fmax=14000, hop_length=256)
logS = librosa.power_to_db(S + 1e-10).astype(np.float32)
T = logS.shape[1]
if T <= 1:
out = np.zeros((n_mels, n_frames), np.float32)
else:
out = np.array([np.interp(np.linspace(0, 1, n_frames),
np.linspace(0, 1, T), logS[i]) for i in range(n_mels)])
v = out.reshape(-1)
v -= np.mean(v)
return (v / (np.linalg.norm(v) + 1e-9)).astype(np.float32)
def mel_patch_shifted(y, sr):
y, sr = _resample_mono(y, sr, 48000)
patches = []
for shift in SHIFT_VARIANTS:
s = shift * 256
if s < 0:
ys = y[abs(s):]
elif s > 0:
ys = np.concatenate([np.zeros(s, np.float32), y])
else:
ys = y
patches.append(mel_patch(ys, sr) if len(ys) > 100 else np.zeros(PATCH_DIM, np.float32))
combined = np.concatenate(patches)
return (combined / (np.linalg.norm(combined) + 1e-9)).astype(np.float32)
def mel_patch_multiscale(y, sr):
y, sr = _resample_mono(y, sr, 48000)
patches = [mel_patch(y, sr, n_frames=n) for n in PATCH_SCALES]
combined = np.concatenate(patches)
return (combined / (np.linalg.norm(combined) + 1e-9)).astype(np.float32)
def transient_descriptor(y, sr):
y, sr = _resample_mono(y, sr, 48000)
if len(y) < 512:
return np.zeros(TRANS_DESC_DIM, np.float32)
desc = np.zeros(TRANS_DESC_DIM, np.float32)
env = np.abs(y)
win = max(1, int(0.002 * sr))
if win > 1:
env = np.convolve(env, np.ones(win) / win, 'same')
attack_samples = int(0.02 * sr)
if attack_samples < len(env):
desc[0] = float(np.max(np.diff(env[:attack_samples])))
desc[1] = float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))) / sr
desc[2] = float(np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr))) / sr
rms = float(np.sqrt(np.mean(y ** 2))) + 1e-9
desc[3] = min(float(np.max(np.abs(y))) / rms, 10) / 10
desc[4] = float(np.mean(librosa.feature.zero_crossing_rate(y)))
onset = librosa.onset.onset_strength(y=y, sr=sr)
desc[5] = float(np.mean(onset)) / (float(np.max(onset)) + 1e-9)
S = np.abs(librosa.stft(y))
n = S.shape[0]
total = float(np.mean(S)) + 1e-9
desc[6] = float(np.mean(S[:n // 4])) / total
desc[7] = float(np.mean(S[n * 3 // 4:])) / total
desc -= np.mean(desc)
return (desc / (np.linalg.norm(desc) + 1e-9)).astype(np.float32)
# Rich features (condensed from v6.9)
def _compute_envelope(y, sr, hop=256):
env = np.array([np.sqrt(np.mean(y[i:i + hop] ** 2))
for i in range(0, max(1, len(y) - hop), hop)])
if len(env) < 2:
return np.zeros(50, np.float32)
win = max(1, len(env) // 20)
if win > 1:
env = np.convolve(env, np.ones(win) / win, 'same')
return (env / (np.max(env) + 1e-9)).astype(np.float32)
def extract_envelope_features(y, sr):
y, sr = _resample_mono(y, sr, 48000)
feats = np.zeros(ENVELOPE_DIM, np.float32)
if len(y) < 256:
return feats
try:
env = _compute_envelope(y, sr)
if len(env) < 4:
return feats
peak_idx = np.argmax(env)
feats[0] = peak_idx / len(env)
env_resamp = np.interp(np.linspace(0, 1, 12), np.linspace(0, 1, len(env)), env)
feats[1:13] = env_resamp
feats[13] = float(np.std(env))
feats[14] = float(np.mean(env))
feats[15] = float(np.std(np.diff(env))) if len(env) > 1 else 0
except:
pass
return feats.astype(np.float32)
def extract_attack_decay_features(y, sr):
y, sr = _resample_mono(y, sr, 48000)
feats = np.zeros(ATTACK_DECAY_DIM, np.float32)
if len(y) < 512:
return feats
try:
env = _compute_envelope(y, sr)
if len(env) < 4:
return feats
peak_idx = np.argmax(env)
feats[0] = peak_idx / len(env)
if peak_idx > 1:
attack = env[:peak_idx]
feats[1] = float(np.mean(np.diff(attack))) * 10
linear = np.linspace(0, env[peak_idx], len(attack))
feats[2] = float(np.mean(attack - linear))
attack_samples = min(int(0.02 * sr), len(y) // 2)
if attack_samples > 100:
try:
cent = librosa.feature.spectral_centroid(y=y[:attack_samples], sr=sr)
feats[3] = float(np.mean(cent)) / sr
except:
pass
if peak_idx < len(env) - 2:
decay = env[peak_idx:]
thr_37 = env[peak_idx] * 0.37
below = np.where(decay < thr_37)[0]
feats[4] = below[0] / len(env) if len(below) > 0 else 1.0
thr_10 = env[peak_idx] * 0.10
below = np.where(decay < thr_10)[0]
feats[5] = below[0] / len(env) if len(below) > 0 else 1.0
if len(decay) > 5:
log_decay = np.log(decay + 1e-9)
try:
feats[6] = np.polyfit(np.arange(len(decay)), log_decay, 1)[0] * 100
except:
pass
mid = len(decay) // 3
if mid > 0:
feats[7] = float(np.mean(decay[mid:2 * mid]))
tail_start = 3 * len(decay) // 4
if tail_start < len(decay):
feats[8] = float(np.mean(decay[tail_start:]))
decay_start = int(peak_idx * len(y) / len(env))
decay_end = min(len(y), decay_start + len(y) // 2)
if decay_end - decay_start > 256:
try:
cent = librosa.feature.spectral_centroid(y=y[decay_start:decay_end], sr=sr)
feats[9] = float(np.mean(cent)) / sr
except:
pass
except:
pass
return feats.astype(np.float32)
def extract_texture_features(y, sr):
y, sr = _resample_mono(y, sr, 48000)
feats = np.zeros(TEXTURE_DIM, np.float32)
if len(y) < 512:
return feats
try:
flat = librosa.feature.spectral_flatness(y=y)
feats[0] = float(np.mean(flat))
feats[1] = float(np.std(flat))
zcr = librosa.feature.zero_crossing_rate(y)
feats[2] = float(np.mean(zcr))
try:
h, p = librosa.effects.hpss(y)
h_energy = float(np.sum(h ** 2))
p_energy = float(np.sum(p ** 2))
total = h_energy + p_energy + 1e-9
feats[3] = h_energy / total
feats[4] = p_energy / total
except:
feats[3] = 0.5
feats[4] = 0.5
bw = librosa.feature.spectral_bandwidth(y=y, sr=sr)
feats[5] = float(np.mean(bw)) / sr
rms = float(np.sqrt(np.mean(y ** 2))) + 1e-9
feats[6] = min(float(np.max(np.abs(y))) / rms, 10) / 10
rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
feats[7] = float(np.mean(rolloff)) / sr
except:
pass
return feats.astype(np.float32)
def extract_rich_features(y, sr):
if V66_MODULES_AVAILABLE:
y, sr = preprocessing.canonicalize_audio(y, sr)
else:
y, sr = _resample_mono(y, sr, 48000)
try:
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=N_MFCC, hop_length=256)
mfcc_delta = librosa.feature.delta(mfcc)
mfcc_feats = np.concatenate([
np.mean(mfcc, axis=1),
np.std(mfcc, axis=1),
np.mean(mfcc_delta, axis=1)
])
except:
mfcc_feats = np.zeros(MFCC_DIM, np.float32)
try:
cent = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr, hop_length=256)) / sr
bw = np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr, hop_length=256)) / sr
rolloff = np.mean(librosa.feature.spectral_rolloff(y=y, sr=sr, hop_length=256)) / sr
contrast = np.mean(librosa.feature.spectral_contrast(y=y, sr=sr, n_bands=6, hop_length=256), axis=1)
flatness = np.mean(librosa.feature.spectral_flatness(y=y, hop_length=256))
onset = librosa.onset.onset_strength(y=y, sr=sr, hop_length=256)
spectral_feats = np.array([cent, bw, rolloff, *contrast, flatness, np.mean(onset), np.std(onset)], np.float32)
except:
spectral_feats = np.zeros(SPECTRAL_DIM, np.float32)
envelope_feats = extract_envelope_features(y, sr)
attack_decay_feats = extract_attack_decay_features(y, sr)
texture_feats = extract_texture_features(y, sr)
combined = np.concatenate([mfcc_feats, spectral_feats, envelope_feats, attack_decay_feats, texture_feats]).astype(np.float32)
return combined
# =============================================================================
# Onset detection
# =============================================================================
def _superflux_env(y, sr, hop=256):
S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, fmax=14000, hop_length=hop, power=2.0)
logS = librosa.power_to_db(S + 1e-10).astype(np.float32)
up = np.vstack([logS[0:1], logS[:-1]])
dn = np.vstack([logS[1:], logS[-1:]])
flux = np.maximum(0.0, np.diff(np.maximum.reduce([logS, up, dn]), axis=1))
env = np.concatenate([[0.0], flux.sum(axis=0)]).astype(np.float32)
return env / (np.max(env) + 1e-9)
def refine_onset_sample(y, sr, onset_samp):
if y.size == 0:
return 0
env = np.abs((y - 0.97 * np.concatenate(([0], y[:-1]))).astype(np.float32))
win = max(1, int(0.002 * sr))
if win > 1:
env = np.convolve(env, np.ones(win) / win, 'same')
d = np.diff(env, prepend=env[0])
r = max(1, int(0.02 * sr))
a, b = max(0, onset_samp - r), min(len(y) - 1, onset_samp + r)
if b <= a + 2:
return max(0, min(len(y) - 1, onset_samp))
peak = a + int(np.argmax(d[a:b]))
if env[peak] <= 1e-8:
return max(0, min(len(y) - 1, onset_samp))
s = max(0, peak - max(1, int(0.012 * sr)))
pre = env[s:peak]
if pre.size:
below = np.where(pre < 0.2 * env[peak])[0]
return s + int(below[-1]) if below.size else peak
return peak
def detect_onsets_unified(y, sr, max_onsets=12):
y, sr = _resample_mono(y, sr, 48000)
hop = 256
env = _superflux_env(y, sr, hop)
peaks = librosa.util.peak_pick(env, pre_max=3, post_max=3, pre_avg=12, post_avg=12, delta=0.06, wait=max(1, int(0.06 * sr / hop)))
if peaks.size == 0:
return [0]
order = np.argsort(-env[np.clip(peaks, 0, len(env) - 1)])
out, used = [], set()
for ii in order:
s0 = refine_onset_sample(y, sr, int(librosa.frames_to_samples(int(peaks[ii]), hop_length=hop)))
bucket = int(s0 / sr / 0.03)
if bucket not in used:
used.add(bucket)
out.append(s0)
if len(out) >= max_onsets:
break
return out if out else [0]
def find_hit_onsets(y, sr, max_hits=12):
y, sr = _resample_mono(y, sr, 48000)
return [s / sr for s in detect_onsets_unified(y, sr, max_onsets=max_hits)]
def _slice(y, sr, t0, t1):
s0 = max(0, int(t0 * sr))
s1 = min(len(y), int(t1 * sr))
return y[s0:s1] if s1 > s0 else np.zeros(0, np.float32)
def slice_views_from_onset(y, sr, onset_s):
y, sr = _resample_mono(y, sr, 48000)
onset_s = max(0, min(onset_s, len(y) / sr))
return {
"full": _slice(y, sr, onset_s - FULL_PRE, onset_s + FULL_POST),
"trans": _slice(y, sr, onset_s - FULL_PRE, onset_s + TRANS_POST),
"tail": _slice(y, sr, onset_s + TAIL_START, onset_s + TAIL_END)
}
def trim_to_first_hit(y, sr):
y, sr = _resample_mono(y, sr, 48000)
s = detect_onsets_unified(y, sr, 1)[0]
return y[max(0, int(s - FULL_PRE * sr)):min(len(y), int(s + FULL_POST * sr))]
# =============================================================================
# YouTube utilities (condensed)
# =============================================================================
def _run(cmd, timeout=70):
try:
p = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return p.returncode, p.stdout + p.stderr
except:
return 1, ""
def ytsearch_video_list(query, n=80, mode="relevance"):
prefix = "ytsearchdate" if mode == "date" else "ytsearch"
rc, out = _run([sys.executable, "-m", "yt_dlp", "--flat-playlist", "--dump-single-json", "--skip-download", "--socket-timeout", "10", f"{prefix}{n}:{query}"], 40)
if rc != 0:
return []
try:
return [{"id": e["id"], "title": e.get("title", "")} for e in json.loads(out).get("entries", []) if e.get("id")]
except:
return []
def augment_query_list(queries, max_aug=2):
suffixes = ["one shot", "sample pack", "sound effect"]
out = []
for q in queries:
if not q.strip():
continue
out.append(q)
rng = random.Random(hash(q) & 0xFFFFFFFF)
s = suffixes[:]
rng.shuffle(s)
for x in s[:max_aug]:
out.append(f"{q} {x}")
return list(dict.fromkeys(out))
def download_wav_section(url, start, end, out_path, timeout=70):
rc, out = _run([sys.executable, "-m", "yt_dlp", "--no-playlist", "-f", "bestaudio[ext=m4a]/bestaudio/best", "--download-sections", f"*{max(0, start)}-{end}", "--force-keyframes-at-cuts", "-x", "--audio-format", "wav", "--postprocessor-args", "ExtractAudio:-ar 48000 -ac 1", "-o", out_path.replace(".wav", ".%(ext)s"), url], timeout)
if rc == 0:
for f in os.listdir(os.path.dirname(out_path)):
if f.endswith(".wav"):
return True, ""
return False, out
_DUR_CACHE = {}
_TITLE_CACHE = {}
def get_video_duration_seconds(url):
if url in _DUR_CACHE:
return _DUR_CACHE[url]
rc, out = _run([sys.executable, "-m", "yt_dlp", "--dump-single-json", "--skip-download", "--no-playlist", url], 30)
if rc == 0:
try:
data = json.loads(out)
_DUR_CACHE[url] = float(data.get("duration", 0))
_TITLE_CACHE[url] = data.get("title", "Unknown")
return _DUR_CACHE[url]
except:
pass
return None
def get_video_title(url):
if url in _TITLE_CACHE:
return _TITLE_CACHE[url]
get_video_duration_seconds(url)
return _TITLE_CACHE.get(url, "Unknown")
# =============================================================================
# Index building (same as v6.9)
# =============================================================================
def _video_status(con, vid):
row = con.execute("SELECT status, COALESCE(index_ver,1) FROM videos WHERE video_id=?", (vid,)).fetchone()
return (row[0], int(row[1] or 1)) if row else (None, 0)
def _mark_video(con, vid, title, status, err=""):
con.execute("INSERT INTO videos VALUES(?,?,?,?,?,?) ON CONFLICT(video_id) DO UPDATE SET title=excluded.title,status=excluded.status,last_error=excluded.last_error,updated_at=excluded.updated_at,index_ver=excluded.index_ver", (vid, title, status, err[:4000], time.time(), CURRENT_INDEX_VERSION))
con.commit()
def _blob(v):
return sqlite3.Binary(v.astype(np.float16).tobytes()) if v is not None else None
def _insert_clip(con, vid, title, url, t0, t1, emb, mel, emb_t, emb_b, mel_t, mel_b, patch_t, patch_b, patch_t_shifted, patch_b_shifted, trans_desc, patch_multi, rich_features):
for attempt in range(6):
try:
con.execute("INSERT INTO clips(video_id,title,url,t0,t1,emb,mel,emb_t,emb_b,mel_t,mel_b,patch_t,patch_b,patch_t_shifted,patch_b_shifted,trans_desc,patch_multi,rich_features,index_ver,created_at) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", (vid, title, url, t0, t1, _blob(emb), _blob(mel), _blob(emb_t), _blob(emb_b), _blob(mel_t), _blob(mel_b), _blob(patch_t), _blob(patch_b), _blob(patch_t_shifted), _blob(patch_b_shifted), _blob(trans_desc), _blob(patch_multi), _blob(rich_features), CURRENT_INDEX_VERSION, time.time()))
return
except sqlite3.OperationalError as e:
if "locked" in str(e).lower():
time.sleep(0.15 * (2 ** attempt))
else:
raise
def build_library_round(embedder, db_path, queries, max_videos=200, secs_per_video=22, max_hits_per_video=12, windows_per_video=1, window_seed=1, include_intro=True, stop_event=None, progress_cb=None, max_results_per_query=400, pool_multiplier=3.0, include_recent=True, augment_queries=True):
"""Index videos with v7.0 features."""
init_db(db_path)
con = _connect_db(db_path)
print(f"[indexing] v7.0 Personalized Perceptual Search")
queries = queries[:15]
q_list = augment_query_list(queries, max_aug=1) if augment_queries else list(queries)
q_list = q_list[:30]
per_q = max(20, min(max_results_per_query, int(math.ceil(max_videos * pool_multiplier / max(1, len(q_list))))))
if progress_cb:
progress_cb(0, 0, 0, 0, 0, "")
candidates = []
for i, q in enumerate(q_list):
if stop_event and stop_event.is_set():
break
if progress_cb:
progress_cb(-(i + 1), len(q_list), 0, 0, 0, q[:50])
candidates.extend(ytsearch_video_list(q, per_q, "relevance"))
if include_recent:
candidates.extend(ytsearch_video_list(q, max(10, per_q // 2), "date"))
seen = set()
dedup = [v for v in candidates if not (v["id"] in seen or seen.add(v["id"]))]
random.shuffle(dedup)
dedup = dedup[:max_videos]
if progress_cb:
progress_cb(0, len(dedup), 0, 0, 0, "")
scanned = ok = dead = added = 0
for v in dedup:
if stop_event and stop_event.is_set():
break
scanned += 1
vid, title, url = v["id"], v.get("title", ""), f"https://www.youtube.com/watch?v={v['id']}"
if progress_cb:
progress_cb(scanned, len(dedup), ok, dead, added, title[:60])
status, ver = _video_status(con, vid)
if status in ("ok", "dead") and ver >= CURRENT_INDEX_VERSION:
continue
if windows_per_video <= 1:
windows = [(0, secs_per_video)]
else:
assumed_dur = 180
max_start = max(0, assumed_dur - secs_per_video - 1)
rng = random.Random((hash(vid) & 0xFFFFFFFF) ^ window_seed)
starts = [0.0] if include_intro else []
for _ in range(50):
if len(starts) >= windows_per_video:
break
s = rng.uniform(0, max_start)
if all(abs(s - x) > secs_per_video for x in starts):
starts.append(s)
windows = [(s, s + secs_per_video) for s in sorted(starts)]
hits, any_ok, last_log = 0, False, ""
with tempfile.TemporaryDirectory() as td:
for wi, (start, end) in enumerate(windows):
if (stop_event and stop_event.is_set()) or hits >= max_hits_per_video:
break
wav = os.path.join(td, f"s{wi}.wav")
success, log = download_wav_section(url, start, end, wav)
last_log = log
wav_file = next((os.path.join(td, f) for f in os.listdir(td) if f.endswith(".wav")), None)
if not wav_file:
continue
any_ok = True
try:
y, sr = sf.read(wav_file, dtype="float32", always_2d=False)
except:
continue
y, sr = _resample_mono(y, sr, 48000)
found_onsets = find_hit_onsets(y, sr, max_hits_per_video)
for onset in found_onsets:
if hits >= max_hits_per_video:
break
views = slice_views_from_onset(y, sr, onset)
full, trans, tail = views["full"], views["trans"], views["tail"]
# Skip clips that are too short for neural networks
# MIN_AUDIO_SAMPLES = 24000 (0.5 sec at 48kHz)
if full.size < MIN_AUDIO_SAMPLES // 2 or trans.size < int(0.04 * sr):
continue
z = lambda d: np.zeros(d, np.float32)
try:
emb_full = embed_full(embedder, full, sr)
except:
continue
if FULL_MODE:
try:
emb_t = embed_detail(embedder, trans, sr) if trans.size > 100 else z(1024)
emb_b = embed_detail(embedder, tail, sr) if tail.size > 100 else z(1024)
except:
emb_t, emb_b = z(1024), z(1024)
else:
emb_t, emb_b = emb_full, z(len(emb_full))
try:
_insert_clip(con, vid, title, url, max(0, start + onset - FULL_PRE), start + onset + FULL_POST, emb_full, mel_shape(full, sr), emb_t, emb_b, mel_shape(trans, sr) if trans.size > 100 else z(64), mel_shape(tail, sr) if tail.size > 100 else z(64), mel_patch(trans, sr) if trans.size > 100 else z(PATCH_DIM), mel_patch(tail, sr) if tail.size > 100 else z(PATCH_DIM), mel_patch_shifted(trans, sr) if trans.size > 100 else z(SHIFTED_PATCH_DIM), mel_patch_shifted(tail, sr) if tail.size > 100 else z(SHIFTED_PATCH_DIM), transient_descriptor(trans, sr) if trans.size > 100 else z(TRANS_DESC_DIM), mel_patch_multiscale(full, sr), extract_rich_features(full, sr))
added += 1
hits += 1
except:
continue
con.commit()
try:
os.remove(wav_file)
except:
pass
if any_ok:
ok += 1
_mark_video(con, vid, title, "ok")
else:
dead += 1
_mark_video(con, vid, title, "dead", last_log)
if progress_cb:
progress_cb(scanned, len(dedup), ok, dead, added, title[:60])
con.close()
return added
# =============================================================================
# Load library
# =============================================================================
def _safe_frombuf(blob, dim):
if blob is None:
return np.zeros(dim, np.float32)
nbytes = len(blob)
if nbytes == dim * 2:
arr = np.frombuffer(blob, dtype=np.float16).astype(np.float32, copy=False)
elif nbytes == dim * 4:
arr = np.frombuffer(blob, dtype=np.float32).astype(np.float32, copy=False)
else:
arr = np.frombuffer(blob, dtype=np.float16).astype(np.float32, copy=False)
if arr.size == dim:
return arr
out = np.zeros(dim, np.float32)
n = min(dim, arr.size)
out[:n] = arr[:n]
return out
def load_library_matrices(db_path, include_legacy=False):
init_db(db_path)
con = _connect_db(db_path)
sql = "SELECT id,title,url,t0,t1,emb,mel,emb_t,emb_b,mel_t,mel_b,patch_t,patch_b,patch_t_shifted,patch_b_shifted,trans_desc,patch_multi,rich_features,COALESCE(index_ver,1) FROM clips"
if not include_legacy:
sql += f" WHERE COALESCE(index_ver,1)={CURRENT_INDEX_VERSION}"
rows = con.execute(sql).fetchall()
con.close()
if not rows:
return {"ids": np.array([], np.int64)}
def norm(mat):
mat = mat.astype(np.float32, copy=False)
return mat / (np.linalg.norm(mat, axis=1, keepdims=True) + 1e-9)
def detect_dim(blob, default):
if blob is None or len(blob) == 0:
return default
return len(blob) // 2 if len(blob) % 2 == 0 else len(blob) // 4
def first_blob(col_idx):
for r in rows:
if r[col_idx] and len(r[col_idx]) > 0:
return r[col_idx]
return None
emb_dim_full = detect_dim(first_blob(5), 512)
emb_dim_t = detect_dim(first_blob(7), emb_dim_full)
emb_dim_b = detect_dim(first_blob(8), emb_dim_full)
rich_dim = detect_dim(first_blob(17), RICH_FEATURES_DIM)
print(f"[scout] Dims: full={emb_dim_full}, trans={emb_dim_t}, tail={emb_dim_b}, rich={rich_dim}")
print(f"[scout] βœ“ Loaded {len(rows)} clips")
rich_raw = np.stack([_safe_frombuf(r[17], rich_dim) for r in rows]).astype(np.float32)
return {
"ids": np.array([r[0] for r in rows], np.int64),
"titles": np.array([r[1] for r in rows], object),
"urls": np.array([r[2] for r in rows], object),
"t0s": np.array([r[3] for r in rows], np.float32),
"t1s": np.array([r[4] for r in rows], np.float32),
"vers": np.array([int(r[18] or 1) for r in rows], np.int32),
"emb": norm(np.stack([_safe_frombuf(r[5], emb_dim_full) for r in rows])),
"mel": norm(np.stack([_safe_frombuf(r[6], 64) for r in rows])),
"emb_t": norm(np.stack([_safe_frombuf(r[7], emb_dim_t) for r in rows])),
"emb_b": norm(np.stack([_safe_frombuf(r[8], emb_dim_b) for r in rows])),
"mel_t": norm(np.stack([_safe_frombuf(r[9], 64) for r in rows])),
"mel_b": norm(np.stack([_safe_frombuf(r[10], 64) for r in rows])),
"patch_t": norm(np.stack([_safe_frombuf(r[11], PATCH_DIM) for r in rows])),
"patch_b": norm(np.stack([_safe_frombuf(r[12], PATCH_DIM) for r in rows])),
"patch_t_shifted": norm(np.stack([_safe_frombuf(r[13], SHIFTED_PATCH_DIM) for r in rows])),
"patch_b_shifted": norm(np.stack([_safe_frombuf(r[14], SHIFTED_PATCH_DIM) for r in rows])),
"trans_desc": norm(np.stack([_safe_frombuf(r[15], TRANS_DESC_DIM) for r in rows])),
"patch_multi": norm(np.stack([_safe_frombuf(r[16], MULTISCALE_PATCH_DIM) for r in rows])),
"rich_features_raw": rich_raw,
"rich_features": norm(rich_raw),
"emb_dim_full": emb_dim_full,
"emb_dim_t": emb_dim_t,
"emb_dim_b": emb_dim_b,
"rich_dim": rich_dim,
}
# =============================================================================
# SEARCH β€” v7.0 Personalized Perceptual
# =============================================================================
def _chunked_dot(mat, v, chunk=20000):
if mat.shape[1] != v.shape[0]:
raise ValueError(f"Dim mismatch: lib={mat.shape[1]} vs query={v.shape[0]}")
out = np.empty(mat.shape[0], dtype=np.float32)
for i in range(0, mat.shape[0], chunk):
j = min(mat.shape[0], i + chunk)
out[i:j] = mat[i:j] @ v
return out
def _best_shift_sim(q, lib):
n = len(SHIFT_VARIANTS)
lib_r = lib.reshape(-1, n, PATCH_DIM)
q_r = q.reshape(n, PATCH_DIM)
lib_n = lib_r / (np.linalg.norm(lib_r, axis=2, keepdims=True) + 1e-9)
q_n = q_r / (np.linalg.norm(q_r, axis=1, keepdims=True) + 1e-9)
best = np.full(lib.shape[0], -1.0, np.float32)
for qi in range(n):
for li in range(n):
if abs(qi - li) <= 1:
best = np.maximum(best, np.sum(q_n[qi] * lib_n[:, li, :], axis=1))
return best
def search_library(embedder, query_bytes, lib, top_k=20, apply_negative_filter=False,
query_max_onsets=1, rerank_top_n=400, debug=False,
db_path=DEFAULT_DB_PATH, return_session_id=False,
exploration_inject=0, exploration_pool=300,
debug_clip_id=None):
"""
v7.0 Personalized Perceptual Search
Stage A: Perceptual retrieval
Stage B: Perceptual rerank (shift-tolerant)
Stage C: Identity boost (gated)
Stage D: Personalization blend (learned from feedback)
Returns results, and optionally a session_id for feedback.
"""
if lib.get("ids") is None or len(lib["ids"]) == 0:
return ([], None) if return_session_id else []
# Load audio
try:
y, sr = sf.read(io.BytesIO(query_bytes), dtype="float32", always_2d=False)
y, sr = _resample_mono(y, sr, 48000)
except Exception as e:
if debug:
print(f"[search] Load failed: {e}")
return ([], None) if return_session_id else []
# Create session for feedback
query_hash = hashlib.md5(query_bytes[:10000]).hexdigest()[:12]
session_id = create_feedback_session(db_path, query_hash)
# Detect onsets
onset_samples = detect_onsets_unified(y, sr, max_onsets=query_max_onsets)
best_scores = None
best_features = None
best_sims = None # Store similarity channels for personalization
for onset_samp in onset_samples:
onset_time = onset_samp / sr
views = slice_views_from_onset(y, sr, onset_time)
full, trans, tail = views["full"], views["trans"], views["tail"]
# Skip if clip is too short
if full.size < MIN_AUDIO_SAMPLES // 4:
if debug:
print(f"[search] Onset at {onset_time:.3f}s too short ({full.size} samples), skipping")
continue
# NOTE: Do NOT pad here - must match indexing pipeline
# Embedding functions handle their own padding internally
try:
emb_full = embed_matching_library(embedder, full, sr, lib["emb_dim_full"])
rich = extract_rich_features(full, sr)
lib_rich_dim = lib["rich_features"].shape[1]
if len(rich) != lib_rich_dim:
if len(rich) < lib_rich_dim:
rich = np.pad(rich, (0, lib_rich_dim - len(rich)))
else:
rich = rich[:lib_rich_dim]
rich_norm = rich / (np.linalg.norm(rich) + 1e-9)
patch_multi = mel_patch_multiscale(full, sr)
mel_full = mel_shape(full, sr)
patch_t_sh = mel_patch_shifted(trans, sr) if trans.size > 100 else np.zeros(SHIFTED_PATCH_DIM, np.float32)
patch_b_sh = mel_patch_shifted(tail, sr) if tail.size > 100 else np.zeros(SHIFTED_PATCH_DIM, np.float32)
trans_d = transient_descriptor(trans, sr) if trans.size > 100 else np.zeros(TRANS_DESC_DIM, np.float32)
except Exception as e:
if debug:
print(f"[search] Feature extraction failed: {e}")
continue
# Stage A: Perceptual retrieval
sim_rich = _chunked_dot(lib["rich_features"], rich_norm)
sim_patch = _chunked_dot(lib["patch_multi"], patch_multi)
sim_mel = _chunked_dot(lib["mel"], mel_full)
sim_emb = _chunked_dot(lib["emb"], emb_full)
perceptual = W_RICH * sim_rich + W_PATCH * sim_patch + W_MEL * sim_mel + W_EMB_STAGE_A * sim_emb
# Debug: show where specific clip ranks
if debug_clip_id is not None:
try:
clip_idx = np.where(lib["ids"] == debug_clip_id)[0]
if len(clip_idx) > 0:
idx = clip_idx[0]
stage_a_rank = int(np.sum(perceptual > perceptual[idx])) + 1
print(f"\n[DEBUG] Clip {debug_clip_id} Stage A analysis:")
print(f" Stage A rank: {stage_a_rank} / {len(perceptual)}")
print(f" Stage A score: {perceptual[idx]:.4f}")
print(f" Components:")
print(f" sim_rich: {sim_rich[idx]:.4f} (Γ— {W_RICH} = {W_RICH * sim_rich[idx]:.4f})")
print(f" sim_patch: {sim_patch[idx]:.4f} (Γ— {W_PATCH} = {W_PATCH * sim_patch[idx]:.4f})")
print(f" sim_mel: {sim_mel[idx]:.4f} (Γ— {W_MEL} = {W_MEL * sim_mel[idx]:.4f})")
print(f" sim_emb: {sim_emb[idx]:.4f} (Γ— {W_EMB_STAGE_A} = {W_EMB_STAGE_A * sim_emb[idx]:.4f})")
print(f" Top 5 Stage A scores: {sorted(perceptual, reverse=True)[:5]}")
print(f" In top {rerank_top_n}? {'YES' if stage_a_rank <= rerank_top_n else 'NO'}")
else:
print(f"[DEBUG] Clip {debug_clip_id} not found in library!")
except Exception as e:
print(f"[DEBUG] Error: {e}")
if best_scores is None or np.max(perceptual) > np.max(best_scores):
best_scores = perceptual
best_features = {
'full': full, 'trans': trans, 'tail': tail,
'emb_full': emb_full,
'patch_t_shifted': patch_t_sh,
'patch_b_shifted': patch_b_sh,
'trans_desc': trans_d,
}
best_sims = {
'sim_rich': sim_rich,
'sim_patch': sim_patch,
'sim_mel': sim_mel,
'sim_emb': sim_emb,
}
if best_scores is None:
return ([], session_id) if return_session_id else []
scores = best_scores.copy()
# Stage B: Perceptual rerank
shortlist_idx = np.argsort(-scores)[:min(rerank_top_n, len(scores))]
sim_patch_t = _best_shift_sim(best_features['patch_t_shifted'], lib["patch_t_shifted"][shortlist_idx])
sim_patch_b = _best_shift_sim(best_features['patch_b_shifted'], lib["patch_b_shifted"][shortlist_idx])
sim_trans = lib["trans_desc"][shortlist_idx] @ best_features['trans_desc']
rerank_bonus = W_PATCH_SHIFT_T * sim_patch_t + W_PATCH_SHIFT_B * sim_patch_b + W_TRANS_DESC * sim_trans
scores[shortlist_idx] += rerank_bonus
# Stage C: Identity boost (gated)
identity_scores = np.zeros(len(shortlist_idx), np.float32)
if FULL_MODE and best_features is not None:
trans, tail = best_features['trans'], best_features['tail']
if trans.size > 100 and tail.size > 100:
try:
emb_trans = embed_matching_library(embedder, trans, sr, lib["emb_dim_t"])
emb_tail = embed_matching_library(embedder, tail, sr, lib["emb_dim_b"])
sim_full = lib["emb"][shortlist_idx] @ best_features['emb_full']
sim_trans_emb = lib["emb_t"][shortlist_idx] @ emb_trans
sim_tail_emb = lib["emb_b"][shortlist_idx] @ emb_tail
identity_scores = W_ID_FULL * sim_full + W_ID_TRANS * sim_trans_emb + W_ID_TAIL * sim_tail_emb
boost = np.maximum(0.0, identity_scores - IDENTITY_THRESHOLD)
scores[shortlist_idx] += IDENTITY_BOOST_WEIGHT * boost
except:
pass
# Stage D: Personalization
model = get_personalization_model(db_path)
alpha = model.get_blend_alpha()
if alpha > 0:
# Build feature matrix for personalization
n_short = len(shortlist_idx)
feature_matrix = np.zeros((n_short, N_FEATURES), np.float32)
feature_matrix[:, 0] = best_sims['sim_rich'][shortlist_idx]
feature_matrix[:, 1] = best_sims['sim_patch'][shortlist_idx]
feature_matrix[:, 2] = best_sims['sim_mel'][shortlist_idx]
feature_matrix[:, 3] = sim_patch_t
feature_matrix[:, 4] = sim_patch_b
feature_matrix[:, 5] = sim_trans
feature_matrix[:, 6] = best_sims['sim_emb'][shortlist_idx]
# Trans/tail embedding sims
if FULL_MODE and 'emb_trans' in dir() and 'emb_tail' in dir():
feature_matrix[:, 7] = lib["emb_t"][shortlist_idx] @ emb_trans
feature_matrix[:, 8] = lib["emb_b"][shortlist_idx] @ emb_tail
else:
feature_matrix[:, 7] = identity_scores * 0.4
feature_matrix[:, 8] = identity_scores * 0.3
feature_matrix[:, 9] = 1.0 # Bias
# Compute personalized scores
personal_scores = model.score_batch(feature_matrix)
# Blend
base_scores = scores[shortlist_idx]
blended = (1 - alpha) * base_scores + alpha * personal_scores
scores[shortlist_idx] = blended
if debug:
print(f"[search] Personalization: alpha={alpha:.2f}, pairs={model.n_pairs_trained}")
# Store candidate features for feedback
final_idx = np.argsort(-scores)[:min(top_k + exploration_inject, len(scores))]
# Exploration: optionally inject a few from deeper in the ranking (disabled by default)
if exploration_inject > 0 and len(scores) > exploration_pool:
explore_pool_idx = np.argsort(-scores)[top_k:exploration_pool]
if len(explore_pool_idx) >= exploration_inject:
explore_idx = np.random.choice(explore_pool_idx, exploration_inject, replace=False)
# Add exploration items but keep everything sorted by score
final_idx = np.concatenate([final_idx[:top_k], explore_idx])
# ALWAYS sort by score (highest first) - deterministic results
final_idx = final_idx[np.argsort(-scores[final_idx])][:top_k]
# Store features for feedback
clip_ids = [int(lib["ids"][i]) for i in final_idx]
features_list = []
candidate_scores = []
candidate_ranks = []
for rank, i in enumerate(final_idx):
# Build feature vector
feat = np.zeros(N_FEATURES, np.float32)
feat[0] = best_sims['sim_rich'][i]
feat[1] = best_sims['sim_patch'][i]
feat[2] = best_sims['sim_mel'][i]
# Get shift-tolerant sims (need to recompute for non-shortlist items)
if i in shortlist_idx:
idx_in_short = np.where(shortlist_idx == i)[0][0]
feat[3] = sim_patch_t[idx_in_short]
feat[4] = sim_patch_b[idx_in_short]
feat[5] = sim_trans[idx_in_short]
feat[7] = feature_matrix[idx_in_short, 7] if alpha > 0 else 0
feat[8] = feature_matrix[idx_in_short, 8] if alpha > 0 else 0
else:
feat[3] = feat[4] = feat[5] = feat[7] = feat[8] = 0
feat[6] = best_sims['sim_emb'][i]
feat[9] = 1.0
features_list.append(feat)
candidate_scores.append(float(scores[i]))
candidate_ranks.append(rank + 1)
store_candidate_features_batch(db_path, session_id, clip_ids, features_list,
scores=candidate_scores, ranks=candidate_ranks)
# Build results
results = [{
"id": int(lib["ids"][i]),
"score": float(scores[i]),
"title": str(lib["titles"][i]),
"url": str(lib["urls"][i]),
"t0": float(lib["t0s"][i]),
"t1": float(lib["t1s"][i]),
"ver": int(lib["vers"][i]) if "vers" in lib else 1,
"rank": rank + 1,
"session_id": session_id,
} for rank, i in enumerate(final_idx)]
if return_session_id:
return results, session_id
return results
# Compatibility
def search_library_v66(embedder, query_bytes, lib, **kwargs):
return search_library(embedder, query_bytes, lib, **kwargs)
def debug_search_for_clip(embedder, query_bytes, lib, clip_id, db_path=DEFAULT_DB_PATH):
"""
Debug why a specific clip isn't ranking well.
Usage:
embedder = scout.get_embedder()
lib = scout.load_library_matrices(db_path)
with open("your_sample.wav", "rb") as f:
query = f.read()
scout.debug_search_for_clip(embedder, query, lib, clip_id=12345)
"""
print(f"\n{'='*60}")
print(f"DEBUGGING CLIP {clip_id}")
print(f"{'='*60}")
# Check clip exists
if clip_id not in lib["ids"]:
print(f"ERROR: Clip {clip_id} not in loaded library!")
print(f"Library has {len(lib['ids'])} clips, IDs range from {lib['ids'].min()} to {lib['ids'].max()}")
return
idx = np.where(lib["ids"] == clip_id)[0][0]
print(f"Clip title: {lib['titles'][idx]}")
print(f"Clip t0: {lib['t0s'][idx]:.2f}s")
# Run search with debug
results = search_library(
embedder, query_bytes, lib,
top_k=100,
debug=True,
db_path=db_path,
debug_clip_id=clip_id,
rerank_top_n=2000 # Increase to see if it helps
)
# Check if clip appeared in results
found = False
for r in results:
if r["id"] == clip_id:
print(f"\nβœ“ Clip FOUND in results at rank #{r['rank']} with score {r['score']:.4f}")
found = True
break
if not found:
print(f"\nβœ— Clip NOT in top {len(results)} results")
print(f"{'='*60}\n")
return results
# =============================================================================
# Preview / Deep index
# =============================================================================
def fetch_preview_wav_bytes(url, t0, pre=0.1, post=0.9, timeout=70):
start, end = max(0, t0 - pre), t0 + post
with tempfile.TemporaryDirectory() as td:
ok, log = download_wav_section(url, start, end, os.path.join(td, "p.wav"), timeout)
wav = next((os.path.join(td, f) for f in os.listdir(td) if f.endswith(".wav")), None)
if not wav:
bio = io.BytesIO()
sf.write(bio, np.zeros(int((end - start) * 48000), np.float32), 48000, format="WAV")
return bio.getvalue(), log or "failed"
try:
y, sr = sf.read(wav, dtype="float32", always_2d=False)
y, sr = _resample_mono(y, sr, 48000)
bio = io.BytesIO()
sf.write(bio, y, sr, format="WAV")
return bio.getvalue(), ""
except Exception as e:
bio = io.BytesIO()
sf.write(bio, np.zeros(int((end - start) * 48000), np.float32), 48000, format="WAV")
return bio.getvalue(), str(e)
def get_video_clip_count(db_path, url):
vid = None
if "youtube.com" in url or "youtu.be" in url:
if "v=" in url:
vid = url.split("v=")[1].split("&")[0]
elif "youtu.be/" in url:
vid = url.split("youtu.be/")[1].split("?")[0]
if not vid:
return 0
try:
con = _connect_db(db_path)
count = con.execute("SELECT COUNT(*) FROM clips WHERE video_id=?", (vid,)).fetchone()[0]
con.close()
return count
except:
return 0
def deep_index_video(embedder, db_path, url, max_hits=100, window_secs=30, progress_cb=None):
init_db(db_path)
con = _connect_db(db_path)
vid = None
if "v=" in url:
vid = url.split("v=")[1].split("&")[0].split("?")[0].strip()
elif "youtu.be/" in url:
vid = url.split("youtu.be/")[1].split("?")[0].split("&")[0].strip()
if not vid or len(vid) < 5:
return 0, "Invalid YouTube URL"
url = f"https://www.youtube.com/watch?v={vid}"
dur = get_video_duration_seconds(url) or 600
title = get_video_title(url)
existing = con.execute("SELECT COUNT(*) FROM clips WHERE video_id=?", (vid,)).fetchone()[0]
if existing > 0:
con.execute("DELETE FROM clips WHERE video_id=?", (vid,))
con.commit()
windows = []
start = 0.0
while start < dur:
windows.append((start, min(start + window_secs, dur)))
start += window_secs - 2
total_added = 0
consecutive_failures = 0
with tempfile.TemporaryDirectory() as td:
for wi, (start, end) in enumerate(windows):
if progress_cb:
progress_cb(wi + 1, len(windows), total_added)
if total_added >= max_hits or consecutive_failures >= 3:
break
wav_path = os.path.join(td, f"seg_{wi}.wav")
success, log = download_wav_section(url, start, end, wav_path, timeout=90)
wav_file = next((os.path.join(td, f) for f in os.listdir(td) if f.endswith(".wav")), None)
if not wav_file:
consecutive_failures += 1
continue
consecutive_failures = 0
try:
y, sr = sf.read(wav_file, dtype="float32", always_2d=False)
except:
continue
y, sr = _resample_mono(y, sr, 48000)
onsets = find_hit_onsets(y, sr, max_hits=max_hits - total_added)
for onset in onsets:
if total_added >= max_hits:
break
views = slice_views_from_onset(y, sr, onset)
full, trans, tail = views["full"], views["trans"], views["tail"]
# Skip clips that are too short for neural networks
if full.size < MIN_AUDIO_SAMPLES // 2:
continue
t0 = max(0, start + onset - FULL_PRE)
t1 = start + onset + FULL_POST
if con.execute("SELECT 1 FROM clips WHERE video_id=? AND ABS(t0-?)<0.05", (vid, t0)).fetchone():
continue
z = lambda d: np.zeros(d, np.float32)
try:
emb_full = embed_full(embedder, full, sr)
except:
continue
if FULL_MODE:
try:
emb_t = embed_detail(embedder, trans, sr) if trans.size > 100 else z(1024)
emb_b = embed_detail(embedder, tail, sr) if tail.size > 100 else z(1024)
except:
emb_t, emb_b = z(1024), z(1024)
else:
emb_t, emb_b = emb_full, z(len(emb_full))
_insert_clip(con, vid, title, url, t0, t1, emb_full, mel_shape(full, sr), emb_t, emb_b, mel_shape(trans, sr) if trans.size > 100 else z(64), mel_shape(tail, sr) if tail.size > 100 else z(64), mel_patch(trans, sr) if trans.size > 100 else z(PATCH_DIM), mel_patch(tail, sr) if tail.size > 100 else z(PATCH_DIM), mel_patch_shifted(trans, sr) if trans.size > 100 else z(SHIFTED_PATCH_DIM), mel_patch_shifted(tail, sr) if tail.size > 100 else z(SHIFTED_PATCH_DIM), transient_descriptor(trans, sr) if trans.size > 100 else z(TRANS_DESC_DIM), mel_patch_multiscale(full, sr), extract_rich_features(full, sr))
total_added += 1
con.commit()
try:
os.remove(wav_file)
except:
pass
_mark_video(con, vid, title, "ok")
con.close()
return total_added, f"Added {total_added} clips from '{title}'"
# Compatibility
_NEGATIVE_PROMPTS = ["doorbell", "alarm", "siren", "phone ringing", "beep tone"]
_POSITIVE_PROMPTS = ["rimshot", "snare drum hit", "hand clap", "door knock", "metal hit"]
def smart_queries_from_sample(embedder, query_bytes):
return ["percussion one shot", "drum sample"] * 4