import os import time import json import requests from dotenv import load_dotenv, find_dotenv from flask import Flask, Blueprint, request, jsonify, current_app, send_from_directory # Note: we avoid creating a Flask app at module import time import uuid from pathlib import Path from typing import Iterable, Optional, Sequence, Union from flask_cors import CORS import requests from TTS.api import TTS # --- S3 (added) --- try: import boto3 from botocore.exceptions import NoCredentialsError, ClientError except Exception: boto3 = None NoCredentialsError = ClientError = Exception # fallbacks so type names exist # RAG imports try: from .rag_backend import IngestBody, ingest_documents, ingest_pdfs_from_folder from .rag_llm import ( LLMBody, llm_generate, ExplainBody, llm_explain, FollowupBody, get_vectorstore, get_vectorstore_for, # ← add this llm_followups, ) except ImportError: # Fallback when running as: python ragg/app.py from rag_backend import IngestBody, ingest_documents, ingest_pdfs_from_folder from rag_llm import ( LLMBody, llm_generate, ExplainBody, llm_explain, FollowupBody, get_vectorstore, get_vectorstore_for, # ← add this llm_followups, ) # OpenAI client (no secret logs) import openai from openai import OpenAI def xtts_speak_to_file( text: str, out_file: Optional[Union[str, Path]] = None, reference_dir: Optional[Union[str, Path]] = "trim", reference_files: Optional[Sequence[Union[str, Path]]] = None, language: str = "en", patterns: Iterable[str] = ("*.wav", "*.mp3", "*.flac"), ) -> Path: """ Generate a WAV using XTTS v2 with reference audios; caches the model. """ speakers: list[str] = [] if reference_files: speakers.extend(str(Path(p)) for p in reference_files) if (not speakers) and reference_dir: vdir = Path(reference_dir) for pat in patterns: speakers.extend(str(p) for p in vdir.glob(pat)) speakers = list(dict.fromkeys(speakers)) if not speakers: raise FileNotFoundError( f"No reference audio files found. Checked: " f"{reference_files or []} and/or {reference_dir}" ) if not hasattr(xtts_speak_to_file, "_model") or xtts_speak_to_file._model is None: import sys, builtins, torch from torch.serialization import add_safe_globals # --- XTTS internal classes that must be allow-listed --- from TTS.tts.configs.xtts_config import XttsConfig from TTS.tts.models.xtts import XttsAudioConfig, XttsArgs from TTS.config.shared_configs import BaseDatasetConfig # Prevent interactive prompts / stdin crashes on Hugging Face sys.stdin = open(os.devnull) builtins.input = lambda *a, **kw: "" os.environ["COQUI_TOS_AGREED"] = "1" # Allowlist all required XTTS classes for PyTorch 2.6+ add_safe_globals([XttsConfig, XttsAudioConfig, BaseDatasetConfig, XttsArgs]) # Initialize the XTTS model safely xtts_speak_to_file._model = TTS( model_name="tts_models/multilingual/multi-dataset/xtts_v2", gpu=False, progress_bar=False, ) tts = xtts_speak_to_file._model out_path = Path(out_file) if out_file else Path(f"xtts_{uuid.uuid4().hex}.wav") out_path.parent.mkdir(parents=True, exist_ok=True) try: tts.tts_to_file( text=text, speaker_wav=speakers, language=language, file_path=str(out_path), ) except Exception as e: raise RuntimeError(f"XTTS synthesis failed: {e}") from e return out_path # ------------------------------------------------------------ # Load environment # ------------------------------------------------------------ load_dotenv(find_dotenv()) openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Optional: version log (safe), but do NOT print the API key try: print(f"openai package version: {openai.__version__}") except Exception: pass # --- S3 config (added) --- S3_BUCKET = os.getenv("S3_BUCKET", "").strip() AWS_REGION = os.getenv("AWS_REGION", "ap-south-1").strip() S3_PREFIX = os.getenv("S3_PREFIX", "audio/").strip() AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "").strip() AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "").strip() _s3_client = None if boto3 and S3_BUCKET and AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY: try: _s3_client = boto3.client( "s3", region_name=AWS_REGION, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) except Exception as _e: _s3_client = None def _upload_to_s3(file_path: Union[str, Path]) -> Optional[str]: """ Upload the file to S3 and return a presigned URL (24h). If S3 is not configured, returns None (caller will fallback). """ if not _s3_client or not S3_BUCKET: return None try: file_path = str(file_path) key = f"{S3_PREFIX}{Path(file_path).name}" _s3_client.upload_file(file_path, S3_BUCKET, key) url = _s3_client.generate_presigned_url( "get_object", Params={"Bucket": S3_BUCKET, "Key": key}, ExpiresIn=24 * 3600, ) return url except (NoCredentialsError, ClientError) as e: try: current_app.logger.error(f"S3 upload failed: {e}") except Exception: print(f"S3 upload failed: {e}") return None # Media and voice references # MEDIA_ROOT = Path(os.getenv("MEDIA_ROOT", "./media")) # AUDIO_DIR = MEDIA_ROOT / "audio" # AUDIO_DIR.mkdir(parents=True, exist_ok=True) # XTTS_REF_DIR = os.getenv("XTTS_REF_DIR", "./trim") # folder with your reference audios BASE_DIR = Path(__file__).resolve().parent.parent # if app.py is top-level; if it's ragg/app.py use .parent.parent MEDIA_ROOT = Path(os.getenv("MEDIA_ROOT", str(BASE_DIR / "media"))) AUDIO_DIR = MEDIA_ROOT / "audio" AUDIO_DIR.mkdir(parents=True, exist_ok=True) XTTS_REF_DIR = os.getenv("XTTS_REF_DIR", str(BASE_DIR / "trim")) # reference voice files # D-ID config (optional) # ------------------------------------------------------------ # Blueprint (mounted at /rag by the main app) # ------------------------------------------------------------ rag_bp = Blueprint("rag", __name__) @rag_bp.route("/audio/", methods=["GET"]) def rag_serve_audio(filename: str): return send_from_directory(AUDIO_DIR, filename, mimetype="audio/wav", conditional=True) # D-ID config (set in .env / HF Secrets) DID_API_KEY = os.getenv("DID_API_KEY", "") DID_SOURCE_IMAGE_URL = os.getenv("DID_SOURCE_IMAGE_URL", "") DID_VOICE_ID = os.getenv("DID_VOICE_ID", "en-US-JennyNeural") # Default folder for /ingest-pdfs PDF_DEFAULT_FOLDER = os.getenv("RAG_PDF_DIR", "./pdfs") # Optional: add CORS headers (the main app should still enable CORS globally) @rag_bp.after_app_request def add_cors_headers(resp): origin = request.headers.get("Origin") # Allow local Angular during dev; main app may add more origins if origin in ("http://localhost:4200", "http://127.0.0.1:4200"): resp.headers["Access-Control-Allow-Origin"] = origin resp.headers["Vary"] = "Origin" resp.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-User" resp.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" return resp # ------------------------------------------------------------ # Helpers # ------------------------------------------------------------ def user_to_db_level(username: str | None) -> str | None: if not username: return None u = username.strip().lower() if u == "lowergrade": return "low" if u == "midgrade": return "mid" if u == "highergrade": return "high" return None def extract_username_from_request(req) -> str | None: hdr = req.headers.get("X-User") if hdr: return hdr data = req.get_json(silent=True) or {} return data.get("username") # --- D-ID helpers --- def _did_create_talk(text: str): if not DID_API_KEY: return None, ("DID_API_KEY not set on the server", 500) if not DID_SOURCE_IMAGE_URL: return None, ("DID_SOURCE_IMAGE_URL not set on the server", 500) payload = { "script": { "type": "text", "input": text, "provider": {"type": "microsoft", "voice_id": DID_VOICE_ID}, }, "source_url": DID_SOURCE_IMAGE_URL, "config": {"fluent": True, "pad_audio": 0}, } try: r = requests.post("https://api.d-id.com/talks", json=payload, auth=(DID_API_KEY, "")) if r.status_code not in (200, 201): return None, (f"D-ID create error: {r.text}", 502) talk_id = r.json().get("id") if not talk_id: return None, ("D-ID did not return a talk id", 502) return talk_id, None except Exception as e: current_app.logger.exception("D-ID create failed: %s", e) return None, ("D-ID create failed", 502) def _did_poll_talk(talk_id: str, timeout_sec: int = 60, interval_sec: float = 2.0): deadline = time.time() + timeout_sec url = f"https://api.d-id.com/talks/{talk_id}" try: while time.time() < deadline: r = requests.get(url, auth=(DID_API_KEY, "")) if r.status_code != 200: return None, (f"D-ID poll error: {r.text}", 502) data = r.json() status = data.get("status") if status == "done": return data.get("result_url") or data.get("result", {}).get("url"), None if status == "error": return None, (f"D-ID generation failed: {data.get('error')}", 502) time.sleep(interval_sec) return None, ("Timed out waiting for the video", 504) except Exception as e: current_app.logger.exception("D-ID poll failed: %s", e) return None, ("D-ID poll failed", 502) # ------------------------------------------------------------ # Endpoints (NOTE: no "/rag" prefix here; the blueprint adds it) # ------------------------------------------------------------ @rag_bp.route("/ingest", methods=["POST", "OPTIONS"]) def rag_ingest(): if request.method == "OPTIONS": return ("", 204) body = IngestBody(**(request.json or {})) result = ingest_documents(body) return jsonify(result) @rag_bp.route("/ingest-pdfs", methods=["POST", "OPTIONS"]) def rag_ingest_pdfs(): if request.method == "OPTIONS": return ("", 204) data = request.json or {} folder = data.get("folder", PDF_DEFAULT_FOLDER) subject = data.get("subject") grade = data.get("grade") chapter = data.get("chapter") result = ingest_pdfs_from_folder(folder, subject=subject, grade=grade, chapter=chapter) return jsonify(result) @rag_bp.route("/generate-questions", methods=["POST", "OPTIONS"]) def rag_generate_questions(): if request.method == "OPTIONS": return ("", 204) data = request.json or {} username = extract_username_from_request(request) mapped_level = user_to_db_level(username) if not data.get("db_level"): data["db_level"] = mapped_level body = LLMBody(**data) result = llm_generate(body) return jsonify(result) # @rag_bp.route("/explain-grammar", methods=["POST", "OPTIONS"]) # @rag_bp.route("/explain-grammar", methods=["POST", "OPTIONS"]) # def rag_explain_grammar(): # if request.method == "OPTIONS": # return ("", 204) # data = request.get_json(force=True) or {} # # --- Extract username and db_level --- # username = extract_username_from_request(request) # db_level = user_to_db_level(username) # # --- MAIN BODY (your preferred structure) --- # body = ExplainBody( # question=(data.get("question") or "").strip(), # model=data.get("model", "gpt-4o-mini"), # db_level=db_level, # source_ids=data.get("source_ids") or [] # ) # # --- 1) Run LLM / RAG explanation --- # result_raw = llm_explain(body) # # --- 2) Normalize + extract answer safely --- # result_dict = None # answer_text = "" # try: # if isinstance(result_raw, dict): # result_dict = dict(result_raw) # elif hasattr(result_raw, "model_dump"): # result_dict = result_raw.model_dump() # elif hasattr(result_raw, "dict"): # result_dict = result_raw.dict() # elif isinstance(result_raw, str): # result_dict = {"answer": result_raw} # else: # result_dict = {"answer": str(result_raw)} # answer_text = ( # result_dict.get("answer") # or result_dict.get("response") # or result_dict.get("text") # or "" # ).strip() # except Exception as e: # current_app.logger.exception("Failed to normalize llm_explain result: %s", e) # return jsonify({"error": "Internal error normalizing LLM response"}), 500 # # --- 3) Optional: synthesize TTS audio --- # try: # if data.get("synthesize_audio"): # try: # out_name = f"explain_{uuid.uuid4().hex}.wav" # wav_path = xtts_speak_to_file( # text=answer_text or result_dict.get("answer", ""), # out_file=AUDIO_DIR / out_name, # reference_dir=XTTS_REF_DIR, # reference_files=None, # language=data.get("language", "en"), # ) # # Local: serve from /rag/audio/* # if "localhost" in request.host_url or "127.0.0.1" in request.host_url: # base = request.host_url.rstrip("/") # result_dict["audio_url"] = f"{base}/rag/audio/{wav_path.name}" # else: # # Deployed: try S3 first; fallback to public SPACE_URL if set # s3_url = _upload_to_s3(str(wav_path)) # if s3_url: # result_dict["audio_url"] = s3_url # else: # base = os.getenv("SPACE_URL", "https://pykara-py-learn-backend.hf.space") # result_dict["audio_url"] = f"{base}/rag/audio/{wav_path.name}" # except FileNotFoundError as e: # current_app.logger.error("XTTS reference audio missing: %s", e) # except Exception as e: # current_app.logger.exception("XTTS synthesis during explain-grammar failed: %s", e) # except Exception: # current_app.logger.exception("Unexpected error while attempting inline synthesis") # # --- 4) Optional: synthesize video (D-ID) --- # try: # if data.get("synthesize_video"): # if not DID_API_KEY or not DID_SOURCE_IMAGE_URL: # current_app.logger.error("D-ID not configured for inline explain-grammar video synthesis") # else: # try: # talk_id, err = _did_create_talk(answer_text or result_dict.get("answer", "")) # if err: # current_app.logger.error( # "D-ID create error during explain-grammar: %s", # err[0] if isinstance(err, tuple) else err, # ) # else: # video_url, err = _did_poll_talk(talk_id, timeout_sec=120, interval_sec=2.0) # if err: # current_app.logger.error( # "D-ID poll error during explain-grammar: %s", # err[0] if isinstance(err, tuple) else err, # ) # else: # if video_url: # result_dict["video_url"] = video_url # except Exception as e: # current_app.logger.exception("D-ID inline synthesis failed during explain-grammar: %s", e) # except Exception: # current_app.logger.exception("Unexpected error while attempting inline video synthesis") # # --- Final response --- # return jsonify(result_dict), 200 @rag_bp.route("/explain-grammar", methods=["POST", "OPTIONS"]) def rag_explain_grammar(): if request.method == "OPTIONS": return ("", 204) data = request.get_json(force=True) or {} # --- Extract username and db_level --- username = extract_username_from_request(request) db_level = user_to_db_level(username) # --- MAIN BODY (your preferred structure) --- body = ExplainBody( question=(data.get("question") or "").strip(), model=data.get("model", "gpt-4o-mini"), db_level=db_level, source_ids=data.get("source_ids") or [] ) # --- 1) Run LLM / RAG explanation --- result_raw = llm_explain(body) # --- 2) Normalize + extract answer safely --- result_dict = None answer_text = "" try: if isinstance(result_raw, dict): result_dict = dict(result_raw) elif hasattr(result_raw, "model_dump"): result_dict = result_raw.model_dump() elif hasattr(result_raw, "dict"): result_dict = result_raw.dict() elif isinstance(result_raw, str): result_dict = {"answer": result_raw} else: result_dict = {"answer": str(result_raw)} answer_text = ( result_dict.get("answer") or result_dict.get("response") or result_dict.get("text") or "" ).strip() except Exception as e: current_app.logger.exception("Failed to normalize llm_explain result: %s", e) return jsonify({"error": "Internal error normalizing LLM response"}), 500 # --- 3) Optional: synthesize TTS audio --- try: if data.get("synthesize_audio"): try: out_name = f"explain_{uuid.uuid4().hex}.wav" wav_path = xtts_speak_to_file( text=answer_text or result_dict.get("answer", ""), out_file=AUDIO_DIR / out_name, reference_dir=XTTS_REF_DIR, reference_files=None, language=data.get("language", "en"), ) base = request.host_url.rstrip("/") result_dict["audio_url"] = f"{base}/rag/audio/{wav_path.name}" except FileNotFoundError as e: current_app.logger.error("XTTS reference audio missing: %s", e) except Exception as e: current_app.logger.exception("XTTS synthesis during explain-grammar failed: %s", e) except Exception: current_app.logger.exception("Unexpected error while attempting inline synthesis") # --- 4) Optional: synthesize video (D-ID) --- try: if data.get("synthesize_video"): if not DID_API_KEY or not DID_SOURCE_IMAGE_URL: current_app.logger.error("D-ID not configured for inline explain-grammar video synthesis") else: try: talk_id, err = _did_create_talk(answer_text or result_dict.get("answer", "")) if err: current_app.logger.error( "D-ID create error during explain-grammar: %s", err[0] if isinstance(err, tuple) else err, ) else: video_url, err = _did_poll_talk(talk_id, timeout_sec=120, interval_sec=2.0) if err: current_app.logger.error( "D-ID poll error during explain-grammar: %s", err[0] if isinstance(err, tuple) else err, ) else: if video_url: result_dict["video_url"] = video_url except Exception as e: current_app.logger.exception("D-ID inline synthesis failed during explain-grammar: %s", e) except Exception: current_app.logger.exception("Unexpected error while attempting inline video synthesis") # --- Final response --- return jsonify(result_dict), 200 # @rag_bp.route("/suggest-followups", methods=["POST", "OPTIONS"]) @rag_bp.route("/suggest-followups", methods=["POST", "OPTIONS"]) def rag_suggest_followups(): if request.method == "OPTIONS": return ("", 204) data = request.get_json(force=True) or {} username = extract_username_from_request(request) db_level = user_to_db_level(username) body = FollowupBody( last_question=(data.get("last_question") or "").strip(), last_answer=(data.get("last_answer") or "").strip(), n=int(data.get("n", 5)), model=data.get("model", "gpt-4o-mini"), db_level=db_level, source_ids=data.get("source_ids") or [] # ← same addition here ) result = llm_followups(body) return jsonify(result) # @rag_bp.get("/_diag") @rag_bp.get("/_diag") def rag_diag(): try: from .rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for except ImportError: from rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for import os from flask import jsonify def _count(vs): """Handle both LangChain and chromadb client objects.""" if vs is None: return None # 1️⃣ chromadb.Collection (your new get_vectorstore_for) if hasattr(vs, "count") and callable(vs.count): try: return vs.count() except Exception: return None # 2️⃣ LangChain vectorstore if hasattr(vs, "_collection"): try: return vs._collection.count() # type: ignore except Exception: try: return vs._client.get_collection(vs._collection.name).count() # type: ignore except Exception: return None return None # load each level safely low_vs = get_vectorstore_for("low") mid_vs = get_vectorstore_for("mid") high_vs = get_vectorstore_for("high") info = { "env_seen": { "CHROMA_DIR": CHROMA_DIR, "CHROMA_ROOT": CHROMA_ROOT }, "low_dir": { "path": os.path.join(CHROMA_ROOT, "low"), "exists": os.path.isdir(os.path.join(CHROMA_ROOT, "low")), }, "counts_default": _count(get_vectorstore()), "counts_low": _count(low_vs), "counts_mid": _count(mid_vs), "counts_high": _count(high_vs), } return jsonify(info), 200 # def rag_diag(): # # minimal imports here to avoid circulars # try: # from .rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for # except ImportError: # from rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for # # import os # from flask import jsonify # # def _count(vs): # try: # return vs._collection.count() # except Exception: # try: # return vs._client.get_collection(vs._collection.name).count() # except Exception: # return None # # info = { # "env_seen": {"CHROMA_DIR": CHROMA_DIR, "CHROMA_ROOT": CHROMA_ROOT}, # "low_dir": { # "path": os.path.join(CHROMA_ROOT, "low"), # "exists": os.path.isdir(os.path.join(CHROMA_ROOT, "low")), # }, # "counts_default": _count(get_vectorstore()), # "counts_low": _count(get_vectorstore_for("low")), # "counts_mid": _count(get_vectorstore_for("mid")), # "counts_high": _count(get_vectorstore_for("high")), # } # return jsonify(info), 200 @rag_bp.route("/search", methods=["POST", "OPTIONS"]) def rag_search(): if request.method == "OPTIONS": return ("", 204) data = request.json or {} q = (data.get("q") or "").strip() if not q: return jsonify({"results": []}) # derive db_level from login, unless explicitly provided username = extract_username_from_request(request) mapped_level = user_to_db_level(username) db_level = data.get("db_level") or mapped_level vs = get_vectorstore_for(db_level) hits = vs.similarity_search_with_score(q, k=5) out = [] for doc, dist in hits: out.append({ "distance": float(dist), "snippet": doc.page_content[:200], "source_path": os.path.normpath(doc.metadata.get("source_path", "")), "page": doc.metadata.get("page_1based"), }) return jsonify({"results": out}) def generate_questions_from_vectorstore(): try: vectorstore = get_vectorstore() query_text = "important content related to grammar" results = vectorstore.similarity_search_with_score(query_text, k=5) print(f"Vectorstore query returned {len(results)} results") content = "\n".join([doc.page_content for doc, _ in results]) print(f"Retrieved content: {content[:500]}...") if not content: return {"error": "No content retrieved from vectorstore. Please ingest PDFs first."} prompt = f"Generate 5 important questions based on the following content: {content}" response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=150, ) response_text = response.choices[0].message.content.strip() print(f"Processed OpenAI response: {response_text}") return response_text except Exception as e: print(f"Error during OpenAI API call: {e}") return {"error": f"Failed to call OpenAI: {str(e)}"} @rag_bp.route("/generate-questions-from-chroma", methods=["POST", "OPTIONS"]) def generate_questions_from_chroma(): def _generate_questions_from_vectorstore(): try: vectorstore = get_vectorstore() query_text = "important content related to grammar" results = vectorstore.similarity_search_with_score(query_text, k=5) content = "\n".join([doc.page_content for doc, _ in results]) if not content: return {"error": "No content retrieved from vectorstore. Please ingest PDFs first."} prompt = f"Generate 5 important questions based on the following content: {content}" response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=150, ) return response.choices[0].message.content.strip() except Exception as e: return {"error": f"Failed to call OpenAI: {str(e)}"} generated = _generate_questions_from_vectorstore() return jsonify({"generated_questions": generated}) @rag_bp.get("/health") def health(): return {"status": "ok"}, 200 @rag_bp.route("/synthesize-audio", methods=["POST", "OPTIONS"]) def rag_synthesize_audio(): """ Synthesize text to WAV on demand using XTTS and return a public URL. Body: { "text": "...", "language": "en", "reference_files": ["trim/foo.wav", ...] } """ if request.method == "OPTIONS": return ("", 204) data = request.get_json(force=True) or {} text = (data.get("text") or "").strip() if not text: return jsonify({"error": "No text provided"}), 400 language = data.get("language", "en") reference_files = data.get("reference_files") # optional list of paths try: out_name = f"synth_{uuid.uuid4().hex}.wav" wav_path = xtts_speak_to_file( text=text, out_file=AUDIO_DIR / out_name, reference_dir=XTTS_REF_DIR, reference_files=reference_files, language=language, ) # Local: serve static file if "localhost" in request.host_url or "127.0.0.1" in request.host_url: base = request.host_url.rstrip("/") audio_url = f"{base}/rag/audio/{wav_path.name}" else: # Deployed: try S3 first; fallback to SPACE_URL s3_url = _upload_to_s3(str(wav_path)) if s3_url: audio_url = s3_url else: base = os.getenv("SPACE_URL", "https://pykara-py-learn-backend.hf.space") audio_url = f"{base}/rag/audio/{wav_path.name}" return jsonify({"audio_url": audio_url, "file": wav_path.name}), 200 except Exception as e: import traceback print("=== XTTS DEBUG ERROR ===") print(traceback.format_exc()) print("========================") return jsonify({"error": "Synthesis failed", "detail": str(e)}), 500 # except FileNotFoundError as e: # current_app.logger.error("XTTS references missing: %s", e) # return jsonify({"error": "XTTS reference audio files not found on server"}), 500 except Exception as e: current_app.logger.exception("XTTS synthesis error: %s", e) return jsonify({"error": "Synthesis failed"}), 500 @rag_bp.route("/synthesize-video", methods=["POST", "OPTIONS"]) def rag_synthesize_video(): """ Synthesize a short video on-demand using the D-ID service and return the public video URL. Body: { "text": "..." } """ if request.method == "OPTIONS": return ("", 204) data = request.get_json(force=True) or {} text = (data.get("text") or "").strip() if not text: return jsonify({"error": "No text provided"}), 400 # Quick config check if not DID_API_KEY or not DID_SOURCE_IMAGE_URL: current_app.logger.error("D-ID not configured (DID_API_KEY or DID_SOURCE_IMAGE_URL missing)") return jsonify({"error": "D-ID not configured on server"}), 500 try: # Create talk (calls D-ID /talks) talk_id, err = _did_create_talk(text) if err: # _did_create_talk returns (None, (msg, status)) current_app.logger.error("D-ID create error: %s", err[0]) return jsonify({"error": err[0]}), err[1] # Poll for result URL video_url, err = _did_poll_talk(talk_id, timeout_sec=120, interval_sec=2.0) if err: current_app.logger.error("D-ID poll error: %s", err[0]) return jsonify({"error": err[0]}), err[1] if not video_url: current_app.logger.error("D-ID did not return a video URL for talk %s", talk_id) return jsonify({"error": "D-ID did not return a video URL"}), 502 return jsonify({"video_url": video_url}), 200 except Exception as e: current_app.logger.exception("Unexpected error generating D-ID video: %s", e) return jsonify({"error": "Internal server error generating video"}), 500 # ------------------------------------------------------------ # Local runner (DEV ONLY) # ------------------------------------------------------------ if __name__ == "__main__": # Allow this module to run as a standalone server on port 7000 for local dev from flask import Flask from flask_cors import CORS app = Flask(__name__) # CORS for local dev (the production app sets CORS globally in verification.py) CORS( app, resources={r"/rag/*": {"origins": ["http://localhost:4200", "http://127.0.0.1:4200"]}}, supports_credentials=True, allow_headers=["Content-Type", "Authorization", "X-User"], methods=["GET", "POST", "OPTIONS"], ) # Ensure Chroma dir exists (use CHROMA_DIR if set) os.makedirs(os.getenv("CHROMA_DIR", "./chroma"), exist_ok=True) # Mount blueprint at /rag and run app.register_blueprint(rag_bp, url_prefix="/rag") app.run(host="0.0.0.0", port=7000, debug=True)