# twin_frequency_trust.py import numpy as np import wave from dataclasses import dataclass from typing import Optional, Tuple, List, Dict def _frame_hop_sampler(wav_path: str, frame_ms: float = 200.0, hop_ms: float = 100.0): """Yield mono float32 frames from a WAV file with overlap, normalized to [-1,1].""" with wave.open(wav_path, 'rb') as wf: n_channels = wf.getnchannels() sampwidth = wf.getsampwidth() framerate = wf.getframerate() n_frames = wf.getnframes() frame_size = int(framerate * frame_ms / 1000.0) hop_size = int(framerate * hop_ms / 1000.0) raw = wf.readframes(n_frames) dtype = {1: np.int8, 2: np.int16, 3: np.int32, 4: np.int32}[sampwidth] data = np.frombuffer(raw, dtype=dtype).astype(np.float32) if n_channels > 1: data = data.reshape(-1, n_channels).mean(axis=1) max_abs = np.max(np.abs(data)) or 1.0 data = data / max_abs for start in range(0, len(data) - frame_size + 1, hop_size): frame = data[start:start + frame_size].copy() yield frame, framerate def _magnitude_spectrum(x: np.ndarray, samplerate: int, fft_size: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]: if fft_size is None: target = max(512, int(2 ** np.ceil(np.log2(len(x))))) fft_size = min(target, 16384) if len(x) < fft_size: pad = np.zeros(fft_size, dtype=np.float32) pad[:len(x)] = x xw = pad else: xw = x[:fft_size] win = np.hanning(len(xw)).astype(np.float32) xw = xw * win X = np.fft.rfft(xw, n=fft_size) mag = np.abs(X).astype(np.float32) mag[0] = 0.0 mag = np.log1p(mag) kernel = np.ones(5, dtype=np.float32) / 5.0 env = np.convolve(mag, kernel, mode='same') + 1e-6 mag_w = mag / env norm = np.linalg.norm(mag_w) or 1.0 mag_n = mag_w / norm freqs = np.fft.rfftfreq(fft_size, d=1.0 / samplerate).astype(np.float32) return mag_n, freqs def _find_peaks(mag: np.ndarray, freqs: np.ndarray, min_hz: float = 40.0, max_hz: float = 8000.0, top_k: int = 10, threshold_quantile: float = 0.90) -> Tuple[np.ndarray, np.ndarray]: mask = (freqs >= min_hz) & (freqs <= max_hz) cand_mags = mag[mask] cand_freqs = freqs[mask] if cand_mags.size == 0: return np.array([]), np.array([]) thresh = np.quantile(cand_mags, threshold_quantile) idx = np.where(cand_mags >= thresh)[0] order = np.argsort(cand_mags[idx])[::-1][:top_k] sel_mags = cand_mags[idx][order] sel_freqs = cand_freqs[idx][order] return sel_freqs, sel_mags @dataclass class SpectralSignature: fft_size: int samplerate: int ref_vector: np.ndarray peak_freqs: np.ndarray peak_mags: np.ndarray def build_reference_signature(wav_path: str, frame_ms: float = 400.0) -> SpectralSignature: frames = list(_frame_hop_sampler(wav_path, frame_ms=frame_ms, hop_ms=frame_ms)) if not frames: raise ValueError("No frames read from WAV.") n_avg = min(5, len(frames)) mags = [] for i in range(n_avg): frame, sr = frames[i] mag, freqs = _magnitude_spectrum(frame, sr) mags.append(mag) ref_vec = np.mean(np.stack(mags, axis=0), axis=0).astype(np.float32) ref_vec = ref_vec / (np.linalg.norm(ref_vec) or 1.0) peak_freqs, peak_mags = _find_peaks(ref_vec, freqs) return SpectralSignature(fft_size=len(ref_vec) * 2 - 2, samplerate=sr, ref_vector=ref_vec, peak_freqs=peak_freqs, peak_mags=peak_mags) def spectral_cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: if a.shape != b.shape: n = min(len(a), len(b)) a = a[:n] b = b[:n] denom = (np.linalg.norm(a) or 1.0) * (np.linalg.norm(b) or 1.0) return float(np.dot(a, b) / denom) def peak_overlap_score(freqs_a: np.ndarray, freqs_b: np.ndarray, tol_hz: float = 5.0) -> float: if len(freqs_a) == 0 or len(freqs_b) == 0: return 0.0 hits = 0 for fa in freqs_a: if np.any(np.abs(freqs_b - fa) <= tol_hz): hits += 1 return hits / max(1, len(freqs_a)) @dataclass class TwinTrustConfig: frame_ms: float = 200.0 hop_ms: float = 100.0 min_hz: float = 40.0 max_hz: float = 8000.0 top_k_peaks: int = 10 peak_tol_hz: float = 5.0 alpha_cosine: float = 0.7 alpha_peaks: float = 0.3 class TwinFrequencyTrust: def __init__(self, signature: SpectralSignature, cfg: Optional[TwinTrustConfig] = None): self.sig = signature self.cfg = cfg or TwinTrustConfig() def score_frame(self, frame: np.ndarray, samplerate: int) -> Dict[str, float]: mag, freqs = _magnitude_spectrum(frame, samplerate, fft_size=self.sig.fft_size) cos = spectral_cosine_similarity(mag, self.sig.ref_vector) pf, pm = _find_peaks(mag, freqs, min_hz=self.cfg.min_hz, max_hz=self.cfg.max_hz, top_k=self.cfg.top_k_peaks) peak_score = peak_overlap_score(pf, self.sig.peak_freqs, tol_hz=self.cfg.peak_tol_hz) trust = self.cfg.alpha_cosine * cos + self.cfg.alpha_peaks * peak_score return {"cosine": float(cos), "peak_overlap": float(peak_score), "trust": float(trust)} def stream_score_wav(self, wav_path: str) -> List[Dict[str, float]]: scores = [] for frame, sr in _frame_hop_sampler(wav_path, frame_ms=self.cfg.frame_ms, hop_ms=self.cfg.hop_ms): s = self.score_frame(frame, sr) scores.append(s) return scores if __name__ == "__main__": import argparse, json parser = argparse.ArgumentParser(description="Twin Frequency Trust: real-time-ish spectral twin detection.") parser.add_argument("--ref", required=True, help="Path to reference WAV file.") parser.add_argument("--test", required=True, help="Path to test WAV file to score.") parser.add_argument("--frame_ms", type=float, default=200.0) parser.add_argument("--hop_ms", type=float, default=100.0) parser.add_argument("--peak_tol_hz", type=float, default=5.0) args = parser.parse_args() sig = build_reference_signature(args.ref, frame_ms=400.0) cfg = TwinTrustConfig(frame_ms=args.frame_ms, hop_ms=args.hop_ms, peak_tol_hz=args.peak_tol_hz) model = TwinFrequencyTrust(sig, cfg) scores = model.stream_score_wav(args.test) print(json.dumps(scores[:10], indent=2)) # show first few frames