""" API routes for the transcription service. """ import logging import time from pathlib import Path import csv from fastapi import APIRouter, UploadFile, File, HTTPException, BackgroundTasks, Form from fastapi.responses import FileResponse from app.core.config import get_settings from app.schemas.models import TranscriptionResponse, HealthResponse from app.services.audio_processor import AudioProcessor, AudioProcessingError from app.services.transcription import TranscriptionService, AVAILABLE_MODELS from app.services.diarization import DiarizationService from app.services.processor import Processor from app.services.emo import EmotionService logger = logging.getLogger(__name__) settings = get_settings() router = APIRouter() @router.get("/api/health", response_model=HealthResponse) async def health_check(): """Health check endpoint.""" return HealthResponse( status="healthy", models_loaded=TranscriptionService.is_loaded() and DiarizationService.is_loaded(), device=settings.resolved_device, ) @router.get("/api/models") async def get_models(): """Get available Whisper models.""" return {"models": list(AVAILABLE_MODELS.keys()), "default": settings.default_whisper_model} @router.post("/api/transcribe", response_model=TranscriptionResponse) async def transcribe_audio( background_tasks: BackgroundTasks, file: UploadFile = File(..., description="Audio file to transcribe"), model: str = Form(default="PhoWhisper Lora Finetuned", description="Whisper model to use"), language: str = Form(default="vi", description="Language code") ): """ Upload and transcribe an audio file. Uses diarize-first workflow: 1. Diarization to identify speakers 2. Transcribe each speaker segment 3. Return combined result 4. Predict emotion segments """ upload_path = None try: # Read file content file_content = await file.read() # Validate try: AudioProcessor.validate_file( file.filename or "audio.wav", len(file_content) ) except AudioProcessingError as e: raise HTTPException(status_code=400, detail=str(e)) # Save upload upload_path = await AudioProcessor.save_upload( file_content, file.filename or "audio.wav" ) # Process with new workflow logger.info(f"Processing audio with model={model}, language={language}") result = await Processor.process_audio( audio_path=upload_path, language=language ) # Name output files base_name = Path(file.filename or "audio").stem txt_filename = f"{base_name}_output.txt" csv_filename = f"{base_name}_output.csv" txt_path = settings.processed_dir / txt_filename csv_path = settings.processed_dir / csv_filename # Write TXT txt_path.write_text(result.txt_content, encoding="utf-8") # Write CSV (UTF-8) roles = result.roles or {} with csv_path.open("w", newline="", encoding="utf-8-sig") as f: writer = csv.DictWriter( f, fieldnames=["start", "end", "speaker", "text", "emotion", "icon"], ) writer.writeheader() for seg in result.segments: emotion = seg.emotion or "" icon = ( EmotionService.meta.get(emotion, {}).get("emoji", "") if emotion else "" ) writer.writerow( { "start": round(seg.start, 2), "end": round(seg.end, 2), "speaker": roles.get(seg.speaker, seg.speaker), "text": seg.text, "emotion": emotion, "icon": icon, } ) # Schedule cleanup background_tasks.add_task(cleanup_files, upload_path) # Build response segments = [] for seg in result.segments: if seg.role == "KH": emotion = seg.emotion icon = seg.icon else: emotion = None icon = None segments.append( { "start": seg.start, "end": seg.end, "speaker": seg.speaker, "role": seg.role, "text": seg.text, "emotion": emotion, "icon": icon, } ) return TranscriptionResponse( success=True, segments=segments, speaker_count=result.speaker_count, speakers=result.speakers, duration=result.duration, processing_time=result.processing_time, roles=result.roles, emotion_timeline=[ {"time": p.time, "emotion": p.emotion, "icon": p.icon} for p in (result.emotion_timeline or []) ], emotion_changes=[ { "time": c.time, "emotion_from": c.emotion_from, "emotion_to": c.emotion_to, "icon_from": c.icon_from, "icon_to": c.icon_to, } for c in (result.emotion_changes or []) ], download_txt=f"/api/download/{txt_filename}", download_csv=f"/api/download/{csv_filename}", ) except HTTPException: raise except Exception as e: logger.exception("Processing failed") if upload_path and upload_path.exists(): background_tasks.add_task(cleanup_files, upload_path) raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") @router.get("/api/download/{filename}") async def download_file(filename: str): """ Download a generated transcript file. Supports: .txt, .srt files """ # Security: only allow specific extensions and no path traversal if not filename.endswith((".txt", ".csv")) or "/" in filename or ".." in filename: raise HTTPException(status_code=400, detail="Invalid filename") filepath = settings.processed_dir / filename if not filepath.exists(): raise HTTPException(status_code=404, detail="File not found") # Determine media type if filename.endswith(".txt"): media_type = "text/plain; charset=utf-8" elif filename.endswith(".csv"): media_type = "text/csv; charset=utf-8" elif filename.endswith(".srt"): media_type = "application/x-subrip" else: media_type = "application/octet-stream" return FileResponse( path=filepath, filename=filename, media_type=media_type, headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) async def cleanup_files(*paths: Path): """Background task to cleanup temporary files.""" import asyncio # Wait a bit before cleanup await asyncio.sleep(5) await AudioProcessor.cleanup_files(*paths)