Spaces:
Sleeping
Sleeping
| import io | |
| import base64 | |
| import librosa | |
| import numpy as np | |
| import soundfile as sf | |
| from fastapi import HTTPException | |
| from app.config import settings | |
| import logging | |
| import tempfile | |
| import os | |
| logger = logging.getLogger(__name__) | |
| def decode_base64_audio(base64_string: str) -> io.BytesIO: | |
| """Decodes a Base64 string into a BytesIO object.""" | |
| try: | |
| if "base64," in base64_string: | |
| base64_string = base64_string.split("base64,")[1] | |
| audio_data = base64.b64decode(base64_string) | |
| return io.BytesIO(audio_data) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid Base64 audio: {str(e)}") | |
| def compute_audio_profile(y: np.ndarray, sr: int) -> dict: | |
| """ | |
| Compute a technical profile of the audio sample. | |
| Returns metadata useful for quality assessment and forensic analysis. | |
| """ | |
| duration = len(y) / sr | |
| # RMS energy (simple vector op) | |
| rms = float(np.sqrt(np.mean(y ** 2))) | |
| # Optimization: Skip expensive spectral SNR calculation here | |
| # (It requires STFT/RMS framing which takes ~100ms) | |
| # The forensic module can compute detailed SNR if needed. | |
| snr_db = 0.0 | |
| # Clipping detection — samples at or near ±1.0 | |
| clip_threshold = 0.999 | |
| # Vectorized fast check | |
| clipping_ratio = float(np.mean(np.abs(y) > clip_threshold)) | |
| clipping_detected = clipping_ratio > 0.001 | |
| # Silence ratio (vectorized) | |
| silence_threshold = rms * 0.1 | |
| silence_ratio = float(np.mean(np.abs(y) < silence_threshold)) | |
| return { | |
| "duration_sec": round(duration, 2), | |
| "snr_db": round(snr_db, 1), # Placeholder, computed later if needed | |
| "clipping_detected": clipping_detected, | |
| "silence_ratio": round(silence_ratio, 3), | |
| "rms_energy": round(rms, 4), | |
| "sample_rate": sr, | |
| } | |
| def segment_audio(y: np.ndarray, sr: int, segment_sec: float = 5.0, | |
| overlap_sec: float = 1.0) -> list: | |
| """ | |
| Split audio into overlapping segments for per-segment analysis. | |
| Short audio (< segment_sec) is returned as a single segment. | |
| """ | |
| segment_len = int(segment_sec * sr) | |
| hop_len = int((segment_sec - overlap_sec) * sr) | |
| if len(y) <= segment_len: | |
| return [y] | |
| segments = [] | |
| start = 0 | |
| while start < len(y): | |
| end = min(start + segment_len, len(y)) | |
| seg = y[start:end] | |
| # Only include if at least 1 second long | |
| if len(seg) >= sr: | |
| segments.append(seg) | |
| start += hop_len | |
| return segments if segments else [y] | |
| def preprocess_audio(audio_file: io.BytesIO): | |
| """ | |
| Clean and standardized preprocessing for AI detection. | |
| Focuses on natural signal preservation to avoid false AI classifications. | |
| Returns: (audio_array, audio_profile_dict) | |
| """ | |
| try: | |
| # Save to temporary file for librosa | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as tmp_file: | |
| tmp_file.write(audio_file.read()) | |
| tmp_path = tmp_file.name | |
| try: | |
| # Load audio at 16kHz (Standard for Wav2Vec2) | |
| y, sr = librosa.load(tmp_path, sr=settings.SAMPLE_RATE) | |
| # Ensure mono | |
| if len(y.shape) > 1: | |
| y = librosa.to_mono(y) | |
| # Reject extremely short audio | |
| if len(y) < sr * 0.3: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Audio too short. Minimum 0.3 seconds required." | |
| ) | |
| # 1. Basic Silence Trimming (Safer threshold) | |
| y_trimmed, _ = librosa.effects.trim(y, top_db=40) | |
| if len(y_trimmed) > sr * 0.1: | |
| y = y_trimmed | |
| # 2. Gentle Peak Normalization | |
| # Preserves natural dynamics which models use for detection | |
| peak = np.max(np.abs(y)) | |
| if peak > 0: | |
| y = y / peak | |
| # 3. Time Clamping — max 30 seconds | |
| max_duration = 30 | |
| if len(y) > sr * max_duration: | |
| y = y[:sr * max_duration] | |
| # 4. Compute audio profile | |
| profile = compute_audio_profile(y, sr) | |
| logger.info( | |
| f"Preprocessing complete: {profile['duration_sec']}s, " | |
| f"SNR={profile['snr_db']}dB, " | |
| f"clipping={'YES' if profile['clipping_detected'] else 'NO'}" | |
| ) | |
| return y, profile | |
| finally: | |
| if os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Error processing audio file: {str(e)}") | |