voice-authenticity-openenv / scripts /extract_features.py
Akki0404's picture
2-step loop, harder adversarial, updated README, fix score calculation
baa1f37
import numpy as np
import librosa
import parselmouth
from parselmouth.praat import call
import os
import warnings
warnings.filterwarnings("ignore")
REAL_DIR = "data/real"
FAKE_DIR = "data/fake"
OUTPUT_DIR = "environment/data"
os.makedirs(OUTPUT_DIR, exist_ok=True)
def extract_features(file_path):
"""
Extract 48-dim feature vector from audio file.
Returns None if file fails.
"""
try:
# Load audio
y, sr = librosa.load(file_path, sr=16000, duration=5.0)
if len(y) < 1600: # skip clips shorter than 0.1s
return None
# ── MFCC (40 features) ──────────────────────────────
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20)
mfcc_mean = mfcc.mean(axis=1) # 20 values
mfcc_std = mfcc.std(axis=1) # 20 values
# ── Spectral features (2 features) ──────────────────
zcr = librosa.feature.zero_crossing_rate(y).mean()
spec_centroid = librosa.feature.spectral_centroid(
y=y, sr=sr).mean()
# ── Voice authenticity features (3 features) ────────
# These are the KEY discriminators between real and fake
try:
snd = parselmouth.Sound(file_path)
pp = call(snd, "To PointProcess (periodic, cc)", 75, 500)
jitter = call(
pp, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3
)
shimmer = call(
[snd, pp], "Get shimmer (local)",
0, 0, 0.0001, 0.02, 1.3, 1.6
)
harmonicity = call(
snd, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0
)
hnr = call(harmonicity, "Get mean", 0, 0)
# Replace NaN/inf with 0
jitter = float(jitter) if np.isfinite(jitter) else 0.0
shimmer = float(shimmer) if np.isfinite(shimmer) else 0.0
hnr = float(hnr) if np.isfinite(hnr) else 0.0
except Exception:
jitter, shimmer, hnr = 0.0, 0.0, 0.0
# ── Compression artifact features (3 features) ──────
# Simulates codec degradation for task 2
spec_bandwidth = librosa.feature.spectral_bandwidth(
y=y, sr=sr).mean()
spec_rolloff = librosa.feature.spectral_rolloff(
y=y, sr=sr).mean()
rms = librosa.feature.rms(y=y).mean()
# ── Assemble final 48-dim vector ─────────────────────
features = np.concatenate([
mfcc_mean, # 0-19
mfcc_std, # 20-39
[zcr, spec_centroid], # 40-41
[jitter, shimmer, hnr], # 42-44
[spec_bandwidth, spec_rolloff, rms] # 45-47
])
return features.astype(np.float32)
except Exception as e:
print(f" ERROR on {file_path}: {e}")
return None
def process_directory(directory, label, desc):
files = [
f for f in os.listdir(directory)
if f.endswith((".wav", ".flac", ".mp3"))
]
print(f"\nProcessing {desc}: {len(files)} files found")
features_list = []
labels_list = []
failed = 0
for i, fname in enumerate(files):
path = os.path.join(directory, fname)
feat = extract_features(path)
if feat is not None:
features_list.append(feat)
labels_list.append(label)
if (i + 1) % 50 == 0:
print(f" {i+1}/{len(files)} done...")
else:
failed += 1
print(f" Success: {len(features_list)}, Failed: {failed}")
return features_list, labels_list
def add_compression_artifacts(features, strength=0.3):
degraded = features.copy()
degraded[20:40] *= (1 - strength * np.random.uniform(0.5, 1.0, 20))
degraded[42] *= (1 - strength * np.random.uniform(0.3, 0.7))
degraded[43] *= (1 - strength * np.random.uniform(0.3, 0.7))
degraded[44] *= (1 + strength * np.random.uniform(0.1, 0.4))
degraded[45] *= (1 + strength * np.random.uniform(0.3, 0.8))
degraded[46] *= (1 - strength * np.random.uniform(0.2, 0.6))
degraded[47] += strength * np.random.uniform(0.1, 0.4)
return degraded
def add_adversarial_perturbation(features, label):
"""
True adversarial: create overlapping distributions.
Fake audio shifted INTO real speech range.
Real audio shifted TOWARD synthetic range.
No clean threshold can separate them.
"""
perturbed = features.copy()
if label == 1: # fake β†’ make it look real
# Push jitter into real range
perturbed[42] += np.random.uniform(0.010, 0.025)
# Push shimmer into real range
perturbed[43] += np.random.uniform(0.020, 0.060)
# Lower HNR toward real range
perturbed[44] -= np.random.uniform(2.0, 5.0)
# Add slight MFCC variation
perturbed[20:30] += np.random.normal(0, 0.3, 10)
elif label == 0: # real β†’ push toward synthetic range
# Suppress jitter slightly
perturbed[42] *= np.random.uniform(0.6, 0.85)
# Suppress shimmer slightly
perturbed[43] *= np.random.uniform(0.6, 0.85)
# Raise HNR slightly
perturbed[44] += np.random.uniform(0.5, 2.0)
# Add 8% label noise β€” some samples are deliberately mislabeled
# to simulate real-world distribution ambiguity
if np.random.random() < 0.08:
perturbed += np.random.normal(0, 0.5, len(perturbed))
return perturbed
def main():
print("=" * 50)
print("Feature Extraction Pipeline")
print("=" * 50)
real_feat, real_labels = process_directory(
REAL_DIR, label=0, desc="REAL audio"
)
fake_feat, fake_labels = process_directory(
FAKE_DIR, label=1, desc="FAKE audio"
)
all_features = np.array(real_feat + fake_feat, dtype=np.float32)
all_labels = np.array(real_labels + fake_labels, dtype=np.int32)
idx = np.random.permutation(len(all_labels))
all_features = all_features[idx]
all_labels = all_labels[idx]
mean = all_features.mean(axis=0)
std = all_features.std(axis=0) + 1e-8
all_features_norm = (all_features - mean) / std
np.save(f"{OUTPUT_DIR}/features.npy", all_features_norm)
# Save raw unnormalized features for env to use
np.save(f"{OUTPUT_DIR}/features_raw.npy", all_features)
np.save(f"{OUTPUT_DIR}/labels.npy", all_labels)
np.save(f"{OUTPUT_DIR}/mean.npy", mean)
np.save(f"{OUTPUT_DIR}/std.npy", std)
print(f"\nTask 1 (clean): {len(all_labels)} samples saved")
# ── TASK 2: Compressed features ─────────────────────────
compressed_features = np.array([
add_compression_artifacts(f, strength=0.3)
for f in (real_feat + fake_feat)
], dtype=np.float32)
compressed_features = compressed_features[idx]
compressed_norm = (compressed_features - mean) / std
np.save(f"{OUTPUT_DIR}/features_compressed.npy", compressed_norm)
np.save(f"{OUTPUT_DIR}/labels_compressed.npy", all_labels)
print(f"Task 2 (compressed): {len(all_labels)} samples saved")
# ── TASK 3: Adversarial features ────────────────────────
raw_combined = real_feat + fake_feat
raw_labels_combined = real_labels + fake_labels
adversarial_features = np.array([
add_adversarial_perturbation(f, l)
for f, l in zip(raw_combined, raw_labels_combined)
], dtype=np.float32)
adversarial_features = adversarial_features[idx]
adversarial_norm = (adversarial_features - mean) / std
np.save(f"{OUTPUT_DIR}/features_adversarial.npy", adversarial_norm)
np.save(f"{OUTPUT_DIR}/labels_adversarial.npy", all_labels)
print(f"Task 3 (adversarial): {len(all_labels)} samples saved")
print(f"\n{'='*50}")
print("DONE")
print(f"Total samples : {len(all_labels)}")
print(f"Real samples : {all_labels.tolist().count(0)}")
print(f"Fake samples : {all_labels.tolist().count(1)}")
print(f"Feature shape : {all_features_norm.shape}")
print(f"{'='*50}")
print("\nSanity check β€” jitter/shimmer/HNR comparison:")
for i in range(min(2, len(all_labels))):
label_str = "REAL" if all_labels[i] == 0 else "FAKE"
print(f"\n [{label_str}]")
print(f" Clean β†’ jitter={all_features[i][42]:.4f} shimmer={all_features[i][43]:.4f} hnr={all_features[i][44]:.4f}")
print(f" Compressed β†’ jitter={compressed_features[i][42]:.4f} shimmer={compressed_features[i][43]:.4f} hnr={compressed_features[i][44]:.4f}")
print(f" Adversarial→ jitter={adversarial_features[i][42]:.4f} shimmer={adversarial_features[i][43]:.4f} hnr={adversarial_features[i][44]:.4f}")
if __name__ == "__main__":
main()