File size: 12,832 Bytes
e051bf8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 | #!/usr/bin/env python3
"""
fix_audio_processor.py
Updates the audio processor to handle base64 padding issues.
Run this in your voice-detection-engine folder.
"""
import os
content = '''"""
Voice Detection Engine - Audio Processor
Handles Base64 decoding, format conversion, resampling.
"""
import io
import logging
import base64
from typing import Optional
import numpy as np
import librosa
import soundfile as sf
from pydub import AudioSegment
from app.config import settings
logger = logging.getLogger("engine.audio_processor")
class AudioProcessor:
"""
Process audio from Base64 to normalized numpy array.
"""
def __init__(self):
self.target_sr = settings.TARGET_SAMPLE_RATE
self.max_seconds = settings.MAX_AUDIO_SECONDS
self.max_samples = self.target_sr * self.max_seconds
def decode_base64(self, audio_base64: str) -> bytes:
"""
Decode base64 string to bytes with padding fix.
"""
# Remove any whitespace
audio_base64 = audio_base64.strip()
# Remove data URL prefix if present
if "," in audio_base64:
audio_base64 = audio_base64.split(",", 1)[1]
# Fix padding - base64 must be divisible by 4
missing_padding = len(audio_base64) % 4
if missing_padding:
audio_base64 += "=" * (4 - missing_padding)
# Decode
return base64.b64decode(audio_base64)
def process(self, audio_bytes: bytes) -> np.ndarray:
"""
Process raw audio bytes to normalized numpy array.
"""
logger.debug(f"Processing audio: {len(audio_bytes)} bytes")
audio_array = None
# Method 1: Try pydub
try:
audio_array = self._decode_with_pydub(audio_bytes)
logger.debug("Decoded with pydub")
except Exception as e:
logger.debug(f"Pydub failed: {e}")
# Method 2: Try soundfile
if audio_array is None:
try:
audio_array = self._decode_with_soundfile(audio_bytes)
logger.debug("Decoded with soundfile")
except Exception as e:
logger.debug(f"Soundfile failed: {e}")
# Method 3: Try librosa
if audio_array is None:
try:
audio_array = self._decode_with_librosa(audio_bytes)
logger.debug("Decoded with librosa")
except Exception as e:
logger.debug(f"Librosa failed: {e}")
if audio_array is None:
raise ValueError("Failed to decode audio with any method")
# Ensure mono
if len(audio_array.shape) > 1:
audio_array = np.mean(audio_array, axis=1)
# Ensure float32
audio_array = audio_array.astype(np.float32)
# Normalize to [-1, 1]
max_val = np.abs(audio_array).max()
if max_val > 0:
audio_array = audio_array / max_val
# Trim to max duration
if len(audio_array) > self.max_samples:
audio_array = audio_array[:self.max_samples]
logger.debug(f"Processed: {len(audio_array)} samples, {len(audio_array)/self.target_sr:.2f}s")
return audio_array
def _decode_with_pydub(self, audio_bytes: bytes) -> np.ndarray:
audio_io = io.BytesIO(audio_bytes)
audio_segment = AudioSegment.from_file(audio_io)
audio_segment = audio_segment.set_channels(1)
audio_segment = audio_segment.set_frame_rate(self.target_sr)
samples = np.array(audio_segment.get_array_of_samples())
sample_width = audio_segment.sample_width
if sample_width == 2:
samples = samples.astype(np.float32) / 32768.0
elif sample_width == 4:
samples = samples.astype(np.float32) / 2147483648.0
else:
samples = samples.astype(np.float32) / 128.0
return samples
def _decode_with_soundfile(self, audio_bytes: bytes) -> np.ndarray:
audio_io = io.BytesIO(audio_bytes)
audio_array, sr = sf.read(audio_io)
if sr != self.target_sr:
audio_array = librosa.resample(audio_array, orig_sr=sr, target_sr=self.target_sr)
return audio_array
def _decode_with_librosa(self, audio_bytes: bytes) -> np.ndarray:
audio_io = io.BytesIO(audio_bytes)
audio_array, sr = librosa.load(audio_io, sr=self.target_sr, mono=True)
return audio_array
'''
# Write file
filepath = "app/preprocessing/audio_processor.py"
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, "w", encoding="utf-8", newline="\n") as f:
f.write(content)
print(f"[OK] Updated {filepath}")
print()
print("Now update the detector to use the new decode method...")
# Also update detector.py
detector_content = '''"""
Voice Detection Engine - Main Detector
"""
import logging
from typing import Dict, Any, List, Tuple
from dataclasses import dataclass
import numpy as np
from app.config import settings
from app.preprocessing.audio_processor import AudioProcessor
from app.models.embeddings import EmbeddingExtractor
from app.features.acoustic import AcousticFeatureExtractor
logger = logging.getLogger("engine.detector")
@dataclass
class RuleHit:
name: str
delta: float
detail: str
class VoiceDetector:
def __init__(self):
logger.info("Initializing VoiceDetector...")
self.audio_processor = AudioProcessor()
self.embedding_extractor = EmbeddingExtractor()
self.acoustic_extractor = AcousticFeatureExtractor()
logger.info("VoiceDetector initialized")
def warmup(self):
logger.info("Warming up detector...")
dummy_audio = np.zeros(settings.TARGET_SAMPLE_RATE, dtype=np.float32)
self.embedding_extractor.warmup(dummy_audio)
self.acoustic_extractor.extract(dummy_audio, settings.TARGET_SAMPLE_RATE)
logger.info("Detector warmup complete")
def analyze(self, audio_base64: str, language: str, request_id: str = "") -> Dict[str, Any]:
logger.info(f"[{request_id}] Starting analysis for language: {language}")
# Decode and Process Audio
try:
# Use the new decode method with padding fix
audio_bytes = self.audio_processor.decode_base64(audio_base64)
audio_array = self.audio_processor.process(audio_bytes)
duration = len(audio_array) / settings.TARGET_SAMPLE_RATE
logger.info(f"[{request_id}] Audio duration: {duration:.2f}s")
if duration < settings.MIN_AUDIO_SECONDS:
logger.warning(f"[{request_id}] Audio too short: {duration:.2f}s")
return {
"classification": "HUMAN",
"confidence": 0.50,
"explanation": "Audio too short for reliable analysis."
}
except Exception as e:
logger.error(f"[{request_id}] Audio processing failed: {e}")
return {
"classification": "HUMAN",
"confidence": 0.50,
"explanation": f"Audio processing failed: {str(e)[:100]}"
}
# Extract Features
try:
acoustic_features = self.acoustic_extractor.extract(audio_array, settings.TARGET_SAMPLE_RATE)
embedding_features = self.embedding_extractor.extract(audio_array)
except Exception as e:
logger.error(f"[{request_id}] Feature extraction failed: {e}")
return {
"classification": "HUMAN",
"confidence": 0.50,
"explanation": "Feature extraction failed."
}
# Apply Heuristics
score, rule_hits = self._apply_heuristics(acoustic_features, embedding_features, duration, request_id)
# Determine Classification
if score > 0.5:
classification = "AI_GENERATED"
else:
classification = "HUMAN"
confidence = abs(score - 0.5) * 2
confidence = max(0.0, min(1.0, confidence))
explanation = self._generate_explanation(classification, rule_hits, acoustic_features, embedding_features)
logger.info(f"[{request_id}] Result: {classification} (score={score:.3f}, confidence={confidence:.3f})")
return {
"classification": classification,
"confidence": round(confidence, 4),
"explanation": explanation
}
def _apply_heuristics(self, acoustic: Dict, embeddings: Dict, duration: float, request_id: str) -> Tuple[float, List[RuleHit]]:
score = 0.5
rule_hits = []
inc = settings.SCORE_INCREMENT
dec = settings.SCORE_DECREMENT
# Pitch Analysis
pitch_std = acoustic.get("pitch_std", 30.0)
pitch_range = acoustic.get("pitch_range", 80.0)
if pitch_std < settings.PITCH_STD_LOW:
score += inc
rule_hits.append(RuleHit("low_pitch_std", inc, f"pitch_std={pitch_std:.1f}Hz"))
elif pitch_std > settings.PITCH_STD_HIGH:
score -= dec
rule_hits.append(RuleHit("high_pitch_std", -dec, f"pitch_std={pitch_std:.1f}Hz"))
if pitch_range < settings.PITCH_RANGE_LOW:
score += inc
rule_hits.append(RuleHit("low_pitch_range", inc, f"pitch_range={pitch_range:.1f}Hz"))
elif pitch_range > settings.PITCH_RANGE_HIGH:
score -= dec
rule_hits.append(RuleHit("high_pitch_range", -dec, f"pitch_range={pitch_range:.1f}Hz"))
# Jitter
jitter = acoustic.get("jitter", 0.020)
if jitter < settings.JITTER_LOW:
score += inc
rule_hits.append(RuleHit("low_jitter", inc, f"jitter={jitter:.4f}"))
elif jitter > settings.JITTER_HIGH:
score -= dec
rule_hits.append(RuleHit("high_jitter", -dec, f"jitter={jitter:.4f}"))
# Shimmer
shimmer = acoustic.get("shimmer", 0.040)
if shimmer < settings.SHIMMER_LOW:
score += inc
rule_hits.append(RuleHit("low_shimmer", inc, f"shimmer={shimmer:.4f}"))
elif shimmer > settings.SHIMMER_HIGH:
score -= dec
rule_hits.append(RuleHit("high_shimmer", -dec, f"shimmer={shimmer:.4f}"))
# Embedding variability
wav2vec_var = embeddings.get("wav2vec_var_ratio", 0.50)
whisper_var = embeddings.get("whisper_var_ratio", 0.50)
if wav2vec_var < settings.EMBEDDING_VAR_LOW:
score += inc
rule_hits.append(RuleHit("low_wav2vec_var", inc, f"wav2vec_var={wav2vec_var:.3f}"))
elif wav2vec_var > settings.EMBEDDING_VAR_HIGH:
score -= dec
rule_hits.append(RuleHit("high_wav2vec_var", -dec, f"wav2vec_var={wav2vec_var:.3f}"))
if whisper_var < settings.EMBEDDING_VAR_LOW:
score += inc
rule_hits.append(RuleHit("low_whisper_var", inc, f"whisper_var={whisper_var:.3f}"))
elif whisper_var > settings.EMBEDDING_VAR_HIGH:
score -= dec
rule_hits.append(RuleHit("high_whisper_var", -dec, f"whisper_var={whisper_var:.3f}"))
score = max(0.0, min(1.0, score))
return score, rule_hits
def _generate_explanation(self, classification: str, rule_hits: List[RuleHit], acoustic: Dict, embeddings: Dict) -> str:
if not rule_hits:
if classification == "AI_GENERATED":
return "Audio characteristics suggest synthetic generation."
else:
return "Audio characteristics suggest natural human speech."
sorted_hits = sorted(rule_hits, key=lambda x: abs(x.delta), reverse=True)
if classification == "AI_GENERATED":
relevant = [h for h in sorted_hits if h.delta > 0]
prefix = "Synthetic indicators"
else:
relevant = [h for h in sorted_hits if h.delta < 0]
prefix = "Human speech indicators"
if not relevant:
relevant = sorted_hits[:3]
details = [h.detail for h in relevant[:3]]
return f"{prefix}: {'; '.join(details)}."
'''
filepath2 = "app/core/detector.py"
os.makedirs(os.path.dirname(filepath2), exist_ok=True)
with open(filepath2, "w", encoding="utf-8", newline="\n") as f:
f.write(detector_content)
print(f"[OK] Updated {filepath2}")
print()
print("=" * 50)
print("Now push to HuggingFace:")
print(" git add .")
print(' git commit -m "Fix base64 padding issue"')
print(" git push")
print("=" * 50) |