Shaankar39's picture
init: Vaani CAVP engine (CPU, accuracy-first — Whisper large-v3, spaCy trf)
7d5f092
"""CONTRASTIVE ACOUSTIC VOICE PROFILING — FastAPI Engine
Main orchestration server. 10 processing layers:
1. TRANSCRIPTION — Whisper
2. FEATURE EXTRACTION — Parselmouth, librosa, OpenSMILE
3. AI CLASSIFICATION — Wav2Vec 2.0, SpeechBrain, langdetect
4. NLP — spaCy, NLTK
5. PHONEME ANALYSIS — Formant extraction, vowel space, accuracy scoring
6. MORPHEME BOUNDARY — Cognitive load, emotional stress, codeswitching
7. PROSODIC PROFILING — Rhythm, intonation, stress patterns
8. CONNECTED SPEECH — Assimilation, elision, linking
9. VOICE QUALITY — HNR, breathiness, nasality, register
10. L1 INTERFERENCE — Bhojpuri, Hindi, Bangla, Odia detection + CIF model
"""
import asyncio
import logging
import os
import time
import uuid
from pathlib import Path
from typing import Any, Dict
from fastapi import FastAPI, File, Form, HTTPException, UploadFile, Request, Security, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from starlette.responses import JSONResponse
from config import WHISPER_MODEL, WHISPER_DEVICE, TORCH_DEVICE, SPACY_MODEL, UPLOAD_DIR, ENGINE_API_KEY
from utils.serializers import to_dict
# Maximum upload size: 50 MB
MAX_UPLOAD_BYTES = 50 * 1024 * 1024
# Maximum time for a single analysis pipeline run (seconds)
ANALYSIS_TIMEOUT = int(os.getenv("ANALYSIS_TIMEOUT", "300")) # 5 minutes
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Engine config: device=%s, whisper_model=%s, spacy=%s", TORCH_DEVICE, WHISPER_MODEL, SPACY_MODEL)
# ── Rate Limiting ────────────────────────────────────────────────────────
limiter = Limiter(key_func=get_remote_address)
app = FastAPI(
title="Contrastive Acoustic Voice Profiling Engine",
version="1.0.0",
)
app.state.limiter = limiter
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded. Please try again later."},
)
# ── API Key Authentication ───────────────────────────────────────────────
_api_key_header = APIKeyHeader(name="X-Engine-API-Key", auto_error=False)
async def verify_engine_api_key(api_key: str = Security(_api_key_header)) -> str:
"""Verify the shared secret between NestJS server and this engine."""
if not ENGINE_API_KEY:
# In production, refuse unauthenticated requests
env = os.getenv("ENVIRONMENT", os.getenv("NODE_ENV", "development"))
if env == "production":
raise HTTPException(status_code=500, detail="ENGINE_API_KEY is not configured — refusing to run unauthenticated in production")
logger.warning("ENGINE_API_KEY not set — engine is unauthenticated (development mode only)")
return ""
if not api_key or api_key != ENGINE_API_KEY:
raise HTTPException(status_code=403, detail="Invalid or missing engine API key")
return api_key
_cors_env = os.getenv("CORS_ORIGINS", "")
_cors_origins = [o.strip() for o in _cors_env.split(",") if o.strip()] if _cors_env else [
"http://localhost:3001",
"http://localhost:5173",
"http://127.0.0.1:3001",
"http://127.0.0.1:5173",
]
app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins,
allow_methods=["GET", "POST"],
allow_headers=["Authorization", "Content-Type", "X-Engine-API-Key"],
)
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# ── Stale upload cleanup (safety net) ────────────────────────────────────
# Runs hourly to catch any uploads not cleaned up by endpoint finally blocks
_STALE_FILE_AGE_SECONDS = 3600 # 1 hour
async def _cleanup_stale_uploads():
"""Periodically delete upload files older than 1 hour (safety net)."""
while True:
await asyncio.sleep(3600)
try:
now = time.time()
count = 0
for f in UPLOAD_DIR.iterdir():
if f.is_file() and (now - f.stat().st_mtime) > _STALE_FILE_AGE_SECONDS:
f.unlink(missing_ok=True)
count += 1
if count:
logger.info("Stale upload cleanup: removed %d orphaned files", count)
except Exception as exc:
logger.warning("Stale upload cleanup error: %s", exc)
@app.on_event("startup")
async def _start_cleanup_task():
asyncio.create_task(_cleanup_stale_uploads())
# ── Pydantic models ──────────────────────────────────────────────────────
class HealthResponse(BaseModel):
status: str
class AnalysisRequest(BaseModel):
gender: str = "neutral"
language: str = "en"
opensmile_features: bool = False
run_speechbrain: bool = False
# ── Helpers ───────────────────────────────────────────────────────────────
async def _save_upload(upload: UploadFile) -> Path:
import subprocess
# Use UUID filename to eliminate path traversal risk entirely
ext = Path(upload.filename).suffix.lower() if upload.filename else ".wav"
allowed_ext = {".wav", ".mp3", ".ogg", ".webm", ".flac", ".m4a", ".opus"}
if ext not in allowed_ext:
ext = ".wav"
unique_name = f"{uuid.uuid4()}{ext}"
dest = UPLOAD_DIR / unique_name
content = await upload.read()
if not content:
raise HTTPException(400, "Uploaded file is empty")
if len(content) > MAX_UPLOAD_BYTES:
raise HTTPException(413, f"File too large (max {MAX_UPLOAD_BYTES // (1024*1024)}MB)")
dest.write_bytes(content)
# Convert non-wav formats to wav using ffmpeg (Praat/Parselmouth requires wav)
if dest.suffix.lower() in {".webm", ".ogg", ".m4a", ".mp3", ".flac", ".opus"}:
wav_dest = dest.with_suffix(".wav")
try:
proc = subprocess.run(
["ffmpeg", "-y", "-i", str(dest), "-ar", "16000", "-ac", "1", str(wav_dest)],
capture_output=True, timeout=120,
)
if proc.returncode != 0:
stderr = proc.stderr.decode(errors="replace")
logger.error("ffmpeg conversion failed: %s", stderr)
raise HTTPException(500, "Audio conversion failed")
# Validate converted file
if not wav_dest.exists() or wav_dest.stat().st_size < 44:
raise HTTPException(500, "Audio conversion produced invalid output")
dest.unlink(missing_ok=True)
dest = wav_dest
except subprocess.TimeoutExpired:
dest.unlink(missing_ok=True)
raise HTTPException(500, "Audio conversion timed out")
except FileNotFoundError:
raise HTTPException(500, "ffmpeg not installed — cannot convert audio")
return dest
def _run_full_pipeline(audio_path: Path, gender: str, run_opensmile: bool, run_sb: bool, l1_language: str = "auto") -> dict[str, Any]:
"""Run the full processing pipeline synchronously (called via to_thread)."""
from modules.transcription import transcribe, get_word_timestamps
from modules.feature_extraction import extract_parselmouth, extract_librosa, extract_opensmile
from modules.ai_classification import classify_phonemes, classify_speechbrain, detect_language
from modules.nlp_layer import analyze_morphology, analyze_syntax, analyze_phoneme_inventory
from modules.phoneme_analysis import analyze_phonemes
from modules.morpheme_boundary import analyze_morpheme_boundaries
from modules.prosodic_profiling import profile_prosody
from modules.connected_speech import analyze_connected_speech
from modules.voice_quality import profile_voice_quality
from modules.forced_alignment import forced_align, alignment_to_phoneme_spans
from modules.voicesauce import analyze_voicesauce, voicesauce_to_dict
from modules.audeep import analyze_audeep, audeep_to_dict
results: dict[str, Any] = {}
t0 = time.time()
# ── Layer 1: Transcription ────────────────────────────────────────
logger.info("Layer 1: Transcription")
transcription = transcribe(audio_path, WHISPER_MODEL, WHISPER_DEVICE)
word_ts = get_word_timestamps(audio_path, WHISPER_MODEL, WHISPER_DEVICE)
results["transcription"] = {
"text": transcription.text,
"language": transcription.language,
"language_probability": transcription.language_probability,
"duration_seconds": transcription.duration_seconds,
"segments": [{"start": s.start, "end": s.end, "text": s.text} for s in transcription.segments],
"word_timestamps": word_ts,
}
# ── Layer 1b: Forced Alignment (WebMAUS + MFA) ────────────────────
logger.info("Layer 1b: Forced Alignment (MFA → WebMAUS fallback)")
alignment = None
try:
alignment = forced_align(
audio_path=audio_path,
transcript=transcription.text,
language=transcription.language or "en",
)
if alignment.success:
results["forced_alignment"] = {
"source": alignment.source,
"num_phones": len(alignment.phones),
"phones": alignment_to_phoneme_spans(alignment),
"words": alignment.words,
"textgrid_path": alignment.textgrid_path,
}
logger.info("Forced alignment: %d phones via %s", len(alignment.phones), alignment.source)
else:
results["forced_alignment"] = {
"source": "none",
"error": alignment.error,
"num_phones": 0,
"phones": [],
}
logger.warning("Forced alignment unavailable: %s", alignment.error)
except Exception as exc:
logger.warning("Forced alignment failed: %s", exc)
results["forced_alignment"] = {"source": "none", "error": str(exc), "num_phones": 0, "phones": []}
# ── Layer 2: Feature Extraction ───────────────────────────────────
logger.info("Layer 2: Feature Extraction")
praat = extract_parselmouth(audio_path, gender)
lib = extract_librosa(audio_path)
results["feature_extraction"] = {
"parselmouth": to_dict(praat),
"librosa": to_dict(lib),
}
if run_opensmile:
osm = extract_opensmile(audio_path)
if osm:
results["feature_extraction"]["opensmile"] = to_dict(osm)
# ── Layer 2b: VoiceSauce Spectral Measures ────────────────────────
logger.info("Layer 2b: VoiceSauce Spectral Analysis")
try:
vs_result = analyze_voicesauce(audio_path, gender)
results["voicesauce"] = voicesauce_to_dict(vs_result)
logger.info("VoiceSauce: phonation=%s, breathiness=%.2f, creak=%.2f",
vs_result.phonation_type, vs_result.breathiness_index, vs_result.creak_index)
except Exception as exc:
logger.warning("VoiceSauce analysis failed: %s", exc)
results["voicesauce"] = {"source": "voicesauce", "error": str(exc)}
# ── Layer 3: AI Classification ────────────────────────────────────
logger.info("Layer 3: AI Classification")
wav2vec = None
lang_detect = None
try:
wav2vec = classify_phonemes(audio_path)
except Exception as exc:
logger.warning("Wav2Vec classification failed: %s", exc)
try:
lang_detect = detect_language(transcription.text)
except Exception as exc:
logger.warning("Language detection failed: %s", exc)
results["ai_classification"] = {
"wav2vec": to_dict(wav2vec) if wav2vec else None,
"language_detection": to_dict(lang_detect) if lang_detect else None,
}
if run_sb:
try:
sb = classify_speechbrain(audio_path)
results["ai_classification"]["speechbrain"] = to_dict(sb)
except Exception as exc:
logger.warning("SpeechBrain classification failed: %s", exc)
# ── Layer 3b: auDeep Emotional Representations ────────────────────
logger.info("Layer 3b: auDeep Deep Emotional Analysis")
try:
audeep_result = analyze_audeep(audio_path)
results["audeep"] = audeep_to_dict(audeep_result)
logger.info("auDeep: %s (%.2f) via %s, V=%.2f A=%.2f D=%.2f",
audeep_result.primary_emotion, audeep_result.emotion_confidence,
audeep_result.model_type,
audeep_result.valence, audeep_result.arousal, audeep_result.dominance)
except Exception as exc:
logger.warning("auDeep analysis failed: %s", exc)
results["audeep"] = {"source": "audeep", "error": str(exc)}
# ── Layer 4: NLP ──────────────────────────────────────────────────
logger.info("Layer 4: NLP")
morph = analyze_morphology(transcription.text, SPACY_MODEL)
syntax = analyze_syntax(transcription.text)
phoneme_inv = None
if wav2vec:
phoneme_seq = [p.phoneme for p in wav2vec.phonemes]
phoneme_inv = analyze_phoneme_inventory(phoneme_seq)
results["nlp"] = {
"morphology": to_dict(morph),
"syntax": to_dict(syntax),
"phoneme_inventory": to_dict(phoneme_inv) if phoneme_inv else None,
}
formant_dict = to_dict(praat.formants)
pitch_dict = to_dict(praat.pitch)
vq_dict = to_dict(praat.voice_quality)
librosa_dict = to_dict(lib)
# ── Layer 5: Phoneme Analysis ──────────────────────────────────
logger.info("Layer 5: Phoneme Analysis")
# Prefer forced alignment phones over Wav2Vec CTC (more precise boundaries)
phoneme_spans = []
fa_phones = results.get("forced_alignment", {}).get("phones", [])
if fa_phones:
phoneme_spans = fa_phones
logger.info("Using forced alignment phonemes (%d phones) for downstream analysis", len(phoneme_spans))
elif wav2vec:
phoneme_spans = [to_dict(p) for p in wav2vec.phonemes]
logger.info("Using Wav2Vec CTC phonemes (%d phones) for downstream analysis", len(phoneme_spans))
pa = analyze_phonemes(phoneme_spans, formant_dict, word_ts)
results["phoneme_analysis"] = to_dict(pa)
# ── Layer 6: Morpheme Boundary + Cognitive Load ────────────────
logger.info("Layer 6: Morpheme Boundary + Cognitive Load")
morpheme_list = [to_dict(m) for m in syntax.morphemes] if syntax.morphemes else []
mb = analyze_morpheme_boundaries(
word_timestamps=word_ts,
transcript=transcription.text,
pitch_data=pitch_dict,
voice_quality=vq_dict,
duration_seconds=transcription.duration_seconds,
morphemes=morpheme_list,
)
results["morpheme_boundary"] = to_dict(mb)
# ── Layer 7: Prosodic Profiling ────────────────────────────────
logger.info("Layer 7: Prosodic Profiling")
pp = profile_prosody(
word_timestamps=word_ts,
pitch_data=pitch_dict,
duration_seconds=transcription.duration_seconds,
total_pause_ms=mb.total_pause_time_ms,
)
results["prosodic_profile"] = to_dict(pp)
# ── Layer 8: Connected Speech ──────────────────────────────────
logger.info("Layer 8: Connected Speech")
cs = analyze_connected_speech(
word_timestamps=word_ts,
phoneme_spans=phoneme_spans,
transcript=transcription.text,
formant_trajectories=formant_dict,
)
results["connected_speech"] = to_dict(cs)
# ── Layer 9: Voice Quality ─────────────────────────────────────
logger.info("Layer 9: Voice Quality")
vq = profile_voice_quality(
formant_data=formant_dict,
pitch_data=pitch_dict,
voice_quality_data=vq_dict,
librosa_features=librosa_dict,
audio_path=str(audio_path),
)
results["voice_quality"] = to_dict(vq)
# ── Layer 10: L1 Interference Detection (Bhojpuri/Hindi/Bangla/Odia)
logger.info("Layer 10: L1 Interference Detection")
from modules.l1_targets import detect_l1_interference, resolve_l1_code
# Try langdetect result first, then Whisper's detected language as fallback
detected_iso = None
if lang_detect and hasattr(lang_detect, "language"):
detected_iso = lang_detect.language
elif isinstance(lang_detect, dict):
detected_iso = lang_detect.get("language")
if not detected_iso and transcription.language:
detected_iso = transcription.language
l1_code = resolve_l1_code(detected_iso, l1_language)
logger.info("L1 resolution: detected_iso=%s, explicit=%s, resolved=%s", detected_iso, l1_language, l1_code)
rhythm_dict = to_dict(pp.rhythm) if hasattr(pp, "rhythm") else results.get("prosodic_profile", {}).get("rhythm", {})
intonation_dict = to_dict(pp.intonation) if hasattr(pp, "intonation") else results.get("prosodic_profile", {}).get("intonation", {})
nasality_dict = (results.get("voice_quality", {}) or {}).get("nasality", {}) or {}
l1_result = detect_l1_interference(
l1_code=l1_code,
formant_data=formant_dict,
pitch_data=pitch_dict,
rhythm_data=rhythm_dict,
phoneme_spans=phoneme_spans,
intonation_data=intonation_dict,
nasality_data=nasality_dict,
)
results["l1_interference"] = l1_result
results["bhojpuri_interference"] = l1_result # backwards compat
results["l1_language"] = l1_code
results["l1_display_name"] = l1_result.get("l1_display_name", "Bhojpuri")
# ── CIF Model — Contrastive Interference Index ─────────────────
logger.info("Computing CIF Model")
from modules.cif_model import compute_cif
cif_result = compute_cif(results, l1_code=l1_code)
results["cif_analysis"] = cif_result
results["processing_time_ms"] = round((time.time() - t0) * 1000, 2)
return results
# ── Routes ────────────────────────────────────────────────────────────────
@app.get("/health")
async def health():
return {"status": "ok", "device": TORCH_DEVICE, "whisper_model": WHISPER_MODEL}
ALLOWED_L1 = {"auto", "bho", "hin", "ben", "ori", "tam", "tel"}
@app.post("/api/analyze", dependencies=[Depends(verify_engine_api_key)], response_model=None)
@limiter.limit("10/minute")
async def analyze(
request: Request,
audio: UploadFile = File(...),
gender: str = Form("neutral"),
run_opensmile: str = Form("false"),
run_speechbrain: str = Form("false"),
l1_language: str = Form("auto"),
) -> dict[str, Any]:
"""Full voice profile analysis for a single audio file."""
if not audio.filename:
raise HTTPException(400, "No audio file provided")
allowed = {".wav", ".mp3", ".ogg", ".webm", ".flac", ".m4a"}
ext = Path(audio.filename).suffix.lower()
if ext not in allowed:
raise HTTPException(400, f"Unsupported format: {ext}")
if l1_language not in ALLOWED_L1:
raise HTTPException(400, f"Invalid l1_language: must be one of {sorted(ALLOWED_L1)}")
audio_path = await _save_upload(audio)
timed_out = False
try:
result = await asyncio.wait_for(
asyncio.to_thread(
_run_full_pipeline,
audio_path,
gender,
run_opensmile.lower() == "true",
run_speechbrain.lower() == "true",
l1_language,
),
timeout=ANALYSIS_TIMEOUT,
)
return {"status": "ok", "profile": result}
except asyncio.TimeoutError:
timed_out = True
logger.error("Analysis timed out after %ds", ANALYSIS_TIMEOUT)
raise HTTPException(504, f"Analysis timed out after {ANALYSIS_TIMEOUT}s")
except Exception as exc:
logger.exception("Analysis failed")
raise HTTPException(500, "Analysis failed")
finally:
if not timed_out:
audio_path.unlink(missing_ok=True)
@app.post("/api/contrastive", dependencies=[Depends(verify_engine_api_key)], response_model=None)
@limiter.limit("5/minute")
async def contrastive_compare(
request: Request,
audio_a: UploadFile = File(...),
audio_b: UploadFile = File(...),
gender: str = Form("neutral"),
label_a: str = Form("sample_a"),
label_b: str = Form("sample_b"),
l1_language: str = Form("auto"),
) -> dict[str, Any]:
"""Compare two audio samples contrastively (e.g., L1 vs L2)."""
if l1_language not in ALLOWED_L1:
raise HTTPException(400, f"Invalid l1_language: must be one of {sorted(ALLOWED_L1)}")
from modules.contrastive import compare_profiles
path_a = await _save_upload(audio_a)
path_b = await _save_upload(audio_b)
timed_out = False
try:
# Run sequentially — GPU models (Whisper, Wav2Vec) are not thread-safe
# Double timeout since two full pipelines run
profile_a = await asyncio.wait_for(
asyncio.to_thread(_run_full_pipeline, path_a, gender, False, False, l1_language),
timeout=ANALYSIS_TIMEOUT,
)
profile_b = await asyncio.wait_for(
asyncio.to_thread(_run_full_pipeline, path_b, gender, False, False, l1_language),
timeout=ANALYSIS_TIMEOUT,
)
comparison = compare_profiles(profile_a, profile_b, label_a, label_b)
return {
"status": "ok",
"profile_a": profile_a,
"profile_b": profile_b,
"contrastive_report": to_dict(comparison),
}
except asyncio.TimeoutError:
timed_out = True
logger.error("Contrastive analysis timed out after %ds", ANALYSIS_TIMEOUT)
raise HTTPException(504, f"Analysis timed out after {ANALYSIS_TIMEOUT}s")
except HTTPException:
raise
except Exception as exc:
logger.exception("Contrastive analysis failed")
raise HTTPException(500, "Contrastive analysis failed")
finally:
if not timed_out:
path_a.unlink(missing_ok=True)
path_b.unlink(missing_ok=True)
@app.post("/api/transcribe", dependencies=[Depends(verify_engine_api_key)], response_model=None)
@limiter.limit("15/minute")
async def transcribe_only(
request: Request,
audio: UploadFile = File(...),
) -> dict[str, Any]:
"""Transcribe audio only (lightweight endpoint)."""
from modules.transcription import transcribe, get_word_timestamps
audio_path = await _save_upload(audio)
timed_out = False
try:
async def _do_transcribe():
result = await asyncio.to_thread(transcribe, audio_path, WHISPER_MODEL, WHISPER_DEVICE)
words = await asyncio.to_thread(get_word_timestamps, audio_path, WHISPER_MODEL, WHISPER_DEVICE)
return result, words
result, words = await asyncio.wait_for(_do_transcribe(), timeout=ANALYSIS_TIMEOUT)
return {
"status": "ok",
"text": result.text,
"language": result.language,
"segments": [{"start": s.start, "end": s.end, "text": s.text} for s in result.segments],
"word_timestamps": words,
}
except asyncio.TimeoutError:
timed_out = True
logger.error("Transcription timed out after %ds", ANALYSIS_TIMEOUT)
raise HTTPException(504, f"Transcription timed out after {ANALYSIS_TIMEOUT}s")
except HTTPException:
raise
except Exception as exc:
logger.exception("Transcription failed")
raise HTTPException(500, "Transcription failed")
finally:
if not timed_out:
audio_path.unlink(missing_ok=True)
@app.post("/api/features", dependencies=[Depends(verify_engine_api_key)], response_model=None)
@limiter.limit("15/minute")
async def features_only(
request: Request,
audio: UploadFile = File(...),
gender: str = Form("neutral"),
) -> dict[str, Any]:
"""Extract acoustic features only."""
from modules.feature_extraction import extract_parselmouth, extract_librosa
audio_path = await _save_upload(audio)
timed_out = False
try:
async def _do_features():
praat = await asyncio.to_thread(extract_parselmouth, audio_path, gender)
lib = await asyncio.to_thread(extract_librosa, audio_path)
return praat, lib
praat, lib = await asyncio.wait_for(_do_features(), timeout=ANALYSIS_TIMEOUT)
return {
"status": "ok",
"parselmouth": to_dict(praat),
"librosa": to_dict(lib),
}
except asyncio.TimeoutError:
timed_out = True
logger.error("Feature extraction timed out after %ds", ANALYSIS_TIMEOUT)
raise HTTPException(504, f"Feature extraction timed out after {ANALYSIS_TIMEOUT}s")
except HTTPException:
raise
except Exception as exc:
logger.exception("Feature extraction failed")
raise HTTPException(500, "Feature extraction failed")
finally:
if not timed_out:
audio_path.unlink(missing_ok=True)
@app.post("/api/report", dependencies=[Depends(verify_engine_api_key)], response_model=None)
@limiter.limit("5/minute")
async def generate_report(
request: Request,
audio: UploadFile = File(...),
gender: str = Form("neutral"),
student_name: str = Form("Student"),
student_id: str = Form(""),
l1_language: str = Form("auto"),
) -> Any:
"""Generate a PDF diagnostic report for parents."""
if l1_language not in ALLOWED_L1:
raise HTTPException(400, f"Invalid l1_language: must be one of {sorted(ALLOWED_L1)}")
from fastapi.responses import Response
from modules.report_generator import generate_pdf_report
audio_path = await _save_upload(audio)
timed_out = False
try:
async def _do_report():
profile = await asyncio.to_thread(
_run_full_pipeline, audio_path, gender, False, False, l1_language,
)
pdf_bytes = await asyncio.to_thread(
generate_pdf_report, profile, audio_path, student_name, student_id,
)
return pdf_bytes
pdf_bytes = await asyncio.wait_for(_do_report(), timeout=ANALYSIS_TIMEOUT)
import re
safe_id = re.sub(r'[^a-zA-Z0-9_-]', '', student_id or 'report')[:50]
filename = f"voice_report_{safe_id}_{int(time.time())}.pdf"
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
except asyncio.TimeoutError:
timed_out = True
logger.error("Report generation timed out after %ds", ANALYSIS_TIMEOUT)
raise HTTPException(504, f"Report generation timed out after {ANALYSIS_TIMEOUT}s")
except HTTPException:
raise
except Exception as exc:
logger.exception("Report generation failed")
raise HTTPException(500, "Report generation failed")
finally:
if not timed_out:
audio_path.unlink(missing_ok=True)
@app.post("/api/export/elan", dependencies=[Depends(verify_engine_api_key)], response_model=None)
@limiter.limit("5/minute")
async def export_elan(
request: Request,
audio: UploadFile = File(...),
gender: str = Form("neutral"),
speaker_id: str = Form("anonymous"),
student_name: str = Form("Student"),
l1_language: str = Form("auto"),
session_id: str = Form(""),
) -> Any:
"""Run full analysis and export results as ELAN .eaf annotation file."""
if l1_language not in ALLOWED_L1:
raise HTTPException(400, f"Invalid l1_language: must be one of {sorted(ALLOWED_L1)}")
from fastapi.responses import Response
from modules.elan_export import export_eaf
audio_path = await _save_upload(audio)
timed_out = False
try:
async def _do_elan():
profile = await asyncio.to_thread(
_run_full_pipeline, audio_path, gender, False, False, l1_language,
)
eaf_xml, _ = await asyncio.to_thread(
export_eaf,
profile=profile,
audio_path=audio_path,
speaker_id=speaker_id,
student_name=student_name,
language="en",
l1_language=l1_language if l1_language != "auto" else "bho",
session_id=session_id or None,
)
return eaf_xml
eaf_xml = await asyncio.wait_for(_do_elan(), timeout=ANALYSIS_TIMEOUT)
import re
safe_id = re.sub(r'[^a-zA-Z0-9_-]', '', speaker_id or 'export')[:50]
filename = f"annotation_{safe_id}_{int(time.time())}.eaf"
return Response(
content=eaf_xml.encode("utf-8"),
media_type="application/xml",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
except asyncio.TimeoutError:
timed_out = True
logger.error("ELAN export timed out after %ds", ANALYSIS_TIMEOUT)
raise HTTPException(504, f"ELAN export timed out after {ANALYSIS_TIMEOUT}s")
except HTTPException:
raise
except Exception as exc:
logger.exception("ELAN export failed")
raise HTTPException(500, "ELAN export failed")
finally:
if not timed_out:
audio_path.unlink(missing_ok=True)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)