| |
| """ |
| fix_audio_processor.py |
| |
| Updates the audio processor to handle base64 padding issues. |
| Run this in your voice-detection-engine folder. |
| """ |
|
|
| import os |
|
|
| content = '''""" |
| Voice Detection Engine - Audio Processor |
| |
| Handles Base64 decoding, format conversion, resampling. |
| """ |
| |
| import io |
| import logging |
| import base64 |
| from typing import Optional |
| |
| import numpy as np |
| import librosa |
| import soundfile as sf |
| from pydub import AudioSegment |
| |
| from app.config import settings |
| |
| logger = logging.getLogger("engine.audio_processor") |
| |
| |
| class AudioProcessor: |
| """ |
| Process audio from Base64 to normalized numpy array. |
| """ |
| |
| def __init__(self): |
| self.target_sr = settings.TARGET_SAMPLE_RATE |
| self.max_seconds = settings.MAX_AUDIO_SECONDS |
| self.max_samples = self.target_sr * self.max_seconds |
| |
| def decode_base64(self, audio_base64: str) -> bytes: |
| """ |
| Decode base64 string to bytes with padding fix. |
| """ |
| # Remove any whitespace |
| audio_base64 = audio_base64.strip() |
| |
| # Remove data URL prefix if present |
| if "," in audio_base64: |
| audio_base64 = audio_base64.split(",", 1)[1] |
| |
| # Fix padding - base64 must be divisible by 4 |
| missing_padding = len(audio_base64) % 4 |
| if missing_padding: |
| audio_base64 += "=" * (4 - missing_padding) |
| |
| # Decode |
| return base64.b64decode(audio_base64) |
| |
| def process(self, audio_bytes: bytes) -> np.ndarray: |
| """ |
| Process raw audio bytes to normalized numpy array. |
| """ |
| logger.debug(f"Processing audio: {len(audio_bytes)} bytes") |
| |
| audio_array = None |
| |
| # Method 1: Try pydub |
| try: |
| audio_array = self._decode_with_pydub(audio_bytes) |
| logger.debug("Decoded with pydub") |
| except Exception as e: |
| logger.debug(f"Pydub failed: {e}") |
| |
| # Method 2: Try soundfile |
| if audio_array is None: |
| try: |
| audio_array = self._decode_with_soundfile(audio_bytes) |
| logger.debug("Decoded with soundfile") |
| except Exception as e: |
| logger.debug(f"Soundfile failed: {e}") |
| |
| # Method 3: Try librosa |
| if audio_array is None: |
| try: |
| audio_array = self._decode_with_librosa(audio_bytes) |
| logger.debug("Decoded with librosa") |
| except Exception as e: |
| logger.debug(f"Librosa failed: {e}") |
| |
| if audio_array is None: |
| raise ValueError("Failed to decode audio with any method") |
| |
| # Ensure mono |
| if len(audio_array.shape) > 1: |
| audio_array = np.mean(audio_array, axis=1) |
| |
| # Ensure float32 |
| audio_array = audio_array.astype(np.float32) |
| |
| # Normalize to [-1, 1] |
| max_val = np.abs(audio_array).max() |
| if max_val > 0: |
| audio_array = audio_array / max_val |
| |
| # Trim to max duration |
| if len(audio_array) > self.max_samples: |
| audio_array = audio_array[:self.max_samples] |
| |
| logger.debug(f"Processed: {len(audio_array)} samples, {len(audio_array)/self.target_sr:.2f}s") |
| |
| return audio_array |
| |
| def _decode_with_pydub(self, audio_bytes: bytes) -> np.ndarray: |
| audio_io = io.BytesIO(audio_bytes) |
| audio_segment = AudioSegment.from_file(audio_io) |
| audio_segment = audio_segment.set_channels(1) |
| audio_segment = audio_segment.set_frame_rate(self.target_sr) |
| samples = np.array(audio_segment.get_array_of_samples()) |
| sample_width = audio_segment.sample_width |
| if sample_width == 2: |
| samples = samples.astype(np.float32) / 32768.0 |
| elif sample_width == 4: |
| samples = samples.astype(np.float32) / 2147483648.0 |
| else: |
| samples = samples.astype(np.float32) / 128.0 |
| return samples |
| |
| def _decode_with_soundfile(self, audio_bytes: bytes) -> np.ndarray: |
| audio_io = io.BytesIO(audio_bytes) |
| audio_array, sr = sf.read(audio_io) |
| if sr != self.target_sr: |
| audio_array = librosa.resample(audio_array, orig_sr=sr, target_sr=self.target_sr) |
| return audio_array |
| |
| def _decode_with_librosa(self, audio_bytes: bytes) -> np.ndarray: |
| audio_io = io.BytesIO(audio_bytes) |
| audio_array, sr = librosa.load(audio_io, sr=self.target_sr, mono=True) |
| return audio_array |
| ''' |
|
|
| |
| filepath = "app/preprocessing/audio_processor.py" |
| os.makedirs(os.path.dirname(filepath), exist_ok=True) |
|
|
| with open(filepath, "w", encoding="utf-8", newline="\n") as f: |
| f.write(content) |
|
|
| print(f"[OK] Updated {filepath}") |
| print() |
| print("Now update the detector to use the new decode method...") |
|
|
| |
| detector_content = '''""" |
| Voice Detection Engine - Main Detector |
| """ |
| |
| import logging |
| from typing import Dict, Any, List, Tuple |
| from dataclasses import dataclass |
| |
| import numpy as np |
| |
| from app.config import settings |
| from app.preprocessing.audio_processor import AudioProcessor |
| from app.models.embeddings import EmbeddingExtractor |
| from app.features.acoustic import AcousticFeatureExtractor |
| |
| logger = logging.getLogger("engine.detector") |
| |
| |
| @dataclass |
| class RuleHit: |
| name: str |
| delta: float |
| detail: str |
| |
| |
| class VoiceDetector: |
| def __init__(self): |
| logger.info("Initializing VoiceDetector...") |
| self.audio_processor = AudioProcessor() |
| self.embedding_extractor = EmbeddingExtractor() |
| self.acoustic_extractor = AcousticFeatureExtractor() |
| logger.info("VoiceDetector initialized") |
| |
| def warmup(self): |
| logger.info("Warming up detector...") |
| dummy_audio = np.zeros(settings.TARGET_SAMPLE_RATE, dtype=np.float32) |
| self.embedding_extractor.warmup(dummy_audio) |
| self.acoustic_extractor.extract(dummy_audio, settings.TARGET_SAMPLE_RATE) |
| logger.info("Detector warmup complete") |
| |
| def analyze(self, audio_base64: str, language: str, request_id: str = "") -> Dict[str, Any]: |
| logger.info(f"[{request_id}] Starting analysis for language: {language}") |
| |
| # Decode and Process Audio |
| try: |
| # Use the new decode method with padding fix |
| audio_bytes = self.audio_processor.decode_base64(audio_base64) |
| audio_array = self.audio_processor.process(audio_bytes) |
| duration = len(audio_array) / settings.TARGET_SAMPLE_RATE |
| |
| logger.info(f"[{request_id}] Audio duration: {duration:.2f}s") |
| |
| if duration < settings.MIN_AUDIO_SECONDS: |
| logger.warning(f"[{request_id}] Audio too short: {duration:.2f}s") |
| return { |
| "classification": "HUMAN", |
| "confidence": 0.50, |
| "explanation": "Audio too short for reliable analysis." |
| } |
| |
| except Exception as e: |
| logger.error(f"[{request_id}] Audio processing failed: {e}") |
| return { |
| "classification": "HUMAN", |
| "confidence": 0.50, |
| "explanation": f"Audio processing failed: {str(e)[:100]}" |
| } |
| |
| # Extract Features |
| try: |
| acoustic_features = self.acoustic_extractor.extract(audio_array, settings.TARGET_SAMPLE_RATE) |
| embedding_features = self.embedding_extractor.extract(audio_array) |
| except Exception as e: |
| logger.error(f"[{request_id}] Feature extraction failed: {e}") |
| return { |
| "classification": "HUMAN", |
| "confidence": 0.50, |
| "explanation": "Feature extraction failed." |
| } |
| |
| # Apply Heuristics |
| score, rule_hits = self._apply_heuristics(acoustic_features, embedding_features, duration, request_id) |
| |
| # Determine Classification |
| if score > 0.5: |
| classification = "AI_GENERATED" |
| else: |
| classification = "HUMAN" |
| |
| confidence = abs(score - 0.5) * 2 |
| confidence = max(0.0, min(1.0, confidence)) |
| |
| explanation = self._generate_explanation(classification, rule_hits, acoustic_features, embedding_features) |
| |
| logger.info(f"[{request_id}] Result: {classification} (score={score:.3f}, confidence={confidence:.3f})") |
| |
| return { |
| "classification": classification, |
| "confidence": round(confidence, 4), |
| "explanation": explanation |
| } |
| |
| def _apply_heuristics(self, acoustic: Dict, embeddings: Dict, duration: float, request_id: str) -> Tuple[float, List[RuleHit]]: |
| score = 0.5 |
| rule_hits = [] |
| inc = settings.SCORE_INCREMENT |
| dec = settings.SCORE_DECREMENT |
| |
| # Pitch Analysis |
| pitch_std = acoustic.get("pitch_std", 30.0) |
| pitch_range = acoustic.get("pitch_range", 80.0) |
| |
| if pitch_std < settings.PITCH_STD_LOW: |
| score += inc |
| rule_hits.append(RuleHit("low_pitch_std", inc, f"pitch_std={pitch_std:.1f}Hz")) |
| elif pitch_std > settings.PITCH_STD_HIGH: |
| score -= dec |
| rule_hits.append(RuleHit("high_pitch_std", -dec, f"pitch_std={pitch_std:.1f}Hz")) |
| |
| if pitch_range < settings.PITCH_RANGE_LOW: |
| score += inc |
| rule_hits.append(RuleHit("low_pitch_range", inc, f"pitch_range={pitch_range:.1f}Hz")) |
| elif pitch_range > settings.PITCH_RANGE_HIGH: |
| score -= dec |
| rule_hits.append(RuleHit("high_pitch_range", -dec, f"pitch_range={pitch_range:.1f}Hz")) |
| |
| # Jitter |
| jitter = acoustic.get("jitter", 0.020) |
| if jitter < settings.JITTER_LOW: |
| score += inc |
| rule_hits.append(RuleHit("low_jitter", inc, f"jitter={jitter:.4f}")) |
| elif jitter > settings.JITTER_HIGH: |
| score -= dec |
| rule_hits.append(RuleHit("high_jitter", -dec, f"jitter={jitter:.4f}")) |
| |
| # Shimmer |
| shimmer = acoustic.get("shimmer", 0.040) |
| if shimmer < settings.SHIMMER_LOW: |
| score += inc |
| rule_hits.append(RuleHit("low_shimmer", inc, f"shimmer={shimmer:.4f}")) |
| elif shimmer > settings.SHIMMER_HIGH: |
| score -= dec |
| rule_hits.append(RuleHit("high_shimmer", -dec, f"shimmer={shimmer:.4f}")) |
| |
| # Embedding variability |
| wav2vec_var = embeddings.get("wav2vec_var_ratio", 0.50) |
| whisper_var = embeddings.get("whisper_var_ratio", 0.50) |
| |
| if wav2vec_var < settings.EMBEDDING_VAR_LOW: |
| score += inc |
| rule_hits.append(RuleHit("low_wav2vec_var", inc, f"wav2vec_var={wav2vec_var:.3f}")) |
| elif wav2vec_var > settings.EMBEDDING_VAR_HIGH: |
| score -= dec |
| rule_hits.append(RuleHit("high_wav2vec_var", -dec, f"wav2vec_var={wav2vec_var:.3f}")) |
| |
| if whisper_var < settings.EMBEDDING_VAR_LOW: |
| score += inc |
| rule_hits.append(RuleHit("low_whisper_var", inc, f"whisper_var={whisper_var:.3f}")) |
| elif whisper_var > settings.EMBEDDING_VAR_HIGH: |
| score -= dec |
| rule_hits.append(RuleHit("high_whisper_var", -dec, f"whisper_var={whisper_var:.3f}")) |
| |
| score = max(0.0, min(1.0, score)) |
| return score, rule_hits |
| |
| def _generate_explanation(self, classification: str, rule_hits: List[RuleHit], acoustic: Dict, embeddings: Dict) -> str: |
| if not rule_hits: |
| if classification == "AI_GENERATED": |
| return "Audio characteristics suggest synthetic generation." |
| else: |
| return "Audio characteristics suggest natural human speech." |
| |
| sorted_hits = sorted(rule_hits, key=lambda x: abs(x.delta), reverse=True) |
| |
| if classification == "AI_GENERATED": |
| relevant = [h for h in sorted_hits if h.delta > 0] |
| prefix = "Synthetic indicators" |
| else: |
| relevant = [h for h in sorted_hits if h.delta < 0] |
| prefix = "Human speech indicators" |
| |
| if not relevant: |
| relevant = sorted_hits[:3] |
| |
| details = [h.detail for h in relevant[:3]] |
| return f"{prefix}: {'; '.join(details)}." |
| ''' |
|
|
| filepath2 = "app/core/detector.py" |
| os.makedirs(os.path.dirname(filepath2), exist_ok=True) |
|
|
| with open(filepath2, "w", encoding="utf-8", newline="\n") as f: |
| f.write(detector_content) |
|
|
| print(f"[OK] Updated {filepath2}") |
| print() |
| print("=" * 50) |
| print("Now push to HuggingFace:") |
| print(" git add .") |
| print(' git commit -m "Fix base64 padding issue"') |
| print(" git push") |
| print("=" * 50) |