pykara's picture
fix
9dbf137
import os
import time
import json
import requests
from dotenv import load_dotenv, find_dotenv
from flask import Flask, Blueprint, request, jsonify, current_app, send_from_directory
# Note: we avoid creating a Flask app at module import time
import uuid
from pathlib import Path
from typing import Iterable, Optional, Sequence, Union
from flask_cors import CORS
import requests
from TTS.api import TTS
# --- S3 (added) ---
try:
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
except Exception:
boto3 = None
NoCredentialsError = ClientError = Exception # fallbacks so type names exist
# RAG imports
try:
from .rag_backend import IngestBody, ingest_documents, ingest_pdfs_from_folder
from .rag_llm import (
LLMBody,
llm_generate,
ExplainBody,
llm_explain,
FollowupBody,
get_vectorstore,
get_vectorstore_for, # ← add this
llm_followups,
)
except ImportError:
# Fallback when running as: python ragg/app.py
from rag_backend import IngestBody, ingest_documents, ingest_pdfs_from_folder
from rag_llm import (
LLMBody,
llm_generate,
ExplainBody,
llm_explain,
FollowupBody,
get_vectorstore,
get_vectorstore_for, # ← add this
llm_followups,
)
# OpenAI client (no secret logs)
import openai
from openai import OpenAI
def xtts_speak_to_file(
text: str,
out_file: Optional[Union[str, Path]] = None,
reference_dir: Optional[Union[str, Path]] = "trim",
reference_files: Optional[Sequence[Union[str, Path]]] = None,
language: str = "en",
patterns: Iterable[str] = ("*.wav", "*.mp3", "*.flac"),
) -> Path:
"""
Generate a WAV using XTTS v2 with reference audios; caches the model.
"""
speakers: list[str] = []
if reference_files:
speakers.extend(str(Path(p)) for p in reference_files)
if (not speakers) and reference_dir:
vdir = Path(reference_dir)
for pat in patterns:
speakers.extend(str(p) for p in vdir.glob(pat))
speakers = list(dict.fromkeys(speakers))
if not speakers:
raise FileNotFoundError(
f"No reference audio files found. Checked: "
f"{reference_files or []} and/or {reference_dir}"
)
if not hasattr(xtts_speak_to_file, "_model") or xtts_speak_to_file._model is None:
import sys, builtins, torch
from torch.serialization import add_safe_globals
# --- XTTS internal classes that must be allow-listed ---
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import XttsAudioConfig, XttsArgs
from TTS.config.shared_configs import BaseDatasetConfig
# Prevent interactive prompts / stdin crashes on Hugging Face
sys.stdin = open(os.devnull)
builtins.input = lambda *a, **kw: ""
os.environ["COQUI_TOS_AGREED"] = "1"
# Allowlist all required XTTS classes for PyTorch 2.6+
add_safe_globals([XttsConfig, XttsAudioConfig, BaseDatasetConfig, XttsArgs])
# Initialize the XTTS model safely
xtts_speak_to_file._model = TTS(
model_name="tts_models/multilingual/multi-dataset/xtts_v2",
gpu=False,
progress_bar=False,
)
tts = xtts_speak_to_file._model
out_path = Path(out_file) if out_file else Path(f"xtts_{uuid.uuid4().hex}.wav")
out_path.parent.mkdir(parents=True, exist_ok=True)
try:
tts.tts_to_file(
text=text,
speaker_wav=speakers,
language=language,
file_path=str(out_path),
)
except Exception as e:
raise RuntimeError(f"XTTS synthesis failed: {e}") from e
return out_path
# ------------------------------------------------------------
# Load environment
# ------------------------------------------------------------
load_dotenv(find_dotenv())
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Optional: version log (safe), but do NOT print the API key
try:
print(f"openai package version: {openai.__version__}")
except Exception:
pass
# --- S3 config (added) ---
S3_BUCKET = os.getenv("S3_BUCKET", "").strip()
AWS_REGION = os.getenv("AWS_REGION", "ap-south-1").strip()
S3_PREFIX = os.getenv("S3_PREFIX", "audio/").strip()
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "").strip()
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "").strip()
_s3_client = None
if boto3 and S3_BUCKET and AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY:
try:
_s3_client = boto3.client(
"s3",
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
except Exception as _e:
_s3_client = None
def _upload_to_s3(file_path: Union[str, Path]) -> Optional[str]:
"""
Upload the file to S3 and return a presigned URL (24h).
If S3 is not configured, returns None (caller will fallback).
"""
if not _s3_client or not S3_BUCKET:
return None
try:
file_path = str(file_path)
key = f"{S3_PREFIX}{Path(file_path).name}"
_s3_client.upload_file(file_path, S3_BUCKET, key)
url = _s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": S3_BUCKET, "Key": key},
ExpiresIn=24 * 3600,
)
return url
except (NoCredentialsError, ClientError) as e:
try:
current_app.logger.error(f"S3 upload failed: {e}")
except Exception:
print(f"S3 upload failed: {e}")
return None
# Media and voice references
# MEDIA_ROOT = Path(os.getenv("MEDIA_ROOT", "./media"))
# AUDIO_DIR = MEDIA_ROOT / "audio"
# AUDIO_DIR.mkdir(parents=True, exist_ok=True)
# XTTS_REF_DIR = os.getenv("XTTS_REF_DIR", "./trim") # folder with your reference audios
BASE_DIR = Path(__file__).resolve().parent.parent # if app.py is top-level; if it's ragg/app.py use .parent.parent
MEDIA_ROOT = Path(os.getenv("MEDIA_ROOT", str(BASE_DIR / "media")))
AUDIO_DIR = MEDIA_ROOT / "audio"
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
XTTS_REF_DIR = os.getenv("XTTS_REF_DIR", str(BASE_DIR / "trim")) # reference voice files
# D-ID config (optional)
# ------------------------------------------------------------
# Blueprint (mounted at /rag by the main app)
# ------------------------------------------------------------
rag_bp = Blueprint("rag", __name__)
@rag_bp.route("/audio/<path:filename>", methods=["GET"])
def rag_serve_audio(filename: str):
return send_from_directory(AUDIO_DIR, filename, mimetype="audio/wav", conditional=True)
# D-ID config (set in .env / HF Secrets)
DID_API_KEY = os.getenv("DID_API_KEY", "")
DID_SOURCE_IMAGE_URL = os.getenv("DID_SOURCE_IMAGE_URL", "")
DID_VOICE_ID = os.getenv("DID_VOICE_ID", "en-US-JennyNeural")
# Default folder for /ingest-pdfs
PDF_DEFAULT_FOLDER = os.getenv("RAG_PDF_DIR", "./pdfs")
# Optional: add CORS headers (the main app should still enable CORS globally)
@rag_bp.after_app_request
def add_cors_headers(resp):
origin = request.headers.get("Origin")
# Allow local Angular during dev; main app may add more origins
if origin in ("http://localhost:4200", "http://127.0.0.1:4200"):
resp.headers["Access-Control-Allow-Origin"] = origin
resp.headers["Vary"] = "Origin"
resp.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-User"
resp.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
return resp
# ------------------------------------------------------------
# Helpers
# ------------------------------------------------------------
def user_to_db_level(username: str | None) -> str | None:
if not username:
return None
u = username.strip().lower()
if u == "lowergrade":
return "low"
if u == "midgrade":
return "mid"
if u == "highergrade":
return "high"
return None
def extract_username_from_request(req) -> str | None:
hdr = req.headers.get("X-User")
if hdr:
return hdr
data = req.get_json(silent=True) or {}
return data.get("username")
# --- D-ID helpers ---
def _did_create_talk(text: str):
if not DID_API_KEY:
return None, ("DID_API_KEY not set on the server", 500)
if not DID_SOURCE_IMAGE_URL:
return None, ("DID_SOURCE_IMAGE_URL not set on the server", 500)
payload = {
"script": {
"type": "text",
"input": text,
"provider": {"type": "microsoft", "voice_id": DID_VOICE_ID},
},
"source_url": DID_SOURCE_IMAGE_URL,
"config": {"fluent": True, "pad_audio": 0},
}
try:
r = requests.post("https://api.d-id.com/talks", json=payload, auth=(DID_API_KEY, ""))
if r.status_code not in (200, 201):
return None, (f"D-ID create error: {r.text}", 502)
talk_id = r.json().get("id")
if not talk_id:
return None, ("D-ID did not return a talk id", 502)
return talk_id, None
except Exception as e:
current_app.logger.exception("D-ID create failed: %s", e)
return None, ("D-ID create failed", 502)
def _did_poll_talk(talk_id: str, timeout_sec: int = 60, interval_sec: float = 2.0):
deadline = time.time() + timeout_sec
url = f"https://api.d-id.com/talks/{talk_id}"
try:
while time.time() < deadline:
r = requests.get(url, auth=(DID_API_KEY, ""))
if r.status_code != 200:
return None, (f"D-ID poll error: {r.text}", 502)
data = r.json()
status = data.get("status")
if status == "done":
return data.get("result_url") or data.get("result", {}).get("url"), None
if status == "error":
return None, (f"D-ID generation failed: {data.get('error')}", 502)
time.sleep(interval_sec)
return None, ("Timed out waiting for the video", 504)
except Exception as e:
current_app.logger.exception("D-ID poll failed: %s", e)
return None, ("D-ID poll failed", 502)
# ------------------------------------------------------------
# Endpoints (NOTE: no "/rag" prefix here; the blueprint adds it)
# ------------------------------------------------------------
@rag_bp.route("/ingest", methods=["POST", "OPTIONS"])
def rag_ingest():
if request.method == "OPTIONS":
return ("", 204)
body = IngestBody(**(request.json or {}))
result = ingest_documents(body)
return jsonify(result)
@rag_bp.route("/ingest-pdfs", methods=["POST", "OPTIONS"])
def rag_ingest_pdfs():
if request.method == "OPTIONS":
return ("", 204)
data = request.json or {}
folder = data.get("folder", PDF_DEFAULT_FOLDER)
subject = data.get("subject")
grade = data.get("grade")
chapter = data.get("chapter")
result = ingest_pdfs_from_folder(folder, subject=subject, grade=grade, chapter=chapter)
return jsonify(result)
@rag_bp.route("/generate-questions", methods=["POST", "OPTIONS"])
def rag_generate_questions():
if request.method == "OPTIONS":
return ("", 204)
data = request.json or {}
username = extract_username_from_request(request)
mapped_level = user_to_db_level(username)
if not data.get("db_level"):
data["db_level"] = mapped_level
body = LLMBody(**data)
result = llm_generate(body)
return jsonify(result)
# @rag_bp.route("/explain-grammar", methods=["POST", "OPTIONS"])
# @rag_bp.route("/explain-grammar", methods=["POST", "OPTIONS"])
# def rag_explain_grammar():
# if request.method == "OPTIONS":
# return ("", 204)
# data = request.get_json(force=True) or {}
# # --- Extract username and db_level ---
# username = extract_username_from_request(request)
# db_level = user_to_db_level(username)
# # --- MAIN BODY (your preferred structure) ---
# body = ExplainBody(
# question=(data.get("question") or "").strip(),
# model=data.get("model", "gpt-4o-mini"),
# db_level=db_level,
# source_ids=data.get("source_ids") or []
# )
# # --- 1) Run LLM / RAG explanation ---
# result_raw = llm_explain(body)
# # --- 2) Normalize + extract answer safely ---
# result_dict = None
# answer_text = ""
# try:
# if isinstance(result_raw, dict):
# result_dict = dict(result_raw)
# elif hasattr(result_raw, "model_dump"):
# result_dict = result_raw.model_dump()
# elif hasattr(result_raw, "dict"):
# result_dict = result_raw.dict()
# elif isinstance(result_raw, str):
# result_dict = {"answer": result_raw}
# else:
# result_dict = {"answer": str(result_raw)}
# answer_text = (
# result_dict.get("answer")
# or result_dict.get("response")
# or result_dict.get("text")
# or ""
# ).strip()
# except Exception as e:
# current_app.logger.exception("Failed to normalize llm_explain result: %s", e)
# return jsonify({"error": "Internal error normalizing LLM response"}), 500
# # --- 3) Optional: synthesize TTS audio ---
# try:
# if data.get("synthesize_audio"):
# try:
# out_name = f"explain_{uuid.uuid4().hex}.wav"
# wav_path = xtts_speak_to_file(
# text=answer_text or result_dict.get("answer", ""),
# out_file=AUDIO_DIR / out_name,
# reference_dir=XTTS_REF_DIR,
# reference_files=None,
# language=data.get("language", "en"),
# )
# # Local: serve from /rag/audio/*
# if "localhost" in request.host_url or "127.0.0.1" in request.host_url:
# base = request.host_url.rstrip("/")
# result_dict["audio_url"] = f"{base}/rag/audio/{wav_path.name}"
# else:
# # Deployed: try S3 first; fallback to public SPACE_URL if set
# s3_url = _upload_to_s3(str(wav_path))
# if s3_url:
# result_dict["audio_url"] = s3_url
# else:
# base = os.getenv("SPACE_URL", "https://pykara-py-learn-backend.hf.space")
# result_dict["audio_url"] = f"{base}/rag/audio/{wav_path.name}"
# except FileNotFoundError as e:
# current_app.logger.error("XTTS reference audio missing: %s", e)
# except Exception as e:
# current_app.logger.exception("XTTS synthesis during explain-grammar failed: %s", e)
# except Exception:
# current_app.logger.exception("Unexpected error while attempting inline synthesis")
# # --- 4) Optional: synthesize video (D-ID) ---
# try:
# if data.get("synthesize_video"):
# if not DID_API_KEY or not DID_SOURCE_IMAGE_URL:
# current_app.logger.error("D-ID not configured for inline explain-grammar video synthesis")
# else:
# try:
# talk_id, err = _did_create_talk(answer_text or result_dict.get("answer", ""))
# if err:
# current_app.logger.error(
# "D-ID create error during explain-grammar: %s",
# err[0] if isinstance(err, tuple) else err,
# )
# else:
# video_url, err = _did_poll_talk(talk_id, timeout_sec=120, interval_sec=2.0)
# if err:
# current_app.logger.error(
# "D-ID poll error during explain-grammar: %s",
# err[0] if isinstance(err, tuple) else err,
# )
# else:
# if video_url:
# result_dict["video_url"] = video_url
# except Exception as e:
# current_app.logger.exception("D-ID inline synthesis failed during explain-grammar: %s", e)
# except Exception:
# current_app.logger.exception("Unexpected error while attempting inline video synthesis")
# # --- Final response ---
# return jsonify(result_dict), 200
@rag_bp.route("/explain-grammar", methods=["POST", "OPTIONS"])
def rag_explain_grammar():
if request.method == "OPTIONS":
return ("", 204)
data = request.get_json(force=True) or {}
# --- Extract username and db_level ---
username = extract_username_from_request(request)
db_level = user_to_db_level(username)
# --- MAIN BODY (your preferred structure) ---
body = ExplainBody(
question=(data.get("question") or "").strip(),
model=data.get("model", "gpt-4o-mini"),
db_level=db_level,
source_ids=data.get("source_ids") or []
)
# --- 1) Run LLM / RAG explanation ---
result_raw = llm_explain(body)
# --- 2) Normalize + extract answer safely ---
result_dict = None
answer_text = ""
try:
if isinstance(result_raw, dict):
result_dict = dict(result_raw)
elif hasattr(result_raw, "model_dump"):
result_dict = result_raw.model_dump()
elif hasattr(result_raw, "dict"):
result_dict = result_raw.dict()
elif isinstance(result_raw, str):
result_dict = {"answer": result_raw}
else:
result_dict = {"answer": str(result_raw)}
answer_text = (
result_dict.get("answer")
or result_dict.get("response")
or result_dict.get("text")
or ""
).strip()
except Exception as e:
current_app.logger.exception("Failed to normalize llm_explain result: %s", e)
return jsonify({"error": "Internal error normalizing LLM response"}), 500
# --- 3) Optional: synthesize TTS audio ---
try:
if data.get("synthesize_audio"):
try:
out_name = f"explain_{uuid.uuid4().hex}.wav"
wav_path = xtts_speak_to_file(
text=answer_text or result_dict.get("answer", ""),
out_file=AUDIO_DIR / out_name,
reference_dir=XTTS_REF_DIR,
reference_files=None,
language=data.get("language", "en"),
)
base = request.host_url.rstrip("/")
result_dict["audio_url"] = f"{base}/rag/audio/{wav_path.name}"
except FileNotFoundError as e:
current_app.logger.error("XTTS reference audio missing: %s", e)
except Exception as e:
current_app.logger.exception("XTTS synthesis during explain-grammar failed: %s", e)
except Exception:
current_app.logger.exception("Unexpected error while attempting inline synthesis")
# --- 4) Optional: synthesize video (D-ID) ---
try:
if data.get("synthesize_video"):
if not DID_API_KEY or not DID_SOURCE_IMAGE_URL:
current_app.logger.error("D-ID not configured for inline explain-grammar video synthesis")
else:
try:
talk_id, err = _did_create_talk(answer_text or result_dict.get("answer", ""))
if err:
current_app.logger.error(
"D-ID create error during explain-grammar: %s",
err[0] if isinstance(err, tuple) else err,
)
else:
video_url, err = _did_poll_talk(talk_id, timeout_sec=120, interval_sec=2.0)
if err:
current_app.logger.error(
"D-ID poll error during explain-grammar: %s",
err[0] if isinstance(err, tuple) else err,
)
else:
if video_url:
result_dict["video_url"] = video_url
except Exception as e:
current_app.logger.exception("D-ID inline synthesis failed during explain-grammar: %s", e)
except Exception:
current_app.logger.exception("Unexpected error while attempting inline video synthesis")
# --- Final response ---
return jsonify(result_dict), 200
# @rag_bp.route("/suggest-followups", methods=["POST", "OPTIONS"])
@rag_bp.route("/suggest-followups", methods=["POST", "OPTIONS"])
def rag_suggest_followups():
if request.method == "OPTIONS":
return ("", 204)
data = request.get_json(force=True) or {}
username = extract_username_from_request(request)
db_level = user_to_db_level(username)
body = FollowupBody(
last_question=(data.get("last_question") or "").strip(),
last_answer=(data.get("last_answer") or "").strip(),
n=int(data.get("n", 5)),
model=data.get("model", "gpt-4o-mini"),
db_level=db_level,
source_ids=data.get("source_ids") or [] # ← same addition here
)
result = llm_followups(body)
return jsonify(result)
# @rag_bp.get("/_diag")
@rag_bp.get("/_diag")
def rag_diag():
try:
from .rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for
except ImportError:
from rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for
import os
from flask import jsonify
def _count(vs):
"""Handle both LangChain and chromadb client objects."""
if vs is None:
return None
# 1️⃣ chromadb.Collection (your new get_vectorstore_for)
if hasattr(vs, "count") and callable(vs.count):
try:
return vs.count()
except Exception:
return None
# 2️⃣ LangChain vectorstore
if hasattr(vs, "_collection"):
try:
return vs._collection.count() # type: ignore
except Exception:
try:
return vs._client.get_collection(vs._collection.name).count() # type: ignore
except Exception:
return None
return None
# load each level safely
low_vs = get_vectorstore_for("low")
mid_vs = get_vectorstore_for("mid")
high_vs = get_vectorstore_for("high")
info = {
"env_seen": {
"CHROMA_DIR": CHROMA_DIR,
"CHROMA_ROOT": CHROMA_ROOT
},
"low_dir": {
"path": os.path.join(CHROMA_ROOT, "low"),
"exists": os.path.isdir(os.path.join(CHROMA_ROOT, "low")),
},
"counts_default": _count(get_vectorstore()),
"counts_low": _count(low_vs),
"counts_mid": _count(mid_vs),
"counts_high": _count(high_vs),
}
return jsonify(info), 200
# def rag_diag():
# # minimal imports here to avoid circulars
# try:
# from .rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for
# except ImportError:
# from rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for
#
# import os
# from flask import jsonify
#
# def _count(vs):
# try:
# return vs._collection.count()
# except Exception:
# try:
# return vs._client.get_collection(vs._collection.name).count()
# except Exception:
# return None
#
# info = {
# "env_seen": {"CHROMA_DIR": CHROMA_DIR, "CHROMA_ROOT": CHROMA_ROOT},
# "low_dir": {
# "path": os.path.join(CHROMA_ROOT, "low"),
# "exists": os.path.isdir(os.path.join(CHROMA_ROOT, "low")),
# },
# "counts_default": _count(get_vectorstore()),
# "counts_low": _count(get_vectorstore_for("low")),
# "counts_mid": _count(get_vectorstore_for("mid")),
# "counts_high": _count(get_vectorstore_for("high")),
# }
# return jsonify(info), 200
@rag_bp.route("/search", methods=["POST", "OPTIONS"])
def rag_search():
if request.method == "OPTIONS":
return ("", 204)
data = request.json or {}
q = (data.get("q") or "").strip()
if not q:
return jsonify({"results": []})
# derive db_level from login, unless explicitly provided
username = extract_username_from_request(request)
mapped_level = user_to_db_level(username)
db_level = data.get("db_level") or mapped_level
vs = get_vectorstore_for(db_level)
hits = vs.similarity_search_with_score(q, k=5)
out = []
for doc, dist in hits:
out.append({
"distance": float(dist),
"snippet": doc.page_content[:200],
"source_path": os.path.normpath(doc.metadata.get("source_path", "")),
"page": doc.metadata.get("page_1based"),
})
return jsonify({"results": out})
def generate_questions_from_vectorstore():
try:
vectorstore = get_vectorstore()
query_text = "important content related to grammar"
results = vectorstore.similarity_search_with_score(query_text, k=5)
print(f"Vectorstore query returned {len(results)} results")
content = "\n".join([doc.page_content for doc, _ in results])
print(f"Retrieved content: {content[:500]}...")
if not content:
return {"error": "No content retrieved from vectorstore. Please ingest PDFs first."}
prompt = f"Generate 5 important questions based on the following content: {content}"
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=150,
)
response_text = response.choices[0].message.content.strip()
print(f"Processed OpenAI response: {response_text}")
return response_text
except Exception as e:
print(f"Error during OpenAI API call: {e}")
return {"error": f"Failed to call OpenAI: {str(e)}"}
@rag_bp.route("/generate-questions-from-chroma", methods=["POST", "OPTIONS"])
def generate_questions_from_chroma():
def _generate_questions_from_vectorstore():
try:
vectorstore = get_vectorstore()
query_text = "important content related to grammar"
results = vectorstore.similarity_search_with_score(query_text, k=5)
content = "\n".join([doc.page_content for doc, _ in results])
if not content:
return {"error": "No content retrieved from vectorstore. Please ingest PDFs first."}
prompt = f"Generate 5 important questions based on the following content: {content}"
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=150,
)
return response.choices[0].message.content.strip()
except Exception as e:
return {"error": f"Failed to call OpenAI: {str(e)}"}
generated = _generate_questions_from_vectorstore()
return jsonify({"generated_questions": generated})
@rag_bp.get("/health")
def health():
return {"status": "ok"}, 200
@rag_bp.route("/synthesize-audio", methods=["POST", "OPTIONS"])
def rag_synthesize_audio():
"""
Synthesize text to WAV on demand using XTTS and return a public URL.
Body: { "text": "...", "language": "en", "reference_files": ["trim/foo.wav", ...] }
"""
if request.method == "OPTIONS":
return ("", 204)
data = request.get_json(force=True) or {}
text = (data.get("text") or "").strip()
if not text:
return jsonify({"error": "No text provided"}), 400
language = data.get("language", "en")
reference_files = data.get("reference_files") # optional list of paths
try:
out_name = f"synth_{uuid.uuid4().hex}.wav"
wav_path = xtts_speak_to_file(
text=text,
out_file=AUDIO_DIR / out_name,
reference_dir=XTTS_REF_DIR,
reference_files=reference_files,
language=language,
)
# Local: serve static file
if "localhost" in request.host_url or "127.0.0.1" in request.host_url:
base = request.host_url.rstrip("/")
audio_url = f"{base}/rag/audio/{wav_path.name}"
else:
# Deployed: try S3 first; fallback to SPACE_URL
s3_url = _upload_to_s3(str(wav_path))
if s3_url:
audio_url = s3_url
else:
base = os.getenv("SPACE_URL", "https://pykara-py-learn-backend.hf.space")
audio_url = f"{base}/rag/audio/{wav_path.name}"
return jsonify({"audio_url": audio_url, "file": wav_path.name}), 200
except Exception as e:
import traceback
print("=== XTTS DEBUG ERROR ===")
print(traceback.format_exc())
print("========================")
return jsonify({"error": "Synthesis failed", "detail": str(e)}), 500
# except FileNotFoundError as e:
# current_app.logger.error("XTTS references missing: %s", e)
# return jsonify({"error": "XTTS reference audio files not found on server"}), 500
except Exception as e:
current_app.logger.exception("XTTS synthesis error: %s", e)
return jsonify({"error": "Synthesis failed"}), 500
@rag_bp.route("/synthesize-video", methods=["POST", "OPTIONS"])
def rag_synthesize_video():
"""
Synthesize a short video on-demand using the D-ID service and return the public video URL.
Body: { "text": "..." }
"""
if request.method == "OPTIONS":
return ("", 204)
data = request.get_json(force=True) or {}
text = (data.get("text") or "").strip()
if not text:
return jsonify({"error": "No text provided"}), 400
# Quick config check
if not DID_API_KEY or not DID_SOURCE_IMAGE_URL:
current_app.logger.error("D-ID not configured (DID_API_KEY or DID_SOURCE_IMAGE_URL missing)")
return jsonify({"error": "D-ID not configured on server"}), 500
try:
# Create talk (calls D-ID /talks)
talk_id, err = _did_create_talk(text)
if err:
# _did_create_talk returns (None, (msg, status))
current_app.logger.error("D-ID create error: %s", err[0])
return jsonify({"error": err[0]}), err[1]
# Poll for result URL
video_url, err = _did_poll_talk(talk_id, timeout_sec=120, interval_sec=2.0)
if err:
current_app.logger.error("D-ID poll error: %s", err[0])
return jsonify({"error": err[0]}), err[1]
if not video_url:
current_app.logger.error("D-ID did not return a video URL for talk %s", talk_id)
return jsonify({"error": "D-ID did not return a video URL"}), 502
return jsonify({"video_url": video_url}), 200
except Exception as e:
current_app.logger.exception("Unexpected error generating D-ID video: %s", e)
return jsonify({"error": "Internal server error generating video"}), 500
# ------------------------------------------------------------
# Local runner (DEV ONLY)
# ------------------------------------------------------------
if __name__ == "__main__":
# Allow this module to run as a standalone server on port 7000 for local dev
from flask import Flask
from flask_cors import CORS
app = Flask(__name__)
# CORS for local dev (the production app sets CORS globally in verification.py)
CORS(
app,
resources={r"/rag/*": {"origins": ["http://localhost:4200", "http://127.0.0.1:4200"]}},
supports_credentials=True,
allow_headers=["Content-Type", "Authorization", "X-User"],
methods=["GET", "POST", "OPTIONS"],
)
# Ensure Chroma dir exists (use CHROMA_DIR if set)
os.makedirs(os.getenv("CHROMA_DIR", "./chroma"), exist_ok=True)
# Mount blueprint at /rag and run
app.register_blueprint(rag_bp, url_prefix="/rag")
app.run(host="0.0.0.0", port=7000, debug=True)