|
|
import io
|
|
|
import os
|
|
|
import sys
|
|
|
import json
|
|
|
import time
|
|
|
import tempfile
|
|
|
import logging
|
|
|
from pathlib import Path
|
|
|
from typing import Optional
|
|
|
|
|
|
from fastapi import FastAPI, UploadFile, File, Form, Query, Response, HTTPException
|
|
|
from fastapi.responses import FileResponse, StreamingResponse, JSONResponse, PlainTextResponse
|
|
|
from pydub import AudioSegment
|
|
|
|
|
|
|
|
|
from ovos_tts_plugin_matxa_multispeaker_cat import MatxaCatalanTTSPlugin
|
|
|
|
|
|
|
|
|
|
|
|
from generate_tts import (
|
|
|
parse_srt_ad_only,
|
|
|
mix_segments_on_timeline,
|
|
|
build_ad_track_from_srt,
|
|
|
ffmpeg_extract_audio_mp4_to_mp3,
|
|
|
mix_two_audios_simultaneous,
|
|
|
ffmpeg_mux_video_with_audio,
|
|
|
)
|
|
|
|
|
|
APP_STARTED_AT = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
logger = logging.getLogger("uvicorn")
|
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
app = FastAPI(title="Veureu TTS (Matxa)")
|
|
|
|
|
|
|
|
|
_TTS = None
|
|
|
def get_tts():
|
|
|
global _TTS
|
|
|
if _TTS is None:
|
|
|
|
|
|
project_root = Path(__file__).parent
|
|
|
config = {
|
|
|
"mel_model_path": str(project_root / "matcha_multispeaker_cat_all_opset_15_10_steps.onnx"),
|
|
|
"vocos_model_path": str(project_root / "mel_spec_22khz_cat.onnx"),
|
|
|
"vocoder_config_path": str(project_root / "config.yaml")
|
|
|
}
|
|
|
|
|
|
_TTS = MatxaCatalanTTSPlugin(config=config)
|
|
|
return _TTS
|
|
|
|
|
|
|
|
|
|
|
|
def _export_bytes(path: Path) -> bytes:
|
|
|
with open(path, "rb") as f:
|
|
|
return f.read()
|
|
|
|
|
|
def _ensure_fmt(fmt: str) -> str:
|
|
|
fmt = (fmt or "mp3").lower()
|
|
|
if fmt not in {"mp3", "wav"}:
|
|
|
raise HTTPException(400, detail="formato debe ser mp3 o wav")
|
|
|
return fmt
|
|
|
|
|
|
def _synthesize_text(texto: str, voice: str, fmt: str) -> tuple[bytes, str]:
|
|
|
"""
|
|
|
Devuelve (bytes, content_type) para texto→audio, con fade suave.
|
|
|
"""
|
|
|
fmt = _ensure_fmt(fmt)
|
|
|
tts = get_tts()
|
|
|
with tempfile.TemporaryDirectory(prefix="matxa_txt_") as td:
|
|
|
wav = Path(td) / "out.wav"
|
|
|
|
|
|
tts.get_tts(texto, str(wav), voice=voice)
|
|
|
|
|
|
|
|
|
au = AudioSegment.from_wav(wav)
|
|
|
au = au.fade_in(8).fade_out(8)
|
|
|
au = AudioSegment.silent(duration=60) + au + AudioSegment.silent(duration=80)
|
|
|
|
|
|
if fmt == "wav":
|
|
|
out = Path(td) / "out.wav"
|
|
|
au.export(out, format="wav")
|
|
|
return _export_bytes(out), "audio/wav"
|
|
|
else:
|
|
|
out = Path(td) / "out.mp3"
|
|
|
au.export(out, format="mp3")
|
|
|
return _export_bytes(out), "audio/mpeg"
|
|
|
|
|
|
def _zip_paths(paths: list[Path]) -> tuple[bytes, str]:
|
|
|
import zipfile
|
|
|
bio = io.BytesIO()
|
|
|
with zipfile.ZipFile(bio, "w", compression=zipfile.ZIP_DEFLATED) as z:
|
|
|
for p in paths:
|
|
|
z.write(p, arcname=p.name)
|
|
|
bio.seek(0)
|
|
|
return bio.read(), "application/zip"
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", response_class=PlainTextResponse)
|
|
|
def root(logs: Optional[str] = None):
|
|
|
return "OK"
|
|
|
|
|
|
@app.get("/health", response_class=PlainTextResponse)
|
|
|
@app.get("/healthz", response_class=PlainTextResponse)
|
|
|
def health():
|
|
|
return "ok"
|
|
|
|
|
|
@app.get("/version")
|
|
|
def version():
|
|
|
return {"python": sys.version, "app_started_at": APP_STARTED_AT}
|
|
|
|
|
|
@app.get("/info")
|
|
|
def info():
|
|
|
return {
|
|
|
"ffmpeg": os.popen("ffmpeg -version | head -n 1").read().strip(),
|
|
|
"espeak_ng": os.popen("espeak-ng --version 2>/dev/null | head -n 1").read().strip(),
|
|
|
"cwd": str(Path.cwd()),
|
|
|
}
|
|
|
|
|
|
@app.get("/diag/ffmpeg", response_class=PlainTextResponse)
|
|
|
def diag_ffmpeg():
|
|
|
return os.popen("ffmpeg -version | head -n 3").read()
|
|
|
|
|
|
@app.get("/diag/espeak", response_class=PlainTextResponse)
|
|
|
def diag_espeak():
|
|
|
return os.popen("whereis espeak-ng").read() or "no whereis output"
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/tts")
|
|
|
def tts_get(
|
|
|
texto: str = Query(..., description="Texto a sintetizar"),
|
|
|
formato: str = Query("mp3", description="mp3 | wav"),
|
|
|
voice: str = Query("central/grau")
|
|
|
):
|
|
|
data, ctype = _synthesize_text(texto, voice, formato)
|
|
|
filename = f"tts.{formato.lower()}"
|
|
|
headers = {"Content-Disposition": f'inline; filename="{filename}"'}
|
|
|
return Response(content=data, media_type=ctype, headers=headers)
|
|
|
|
|
|
@app.post("/tts/text")
|
|
|
def tts_post(
|
|
|
texto: str = Form(...),
|
|
|
formato: str = Form("mp3"),
|
|
|
voice: str = Form("central/grau")
|
|
|
):
|
|
|
data, ctype = _synthesize_text(texto, voice, formato)
|
|
|
filename = f"tts.{formato.lower()}"
|
|
|
headers = {"Content-Disposition": f'inline; filename="{filename}"'}
|
|
|
return Response(content=data, media_type=ctype, headers=headers)
|
|
|
|
|
|
|
|
|
@app.post("/tts/text_long")
|
|
|
def tts_from_long_text(
|
|
|
texto: str = Form(...),
|
|
|
voice: str = Form("central/grau"),
|
|
|
formato: str = Form("mp3")
|
|
|
):
|
|
|
"""
|
|
|
Recibe un texto largo, lo divide en fragmentos, genera el audio para cada uno
|
|
|
y los une en un único fichero de audio que se devuelve directamente.
|
|
|
"""
|
|
|
fmt = _ensure_fmt(formato)
|
|
|
tts = get_tts()
|
|
|
|
|
|
|
|
|
chunks = []
|
|
|
if len(texto) > 490:
|
|
|
logger.info(f"Texto largo detectado ({len(texto)} chars). Dividiendo en fragmentos...")
|
|
|
current_chunk = ""
|
|
|
for sentence in texto.split('. '):
|
|
|
if len(current_chunk) + len(sentence) + 1 < 490:
|
|
|
current_chunk += sentence + '. '
|
|
|
else:
|
|
|
if current_chunk:
|
|
|
chunks.append(current_chunk.strip())
|
|
|
current_chunk = sentence + '. '
|
|
|
if current_chunk:
|
|
|
chunks.append(current_chunk.strip())
|
|
|
else:
|
|
|
chunks.append(texto)
|
|
|
|
|
|
if not chunks:
|
|
|
raise HTTPException(status_code=400, detail="El texto no pudo ser procesado en fragmentos.")
|
|
|
|
|
|
|
|
|
audio_segments = []
|
|
|
with tempfile.TemporaryDirectory(prefix="matxa_long_txt_") as td:
|
|
|
for i, chunk in enumerate(chunks):
|
|
|
chunk = chunk.strip()
|
|
|
if not chunk:
|
|
|
continue
|
|
|
|
|
|
wav_path = Path(td) / f"chunk_{i}.wav"
|
|
|
try:
|
|
|
tts.get_tts(chunk, str(wav_path), voice=voice)
|
|
|
segment = AudioSegment.from_wav(wav_path)
|
|
|
segment = segment.fade_in(8).fade_out(8)
|
|
|
segment = AudioSegment.silent(duration=60) + segment + AudioSegment.silent(duration=80)
|
|
|
audio_segments.append(segment)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error al generar audio para el fragmento {i}: {e}")
|
|
|
raise HTTPException(status_code=500, detail=f"Error en la síntesis del fragmento {i}: {chunk[:50]}...")
|
|
|
|
|
|
if not audio_segments:
|
|
|
raise HTTPException(status_code=500, detail="No se pudo generar ningún fragmento de audio.")
|
|
|
|
|
|
|
|
|
final_audio = sum(audio_segments, AudioSegment.empty())
|
|
|
buffer = io.BytesIO()
|
|
|
final_audio.export(buffer, format=fmt)
|
|
|
buffer.seek(0)
|
|
|
|
|
|
content_type = "audio/wav" if fmt == "wav" else "audio/mpeg"
|
|
|
filename = f"tts_long.{fmt}"
|
|
|
headers = {"Content-Disposition": f'inline; filename="{filename}"'}
|
|
|
return Response(content=buffer.read(), media_type=content_type, headers=headers)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/tts/srt_ad_audio")
|
|
|
def tts_ad_audio_from_srt(
|
|
|
srt: UploadFile = File(..., description="Archivo .srt con líneas (AD): ..."),
|
|
|
voice: str = Form("central/grau"),
|
|
|
ad_format: str = Form("mp3"),
|
|
|
):
|
|
|
"""Genera solo la pista de audiodescripción (une_ad.mp3/une_ad.wav) a partir de un SRT."""
|
|
|
ad_format = _ensure_fmt(ad_format)
|
|
|
|
|
|
with tempfile.TemporaryDirectory(prefix="matxa_srt_ad_only_") as td:
|
|
|
td = Path(td)
|
|
|
srt_path = td / "input.srt"
|
|
|
srt_bytes = srt.file.read()
|
|
|
srt_path.write_bytes(srt_bytes)
|
|
|
|
|
|
ad_out = td / f"une_ad.{ad_format}"
|
|
|
_ = build_ad_track_from_srt(str(srt_path), output_path=str(ad_out), voice=voice)
|
|
|
|
|
|
data = ad_out.read_bytes()
|
|
|
content_type = "audio/wav" if ad_format == "wav" else "audio/mpeg"
|
|
|
filename = f"une_ad.{ad_format}"
|
|
|
headers = {"Content-Disposition": f'inline; filename="{filename}"'}
|
|
|
return Response(content=data, media_type=content_type, headers=headers)
|
|
|
|
|
|
|
|
|
@app.post("/tts/srt")
|
|
|
def tts_from_srt(
|
|
|
srt: UploadFile = File(..., description="Archivo .srt con líneas (AD): ..."),
|
|
|
|
|
|
video: UploadFile | None = File(None, description="Vídeo .mp4 para extraer el audio original"),
|
|
|
original_audio: UploadFile | None = File(None, description="Audio original (mp3/wav) para mezclar con la AD"),
|
|
|
|
|
|
voice: str = Form("central/grau"),
|
|
|
ad_format: str = Form("mp3"),
|
|
|
include_final_mp4: int = Form(0, description="1 para devolver también un MP4 remux con la mezcla"),
|
|
|
):
|
|
|
"""
|
|
|
Devuelve un ZIP con:
|
|
|
- ad_master.(mp3|wav)
|
|
|
- mix_original_plus_ad.mp3 (si video u original_audio)
|
|
|
- video_con_ad.mp4 (si include_final_mp4=1 y video)
|
|
|
"""
|
|
|
ad_format = _ensure_fmt(ad_format)
|
|
|
|
|
|
with tempfile.TemporaryDirectory(prefix="matxa_srt_") as td:
|
|
|
td = Path(td)
|
|
|
|
|
|
srt_path = td / "input.srt"
|
|
|
srt_bytes = srt.file.read()
|
|
|
srt_path.write_bytes(srt_bytes)
|
|
|
|
|
|
|
|
|
ad_out = td / f"ad_master.{ad_format}"
|
|
|
|
|
|
|
|
|
get_tts()
|
|
|
|
|
|
_ = build_ad_track_from_srt(str(srt_path), output_path=str(ad_out), voice=voice)
|
|
|
out_paths = [ad_out]
|
|
|
|
|
|
|
|
|
mix_path = None
|
|
|
ori_path = None
|
|
|
if original_audio is not None:
|
|
|
|
|
|
ext = Path(original_audio.filename or "").suffix.lower()
|
|
|
if ext not in {".mp3", ".wav"}:
|
|
|
raise HTTPException(400, detail="original_audio debe ser .mp3 o .wav")
|
|
|
ori_path = td / f"original{ext}"
|
|
|
ori_path.write_bytes(original_audio.file.read())
|
|
|
elif video is not None:
|
|
|
if not (video.filename or "").lower().endswith(".mp4"):
|
|
|
raise HTTPException(400, detail="video debe ser .mp4")
|
|
|
vid_path = td / "video.mp4"
|
|
|
vid_path.write_bytes(video.file.read())
|
|
|
|
|
|
ori_path = td / "original.mp3"
|
|
|
ffmpeg_extract_audio_mp4_to_mp3(str(vid_path), str(ori_path))
|
|
|
|
|
|
if ori_path is not None:
|
|
|
|
|
|
if ad_out.suffix.lower() == ".wav":
|
|
|
ad_mp3 = td / "ad_master.mp3"
|
|
|
AudioSegment.from_wav(ad_out).export(ad_mp3, format="mp3")
|
|
|
else:
|
|
|
ad_mp3 = ad_out
|
|
|
mix_path = td / "mix_original_plus_ad.mp3"
|
|
|
mix_two_audios_simultaneous(str(ori_path), str(ad_mp3), str(mix_path))
|
|
|
out_paths.append(mix_path)
|
|
|
|
|
|
|
|
|
if include_final_mp4 and video is not None and mix_path is not None:
|
|
|
vid_path = td / "video.mp4"
|
|
|
final_mp4 = td / "video_con_ad.mp4"
|
|
|
ffmpeg_mux_video_with_audio(str(vid_path), str(mix_path), str(final_mp4))
|
|
|
out_paths.append(final_mp4)
|
|
|
|
|
|
|
|
|
zip_bytes, zip_ctype = _zip_paths(out_paths)
|
|
|
headers = {"Content-Disposition": 'attachment; filename="tts_ad_assets.zip"'}
|
|
|
return Response(content=zip_bytes, media_type=zip_ctype, headers=headers)
|
|
|
|