File size: 6,414 Bytes
ed1b365 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | # twin_frequency_trust.py
import numpy as np
import wave
from dataclasses import dataclass
from typing import Optional, Tuple, List, Dict
def _frame_hop_sampler(wav_path: str, frame_ms: float = 200.0, hop_ms: float = 100.0):
"""Yield mono float32 frames from a WAV file with overlap, normalized to [-1,1]."""
with wave.open(wav_path, 'rb') as wf:
n_channels = wf.getnchannels()
sampwidth = wf.getsampwidth()
framerate = wf.getframerate()
n_frames = wf.getnframes()
frame_size = int(framerate * frame_ms / 1000.0)
hop_size = int(framerate * hop_ms / 1000.0)
raw = wf.readframes(n_frames)
dtype = {1: np.int8, 2: np.int16, 3: np.int32, 4: np.int32}[sampwidth]
data = np.frombuffer(raw, dtype=dtype).astype(np.float32)
if n_channels > 1:
data = data.reshape(-1, n_channels).mean(axis=1)
max_abs = np.max(np.abs(data)) or 1.0
data = data / max_abs
for start in range(0, len(data) - frame_size + 1, hop_size):
frame = data[start:start + frame_size].copy()
yield frame, framerate
def _magnitude_spectrum(x: np.ndarray, samplerate: int, fft_size: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]:
if fft_size is None:
target = max(512, int(2 ** np.ceil(np.log2(len(x)))))
fft_size = min(target, 16384)
if len(x) < fft_size:
pad = np.zeros(fft_size, dtype=np.float32)
pad[:len(x)] = x
xw = pad
else:
xw = x[:fft_size]
win = np.hanning(len(xw)).astype(np.float32)
xw = xw * win
X = np.fft.rfft(xw, n=fft_size)
mag = np.abs(X).astype(np.float32)
mag[0] = 0.0
mag = np.log1p(mag)
kernel = np.ones(5, dtype=np.float32) / 5.0
env = np.convolve(mag, kernel, mode='same') + 1e-6
mag_w = mag / env
norm = np.linalg.norm(mag_w) or 1.0
mag_n = mag_w / norm
freqs = np.fft.rfftfreq(fft_size, d=1.0 / samplerate).astype(np.float32)
return mag_n, freqs
def _find_peaks(mag: np.ndarray, freqs: np.ndarray, min_hz: float = 40.0, max_hz: float = 8000.0,
top_k: int = 10, threshold_quantile: float = 0.90) -> Tuple[np.ndarray, np.ndarray]:
mask = (freqs >= min_hz) & (freqs <= max_hz)
cand_mags = mag[mask]
cand_freqs = freqs[mask]
if cand_mags.size == 0:
return np.array([]), np.array([])
thresh = np.quantile(cand_mags, threshold_quantile)
idx = np.where(cand_mags >= thresh)[0]
order = np.argsort(cand_mags[idx])[::-1][:top_k]
sel_mags = cand_mags[idx][order]
sel_freqs = cand_freqs[idx][order]
return sel_freqs, sel_mags
@dataclass
class SpectralSignature:
fft_size: int
samplerate: int
ref_vector: np.ndarray
peak_freqs: np.ndarray
peak_mags: np.ndarray
def build_reference_signature(wav_path: str, frame_ms: float = 400.0) -> SpectralSignature:
frames = list(_frame_hop_sampler(wav_path, frame_ms=frame_ms, hop_ms=frame_ms))
if not frames:
raise ValueError("No frames read from WAV.")
n_avg = min(5, len(frames))
mags = []
for i in range(n_avg):
frame, sr = frames[i]
mag, freqs = _magnitude_spectrum(frame, sr)
mags.append(mag)
ref_vec = np.mean(np.stack(mags, axis=0), axis=0).astype(np.float32)
ref_vec = ref_vec / (np.linalg.norm(ref_vec) or 1.0)
peak_freqs, peak_mags = _find_peaks(ref_vec, freqs)
return SpectralSignature(fft_size=len(ref_vec) * 2 - 2, samplerate=sr,
ref_vector=ref_vec, peak_freqs=peak_freqs, peak_mags=peak_mags)
def spectral_cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
if a.shape != b.shape:
n = min(len(a), len(b))
a = a[:n]
b = b[:n]
denom = (np.linalg.norm(a) or 1.0) * (np.linalg.norm(b) or 1.0)
return float(np.dot(a, b) / denom)
def peak_overlap_score(freqs_a: np.ndarray, freqs_b: np.ndarray, tol_hz: float = 5.0) -> float:
if len(freqs_a) == 0 or len(freqs_b) == 0:
return 0.0
hits = 0
for fa in freqs_a:
if np.any(np.abs(freqs_b - fa) <= tol_hz):
hits += 1
return hits / max(1, len(freqs_a))
@dataclass
class TwinTrustConfig:
frame_ms: float = 200.0
hop_ms: float = 100.0
min_hz: float = 40.0
max_hz: float = 8000.0
top_k_peaks: int = 10
peak_tol_hz: float = 5.0
alpha_cosine: float = 0.7
alpha_peaks: float = 0.3
class TwinFrequencyTrust:
def __init__(self, signature: SpectralSignature, cfg: Optional[TwinTrustConfig] = None):
self.sig = signature
self.cfg = cfg or TwinTrustConfig()
def score_frame(self, frame: np.ndarray, samplerate: int) -> Dict[str, float]:
mag, freqs = _magnitude_spectrum(frame, samplerate, fft_size=self.sig.fft_size)
cos = spectral_cosine_similarity(mag, self.sig.ref_vector)
pf, pm = _find_peaks(mag, freqs, min_hz=self.cfg.min_hz, max_hz=self.cfg.max_hz, top_k=self.cfg.top_k_peaks)
peak_score = peak_overlap_score(pf, self.sig.peak_freqs, tol_hz=self.cfg.peak_tol_hz)
trust = self.cfg.alpha_cosine * cos + self.cfg.alpha_peaks * peak_score
return {"cosine": float(cos), "peak_overlap": float(peak_score), "trust": float(trust)}
def stream_score_wav(self, wav_path: str) -> List[Dict[str, float]]:
scores = []
for frame, sr in _frame_hop_sampler(wav_path, frame_ms=self.cfg.frame_ms, hop_ms=self.cfg.hop_ms):
s = self.score_frame(frame, sr)
scores.append(s)
return scores
if __name__ == "__main__":
import argparse, json
parser = argparse.ArgumentParser(description="Twin Frequency Trust: real-time-ish spectral twin detection.")
parser.add_argument("--ref", required=True, help="Path to reference WAV file.")
parser.add_argument("--test", required=True, help="Path to test WAV file to score.")
parser.add_argument("--frame_ms", type=float, default=200.0)
parser.add_argument("--hop_ms", type=float, default=100.0)
parser.add_argument("--peak_tol_hz", type=float, default=5.0)
args = parser.parse_args()
sig = build_reference_signature(args.ref, frame_ms=400.0)
cfg = TwinTrustConfig(frame_ms=args.frame_ms, hop_ms=args.hop_ms, peak_tol_hz=args.peak_tol_hz)
model = TwinFrequencyTrust(sig, cfg)
scores = model.stream_score_wav(args.test)
print(json.dumps(scores[:10], indent=2)) # show first few frames
|