Spaces:
Running
Running
File size: 7,239 Bytes
857b1b2 9640624 857b1b2 a8d398e 919be0d 857b1b2 919be0d 857b1b2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | """
API routes for the transcription service.
"""
import logging
import time
from pathlib import Path
import csv
from fastapi import APIRouter, UploadFile, File, HTTPException, BackgroundTasks, Form
from fastapi.responses import FileResponse
from app.core.config import get_settings
from app.schemas.models import TranscriptionResponse, HealthResponse
from app.services.audio_processor import AudioProcessor, AudioProcessingError
from app.services.transcription import TranscriptionService, AVAILABLE_MODELS
from app.services.diarization import DiarizationService
from app.services.processor import Processor
from app.services.emo import EmotionService
logger = logging.getLogger(__name__)
settings = get_settings()
router = APIRouter()
@router.get("/api/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint."""
return HealthResponse(
status="healthy",
models_loaded=TranscriptionService.is_loaded()
and DiarizationService.is_loaded(),
device=settings.resolved_device,
)
@router.get("/api/models")
async def get_models():
"""Get available Whisper models."""
return {"models": list(AVAILABLE_MODELS.keys()), "default": settings.default_whisper_model}
@router.post("/api/transcribe", response_model=TranscriptionResponse)
async def transcribe_audio(
background_tasks: BackgroundTasks,
file: UploadFile = File(..., description="Audio file to transcribe"),
model: str = Form(default="PhoWhisper Lora Finetuned", description="Whisper model to use"),
language: str = Form(default="vi", description="Language code")
):
"""
Upload and transcribe an audio file.
Uses diarize-first workflow:
1. Diarization to identify speakers
2. Transcribe each speaker segment
3. Return combined result
4. Predict emotion segments
"""
upload_path = None
try:
# Read file content
file_content = await file.read()
# Validate
try:
AudioProcessor.validate_file(
file.filename or "audio.wav", len(file_content)
)
except AudioProcessingError as e:
raise HTTPException(status_code=400, detail=str(e))
# Save upload
upload_path = await AudioProcessor.save_upload(
file_content, file.filename or "audio.wav"
)
# Process with new workflow
logger.info(f"Processing audio with model={model}, language={language}")
result = await Processor.process_audio(
audio_path=upload_path, language=language
)
# Name output files
base_name = Path(file.filename or "audio").stem
txt_filename = f"{base_name}_output.txt"
csv_filename = f"{base_name}_output.csv"
txt_path = settings.processed_dir / txt_filename
csv_path = settings.processed_dir / csv_filename
# Write TXT
txt_path.write_text(result.txt_content, encoding="utf-8")
# Write CSV (UTF-8)
roles = result.roles or {}
with csv_path.open("w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(
f,
fieldnames=["start", "end", "speaker", "text", "emotion", "icon"],
)
writer.writeheader()
for seg in result.segments:
emotion = seg.emotion or ""
icon = (
EmotionService.meta.get(emotion, {}).get("emoji", "")
if emotion
else ""
)
writer.writerow(
{
"start": round(seg.start, 2),
"end": round(seg.end, 2),
"speaker": roles.get(seg.speaker, seg.speaker),
"text": seg.text,
"emotion": emotion,
"icon": icon,
}
)
# Schedule cleanup
background_tasks.add_task(cleanup_files, upload_path)
# Build response
segments = []
for seg in result.segments:
if seg.role == "KH":
emotion = seg.emotion
icon = seg.icon
else:
emotion = None
icon = None
segments.append(
{
"start": seg.start,
"end": seg.end,
"speaker": seg.speaker,
"role": seg.role,
"text": seg.text,
"emotion": emotion,
"icon": icon,
}
)
return TranscriptionResponse(
success=True,
segments=segments,
speaker_count=result.speaker_count,
speakers=result.speakers,
duration=result.duration,
processing_time=result.processing_time,
roles=result.roles,
emotion_timeline=[
{"time": p.time, "emotion": p.emotion, "icon": p.icon}
for p in (result.emotion_timeline or [])
],
emotion_changes=[
{
"time": c.time,
"emotion_from": c.emotion_from,
"emotion_to": c.emotion_to,
"icon_from": c.icon_from,
"icon_to": c.icon_to,
}
for c in (result.emotion_changes or [])
],
download_txt=f"/api/download/{txt_filename}",
download_csv=f"/api/download/{csv_filename}",
)
except HTTPException:
raise
except Exception as e:
logger.exception("Processing failed")
if upload_path and upload_path.exists():
background_tasks.add_task(cleanup_files, upload_path)
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
@router.get("/api/download/{filename}")
async def download_file(filename: str):
"""
Download a generated transcript file.
Supports: .txt, .srt files
"""
# Security: only allow specific extensions and no path traversal
if not filename.endswith((".txt", ".csv")) or "/" in filename or ".." in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
filepath = settings.processed_dir / filename
if not filepath.exists():
raise HTTPException(status_code=404, detail="File not found")
# Determine media type
if filename.endswith(".txt"):
media_type = "text/plain; charset=utf-8"
elif filename.endswith(".csv"):
media_type = "text/csv; charset=utf-8"
elif filename.endswith(".srt"):
media_type = "application/x-subrip"
else:
media_type = "application/octet-stream"
return FileResponse(
path=filepath,
filename=filename,
media_type=media_type,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
async def cleanup_files(*paths: Path):
"""Background task to cleanup temporary files."""
import asyncio
# Wait a bit before cleanup
await asyncio.sleep(5)
await AudioProcessor.cleanup_files(*paths)
|