import io import base64 import librosa import numpy as np import soundfile as sf from fastapi import HTTPException from app.config import settings import logging import tempfile import os logger = logging.getLogger(__name__) def decode_base64_audio(base64_string: str) -> io.BytesIO: """Decodes a Base64 string into a BytesIO object.""" try: if "base64," in base64_string: base64_string = base64_string.split("base64,")[1] audio_data = base64.b64decode(base64_string) return io.BytesIO(audio_data) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid Base64 audio: {str(e)}") def compute_audio_profile(y: np.ndarray, sr: int) -> dict: """ Compute a technical profile of the audio sample. Returns metadata useful for quality assessment and forensic analysis. """ duration = len(y) / sr # RMS energy (simple vector op) rms = float(np.sqrt(np.mean(y ** 2))) # Optimization: Skip expensive spectral SNR calculation here # (It requires STFT/RMS framing which takes ~100ms) # The forensic module can compute detailed SNR if needed. snr_db = 0.0 # Clipping detection — samples at or near ±1.0 clip_threshold = 0.999 # Vectorized fast check clipping_ratio = float(np.mean(np.abs(y) > clip_threshold)) clipping_detected = clipping_ratio > 0.001 # Silence ratio (vectorized) silence_threshold = rms * 0.1 silence_ratio = float(np.mean(np.abs(y) < silence_threshold)) return { "duration_sec": round(duration, 2), "snr_db": round(snr_db, 1), # Placeholder, computed later if needed "clipping_detected": clipping_detected, "silence_ratio": round(silence_ratio, 3), "rms_energy": round(rms, 4), "sample_rate": sr, } def segment_audio(y: np.ndarray, sr: int, segment_sec: float = 5.0, overlap_sec: float = 1.0) -> list: """ Split audio into overlapping segments for per-segment analysis. Short audio (< segment_sec) is returned as a single segment. """ segment_len = int(segment_sec * sr) hop_len = int((segment_sec - overlap_sec) * sr) if len(y) <= segment_len: return [y] segments = [] start = 0 while start < len(y): end = min(start + segment_len, len(y)) seg = y[start:end] # Only include if at least 1 second long if len(seg) >= sr: segments.append(seg) start += hop_len return segments if segments else [y] def preprocess_audio(audio_file: io.BytesIO): """ Clean and standardized preprocessing for AI detection. Focuses on natural signal preservation to avoid false AI classifications. Returns: (audio_array, audio_profile_dict) """ try: # Save to temporary file for librosa with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as tmp_file: tmp_file.write(audio_file.read()) tmp_path = tmp_file.name try: # Load audio at 16kHz (Standard for Wav2Vec2) y, sr = librosa.load(tmp_path, sr=settings.SAMPLE_RATE) # Ensure mono if len(y.shape) > 1: y = librosa.to_mono(y) # Reject extremely short audio if len(y) < sr * 0.3: raise HTTPException( status_code=400, detail="Audio too short. Minimum 0.3 seconds required." ) # 1. Basic Silence Trimming (Safer threshold) y_trimmed, _ = librosa.effects.trim(y, top_db=40) if len(y_trimmed) > sr * 0.1: y = y_trimmed # 2. Gentle Peak Normalization # Preserves natural dynamics which models use for detection peak = np.max(np.abs(y)) if peak > 0: y = y / peak # 3. Time Clamping — max 30 seconds max_duration = 30 if len(y) > sr * max_duration: y = y[:sr * max_duration] # 4. Compute audio profile profile = compute_audio_profile(y, sr) logger.info( f"Preprocessing complete: {profile['duration_sec']}s, " f"SNR={profile['snr_db']}dB, " f"clipping={'YES' if profile['clipping_detected'] else 'NO'}" ) return y, profile finally: if os.path.exists(tmp_path): os.unlink(tmp_path) except HTTPException: raise except Exception as e: raise HTTPException(status_code=400, detail=f"Error processing audio file: {str(e)}")