Spaces:
Sleeping
Sleeping
| """ | |
| API routes for the transcription service. | |
| """ | |
| import logging | |
| import time | |
| from pathlib import Path | |
| import csv | |
| from fastapi import APIRouter, UploadFile, File, HTTPException, BackgroundTasks, Form | |
| from fastapi.responses import FileResponse | |
| from app.core.config import get_settings | |
| from app.schemas.models import TranscriptionResponse, HealthResponse | |
| from app.services.audio_processor import AudioProcessor, AudioProcessingError | |
| from app.services.transcription import TranscriptionService, AVAILABLE_MODELS | |
| from app.services.diarization import DiarizationService | |
| from app.services.processor import Processor | |
| from app.services.emo import EmotionService | |
| logger = logging.getLogger(__name__) | |
| settings = get_settings() | |
| router = APIRouter() | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| return HealthResponse( | |
| status="healthy", | |
| models_loaded=TranscriptionService.is_loaded() | |
| and DiarizationService.is_loaded(), | |
| device=settings.resolved_device, | |
| ) | |
| async def get_models(): | |
| """Get available Whisper models.""" | |
| return {"models": list(AVAILABLE_MODELS.keys()), "default": settings.default_whisper_model} | |
| async def transcribe_audio( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(..., description="Audio file to transcribe"), | |
| model: str = Form(default="PhoWhisper Lora Finetuned", description="Whisper model to use"), | |
| language: str = Form(default="vi", description="Language code") | |
| ): | |
| """ | |
| Upload and transcribe an audio file. | |
| Uses diarize-first workflow: | |
| 1. Diarization to identify speakers | |
| 2. Transcribe each speaker segment | |
| 3. Return combined result | |
| 4. Predict emotion segments | |
| """ | |
| upload_path = None | |
| try: | |
| # Read file content | |
| file_content = await file.read() | |
| # Validate | |
| try: | |
| AudioProcessor.validate_file( | |
| file.filename or "audio.wav", len(file_content) | |
| ) | |
| except AudioProcessingError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| # Save upload | |
| upload_path = await AudioProcessor.save_upload( | |
| file_content, file.filename or "audio.wav" | |
| ) | |
| # Process with new workflow | |
| logger.info(f"Processing audio with model={model}, language={language}") | |
| result = await Processor.process_audio( | |
| audio_path=upload_path, language=language | |
| ) | |
| # Name output files | |
| base_name = Path(file.filename or "audio").stem | |
| txt_filename = f"{base_name}_output.txt" | |
| csv_filename = f"{base_name}_output.csv" | |
| txt_path = settings.processed_dir / txt_filename | |
| csv_path = settings.processed_dir / csv_filename | |
| # Write TXT | |
| txt_path.write_text(result.txt_content, encoding="utf-8") | |
| # Write CSV (UTF-8) | |
| roles = result.roles or {} | |
| with csv_path.open("w", newline="", encoding="utf-8-sig") as f: | |
| writer = csv.DictWriter( | |
| f, | |
| fieldnames=["start", "end", "speaker", "text", "emotion", "icon"], | |
| ) | |
| writer.writeheader() | |
| for seg in result.segments: | |
| emotion = seg.emotion or "" | |
| icon = ( | |
| EmotionService.meta.get(emotion, {}).get("emoji", "") | |
| if emotion | |
| else "" | |
| ) | |
| writer.writerow( | |
| { | |
| "start": round(seg.start, 2), | |
| "end": round(seg.end, 2), | |
| "speaker": roles.get(seg.speaker, seg.speaker), | |
| "text": seg.text, | |
| "emotion": emotion, | |
| "icon": icon, | |
| } | |
| ) | |
| # Schedule cleanup | |
| background_tasks.add_task(cleanup_files, upload_path) | |
| # Build response | |
| segments = [] | |
| for seg in result.segments: | |
| if seg.role == "KH": | |
| emotion = seg.emotion | |
| icon = seg.icon | |
| else: | |
| emotion = None | |
| icon = None | |
| segments.append( | |
| { | |
| "start": seg.start, | |
| "end": seg.end, | |
| "speaker": seg.speaker, | |
| "role": seg.role, | |
| "text": seg.text, | |
| "emotion": emotion, | |
| "icon": icon, | |
| } | |
| ) | |
| return TranscriptionResponse( | |
| success=True, | |
| segments=segments, | |
| speaker_count=result.speaker_count, | |
| speakers=result.speakers, | |
| duration=result.duration, | |
| processing_time=result.processing_time, | |
| roles=result.roles, | |
| emotion_timeline=[ | |
| {"time": p.time, "emotion": p.emotion, "icon": p.icon} | |
| for p in (result.emotion_timeline or []) | |
| ], | |
| emotion_changes=[ | |
| { | |
| "time": c.time, | |
| "emotion_from": c.emotion_from, | |
| "emotion_to": c.emotion_to, | |
| "icon_from": c.icon_from, | |
| "icon_to": c.icon_to, | |
| } | |
| for c in (result.emotion_changes or []) | |
| ], | |
| download_txt=f"/api/download/{txt_filename}", | |
| download_csv=f"/api/download/{csv_filename}", | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception("Processing failed") | |
| if upload_path and upload_path.exists(): | |
| background_tasks.add_task(cleanup_files, upload_path) | |
| raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") | |
| async def download_file(filename: str): | |
| """ | |
| Download a generated transcript file. | |
| Supports: .txt, .srt files | |
| """ | |
| # Security: only allow specific extensions and no path traversal | |
| if not filename.endswith((".txt", ".csv")) or "/" in filename or ".." in filename: | |
| raise HTTPException(status_code=400, detail="Invalid filename") | |
| filepath = settings.processed_dir / filename | |
| if not filepath.exists(): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| # Determine media type | |
| if filename.endswith(".txt"): | |
| media_type = "text/plain; charset=utf-8" | |
| elif filename.endswith(".csv"): | |
| media_type = "text/csv; charset=utf-8" | |
| elif filename.endswith(".srt"): | |
| media_type = "application/x-subrip" | |
| else: | |
| media_type = "application/octet-stream" | |
| return FileResponse( | |
| path=filepath, | |
| filename=filename, | |
| media_type=media_type, | |
| headers={"Content-Disposition": f'attachment; filename="{filename}"'}, | |
| ) | |
| async def cleanup_files(*paths: Path): | |
| """Background task to cleanup temporary files.""" | |
| import asyncio | |
| # Wait a bit before cleanup | |
| await asyncio.sleep(5) | |
| await AudioProcessor.cleanup_files(*paths) | |