File size: 6,414 Bytes
ed1b365
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# twin_frequency_trust.py
import numpy as np
import wave
from dataclasses import dataclass
from typing import Optional, Tuple, List, Dict

def _frame_hop_sampler(wav_path: str, frame_ms: float = 200.0, hop_ms: float = 100.0):
    """Yield mono float32 frames from a WAV file with overlap, normalized to [-1,1]."""
    with wave.open(wav_path, 'rb') as wf:
        n_channels = wf.getnchannels()
        sampwidth = wf.getsampwidth()
        framerate = wf.getframerate()
        n_frames = wf.getnframes()
        frame_size = int(framerate * frame_ms / 1000.0)
        hop_size = int(framerate * hop_ms / 1000.0)

        raw = wf.readframes(n_frames)
        dtype = {1: np.int8, 2: np.int16, 3: np.int32, 4: np.int32}[sampwidth]
        data = np.frombuffer(raw, dtype=dtype).astype(np.float32)
        if n_channels > 1:
            data = data.reshape(-1, n_channels).mean(axis=1)
        max_abs = np.max(np.abs(data)) or 1.0
        data = data / max_abs

        for start in range(0, len(data) - frame_size + 1, hop_size):
            frame = data[start:start + frame_size].copy()
            yield frame, framerate

def _magnitude_spectrum(x: np.ndarray, samplerate: int, fft_size: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]:
    if fft_size is None:
        target = max(512, int(2 ** np.ceil(np.log2(len(x)))))
        fft_size = min(target, 16384)
    if len(x) < fft_size:
        pad = np.zeros(fft_size, dtype=np.float32)
        pad[:len(x)] = x
        xw = pad
    else:
        xw = x[:fft_size]
    win = np.hanning(len(xw)).astype(np.float32)
    xw = xw * win
    X = np.fft.rfft(xw, n=fft_size)
    mag = np.abs(X).astype(np.float32)
    mag[0] = 0.0
    mag = np.log1p(mag)
    kernel = np.ones(5, dtype=np.float32) / 5.0
    env = np.convolve(mag, kernel, mode='same') + 1e-6
    mag_w = mag / env
    norm = np.linalg.norm(mag_w) or 1.0
    mag_n = mag_w / norm
    freqs = np.fft.rfftfreq(fft_size, d=1.0 / samplerate).astype(np.float32)
    return mag_n, freqs

def _find_peaks(mag: np.ndarray, freqs: np.ndarray, min_hz: float = 40.0, max_hz: float = 8000.0,
                top_k: int = 10, threshold_quantile: float = 0.90) -> Tuple[np.ndarray, np.ndarray]:
    mask = (freqs >= min_hz) & (freqs <= max_hz)
    cand_mags = mag[mask]
    cand_freqs = freqs[mask]
    if cand_mags.size == 0:
        return np.array([]), np.array([])
    thresh = np.quantile(cand_mags, threshold_quantile)
    idx = np.where(cand_mags >= thresh)[0]
    order = np.argsort(cand_mags[idx])[::-1][:top_k]
    sel_mags = cand_mags[idx][order]
    sel_freqs = cand_freqs[idx][order]
    return sel_freqs, sel_mags

@dataclass
class SpectralSignature:
    fft_size: int
    samplerate: int
    ref_vector: np.ndarray
    peak_freqs: np.ndarray
    peak_mags: np.ndarray

def build_reference_signature(wav_path: str, frame_ms: float = 400.0) -> SpectralSignature:
    frames = list(_frame_hop_sampler(wav_path, frame_ms=frame_ms, hop_ms=frame_ms))
    if not frames:
        raise ValueError("No frames read from WAV.")
    n_avg = min(5, len(frames))
    mags = []
    for i in range(n_avg):
        frame, sr = frames[i]
        mag, freqs = _magnitude_spectrum(frame, sr)
        mags.append(mag)
    ref_vec = np.mean(np.stack(mags, axis=0), axis=0).astype(np.float32)
    ref_vec = ref_vec / (np.linalg.norm(ref_vec) or 1.0)
    peak_freqs, peak_mags = _find_peaks(ref_vec, freqs)
    return SpectralSignature(fft_size=len(ref_vec) * 2 - 2, samplerate=sr,
                             ref_vector=ref_vec, peak_freqs=peak_freqs, peak_mags=peak_mags)

def spectral_cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    if a.shape != b.shape:
        n = min(len(a), len(b))
        a = a[:n]
        b = b[:n]
    denom = (np.linalg.norm(a) or 1.0) * (np.linalg.norm(b) or 1.0)
    return float(np.dot(a, b) / denom)

def peak_overlap_score(freqs_a: np.ndarray, freqs_b: np.ndarray, tol_hz: float = 5.0) -> float:
    if len(freqs_a) == 0 or len(freqs_b) == 0:
        return 0.0
    hits = 0
    for fa in freqs_a:
        if np.any(np.abs(freqs_b - fa) <= tol_hz):
            hits += 1
    return hits / max(1, len(freqs_a))

@dataclass
class TwinTrustConfig:
    frame_ms: float = 200.0
    hop_ms: float = 100.0
    min_hz: float = 40.0
    max_hz: float = 8000.0
    top_k_peaks: int = 10
    peak_tol_hz: float = 5.0
    alpha_cosine: float = 0.7
    alpha_peaks: float = 0.3

class TwinFrequencyTrust:
    def __init__(self, signature: SpectralSignature, cfg: Optional[TwinTrustConfig] = None):
        self.sig = signature
        self.cfg = cfg or TwinTrustConfig()

    def score_frame(self, frame: np.ndarray, samplerate: int) -> Dict[str, float]:
        mag, freqs = _magnitude_spectrum(frame, samplerate, fft_size=self.sig.fft_size)
        cos = spectral_cosine_similarity(mag, self.sig.ref_vector)
        pf, pm = _find_peaks(mag, freqs, min_hz=self.cfg.min_hz, max_hz=self.cfg.max_hz, top_k=self.cfg.top_k_peaks)
        peak_score = peak_overlap_score(pf, self.sig.peak_freqs, tol_hz=self.cfg.peak_tol_hz)
        trust = self.cfg.alpha_cosine * cos + self.cfg.alpha_peaks * peak_score
        return {"cosine": float(cos), "peak_overlap": float(peak_score), "trust": float(trust)}

    def stream_score_wav(self, wav_path: str) -> List[Dict[str, float]]:
        scores = []
        for frame, sr in _frame_hop_sampler(wav_path, frame_ms=self.cfg.frame_ms, hop_ms=self.cfg.hop_ms):
            s = self.score_frame(frame, sr)
            scores.append(s)
        return scores

if __name__ == "__main__":
    import argparse, json
    parser = argparse.ArgumentParser(description="Twin Frequency Trust: real-time-ish spectral twin detection.")
    parser.add_argument("--ref", required=True, help="Path to reference WAV file.")
    parser.add_argument("--test", required=True, help="Path to test WAV file to score.")
    parser.add_argument("--frame_ms", type=float, default=200.0)
    parser.add_argument("--hop_ms", type=float, default=100.0)
    parser.add_argument("--peak_tol_hz", type=float, default=5.0)
    args = parser.parse_args()

    sig = build_reference_signature(args.ref, frame_ms=400.0)
    cfg = TwinTrustConfig(frame_ms=args.frame_ms, hop_ms=args.hop_ms, peak_tol_hz=args.peak_tol_hz)
    model = TwinFrequencyTrust(sig, cfg)
    scores = model.stream_score_wav(args.test)
    print(json.dumps(scores[:10], indent=2))  # show first few frames