""" Snare Scout v7.0 โ€” Personalized Perceptual Search Builds on v6.9 (Blended Perceptual + Gated Identity) with: - Feedback collection (๐Ÿ‘/๐Ÿ‘Ž on results) - Pairwise learning-to-rank (learns YOUR definition of "similar") - Online training (improves immediately from votes) - Generalization across whole library (learns channel weights, not clip IDs) The system learns what similarity channels YOU trust: - Do you care more about attack shape or spectral envelope? - Do you prefer embedding similarity or acoustic features? - What tradeoffs matter to YOUR ear? This transfers to new samples automatically. """ import os os.environ["TOKENIZERS_PARALLELISM"] = "false" import io, sys, json, time, math, random, sqlite3, tempfile, subprocess, hashlib, uuid from dataclasses import dataclass from typing import Dict, List, Optional, Tuple import numpy as np import soundfile as sf import librosa import torch from transformers import ClapModel, ClapProcessor # v6.6 modules try: import preprocessing import embeddings_mert import embeddings_panns import distance_metrics V66_MODULES_AVAILABLE = True except ImportError as e: print(f"[scout] Warning: v6.6 modules not available: {e}") V66_MODULES_AVAILABLE = False DEFAULT_DB_PATH = os.path.join("library", "snare_scout.sqlite") DEFAULT_MODEL_NAME = "laion/larger_clap_music" os.makedirs("library", exist_ok=True) CURRENT_INDEX_VERSION = 12 # Same as v6.9 # ============================================================================= # CONFIGURATION # ============================================================================= # Identity boost (from v6.9) IDENTITY_THRESHOLD = float(os.getenv("SCOUT_IDENTITY_THR", "0.88")) IDENTITY_BOOST_WEIGHT = float(os.getenv("SCOUT_IDENTITY_BOOST", "0.35")) # Personalization settings PERSONALIZATION_MIN_PAIRS = int(os.getenv("SCOUT_MIN_PAIRS", "10")) # Min pairs before using PERSONALIZATION_MAX_ALPHA = float(os.getenv("SCOUT_MAX_ALPHA", "0.7")) # Max blend weight PERSONALIZATION_LEARNING_RATE = float(os.getenv("SCOUT_LR", "0.03")) PERSONALIZATION_REGULARIZATION = float(os.getenv("SCOUT_REG", "0.0005")) # Stage A weights (perceptual retrieval) W_RICH = 0.55 W_PATCH = 0.25 W_MEL = 0.15 W_EMB_STAGE_A = 0.05 # Stage B weights (perceptual rerank) W_PATCH_SHIFT_T = 0.12 W_PATCH_SHIFT_B = 0.08 W_TRANS_DESC = 0.05 # Stage C weights (identity score) W_ID_FULL = 0.60 W_ID_TRANS = 0.25 W_ID_TAIL = 0.15 # ============================================================================= # FEATURE VECTOR FOR PERSONALIZATION # ============================================================================= # These are the similarity channels we'll learn to weight FEATURE_NAMES = [ "sim_rich", # Rich acoustic features "sim_patch_multi", # Multiscale patches "sim_mel", # Mel envelope "sim_patch_shift_t", # Shift-tolerant trans "sim_patch_shift_b", # Shift-tolerant tail "sim_trans_desc", # Transient descriptor "sim_emb_full", # PANNs full embedding "sim_emb_trans", # MERT trans embedding "sim_emb_tail", # MERT tail embedding "bias" # Constant term ] N_FEATURES = len(FEATURE_NAMES) # Default weights (matches v6.9 behavior before any learning) DEFAULT_WEIGHTS = np.array([ 0.55, # rich 0.25, # patch_multi 0.15, # mel 0.12, # patch_shift_t 0.08, # patch_shift_b 0.05, # trans_desc 0.05, # emb_full (minimal in perceptual mode) 0.02, # emb_trans 0.02, # emb_tail 0.0 # bias ], dtype=np.float32) # ============================================================================= # Slice timing # ============================================================================= if V66_MODULES_AVAILABLE: FULL_PRE = preprocessing.ONSET_PRE_MS / 1000.0 FULL_POST = preprocessing.ONSET_POST_MS / 1000.0 TRANS_POST = max(0.0, preprocessing.TRANS_END_MS / 1000.0 - FULL_PRE) TAIL_START = max(0.0, preprocessing.TAIL_START_MS / 1000.0 - FULL_PRE) TAIL_END = max(0.0, preprocessing.TAIL_END_MS / 1000.0 - FULL_PRE) else: FULL_PRE, FULL_POST = 0.015, 0.735 TRANS_POST = 0.070 TAIL_START, TAIL_END = 0.015, 0.635 # Feature dimensions PATCH_N_MELS, PATCH_N_FRAMES = 32, 24 PATCH_DIM = PATCH_N_MELS * PATCH_N_FRAMES SHIFT_FRAMES = 3 SHIFT_VARIANTS = [-SHIFT_FRAMES, 0, SHIFT_FRAMES] SHIFTED_PATCH_DIM = PATCH_DIM * len(SHIFT_VARIANTS) PATCH_SCALES = [16, 24, 32, 48] MULTISCALE_PATCH_DIM = PATCH_N_MELS * sum(PATCH_SCALES) TRANS_DESC_DIM = 8 # Rich features N_MFCC = 20 MFCC_DIM = N_MFCC * 3 SPECTRAL_DIM = 13 ENVELOPE_DIM = 16 ATTACK_DECAY_DIM = 16 TEXTURE_DIM = 8 RICH_FEATURES_DIM = MFCC_DIM + SPECTRAL_DIM + ENVELOPE_DIM + ATTACK_DECAY_DIM + TEXTURE_DIM # Embedding backends EMB_FULL_BACKEND = os.getenv("SCOUT_EMB_FULL", "panns").strip().lower() EMB_DETAIL_BACKEND = os.getenv("SCOUT_EMB_DETAIL", "mert").strip().lower() FULL_MODE = os.getenv("SCOUT_FULL_MODE", "1").strip().lower() in ("1", "true", "yes", "on") def explain_pipeline(): return f""" **Snare Scout v7.0 โ€” Personalized Perceptual Search** **Base:** Blended perceptual + gated identity (v6.9) **New:** Learning-to-rank from your feedback **How it works:** 1. ๐Ÿ‘/๐Ÿ‘Ž on results creates preference pairs 2. System learns which similarity channels YOU trust 3. Personalization blends in as you give more feedback **Current settings:** - Min pairs to activate: {PERSONALIZATION_MIN_PAIRS} - Max personalization blend: {PERSONALIZATION_MAX_ALPHA:.0%} - Learning rate: {PERSONALIZATION_LEARNING_RATE} **Feature channels being learned:** {', '.join(FEATURE_NAMES[:-1])} (Index v{CURRENT_INDEX_VERSION}) """ # ============================================================================= # Database # ============================================================================= def _connect_db(db_path): os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True) con = sqlite3.connect(db_path, timeout=30) con.execute("PRAGMA journal_mode=WAL") con.execute("PRAGMA busy_timeout=10000") return con def init_db(db_path): con = _connect_db(db_path) # Original tables con.execute("CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value TEXT)") con.execute("""CREATE TABLE IF NOT EXISTS videos ( video_id TEXT PRIMARY KEY, title TEXT, status TEXT, last_error TEXT, updated_at REAL, index_ver INTEGER)""") con.execute("""CREATE TABLE IF NOT EXISTS clips ( id INTEGER PRIMARY KEY, video_id TEXT, title TEXT, url TEXT, t0 REAL, t1 REAL, emb BLOB, mel BLOB, emb_t BLOB, emb_b BLOB, mel_t BLOB, mel_b BLOB, patch_t BLOB, patch_b BLOB, patch_t_shifted BLOB, patch_b_shifted BLOB, trans_desc BLOB, patch_multi BLOB, rich_features BLOB, index_ver INTEGER, created_at REAL)""") # v7.0 Personalization tables con.execute("""CREATE TABLE IF NOT EXISTS feedback_sessions ( session_id TEXT PRIMARY KEY, created_at REAL, query_hash TEXT, mode TEXT, notes TEXT )""") con.execute("""CREATE TABLE IF NOT EXISTS feedback_votes ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT, clip_id INTEGER, vote INTEGER, rank_at_vote INTEGER, created_at REAL )""") con.execute("CREATE INDEX IF NOT EXISTS idx_votes_session ON feedback_votes(session_id)") con.execute("""CREATE TABLE IF NOT EXISTS feedback_pairs ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT, pos_clip_id INTEGER, neg_clip_id INTEGER, created_at REAL )""") con.execute("""CREATE TABLE IF NOT EXISTS feedback_candidate_features ( session_id TEXT, clip_id INTEGER, features BLOB, score REAL DEFAULT 0, rank INTEGER DEFAULT 0, PRIMARY KEY(session_id, clip_id) )""") con.execute("""CREATE TABLE IF NOT EXISTS personalization_model ( key TEXT PRIMARY KEY, value BLOB )""") # Migration for clips table cols = {r[1] for r in con.execute("PRAGMA table_info(clips)").fetchall()} for col, t in [("emb", "BLOB"), ("mel", "BLOB"), ("emb_t", "BLOB"), ("emb_b", "BLOB"), ("mel_t", "BLOB"), ("mel_b", "BLOB"), ("patch_t", "BLOB"), ("patch_b", "BLOB"), ("patch_t_shifted", "BLOB"), ("patch_b_shifted", "BLOB"), ("trans_desc", "BLOB"), ("patch_multi", "BLOB"), ("rich_features", "BLOB"), ("index_ver", "INTEGER"), ("created_at", "REAL")]: if col not in cols: con.execute(f"ALTER TABLE clips ADD COLUMN {col} {t}") con.execute("CREATE INDEX IF NOT EXISTS idx_clips_ver ON clips(index_ver)") # Migration for feedback_candidate_features (v7.0+) try: fcf_cols = {r[1] for r in con.execute("PRAGMA table_info(feedback_candidate_features)").fetchall()} for col, t in [("score", "REAL DEFAULT 0"), ("rank", "INTEGER DEFAULT 0")]: col_name = col.split()[0] if " " in col else col if col_name not in fcf_cols: con.execute(f"ALTER TABLE feedback_candidate_features ADD COLUMN {col} {t}") except: pass # Table might not exist yet con.commit() con.close() def get_db_stats(db_path): init_db(db_path) con = _connect_db(db_path) clips = con.execute("SELECT COUNT(*) FROM clips").fetchone()[0] clips_cur = con.execute("SELECT COUNT(*) FROM clips WHERE index_ver=?", (CURRENT_INDEX_VERSION,)).fetchone()[0] videos_ok = con.execute("SELECT COUNT(*) FROM videos WHERE status='ok'").fetchone()[0] videos_total = con.execute("SELECT COUNT(*) FROM videos").fetchone()[0] # Personalization stats n_pairs = con.execute("SELECT COUNT(*) FROM feedback_pairs").fetchone()[0] n_votes = con.execute("SELECT COUNT(*) FROM feedback_votes").fetchone()[0] n_sessions = con.execute("SELECT COUNT(*) FROM feedback_sessions").fetchone()[0] con.close() return { "clips": clips, "clips_v4": clips_cur, "clips_legacy": clips - clips_cur, "videos_total": videos_total, "videos_ok": videos_ok, "videos_dead": videos_total - videos_ok, "feedback_pairs": n_pairs, "feedback_votes": n_votes, "feedback_sessions": n_sessions } def purge_legacy(db_path): init_db(db_path) con = _connect_db(db_path) n = con.execute("SELECT COUNT(*) FROM clips WHERE COALESCE(index_ver,1)!=?", (CURRENT_INDEX_VERSION,)).fetchone()[0] con.execute("DELETE FROM clips WHERE COALESCE(index_ver,1)!=?", (CURRENT_INDEX_VERSION,)) con.commit() con.close() return int(n) # ============================================================================= # PERSONALIZATION MODEL # ============================================================================= class PersonalizationModel: """ Pairwise logistic regression for learning user preferences. Learns weights for similarity channels based on ๐Ÿ‘/๐Ÿ‘Ž feedback. """ def __init__(self, db_path: str): self.db_path = db_path self.weights = self._load_weights() self.n_pairs_trained = self._count_pairs() def _load_weights(self) -> np.ndarray: """Load weights from DB or return defaults.""" try: con = _connect_db(self.db_path) row = con.execute( "SELECT value FROM personalization_model WHERE key='weights'" ).fetchone() con.close() if row: return np.frombuffer(row[0], dtype=np.float32).copy() except: pass return DEFAULT_WEIGHTS.copy() def _save_weights(self): """Save weights to DB.""" con = _connect_db(self.db_path) con.execute( "INSERT OR REPLACE INTO personalization_model (key, value) VALUES (?, ?)", ("weights", self.weights.astype(np.float32).tobytes()) ) con.commit() con.close() def _count_pairs(self) -> int: """Count total training pairs.""" try: con = _connect_db(self.db_path) n = con.execute("SELECT COUNT(*) FROM feedback_pairs").fetchone()[0] con.close() return n except: return 0 def get_blend_alpha(self) -> float: """ How much to blend personalized scores vs base scores. Increases with more training data. """ if self.n_pairs_trained < PERSONALIZATION_MIN_PAIRS: return 0.0 # Not enough data yet # Gradually increase alpha as we get more pairs alpha = 0.15 + 0.002 * (self.n_pairs_trained - PERSONALIZATION_MIN_PAIRS) return min(PERSONALIZATION_MAX_ALPHA, alpha) def train_step(self, pos_features: np.ndarray, neg_features: np.ndarray): """ Single SGD step for pairwise logistic regression. pos_features: feature vector for upvoted candidate neg_features: feature vector for downvoted candidate """ d = pos_features - neg_features # Sigmoid logit = np.dot(self.weights, d) p = 1.0 / (1.0 + np.exp(-np.clip(logit, -30, 30))) # Gradient update: want wยทd to be positive (pos ranks above neg) grad = (1.0 - p) * d - PERSONALIZATION_REGULARIZATION * self.weights self.weights += PERSONALIZATION_LEARNING_RATE * grad # Keep weights bounded self.weights = np.clip(self.weights, -5.0, 5.0) def train_on_pairs(self, pairs: List[Tuple[np.ndarray, np.ndarray]], epochs: int = 3): """Train on a batch of pairs.""" for _ in range(epochs): random.shuffle(pairs) for pos_f, neg_f in pairs: self.train_step(pos_f, neg_f) self._save_weights() self.n_pairs_trained = self._count_pairs() def score(self, features: np.ndarray) -> float: """Compute personalized score for a candidate.""" return float(np.dot(self.weights, features)) def score_batch(self, feature_matrix: np.ndarray) -> np.ndarray: """Compute personalized scores for multiple candidates.""" return feature_matrix @ self.weights def reset(self): """Reset to default weights and clear all feedback.""" self.weights = DEFAULT_WEIGHTS.copy() con = _connect_db(self.db_path) con.execute("DELETE FROM feedback_pairs") con.execute("DELETE FROM feedback_votes") con.execute("DELETE FROM feedback_sessions") con.execute("DELETE FROM feedback_candidate_features") con.execute("DELETE FROM personalization_model") con.commit() con.close() self.n_pairs_trained = 0 def get_weight_report(self) -> str: """Human-readable report of learned weights.""" lines = ["**Learned Weights:**"] for name, w, default in zip(FEATURE_NAMES, self.weights, DEFAULT_WEIGHTS): delta = w - default arrow = "โ†‘" if delta > 0.01 else "โ†“" if delta < -0.01 else "=" lines.append(f" {name}: {w:.3f} (default {default:.3f}) {arrow}") lines.append(f"\n**Training pairs:** {self.n_pairs_trained}") lines.append(f"**Blend alpha:** {self.get_blend_alpha():.2f}") return "\n".join(lines) # Global personalization model (lazy loaded) _PERSONALIZATION_MODEL = None def get_personalization_model(db_path: str = DEFAULT_DB_PATH) -> PersonalizationModel: global _PERSONALIZATION_MODEL if _PERSONALIZATION_MODEL is None or _PERSONALIZATION_MODEL.db_path != db_path: _PERSONALIZATION_MODEL = PersonalizationModel(db_path) return _PERSONALIZATION_MODEL # ============================================================================= # FEEDBACK MANAGEMENT # ============================================================================= def create_feedback_session(db_path: str, query_hash: str, mode: str = "perceptual") -> str: """Create a new feedback session for a search query.""" session_id = str(uuid.uuid4())[:12] con = _connect_db(db_path) con.execute( "INSERT INTO feedback_sessions (session_id, created_at, query_hash, mode) VALUES (?, ?, ?, ?)", (session_id, time.time(), query_hash, mode) ) con.commit() con.close() return session_id def store_candidate_features(db_path: str, session_id: str, clip_id: int, features: np.ndarray, score: float = 0.0, rank: int = 0): """Store feature vector and score for a candidate in a session.""" con = _connect_db(db_path) con.execute( "INSERT OR REPLACE INTO feedback_candidate_features (session_id, clip_id, features, score, rank) VALUES (?, ?, ?, ?, ?)", (session_id, clip_id, features.astype(np.float32).tobytes(), score, rank) ) con.commit() con.close() def store_candidate_features_batch(db_path: str, session_id: str, clip_ids: List[int], features_list: List[np.ndarray], scores: List[float] = None, ranks: List[int] = None): """Store feature vectors, scores, and ranks for multiple candidates.""" con = _connect_db(db_path) if scores is None: scores = [0.0] * len(clip_ids) if ranks is None: ranks = list(range(1, len(clip_ids) + 1)) for clip_id, features, score, rank in zip(clip_ids, features_list, scores, ranks): con.execute( "INSERT OR REPLACE INTO feedback_candidate_features (session_id, clip_id, features, score, rank) VALUES (?, ?, ?, ?, ?)", (session_id, clip_id, features.astype(np.float32).tobytes(), score, rank) ) con.commit() con.close() def record_vote(db_path: str, session_id: str, clip_id: int, vote: int, rank: int): """ Record a vote (๐Ÿ‘ = +1, ๐Ÿ‘Ž = -1). Training logic (robust learning-to-rank): - YES + NO: Create explicit pairs (strongest signal) - YES only: Create pairs against implicit negatives that are: * Ranked below the upvoted item * Have score at least MARGIN lower than the upvoted item * Were actually shown to the user - NO only: Store but don't train (no positive to learn from) Returns number of pairs created. """ IMPLICIT_NEGATIVE_MARGIN = 0.10 # Score gap required for implicit negative MAX_IMPLICIT_NEGATIVES = 3 # Max implicit pairs per upvote con = _connect_db(db_path) # Store vote con.execute( "INSERT INTO feedback_votes (session_id, clip_id, vote, rank_at_vote, created_at) VALUES (?, ?, ?, ?, ?)", (session_id, clip_id, vote, rank, time.time()) ) # Get all votes for this session votes = con.execute( "SELECT clip_id, vote, rank_at_vote FROM feedback_votes WHERE session_id=?", (session_id,) ).fetchall() upvoted = [(v[0], v[2]) for v in votes if v[1] > 0] # (clip_id, rank) downvoted = [(v[0], v[2]) for v in votes if v[1] < 0] # (clip_id, rank) # No upvotes = no training (downvote-only doesn't help) if not upvoted: con.commit() con.close() return 0 # Get all candidates shown in this session WITH their scores all_candidates = con.execute( "SELECT clip_id, score, rank FROM feedback_candidate_features WHERE session_id=? ORDER BY rank", (session_id,) ).fetchall() # Build lookup: clip_id -> (score, rank) candidate_info = {r[0]: (r[1], r[2]) for r in all_candidates} voted_ids = {v[0] for v in votes} downvoted_ids = {v[0] for v in downvoted} new_pairs = [] for pos_id, pos_vote_rank in upvoted: pos_score, pos_orig_rank = candidate_info.get(pos_id, (0.0, pos_vote_rank)) # Strategy 1: Explicit negatives (user clicked ๐Ÿ‘Ž) - strongest signal for neg_id, neg_vote_rank in downvoted: existing = con.execute( "SELECT 1 FROM feedback_pairs WHERE session_id=? AND pos_clip_id=? AND neg_clip_id=?", (session_id, pos_id, neg_id) ).fetchone() if not existing: con.execute( "INSERT INTO feedback_pairs (session_id, pos_clip_id, neg_clip_id, created_at) VALUES (?, ?, ?, ?)", (session_id, pos_id, neg_id, time.time()) ) new_pairs.append((pos_id, neg_id)) # Strategy 2: Implicit negatives (only if no explicit downvotes) # Use margin rule: item must be ranked below AND score gap >= MARGIN if not downvoted: implicit_negatives = [] for cid, (cand_score, cand_rank) in candidate_info.items(): # Skip if: already voted on, same as positive, or not meeting criteria if cid in voted_ids: continue if cid == pos_id: continue # Must be ranked below the upvoted item if cand_rank <= pos_orig_rank: continue # Must have score at least MARGIN lower score_gap = pos_score - cand_score if score_gap < IMPLICIT_NEGATIVE_MARGIN: continue # Good implicit negative candidate implicit_negatives.append((cid, cand_score, cand_rank, score_gap)) # Sort by score gap (larger gap = more confident negative) implicit_negatives.sort(key=lambda x: -x[3]) # Take top k for neg_id, neg_score, neg_rank, gap in implicit_negatives[:MAX_IMPLICIT_NEGATIVES]: existing = con.execute( "SELECT 1 FROM feedback_pairs WHERE session_id=? AND pos_clip_id=? AND neg_clip_id=?", (session_id, pos_id, neg_id) ).fetchone() if not existing: con.execute( "INSERT INTO feedback_pairs (session_id, pos_clip_id, neg_clip_id, created_at) VALUES (?, ?, ?, ?)", (session_id, pos_id, neg_id, time.time()) ) new_pairs.append((pos_id, neg_id)) con.commit() # Train on new pairs if new_pairs: training_pairs = [] for pos_id, neg_id in new_pairs: pos_row = con.execute( "SELECT features FROM feedback_candidate_features WHERE session_id=? AND clip_id=?", (session_id, pos_id) ).fetchone() neg_row = con.execute( "SELECT features FROM feedback_candidate_features WHERE session_id=? AND clip_id=?", (session_id, neg_id) ).fetchone() if pos_row and neg_row: pos_f = np.frombuffer(pos_row[0], dtype=np.float32) neg_f = np.frombuffer(neg_row[0], dtype=np.float32) training_pairs.append((pos_f, neg_f)) if training_pairs: model = get_personalization_model(db_path) model.train_on_pairs(training_pairs, epochs=2) con.close() return len(new_pairs) def remove_vote(db_path: str, session_id: str, clip_id: int): """ Remove a vote and any training pairs it created. Returns number of pairs removed. """ con = _connect_db(db_path) # Get the vote being removed vote_row = con.execute( "SELECT vote FROM feedback_votes WHERE session_id=? AND clip_id=? ORDER BY created_at DESC LIMIT 1", (session_id, clip_id) ).fetchone() if not vote_row: con.close() return 0 removed_vote = vote_row[0] # Remove the vote(s) for this clip in this session con.execute( "DELETE FROM feedback_votes WHERE session_id=? AND clip_id=?", (session_id, clip_id) ) # Remove any pairs involving this clip pairs_removed = 0 if removed_vote > 0: # Was upvoted - remove pairs where this was the positive result = con.execute( "DELETE FROM feedback_pairs WHERE session_id=? AND pos_clip_id=?", (session_id, clip_id) ) pairs_removed = result.rowcount else: # Was downvoted - remove pairs where this was the negative result = con.execute( "DELETE FROM feedback_pairs WHERE session_id=? AND neg_clip_id=?", (session_id, clip_id) ) pairs_removed = result.rowcount con.commit() con.close() # Note: We don't "untrain" the model - the pairs are just removed from future training # The model will naturally adjust as more votes come in return pairs_removed def get_feedback_stats(db_path: str) -> dict: """Get feedback statistics.""" con = _connect_db(db_path) n_pairs = con.execute("SELECT COUNT(*) FROM feedback_pairs").fetchone()[0] n_votes = con.execute("SELECT COUNT(*) FROM feedback_votes").fetchone()[0] n_up = con.execute("SELECT COUNT(*) FROM feedback_votes WHERE vote > 0").fetchone()[0] n_down = con.execute("SELECT COUNT(*) FROM feedback_votes WHERE vote < 0").fetchone()[0] n_sessions = con.execute("SELECT COUNT(*) FROM feedback_sessions").fetchone()[0] con.close() model = get_personalization_model(db_path) return { "total_pairs": n_pairs, "total_votes": n_votes, "upvotes": n_up, "downvotes": n_down, "sessions": n_sessions, "blend_alpha": model.get_blend_alpha(), "personalization_active": model.get_blend_alpha() > 0 } def reset_personalization(db_path: str): """Reset all personalization data.""" model = get_personalization_model(db_path) model.reset() # ============================================================================= # Audio utilities # ============================================================================= # Minimum samples needed for neural networks (0.5 sec at 48kHz) MIN_AUDIO_SAMPLES = 24000 def _resample_mono(y, sr, target_sr=48000): if y.ndim > 1: y = np.mean(y, axis=1) y = y.astype(np.float32) peak = np.max(np.abs(y)) if peak > 1e-9: y = y / peak if sr != target_sr: y = librosa.resample(y, orig_sr=sr, target_sr=target_sr) return y, target_sr def _pad_to_minimum(y, min_samples=MIN_AUDIO_SAMPLES): """Pad audio to minimum length required by neural networks.""" if len(y) >= min_samples: return y # Pad with zeros (silence) at the end return np.pad(y, (0, min_samples - len(y)), mode='constant') def _is_too_short(y, min_samples=MIN_AUDIO_SAMPLES): """Check if audio is too short even for padding to help.""" # If it's less than 10% of minimum, it's probably not a real hit return len(y) < min_samples // 10 # ============================================================================= # CLAP embedder # ============================================================================= @dataclass class Embedder: model_name: str device: str processor: ClapProcessor model: ClapModel text_cache: Dict[str, np.ndarray] _EMBEDDER_CACHE = {} def get_embedder(model_name=DEFAULT_MODEL_NAME): if model_name in _EMBEDDER_CACHE: return _EMBEDDER_CACHE[model_name] print(f"[scout] Loading CLAP model: {model_name}...") device = "mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu" print(f"[scout] Using device: {device}") processor = ClapProcessor.from_pretrained(model_name) model = ClapModel.from_pretrained(model_name).to(device).eval() print(f"[scout] Model loaded and ready") embedder = Embedder(model_name, device, processor, model, {}) _EMBEDDER_CACHE[model_name] = embedder return embedder @torch.inference_mode() def embed_audio(embedder, y, sr): y, sr = _resample_mono(y, sr, 48000) # Pad short audio for CLAP y = _pad_to_minimum(y, MIN_AUDIO_SAMPLES) try: inputs = embedder.processor(audio=y, sampling_rate=sr, return_tensors="pt") except: inputs = embedder.processor(audios=y, sampling_rate=sr, return_tensors="pt") inputs = {k: v.to(embedder.device) for k, v in inputs.items()} v = embedder.model.get_audio_features(**inputs).detach().float().cpu().numpy().reshape(-1) return (v / (np.linalg.norm(v) + 1e-9)).astype(np.float32) @torch.inference_mode() def embed_texts(embedder, texts): new = [t for t in texts if t not in embedder.text_cache] if new: inputs = embedder.processor(text=new, return_tensors="pt", padding=True) inputs = {k: v.to(embedder.device) for k, v in inputs.items()} arr = embedder.model.get_text_features(**inputs).detach().float().cpu().numpy() arr = arr / (np.linalg.norm(arr, axis=1, keepdims=True) + 1e-9) for t, v in zip(new, arr): embedder.text_cache[t] = v.astype(np.float32) return np.stack([embedder.text_cache[t] for t in texts]) # ============================================================================= # Embedding backends # ============================================================================= def _embed_with_backend(backend: str, embedder, audio: np.ndarray, sr: int) -> np.ndarray: backend = (backend or "clap").lower() # Pad short audio to minimum length audio = _pad_to_minimum(audio, MIN_AUDIO_SAMPLES) if backend == "panns" and V66_MODULES_AVAILABLE: return embeddings_panns.embed_audio_panns(audio, sr) if backend == "mert" and V66_MODULES_AVAILABLE and embeddings_mert.is_mert_available(): return embeddings_mert.embed_audio_mert(audio, sr) return embed_audio(embedder, audio, sr) def embed_full(embedder, audio: np.ndarray, sr: int) -> np.ndarray: return _embed_with_backend(EMB_FULL_BACKEND, embedder, audio, sr) def embed_detail(embedder, audio: np.ndarray, sr: int) -> np.ndarray: return _embed_with_backend(EMB_DETAIL_BACKEND, embedder, audio, sr) def embed_matching_library(embedder, audio: np.ndarray, sr: int, target_dim: int) -> np.ndarray: """Auto-select backend based on target dimension, with padding for short audio.""" # Pad short audio to minimum length audio = _pad_to_minimum(audio, MIN_AUDIO_SAMPLES) if target_dim == 2048 and V66_MODULES_AVAILABLE: return embeddings_panns.embed_audio_panns(audio, sr) elif target_dim == 1024 and V66_MODULES_AVAILABLE and embeddings_mert.is_mert_available(): return embeddings_mert.embed_audio_mert(audio, sr) elif target_dim == 512: return embed_audio(embedder, audio, sr) raise ValueError(f"Unknown target dimension: {target_dim}") # ============================================================================= # Feature extraction # ============================================================================= def mel_shape(y, sr, n_mels=64): y, sr = _resample_mono(y, sr, 48000) S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels, fmax=14000) v = librosa.power_to_db(S + 1e-10).mean(axis=1).astype(np.float32) v -= np.mean(v) return (v / (np.linalg.norm(v) + 1e-9)).astype(np.float32) def mel_patch(y, sr, n_mels=PATCH_N_MELS, n_frames=PATCH_N_FRAMES): y, sr = _resample_mono(y, sr, 48000) S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels, fmax=14000, hop_length=256) logS = librosa.power_to_db(S + 1e-10).astype(np.float32) T = logS.shape[1] if T <= 1: out = np.zeros((n_mels, n_frames), np.float32) else: out = np.array([np.interp(np.linspace(0, 1, n_frames), np.linspace(0, 1, T), logS[i]) for i in range(n_mels)]) v = out.reshape(-1) v -= np.mean(v) return (v / (np.linalg.norm(v) + 1e-9)).astype(np.float32) def mel_patch_shifted(y, sr): y, sr = _resample_mono(y, sr, 48000) patches = [] for shift in SHIFT_VARIANTS: s = shift * 256 if s < 0: ys = y[abs(s):] elif s > 0: ys = np.concatenate([np.zeros(s, np.float32), y]) else: ys = y patches.append(mel_patch(ys, sr) if len(ys) > 100 else np.zeros(PATCH_DIM, np.float32)) combined = np.concatenate(patches) return (combined / (np.linalg.norm(combined) + 1e-9)).astype(np.float32) def mel_patch_multiscale(y, sr): y, sr = _resample_mono(y, sr, 48000) patches = [mel_patch(y, sr, n_frames=n) for n in PATCH_SCALES] combined = np.concatenate(patches) return (combined / (np.linalg.norm(combined) + 1e-9)).astype(np.float32) def transient_descriptor(y, sr): y, sr = _resample_mono(y, sr, 48000) if len(y) < 512: return np.zeros(TRANS_DESC_DIM, np.float32) desc = np.zeros(TRANS_DESC_DIM, np.float32) env = np.abs(y) win = max(1, int(0.002 * sr)) if win > 1: env = np.convolve(env, np.ones(win) / win, 'same') attack_samples = int(0.02 * sr) if attack_samples < len(env): desc[0] = float(np.max(np.diff(env[:attack_samples]))) desc[1] = float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))) / sr desc[2] = float(np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr))) / sr rms = float(np.sqrt(np.mean(y ** 2))) + 1e-9 desc[3] = min(float(np.max(np.abs(y))) / rms, 10) / 10 desc[4] = float(np.mean(librosa.feature.zero_crossing_rate(y))) onset = librosa.onset.onset_strength(y=y, sr=sr) desc[5] = float(np.mean(onset)) / (float(np.max(onset)) + 1e-9) S = np.abs(librosa.stft(y)) n = S.shape[0] total = float(np.mean(S)) + 1e-9 desc[6] = float(np.mean(S[:n // 4])) / total desc[7] = float(np.mean(S[n * 3 // 4:])) / total desc -= np.mean(desc) return (desc / (np.linalg.norm(desc) + 1e-9)).astype(np.float32) # Rich features (condensed from v6.9) def _compute_envelope(y, sr, hop=256): env = np.array([np.sqrt(np.mean(y[i:i + hop] ** 2)) for i in range(0, max(1, len(y) - hop), hop)]) if len(env) < 2: return np.zeros(50, np.float32) win = max(1, len(env) // 20) if win > 1: env = np.convolve(env, np.ones(win) / win, 'same') return (env / (np.max(env) + 1e-9)).astype(np.float32) def extract_envelope_features(y, sr): y, sr = _resample_mono(y, sr, 48000) feats = np.zeros(ENVELOPE_DIM, np.float32) if len(y) < 256: return feats try: env = _compute_envelope(y, sr) if len(env) < 4: return feats peak_idx = np.argmax(env) feats[0] = peak_idx / len(env) env_resamp = np.interp(np.linspace(0, 1, 12), np.linspace(0, 1, len(env)), env) feats[1:13] = env_resamp feats[13] = float(np.std(env)) feats[14] = float(np.mean(env)) feats[15] = float(np.std(np.diff(env))) if len(env) > 1 else 0 except: pass return feats.astype(np.float32) def extract_attack_decay_features(y, sr): y, sr = _resample_mono(y, sr, 48000) feats = np.zeros(ATTACK_DECAY_DIM, np.float32) if len(y) < 512: return feats try: env = _compute_envelope(y, sr) if len(env) < 4: return feats peak_idx = np.argmax(env) feats[0] = peak_idx / len(env) if peak_idx > 1: attack = env[:peak_idx] feats[1] = float(np.mean(np.diff(attack))) * 10 linear = np.linspace(0, env[peak_idx], len(attack)) feats[2] = float(np.mean(attack - linear)) attack_samples = min(int(0.02 * sr), len(y) // 2) if attack_samples > 100: try: cent = librosa.feature.spectral_centroid(y=y[:attack_samples], sr=sr) feats[3] = float(np.mean(cent)) / sr except: pass if peak_idx < len(env) - 2: decay = env[peak_idx:] thr_37 = env[peak_idx] * 0.37 below = np.where(decay < thr_37)[0] feats[4] = below[0] / len(env) if len(below) > 0 else 1.0 thr_10 = env[peak_idx] * 0.10 below = np.where(decay < thr_10)[0] feats[5] = below[0] / len(env) if len(below) > 0 else 1.0 if len(decay) > 5: log_decay = np.log(decay + 1e-9) try: feats[6] = np.polyfit(np.arange(len(decay)), log_decay, 1)[0] * 100 except: pass mid = len(decay) // 3 if mid > 0: feats[7] = float(np.mean(decay[mid:2 * mid])) tail_start = 3 * len(decay) // 4 if tail_start < len(decay): feats[8] = float(np.mean(decay[tail_start:])) decay_start = int(peak_idx * len(y) / len(env)) decay_end = min(len(y), decay_start + len(y) // 2) if decay_end - decay_start > 256: try: cent = librosa.feature.spectral_centroid(y=y[decay_start:decay_end], sr=sr) feats[9] = float(np.mean(cent)) / sr except: pass except: pass return feats.astype(np.float32) def extract_texture_features(y, sr): y, sr = _resample_mono(y, sr, 48000) feats = np.zeros(TEXTURE_DIM, np.float32) if len(y) < 512: return feats try: flat = librosa.feature.spectral_flatness(y=y) feats[0] = float(np.mean(flat)) feats[1] = float(np.std(flat)) zcr = librosa.feature.zero_crossing_rate(y) feats[2] = float(np.mean(zcr)) try: h, p = librosa.effects.hpss(y) h_energy = float(np.sum(h ** 2)) p_energy = float(np.sum(p ** 2)) total = h_energy + p_energy + 1e-9 feats[3] = h_energy / total feats[4] = p_energy / total except: feats[3] = 0.5 feats[4] = 0.5 bw = librosa.feature.spectral_bandwidth(y=y, sr=sr) feats[5] = float(np.mean(bw)) / sr rms = float(np.sqrt(np.mean(y ** 2))) + 1e-9 feats[6] = min(float(np.max(np.abs(y))) / rms, 10) / 10 rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr) feats[7] = float(np.mean(rolloff)) / sr except: pass return feats.astype(np.float32) def extract_rich_features(y, sr): if V66_MODULES_AVAILABLE: y, sr = preprocessing.canonicalize_audio(y, sr) else: y, sr = _resample_mono(y, sr, 48000) try: mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=N_MFCC, hop_length=256) mfcc_delta = librosa.feature.delta(mfcc) mfcc_feats = np.concatenate([ np.mean(mfcc, axis=1), np.std(mfcc, axis=1), np.mean(mfcc_delta, axis=1) ]) except: mfcc_feats = np.zeros(MFCC_DIM, np.float32) try: cent = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr, hop_length=256)) / sr bw = np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr, hop_length=256)) / sr rolloff = np.mean(librosa.feature.spectral_rolloff(y=y, sr=sr, hop_length=256)) / sr contrast = np.mean(librosa.feature.spectral_contrast(y=y, sr=sr, n_bands=6, hop_length=256), axis=1) flatness = np.mean(librosa.feature.spectral_flatness(y=y, hop_length=256)) onset = librosa.onset.onset_strength(y=y, sr=sr, hop_length=256) spectral_feats = np.array([cent, bw, rolloff, *contrast, flatness, np.mean(onset), np.std(onset)], np.float32) except: spectral_feats = np.zeros(SPECTRAL_DIM, np.float32) envelope_feats = extract_envelope_features(y, sr) attack_decay_feats = extract_attack_decay_features(y, sr) texture_feats = extract_texture_features(y, sr) combined = np.concatenate([mfcc_feats, spectral_feats, envelope_feats, attack_decay_feats, texture_feats]).astype(np.float32) return combined # ============================================================================= # Onset detection # ============================================================================= def _superflux_env(y, sr, hop=256): S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, fmax=14000, hop_length=hop, power=2.0) logS = librosa.power_to_db(S + 1e-10).astype(np.float32) up = np.vstack([logS[0:1], logS[:-1]]) dn = np.vstack([logS[1:], logS[-1:]]) flux = np.maximum(0.0, np.diff(np.maximum.reduce([logS, up, dn]), axis=1)) env = np.concatenate([[0.0], flux.sum(axis=0)]).astype(np.float32) return env / (np.max(env) + 1e-9) def refine_onset_sample(y, sr, onset_samp): if y.size == 0: return 0 env = np.abs((y - 0.97 * np.concatenate(([0], y[:-1]))).astype(np.float32)) win = max(1, int(0.002 * sr)) if win > 1: env = np.convolve(env, np.ones(win) / win, 'same') d = np.diff(env, prepend=env[0]) r = max(1, int(0.02 * sr)) a, b = max(0, onset_samp - r), min(len(y) - 1, onset_samp + r) if b <= a + 2: return max(0, min(len(y) - 1, onset_samp)) peak = a + int(np.argmax(d[a:b])) if env[peak] <= 1e-8: return max(0, min(len(y) - 1, onset_samp)) s = max(0, peak - max(1, int(0.012 * sr))) pre = env[s:peak] if pre.size: below = np.where(pre < 0.2 * env[peak])[0] return s + int(below[-1]) if below.size else peak return peak def detect_onsets_unified(y, sr, max_onsets=12): y, sr = _resample_mono(y, sr, 48000) hop = 256 env = _superflux_env(y, sr, hop) peaks = librosa.util.peak_pick(env, pre_max=3, post_max=3, pre_avg=12, post_avg=12, delta=0.06, wait=max(1, int(0.06 * sr / hop))) if peaks.size == 0: return [0] order = np.argsort(-env[np.clip(peaks, 0, len(env) - 1)]) out, used = [], set() for ii in order: s0 = refine_onset_sample(y, sr, int(librosa.frames_to_samples(int(peaks[ii]), hop_length=hop))) bucket = int(s0 / sr / 0.03) if bucket not in used: used.add(bucket) out.append(s0) if len(out) >= max_onsets: break return out if out else [0] def find_hit_onsets(y, sr, max_hits=12): y, sr = _resample_mono(y, sr, 48000) return [s / sr for s in detect_onsets_unified(y, sr, max_onsets=max_hits)] def _slice(y, sr, t0, t1): s0 = max(0, int(t0 * sr)) s1 = min(len(y), int(t1 * sr)) return y[s0:s1] if s1 > s0 else np.zeros(0, np.float32) def slice_views_from_onset(y, sr, onset_s): y, sr = _resample_mono(y, sr, 48000) onset_s = max(0, min(onset_s, len(y) / sr)) return { "full": _slice(y, sr, onset_s - FULL_PRE, onset_s + FULL_POST), "trans": _slice(y, sr, onset_s - FULL_PRE, onset_s + TRANS_POST), "tail": _slice(y, sr, onset_s + TAIL_START, onset_s + TAIL_END) } def trim_to_first_hit(y, sr): y, sr = _resample_mono(y, sr, 48000) s = detect_onsets_unified(y, sr, 1)[0] return y[max(0, int(s - FULL_PRE * sr)):min(len(y), int(s + FULL_POST * sr))] # ============================================================================= # YouTube utilities (condensed) # ============================================================================= def _run(cmd, timeout=70): try: p = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) return p.returncode, p.stdout + p.stderr except: return 1, "" def ytsearch_video_list(query, n=80, mode="relevance"): prefix = "ytsearchdate" if mode == "date" else "ytsearch" rc, out = _run([sys.executable, "-m", "yt_dlp", "--flat-playlist", "--dump-single-json", "--skip-download", "--socket-timeout", "10", f"{prefix}{n}:{query}"], 40) if rc != 0: return [] try: return [{"id": e["id"], "title": e.get("title", "")} for e in json.loads(out).get("entries", []) if e.get("id")] except: return [] def augment_query_list(queries, max_aug=2): suffixes = ["one shot", "sample pack", "sound effect"] out = [] for q in queries: if not q.strip(): continue out.append(q) rng = random.Random(hash(q) & 0xFFFFFFFF) s = suffixes[:] rng.shuffle(s) for x in s[:max_aug]: out.append(f"{q} {x}") return list(dict.fromkeys(out)) def download_wav_section(url, start, end, out_path, timeout=70): rc, out = _run([sys.executable, "-m", "yt_dlp", "--no-playlist", "-f", "bestaudio[ext=m4a]/bestaudio/best", "--download-sections", f"*{max(0, start)}-{end}", "--force-keyframes-at-cuts", "-x", "--audio-format", "wav", "--postprocessor-args", "ExtractAudio:-ar 48000 -ac 1", "-o", out_path.replace(".wav", ".%(ext)s"), url], timeout) if rc == 0: for f in os.listdir(os.path.dirname(out_path)): if f.endswith(".wav"): return True, "" return False, out _DUR_CACHE = {} _TITLE_CACHE = {} def get_video_duration_seconds(url): if url in _DUR_CACHE: return _DUR_CACHE[url] rc, out = _run([sys.executable, "-m", "yt_dlp", "--dump-single-json", "--skip-download", "--no-playlist", url], 30) if rc == 0: try: data = json.loads(out) _DUR_CACHE[url] = float(data.get("duration", 0)) _TITLE_CACHE[url] = data.get("title", "Unknown") return _DUR_CACHE[url] except: pass return None def get_video_title(url): if url in _TITLE_CACHE: return _TITLE_CACHE[url] get_video_duration_seconds(url) return _TITLE_CACHE.get(url, "Unknown") # ============================================================================= # Index building (same as v6.9) # ============================================================================= def _video_status(con, vid): row = con.execute("SELECT status, COALESCE(index_ver,1) FROM videos WHERE video_id=?", (vid,)).fetchone() return (row[0], int(row[1] or 1)) if row else (None, 0) def _mark_video(con, vid, title, status, err=""): con.execute("INSERT INTO videos VALUES(?,?,?,?,?,?) ON CONFLICT(video_id) DO UPDATE SET title=excluded.title,status=excluded.status,last_error=excluded.last_error,updated_at=excluded.updated_at,index_ver=excluded.index_ver", (vid, title, status, err[:4000], time.time(), CURRENT_INDEX_VERSION)) con.commit() def _blob(v): return sqlite3.Binary(v.astype(np.float16).tobytes()) if v is not None else None def _insert_clip(con, vid, title, url, t0, t1, emb, mel, emb_t, emb_b, mel_t, mel_b, patch_t, patch_b, patch_t_shifted, patch_b_shifted, trans_desc, patch_multi, rich_features): for attempt in range(6): try: con.execute("INSERT INTO clips(video_id,title,url,t0,t1,emb,mel,emb_t,emb_b,mel_t,mel_b,patch_t,patch_b,patch_t_shifted,patch_b_shifted,trans_desc,patch_multi,rich_features,index_ver,created_at) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", (vid, title, url, t0, t1, _blob(emb), _blob(mel), _blob(emb_t), _blob(emb_b), _blob(mel_t), _blob(mel_b), _blob(patch_t), _blob(patch_b), _blob(patch_t_shifted), _blob(patch_b_shifted), _blob(trans_desc), _blob(patch_multi), _blob(rich_features), CURRENT_INDEX_VERSION, time.time())) return except sqlite3.OperationalError as e: if "locked" in str(e).lower(): time.sleep(0.15 * (2 ** attempt)) else: raise def build_library_round(embedder, db_path, queries, max_videos=200, secs_per_video=22, max_hits_per_video=12, windows_per_video=1, window_seed=1, include_intro=True, stop_event=None, progress_cb=None, max_results_per_query=400, pool_multiplier=3.0, include_recent=True, augment_queries=True): """Index videos with v7.0 features.""" init_db(db_path) con = _connect_db(db_path) print(f"[indexing] v7.0 Personalized Perceptual Search") queries = queries[:15] q_list = augment_query_list(queries, max_aug=1) if augment_queries else list(queries) q_list = q_list[:30] per_q = max(20, min(max_results_per_query, int(math.ceil(max_videos * pool_multiplier / max(1, len(q_list)))))) if progress_cb: progress_cb(0, 0, 0, 0, 0, "") candidates = [] for i, q in enumerate(q_list): if stop_event and stop_event.is_set(): break if progress_cb: progress_cb(-(i + 1), len(q_list), 0, 0, 0, q[:50]) candidates.extend(ytsearch_video_list(q, per_q, "relevance")) if include_recent: candidates.extend(ytsearch_video_list(q, max(10, per_q // 2), "date")) seen = set() dedup = [v for v in candidates if not (v["id"] in seen or seen.add(v["id"]))] random.shuffle(dedup) dedup = dedup[:max_videos] if progress_cb: progress_cb(0, len(dedup), 0, 0, 0, "") scanned = ok = dead = added = 0 for v in dedup: if stop_event and stop_event.is_set(): break scanned += 1 vid, title, url = v["id"], v.get("title", ""), f"https://www.youtube.com/watch?v={v['id']}" if progress_cb: progress_cb(scanned, len(dedup), ok, dead, added, title[:60]) status, ver = _video_status(con, vid) if status in ("ok", "dead") and ver >= CURRENT_INDEX_VERSION: continue if windows_per_video <= 1: windows = [(0, secs_per_video)] else: assumed_dur = 180 max_start = max(0, assumed_dur - secs_per_video - 1) rng = random.Random((hash(vid) & 0xFFFFFFFF) ^ window_seed) starts = [0.0] if include_intro else [] for _ in range(50): if len(starts) >= windows_per_video: break s = rng.uniform(0, max_start) if all(abs(s - x) > secs_per_video for x in starts): starts.append(s) windows = [(s, s + secs_per_video) for s in sorted(starts)] hits, any_ok, last_log = 0, False, "" with tempfile.TemporaryDirectory() as td: for wi, (start, end) in enumerate(windows): if (stop_event and stop_event.is_set()) or hits >= max_hits_per_video: break wav = os.path.join(td, f"s{wi}.wav") success, log = download_wav_section(url, start, end, wav) last_log = log wav_file = next((os.path.join(td, f) for f in os.listdir(td) if f.endswith(".wav")), None) if not wav_file: continue any_ok = True try: y, sr = sf.read(wav_file, dtype="float32", always_2d=False) except: continue y, sr = _resample_mono(y, sr, 48000) found_onsets = find_hit_onsets(y, sr, max_hits_per_video) for onset in found_onsets: if hits >= max_hits_per_video: break views = slice_views_from_onset(y, sr, onset) full, trans, tail = views["full"], views["trans"], views["tail"] # Skip clips that are too short for neural networks # MIN_AUDIO_SAMPLES = 24000 (0.5 sec at 48kHz) if full.size < MIN_AUDIO_SAMPLES // 2 or trans.size < int(0.04 * sr): continue z = lambda d: np.zeros(d, np.float32) try: emb_full = embed_full(embedder, full, sr) except: continue if FULL_MODE: try: emb_t = embed_detail(embedder, trans, sr) if trans.size > 100 else z(1024) emb_b = embed_detail(embedder, tail, sr) if tail.size > 100 else z(1024) except: emb_t, emb_b = z(1024), z(1024) else: emb_t, emb_b = emb_full, z(len(emb_full)) try: _insert_clip(con, vid, title, url, max(0, start + onset - FULL_PRE), start + onset + FULL_POST, emb_full, mel_shape(full, sr), emb_t, emb_b, mel_shape(trans, sr) if trans.size > 100 else z(64), mel_shape(tail, sr) if tail.size > 100 else z(64), mel_patch(trans, sr) if trans.size > 100 else z(PATCH_DIM), mel_patch(tail, sr) if tail.size > 100 else z(PATCH_DIM), mel_patch_shifted(trans, sr) if trans.size > 100 else z(SHIFTED_PATCH_DIM), mel_patch_shifted(tail, sr) if tail.size > 100 else z(SHIFTED_PATCH_DIM), transient_descriptor(trans, sr) if trans.size > 100 else z(TRANS_DESC_DIM), mel_patch_multiscale(full, sr), extract_rich_features(full, sr)) added += 1 hits += 1 except: continue con.commit() try: os.remove(wav_file) except: pass if any_ok: ok += 1 _mark_video(con, vid, title, "ok") else: dead += 1 _mark_video(con, vid, title, "dead", last_log) if progress_cb: progress_cb(scanned, len(dedup), ok, dead, added, title[:60]) con.close() return added # ============================================================================= # Load library # ============================================================================= def _safe_frombuf(blob, dim): if blob is None: return np.zeros(dim, np.float32) nbytes = len(blob) if nbytes == dim * 2: arr = np.frombuffer(blob, dtype=np.float16).astype(np.float32, copy=False) elif nbytes == dim * 4: arr = np.frombuffer(blob, dtype=np.float32).astype(np.float32, copy=False) else: arr = np.frombuffer(blob, dtype=np.float16).astype(np.float32, copy=False) if arr.size == dim: return arr out = np.zeros(dim, np.float32) n = min(dim, arr.size) out[:n] = arr[:n] return out def load_library_matrices(db_path, include_legacy=False): init_db(db_path) con = _connect_db(db_path) sql = "SELECT id,title,url,t0,t1,emb,mel,emb_t,emb_b,mel_t,mel_b,patch_t,patch_b,patch_t_shifted,patch_b_shifted,trans_desc,patch_multi,rich_features,COALESCE(index_ver,1) FROM clips" if not include_legacy: sql += f" WHERE COALESCE(index_ver,1)={CURRENT_INDEX_VERSION}" rows = con.execute(sql).fetchall() con.close() if not rows: return {"ids": np.array([], np.int64)} def norm(mat): mat = mat.astype(np.float32, copy=False) return mat / (np.linalg.norm(mat, axis=1, keepdims=True) + 1e-9) def detect_dim(blob, default): if blob is None or len(blob) == 0: return default return len(blob) // 2 if len(blob) % 2 == 0 else len(blob) // 4 def first_blob(col_idx): for r in rows: if r[col_idx] and len(r[col_idx]) > 0: return r[col_idx] return None emb_dim_full = detect_dim(first_blob(5), 512) emb_dim_t = detect_dim(first_blob(7), emb_dim_full) emb_dim_b = detect_dim(first_blob(8), emb_dim_full) rich_dim = detect_dim(first_blob(17), RICH_FEATURES_DIM) print(f"[scout] Dims: full={emb_dim_full}, trans={emb_dim_t}, tail={emb_dim_b}, rich={rich_dim}") print(f"[scout] โœ“ Loaded {len(rows)} clips") rich_raw = np.stack([_safe_frombuf(r[17], rich_dim) for r in rows]).astype(np.float32) return { "ids": np.array([r[0] for r in rows], np.int64), "titles": np.array([r[1] for r in rows], object), "urls": np.array([r[2] for r in rows], object), "t0s": np.array([r[3] for r in rows], np.float32), "t1s": np.array([r[4] for r in rows], np.float32), "vers": np.array([int(r[18] or 1) for r in rows], np.int32), "emb": norm(np.stack([_safe_frombuf(r[5], emb_dim_full) for r in rows])), "mel": norm(np.stack([_safe_frombuf(r[6], 64) for r in rows])), "emb_t": norm(np.stack([_safe_frombuf(r[7], emb_dim_t) for r in rows])), "emb_b": norm(np.stack([_safe_frombuf(r[8], emb_dim_b) for r in rows])), "mel_t": norm(np.stack([_safe_frombuf(r[9], 64) for r in rows])), "mel_b": norm(np.stack([_safe_frombuf(r[10], 64) for r in rows])), "patch_t": norm(np.stack([_safe_frombuf(r[11], PATCH_DIM) for r in rows])), "patch_b": norm(np.stack([_safe_frombuf(r[12], PATCH_DIM) for r in rows])), "patch_t_shifted": norm(np.stack([_safe_frombuf(r[13], SHIFTED_PATCH_DIM) for r in rows])), "patch_b_shifted": norm(np.stack([_safe_frombuf(r[14], SHIFTED_PATCH_DIM) for r in rows])), "trans_desc": norm(np.stack([_safe_frombuf(r[15], TRANS_DESC_DIM) for r in rows])), "patch_multi": norm(np.stack([_safe_frombuf(r[16], MULTISCALE_PATCH_DIM) for r in rows])), "rich_features_raw": rich_raw, "rich_features": norm(rich_raw), "emb_dim_full": emb_dim_full, "emb_dim_t": emb_dim_t, "emb_dim_b": emb_dim_b, "rich_dim": rich_dim, } # ============================================================================= # SEARCH โ€” v7.0 Personalized Perceptual # ============================================================================= def _chunked_dot(mat, v, chunk=20000): if mat.shape[1] != v.shape[0]: raise ValueError(f"Dim mismatch: lib={mat.shape[1]} vs query={v.shape[0]}") out = np.empty(mat.shape[0], dtype=np.float32) for i in range(0, mat.shape[0], chunk): j = min(mat.shape[0], i + chunk) out[i:j] = mat[i:j] @ v return out def _best_shift_sim(q, lib): n = len(SHIFT_VARIANTS) lib_r = lib.reshape(-1, n, PATCH_DIM) q_r = q.reshape(n, PATCH_DIM) lib_n = lib_r / (np.linalg.norm(lib_r, axis=2, keepdims=True) + 1e-9) q_n = q_r / (np.linalg.norm(q_r, axis=1, keepdims=True) + 1e-9) best = np.full(lib.shape[0], -1.0, np.float32) for qi in range(n): for li in range(n): if abs(qi - li) <= 1: best = np.maximum(best, np.sum(q_n[qi] * lib_n[:, li, :], axis=1)) return best def search_library(embedder, query_bytes, lib, top_k=20, apply_negative_filter=False, query_max_onsets=1, rerank_top_n=400, debug=False, db_path=DEFAULT_DB_PATH, return_session_id=False, exploration_inject=0, exploration_pool=300, debug_clip_id=None): """ v7.0 Personalized Perceptual Search Stage A: Perceptual retrieval Stage B: Perceptual rerank (shift-tolerant) Stage C: Identity boost (gated) Stage D: Personalization blend (learned from feedback) Returns results, and optionally a session_id for feedback. """ if lib.get("ids") is None or len(lib["ids"]) == 0: return ([], None) if return_session_id else [] # Load audio try: y, sr = sf.read(io.BytesIO(query_bytes), dtype="float32", always_2d=False) y, sr = _resample_mono(y, sr, 48000) except Exception as e: if debug: print(f"[search] Load failed: {e}") return ([], None) if return_session_id else [] # Create session for feedback query_hash = hashlib.md5(query_bytes[:10000]).hexdigest()[:12] session_id = create_feedback_session(db_path, query_hash) # Detect onsets onset_samples = detect_onsets_unified(y, sr, max_onsets=query_max_onsets) best_scores = None best_features = None best_sims = None # Store similarity channels for personalization for onset_samp in onset_samples: onset_time = onset_samp / sr views = slice_views_from_onset(y, sr, onset_time) full, trans, tail = views["full"], views["trans"], views["tail"] # Skip if clip is too short if full.size < MIN_AUDIO_SAMPLES // 4: if debug: print(f"[search] Onset at {onset_time:.3f}s too short ({full.size} samples), skipping") continue # NOTE: Do NOT pad here - must match indexing pipeline # Embedding functions handle their own padding internally try: emb_full = embed_matching_library(embedder, full, sr, lib["emb_dim_full"]) rich = extract_rich_features(full, sr) lib_rich_dim = lib["rich_features"].shape[1] if len(rich) != lib_rich_dim: if len(rich) < lib_rich_dim: rich = np.pad(rich, (0, lib_rich_dim - len(rich))) else: rich = rich[:lib_rich_dim] rich_norm = rich / (np.linalg.norm(rich) + 1e-9) patch_multi = mel_patch_multiscale(full, sr) mel_full = mel_shape(full, sr) patch_t_sh = mel_patch_shifted(trans, sr) if trans.size > 100 else np.zeros(SHIFTED_PATCH_DIM, np.float32) patch_b_sh = mel_patch_shifted(tail, sr) if tail.size > 100 else np.zeros(SHIFTED_PATCH_DIM, np.float32) trans_d = transient_descriptor(trans, sr) if trans.size > 100 else np.zeros(TRANS_DESC_DIM, np.float32) except Exception as e: if debug: print(f"[search] Feature extraction failed: {e}") continue # Stage A: Perceptual retrieval sim_rich = _chunked_dot(lib["rich_features"], rich_norm) sim_patch = _chunked_dot(lib["patch_multi"], patch_multi) sim_mel = _chunked_dot(lib["mel"], mel_full) sim_emb = _chunked_dot(lib["emb"], emb_full) perceptual = W_RICH * sim_rich + W_PATCH * sim_patch + W_MEL * sim_mel + W_EMB_STAGE_A * sim_emb # Debug: show where specific clip ranks if debug_clip_id is not None: try: clip_idx = np.where(lib["ids"] == debug_clip_id)[0] if len(clip_idx) > 0: idx = clip_idx[0] stage_a_rank = int(np.sum(perceptual > perceptual[idx])) + 1 print(f"\n[DEBUG] Clip {debug_clip_id} Stage A analysis:") print(f" Stage A rank: {stage_a_rank} / {len(perceptual)}") print(f" Stage A score: {perceptual[idx]:.4f}") print(f" Components:") print(f" sim_rich: {sim_rich[idx]:.4f} (ร— {W_RICH} = {W_RICH * sim_rich[idx]:.4f})") print(f" sim_patch: {sim_patch[idx]:.4f} (ร— {W_PATCH} = {W_PATCH * sim_patch[idx]:.4f})") print(f" sim_mel: {sim_mel[idx]:.4f} (ร— {W_MEL} = {W_MEL * sim_mel[idx]:.4f})") print(f" sim_emb: {sim_emb[idx]:.4f} (ร— {W_EMB_STAGE_A} = {W_EMB_STAGE_A * sim_emb[idx]:.4f})") print(f" Top 5 Stage A scores: {sorted(perceptual, reverse=True)[:5]}") print(f" In top {rerank_top_n}? {'YES' if stage_a_rank <= rerank_top_n else 'NO'}") else: print(f"[DEBUG] Clip {debug_clip_id} not found in library!") except Exception as e: print(f"[DEBUG] Error: {e}") if best_scores is None or np.max(perceptual) > np.max(best_scores): best_scores = perceptual best_features = { 'full': full, 'trans': trans, 'tail': tail, 'emb_full': emb_full, 'patch_t_shifted': patch_t_sh, 'patch_b_shifted': patch_b_sh, 'trans_desc': trans_d, } best_sims = { 'sim_rich': sim_rich, 'sim_patch': sim_patch, 'sim_mel': sim_mel, 'sim_emb': sim_emb, } if best_scores is None: return ([], session_id) if return_session_id else [] scores = best_scores.copy() # Stage B: Perceptual rerank shortlist_idx = np.argsort(-scores)[:min(rerank_top_n, len(scores))] sim_patch_t = _best_shift_sim(best_features['patch_t_shifted'], lib["patch_t_shifted"][shortlist_idx]) sim_patch_b = _best_shift_sim(best_features['patch_b_shifted'], lib["patch_b_shifted"][shortlist_idx]) sim_trans = lib["trans_desc"][shortlist_idx] @ best_features['trans_desc'] rerank_bonus = W_PATCH_SHIFT_T * sim_patch_t + W_PATCH_SHIFT_B * sim_patch_b + W_TRANS_DESC * sim_trans scores[shortlist_idx] += rerank_bonus # Stage C: Identity boost (gated) identity_scores = np.zeros(len(shortlist_idx), np.float32) if FULL_MODE and best_features is not None: trans, tail = best_features['trans'], best_features['tail'] if trans.size > 100 and tail.size > 100: try: emb_trans = embed_matching_library(embedder, trans, sr, lib["emb_dim_t"]) emb_tail = embed_matching_library(embedder, tail, sr, lib["emb_dim_b"]) sim_full = lib["emb"][shortlist_idx] @ best_features['emb_full'] sim_trans_emb = lib["emb_t"][shortlist_idx] @ emb_trans sim_tail_emb = lib["emb_b"][shortlist_idx] @ emb_tail identity_scores = W_ID_FULL * sim_full + W_ID_TRANS * sim_trans_emb + W_ID_TAIL * sim_tail_emb boost = np.maximum(0.0, identity_scores - IDENTITY_THRESHOLD) scores[shortlist_idx] += IDENTITY_BOOST_WEIGHT * boost except: pass # Stage D: Personalization model = get_personalization_model(db_path) alpha = model.get_blend_alpha() if alpha > 0: # Build feature matrix for personalization n_short = len(shortlist_idx) feature_matrix = np.zeros((n_short, N_FEATURES), np.float32) feature_matrix[:, 0] = best_sims['sim_rich'][shortlist_idx] feature_matrix[:, 1] = best_sims['sim_patch'][shortlist_idx] feature_matrix[:, 2] = best_sims['sim_mel'][shortlist_idx] feature_matrix[:, 3] = sim_patch_t feature_matrix[:, 4] = sim_patch_b feature_matrix[:, 5] = sim_trans feature_matrix[:, 6] = best_sims['sim_emb'][shortlist_idx] # Trans/tail embedding sims if FULL_MODE and 'emb_trans' in dir() and 'emb_tail' in dir(): feature_matrix[:, 7] = lib["emb_t"][shortlist_idx] @ emb_trans feature_matrix[:, 8] = lib["emb_b"][shortlist_idx] @ emb_tail else: feature_matrix[:, 7] = identity_scores * 0.4 feature_matrix[:, 8] = identity_scores * 0.3 feature_matrix[:, 9] = 1.0 # Bias # Compute personalized scores personal_scores = model.score_batch(feature_matrix) # Blend base_scores = scores[shortlist_idx] blended = (1 - alpha) * base_scores + alpha * personal_scores scores[shortlist_idx] = blended if debug: print(f"[search] Personalization: alpha={alpha:.2f}, pairs={model.n_pairs_trained}") # Store candidate features for feedback final_idx = np.argsort(-scores)[:min(top_k + exploration_inject, len(scores))] # Exploration: optionally inject a few from deeper in the ranking (disabled by default) if exploration_inject > 0 and len(scores) > exploration_pool: explore_pool_idx = np.argsort(-scores)[top_k:exploration_pool] if len(explore_pool_idx) >= exploration_inject: explore_idx = np.random.choice(explore_pool_idx, exploration_inject, replace=False) # Add exploration items but keep everything sorted by score final_idx = np.concatenate([final_idx[:top_k], explore_idx]) # ALWAYS sort by score (highest first) - deterministic results final_idx = final_idx[np.argsort(-scores[final_idx])][:top_k] # Store features for feedback clip_ids = [int(lib["ids"][i]) for i in final_idx] features_list = [] candidate_scores = [] candidate_ranks = [] for rank, i in enumerate(final_idx): # Build feature vector feat = np.zeros(N_FEATURES, np.float32) feat[0] = best_sims['sim_rich'][i] feat[1] = best_sims['sim_patch'][i] feat[2] = best_sims['sim_mel'][i] # Get shift-tolerant sims (need to recompute for non-shortlist items) if i in shortlist_idx: idx_in_short = np.where(shortlist_idx == i)[0][0] feat[3] = sim_patch_t[idx_in_short] feat[4] = sim_patch_b[idx_in_short] feat[5] = sim_trans[idx_in_short] feat[7] = feature_matrix[idx_in_short, 7] if alpha > 0 else 0 feat[8] = feature_matrix[idx_in_short, 8] if alpha > 0 else 0 else: feat[3] = feat[4] = feat[5] = feat[7] = feat[8] = 0 feat[6] = best_sims['sim_emb'][i] feat[9] = 1.0 features_list.append(feat) candidate_scores.append(float(scores[i])) candidate_ranks.append(rank + 1) store_candidate_features_batch(db_path, session_id, clip_ids, features_list, scores=candidate_scores, ranks=candidate_ranks) # Build results results = [{ "id": int(lib["ids"][i]), "score": float(scores[i]), "title": str(lib["titles"][i]), "url": str(lib["urls"][i]), "t0": float(lib["t0s"][i]), "t1": float(lib["t1s"][i]), "ver": int(lib["vers"][i]) if "vers" in lib else 1, "rank": rank + 1, "session_id": session_id, } for rank, i in enumerate(final_idx)] if return_session_id: return results, session_id return results # Compatibility def search_library_v66(embedder, query_bytes, lib, **kwargs): return search_library(embedder, query_bytes, lib, **kwargs) def debug_search_for_clip(embedder, query_bytes, lib, clip_id, db_path=DEFAULT_DB_PATH): """ Debug why a specific clip isn't ranking well. Usage: embedder = scout.get_embedder() lib = scout.load_library_matrices(db_path) with open("your_sample.wav", "rb") as f: query = f.read() scout.debug_search_for_clip(embedder, query, lib, clip_id=12345) """ print(f"\n{'='*60}") print(f"DEBUGGING CLIP {clip_id}") print(f"{'='*60}") # Check clip exists if clip_id not in lib["ids"]: print(f"ERROR: Clip {clip_id} not in loaded library!") print(f"Library has {len(lib['ids'])} clips, IDs range from {lib['ids'].min()} to {lib['ids'].max()}") return idx = np.where(lib["ids"] == clip_id)[0][0] print(f"Clip title: {lib['titles'][idx]}") print(f"Clip t0: {lib['t0s'][idx]:.2f}s") # Run search with debug results = search_library( embedder, query_bytes, lib, top_k=100, debug=True, db_path=db_path, debug_clip_id=clip_id, rerank_top_n=2000 # Increase to see if it helps ) # Check if clip appeared in results found = False for r in results: if r["id"] == clip_id: print(f"\nโœ“ Clip FOUND in results at rank #{r['rank']} with score {r['score']:.4f}") found = True break if not found: print(f"\nโœ— Clip NOT in top {len(results)} results") print(f"{'='*60}\n") return results # ============================================================================= # Preview / Deep index # ============================================================================= def fetch_preview_wav_bytes(url, t0, pre=0.1, post=0.9, timeout=70): start, end = max(0, t0 - pre), t0 + post with tempfile.TemporaryDirectory() as td: ok, log = download_wav_section(url, start, end, os.path.join(td, "p.wav"), timeout) wav = next((os.path.join(td, f) for f in os.listdir(td) if f.endswith(".wav")), None) if not wav: bio = io.BytesIO() sf.write(bio, np.zeros(int((end - start) * 48000), np.float32), 48000, format="WAV") return bio.getvalue(), log or "failed" try: y, sr = sf.read(wav, dtype="float32", always_2d=False) y, sr = _resample_mono(y, sr, 48000) bio = io.BytesIO() sf.write(bio, y, sr, format="WAV") return bio.getvalue(), "" except Exception as e: bio = io.BytesIO() sf.write(bio, np.zeros(int((end - start) * 48000), np.float32), 48000, format="WAV") return bio.getvalue(), str(e) def get_video_clip_count(db_path, url): vid = None if "youtube.com" in url or "youtu.be" in url: if "v=" in url: vid = url.split("v=")[1].split("&")[0] elif "youtu.be/" in url: vid = url.split("youtu.be/")[1].split("?")[0] if not vid: return 0 try: con = _connect_db(db_path) count = con.execute("SELECT COUNT(*) FROM clips WHERE video_id=?", (vid,)).fetchone()[0] con.close() return count except: return 0 def deep_index_video(embedder, db_path, url, max_hits=100, window_secs=30, progress_cb=None): init_db(db_path) con = _connect_db(db_path) vid = None if "v=" in url: vid = url.split("v=")[1].split("&")[0].split("?")[0].strip() elif "youtu.be/" in url: vid = url.split("youtu.be/")[1].split("?")[0].split("&")[0].strip() if not vid or len(vid) < 5: return 0, "Invalid YouTube URL" url = f"https://www.youtube.com/watch?v={vid}" dur = get_video_duration_seconds(url) or 600 title = get_video_title(url) existing = con.execute("SELECT COUNT(*) FROM clips WHERE video_id=?", (vid,)).fetchone()[0] if existing > 0: con.execute("DELETE FROM clips WHERE video_id=?", (vid,)) con.commit() windows = [] start = 0.0 while start < dur: windows.append((start, min(start + window_secs, dur))) start += window_secs - 2 total_added = 0 consecutive_failures = 0 with tempfile.TemporaryDirectory() as td: for wi, (start, end) in enumerate(windows): if progress_cb: progress_cb(wi + 1, len(windows), total_added) if total_added >= max_hits or consecutive_failures >= 3: break wav_path = os.path.join(td, f"seg_{wi}.wav") success, log = download_wav_section(url, start, end, wav_path, timeout=90) wav_file = next((os.path.join(td, f) for f in os.listdir(td) if f.endswith(".wav")), None) if not wav_file: consecutive_failures += 1 continue consecutive_failures = 0 try: y, sr = sf.read(wav_file, dtype="float32", always_2d=False) except: continue y, sr = _resample_mono(y, sr, 48000) onsets = find_hit_onsets(y, sr, max_hits=max_hits - total_added) for onset in onsets: if total_added >= max_hits: break views = slice_views_from_onset(y, sr, onset) full, trans, tail = views["full"], views["trans"], views["tail"] # Skip clips that are too short for neural networks if full.size < MIN_AUDIO_SAMPLES // 2: continue t0 = max(0, start + onset - FULL_PRE) t1 = start + onset + FULL_POST if con.execute("SELECT 1 FROM clips WHERE video_id=? AND ABS(t0-?)<0.05", (vid, t0)).fetchone(): continue z = lambda d: np.zeros(d, np.float32) try: emb_full = embed_full(embedder, full, sr) except: continue if FULL_MODE: try: emb_t = embed_detail(embedder, trans, sr) if trans.size > 100 else z(1024) emb_b = embed_detail(embedder, tail, sr) if tail.size > 100 else z(1024) except: emb_t, emb_b = z(1024), z(1024) else: emb_t, emb_b = emb_full, z(len(emb_full)) _insert_clip(con, vid, title, url, t0, t1, emb_full, mel_shape(full, sr), emb_t, emb_b, mel_shape(trans, sr) if trans.size > 100 else z(64), mel_shape(tail, sr) if tail.size > 100 else z(64), mel_patch(trans, sr) if trans.size > 100 else z(PATCH_DIM), mel_patch(tail, sr) if tail.size > 100 else z(PATCH_DIM), mel_patch_shifted(trans, sr) if trans.size > 100 else z(SHIFTED_PATCH_DIM), mel_patch_shifted(tail, sr) if tail.size > 100 else z(SHIFTED_PATCH_DIM), transient_descriptor(trans, sr) if trans.size > 100 else z(TRANS_DESC_DIM), mel_patch_multiscale(full, sr), extract_rich_features(full, sr)) total_added += 1 con.commit() try: os.remove(wav_file) except: pass _mark_video(con, vid, title, "ok") con.close() return total_added, f"Added {total_added} clips from '{title}'" # Compatibility _NEGATIVE_PROMPTS = ["doorbell", "alarm", "siren", "phone ringing", "beep tone"] _POSITIVE_PROMPTS = ["rimshot", "snare drum hit", "hand clap", "door knock", "metal hit"] def smart_queries_from_sample(embedder, query_bytes): return ["percussion one shot", "drum sample"] * 4