#!/usr/bin/env python3 """ fix_audio_processor.py Updates the audio processor to handle base64 padding issues. Run this in your voice-detection-engine folder. """ import os content = '''""" Voice Detection Engine - Audio Processor Handles Base64 decoding, format conversion, resampling. """ import io import logging import base64 from typing import Optional import numpy as np import librosa import soundfile as sf from pydub import AudioSegment from app.config import settings logger = logging.getLogger("engine.audio_processor") class AudioProcessor: """ Process audio from Base64 to normalized numpy array. """ def __init__(self): self.target_sr = settings.TARGET_SAMPLE_RATE self.max_seconds = settings.MAX_AUDIO_SECONDS self.max_samples = self.target_sr * self.max_seconds def decode_base64(self, audio_base64: str) -> bytes: """ Decode base64 string to bytes with padding fix. """ # Remove any whitespace audio_base64 = audio_base64.strip() # Remove data URL prefix if present if "," in audio_base64: audio_base64 = audio_base64.split(",", 1)[1] # Fix padding - base64 must be divisible by 4 missing_padding = len(audio_base64) % 4 if missing_padding: audio_base64 += "=" * (4 - missing_padding) # Decode return base64.b64decode(audio_base64) def process(self, audio_bytes: bytes) -> np.ndarray: """ Process raw audio bytes to normalized numpy array. """ logger.debug(f"Processing audio: {len(audio_bytes)} bytes") audio_array = None # Method 1: Try pydub try: audio_array = self._decode_with_pydub(audio_bytes) logger.debug("Decoded with pydub") except Exception as e: logger.debug(f"Pydub failed: {e}") # Method 2: Try soundfile if audio_array is None: try: audio_array = self._decode_with_soundfile(audio_bytes) logger.debug("Decoded with soundfile") except Exception as e: logger.debug(f"Soundfile failed: {e}") # Method 3: Try librosa if audio_array is None: try: audio_array = self._decode_with_librosa(audio_bytes) logger.debug("Decoded with librosa") except Exception as e: logger.debug(f"Librosa failed: {e}") if audio_array is None: raise ValueError("Failed to decode audio with any method") # Ensure mono if len(audio_array.shape) > 1: audio_array = np.mean(audio_array, axis=1) # Ensure float32 audio_array = audio_array.astype(np.float32) # Normalize to [-1, 1] max_val = np.abs(audio_array).max() if max_val > 0: audio_array = audio_array / max_val # Trim to max duration if len(audio_array) > self.max_samples: audio_array = audio_array[:self.max_samples] logger.debug(f"Processed: {len(audio_array)} samples, {len(audio_array)/self.target_sr:.2f}s") return audio_array def _decode_with_pydub(self, audio_bytes: bytes) -> np.ndarray: audio_io = io.BytesIO(audio_bytes) audio_segment = AudioSegment.from_file(audio_io) audio_segment = audio_segment.set_channels(1) audio_segment = audio_segment.set_frame_rate(self.target_sr) samples = np.array(audio_segment.get_array_of_samples()) sample_width = audio_segment.sample_width if sample_width == 2: samples = samples.astype(np.float32) / 32768.0 elif sample_width == 4: samples = samples.astype(np.float32) / 2147483648.0 else: samples = samples.astype(np.float32) / 128.0 return samples def _decode_with_soundfile(self, audio_bytes: bytes) -> np.ndarray: audio_io = io.BytesIO(audio_bytes) audio_array, sr = sf.read(audio_io) if sr != self.target_sr: audio_array = librosa.resample(audio_array, orig_sr=sr, target_sr=self.target_sr) return audio_array def _decode_with_librosa(self, audio_bytes: bytes) -> np.ndarray: audio_io = io.BytesIO(audio_bytes) audio_array, sr = librosa.load(audio_io, sr=self.target_sr, mono=True) return audio_array ''' # Write file filepath = "app/preprocessing/audio_processor.py" os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(filepath, "w", encoding="utf-8", newline="\n") as f: f.write(content) print(f"[OK] Updated {filepath}") print() print("Now update the detector to use the new decode method...") # Also update detector.py detector_content = '''""" Voice Detection Engine - Main Detector """ import logging from typing import Dict, Any, List, Tuple from dataclasses import dataclass import numpy as np from app.config import settings from app.preprocessing.audio_processor import AudioProcessor from app.models.embeddings import EmbeddingExtractor from app.features.acoustic import AcousticFeatureExtractor logger = logging.getLogger("engine.detector") @dataclass class RuleHit: name: str delta: float detail: str class VoiceDetector: def __init__(self): logger.info("Initializing VoiceDetector...") self.audio_processor = AudioProcessor() self.embedding_extractor = EmbeddingExtractor() self.acoustic_extractor = AcousticFeatureExtractor() logger.info("VoiceDetector initialized") def warmup(self): logger.info("Warming up detector...") dummy_audio = np.zeros(settings.TARGET_SAMPLE_RATE, dtype=np.float32) self.embedding_extractor.warmup(dummy_audio) self.acoustic_extractor.extract(dummy_audio, settings.TARGET_SAMPLE_RATE) logger.info("Detector warmup complete") def analyze(self, audio_base64: str, language: str, request_id: str = "") -> Dict[str, Any]: logger.info(f"[{request_id}] Starting analysis for language: {language}") # Decode and Process Audio try: # Use the new decode method with padding fix audio_bytes = self.audio_processor.decode_base64(audio_base64) audio_array = self.audio_processor.process(audio_bytes) duration = len(audio_array) / settings.TARGET_SAMPLE_RATE logger.info(f"[{request_id}] Audio duration: {duration:.2f}s") if duration < settings.MIN_AUDIO_SECONDS: logger.warning(f"[{request_id}] Audio too short: {duration:.2f}s") return { "classification": "HUMAN", "confidence": 0.50, "explanation": "Audio too short for reliable analysis." } except Exception as e: logger.error(f"[{request_id}] Audio processing failed: {e}") return { "classification": "HUMAN", "confidence": 0.50, "explanation": f"Audio processing failed: {str(e)[:100]}" } # Extract Features try: acoustic_features = self.acoustic_extractor.extract(audio_array, settings.TARGET_SAMPLE_RATE) embedding_features = self.embedding_extractor.extract(audio_array) except Exception as e: logger.error(f"[{request_id}] Feature extraction failed: {e}") return { "classification": "HUMAN", "confidence": 0.50, "explanation": "Feature extraction failed." } # Apply Heuristics score, rule_hits = self._apply_heuristics(acoustic_features, embedding_features, duration, request_id) # Determine Classification if score > 0.5: classification = "AI_GENERATED" else: classification = "HUMAN" confidence = abs(score - 0.5) * 2 confidence = max(0.0, min(1.0, confidence)) explanation = self._generate_explanation(classification, rule_hits, acoustic_features, embedding_features) logger.info(f"[{request_id}] Result: {classification} (score={score:.3f}, confidence={confidence:.3f})") return { "classification": classification, "confidence": round(confidence, 4), "explanation": explanation } def _apply_heuristics(self, acoustic: Dict, embeddings: Dict, duration: float, request_id: str) -> Tuple[float, List[RuleHit]]: score = 0.5 rule_hits = [] inc = settings.SCORE_INCREMENT dec = settings.SCORE_DECREMENT # Pitch Analysis pitch_std = acoustic.get("pitch_std", 30.0) pitch_range = acoustic.get("pitch_range", 80.0) if pitch_std < settings.PITCH_STD_LOW: score += inc rule_hits.append(RuleHit("low_pitch_std", inc, f"pitch_std={pitch_std:.1f}Hz")) elif pitch_std > settings.PITCH_STD_HIGH: score -= dec rule_hits.append(RuleHit("high_pitch_std", -dec, f"pitch_std={pitch_std:.1f}Hz")) if pitch_range < settings.PITCH_RANGE_LOW: score += inc rule_hits.append(RuleHit("low_pitch_range", inc, f"pitch_range={pitch_range:.1f}Hz")) elif pitch_range > settings.PITCH_RANGE_HIGH: score -= dec rule_hits.append(RuleHit("high_pitch_range", -dec, f"pitch_range={pitch_range:.1f}Hz")) # Jitter jitter = acoustic.get("jitter", 0.020) if jitter < settings.JITTER_LOW: score += inc rule_hits.append(RuleHit("low_jitter", inc, f"jitter={jitter:.4f}")) elif jitter > settings.JITTER_HIGH: score -= dec rule_hits.append(RuleHit("high_jitter", -dec, f"jitter={jitter:.4f}")) # Shimmer shimmer = acoustic.get("shimmer", 0.040) if shimmer < settings.SHIMMER_LOW: score += inc rule_hits.append(RuleHit("low_shimmer", inc, f"shimmer={shimmer:.4f}")) elif shimmer > settings.SHIMMER_HIGH: score -= dec rule_hits.append(RuleHit("high_shimmer", -dec, f"shimmer={shimmer:.4f}")) # Embedding variability wav2vec_var = embeddings.get("wav2vec_var_ratio", 0.50) whisper_var = embeddings.get("whisper_var_ratio", 0.50) if wav2vec_var < settings.EMBEDDING_VAR_LOW: score += inc rule_hits.append(RuleHit("low_wav2vec_var", inc, f"wav2vec_var={wav2vec_var:.3f}")) elif wav2vec_var > settings.EMBEDDING_VAR_HIGH: score -= dec rule_hits.append(RuleHit("high_wav2vec_var", -dec, f"wav2vec_var={wav2vec_var:.3f}")) if whisper_var < settings.EMBEDDING_VAR_LOW: score += inc rule_hits.append(RuleHit("low_whisper_var", inc, f"whisper_var={whisper_var:.3f}")) elif whisper_var > settings.EMBEDDING_VAR_HIGH: score -= dec rule_hits.append(RuleHit("high_whisper_var", -dec, f"whisper_var={whisper_var:.3f}")) score = max(0.0, min(1.0, score)) return score, rule_hits def _generate_explanation(self, classification: str, rule_hits: List[RuleHit], acoustic: Dict, embeddings: Dict) -> str: if not rule_hits: if classification == "AI_GENERATED": return "Audio characteristics suggest synthetic generation." else: return "Audio characteristics suggest natural human speech." sorted_hits = sorted(rule_hits, key=lambda x: abs(x.delta), reverse=True) if classification == "AI_GENERATED": relevant = [h for h in sorted_hits if h.delta > 0] prefix = "Synthetic indicators" else: relevant = [h for h in sorted_hits if h.delta < 0] prefix = "Human speech indicators" if not relevant: relevant = sorted_hits[:3] details = [h.detail for h in relevant[:3]] return f"{prefix}: {'; '.join(details)}." ''' filepath2 = "app/core/detector.py" os.makedirs(os.path.dirname(filepath2), exist_ok=True) with open(filepath2, "w", encoding="utf-8", newline="\n") as f: f.write(detector_content) print(f"[OK] Updated {filepath2}") print() print("=" * 50) print("Now push to HuggingFace:") print(" git add .") print(' git commit -m "Fix base64 padding issue"') print(" git push") print("=" * 50)