Spaces:
Running
Running
| import os | |
| import time | |
| import json | |
| import requests | |
| from dotenv import load_dotenv, find_dotenv | |
| from flask import Flask, Blueprint, request, jsonify, current_app, send_from_directory | |
| # Note: we avoid creating a Flask app at module import time | |
| import uuid | |
| from pathlib import Path | |
| from typing import Iterable, Optional, Sequence, Union | |
| from flask_cors import CORS | |
| import requests | |
| from TTS.api import TTS | |
| # --- S3 (added) --- | |
| try: | |
| import boto3 | |
| from botocore.exceptions import NoCredentialsError, ClientError | |
| except Exception: | |
| boto3 = None | |
| NoCredentialsError = ClientError = Exception # fallbacks so type names exist | |
| # RAG imports | |
| try: | |
| from .rag_backend import IngestBody, ingest_documents, ingest_pdfs_from_folder | |
| from .rag_llm import ( | |
| LLMBody, | |
| llm_generate, | |
| ExplainBody, | |
| llm_explain, | |
| FollowupBody, | |
| get_vectorstore, | |
| get_vectorstore_for, # ← add this | |
| llm_followups, | |
| ) | |
| except ImportError: | |
| # Fallback when running as: python ragg/app.py | |
| from rag_backend import IngestBody, ingest_documents, ingest_pdfs_from_folder | |
| from rag_llm import ( | |
| LLMBody, | |
| llm_generate, | |
| ExplainBody, | |
| llm_explain, | |
| FollowupBody, | |
| get_vectorstore, | |
| get_vectorstore_for, # ← add this | |
| llm_followups, | |
| ) | |
| # OpenAI client (no secret logs) | |
| import openai | |
| from openai import OpenAI | |
| def xtts_speak_to_file( | |
| text: str, | |
| out_file: Optional[Union[str, Path]] = None, | |
| reference_dir: Optional[Union[str, Path]] = "trim", | |
| reference_files: Optional[Sequence[Union[str, Path]]] = None, | |
| language: str = "en", | |
| patterns: Iterable[str] = ("*.wav", "*.mp3", "*.flac"), | |
| ) -> Path: | |
| """ | |
| Generate a WAV using XTTS v2 with reference audios; caches the model. | |
| """ | |
| speakers: list[str] = [] | |
| if reference_files: | |
| speakers.extend(str(Path(p)) for p in reference_files) | |
| if (not speakers) and reference_dir: | |
| vdir = Path(reference_dir) | |
| for pat in patterns: | |
| speakers.extend(str(p) for p in vdir.glob(pat)) | |
| speakers = list(dict.fromkeys(speakers)) | |
| if not speakers: | |
| raise FileNotFoundError( | |
| f"No reference audio files found. Checked: " | |
| f"{reference_files or []} and/or {reference_dir}" | |
| ) | |
| if not hasattr(xtts_speak_to_file, "_model") or xtts_speak_to_file._model is None: | |
| import sys, builtins, torch | |
| from torch.serialization import add_safe_globals | |
| # --- XTTS internal classes that must be allow-listed --- | |
| from TTS.tts.configs.xtts_config import XttsConfig | |
| from TTS.tts.models.xtts import XttsAudioConfig, XttsArgs | |
| from TTS.config.shared_configs import BaseDatasetConfig | |
| # Prevent interactive prompts / stdin crashes on Hugging Face | |
| sys.stdin = open(os.devnull) | |
| builtins.input = lambda *a, **kw: "" | |
| os.environ["COQUI_TOS_AGREED"] = "1" | |
| # Allowlist all required XTTS classes for PyTorch 2.6+ | |
| add_safe_globals([XttsConfig, XttsAudioConfig, BaseDatasetConfig, XttsArgs]) | |
| # Initialize the XTTS model safely | |
| xtts_speak_to_file._model = TTS( | |
| model_name="tts_models/multilingual/multi-dataset/xtts_v2", | |
| gpu=False, | |
| progress_bar=False, | |
| ) | |
| tts = xtts_speak_to_file._model | |
| out_path = Path(out_file) if out_file else Path(f"xtts_{uuid.uuid4().hex}.wav") | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| tts.tts_to_file( | |
| text=text, | |
| speaker_wav=speakers, | |
| language=language, | |
| file_path=str(out_path), | |
| ) | |
| except Exception as e: | |
| raise RuntimeError(f"XTTS synthesis failed: {e}") from e | |
| return out_path | |
| # ------------------------------------------------------------ | |
| # Load environment | |
| # ------------------------------------------------------------ | |
| load_dotenv(find_dotenv()) | |
| openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # Optional: version log (safe), but do NOT print the API key | |
| try: | |
| print(f"openai package version: {openai.__version__}") | |
| except Exception: | |
| pass | |
| # --- S3 config (added) --- | |
| S3_BUCKET = os.getenv("S3_BUCKET", "").strip() | |
| AWS_REGION = os.getenv("AWS_REGION", "ap-south-1").strip() | |
| S3_PREFIX = os.getenv("S3_PREFIX", "audio/").strip() | |
| AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "").strip() | |
| AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "").strip() | |
| _s3_client = None | |
| if boto3 and S3_BUCKET and AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY: | |
| try: | |
| _s3_client = boto3.client( | |
| "s3", | |
| region_name=AWS_REGION, | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| ) | |
| except Exception as _e: | |
| _s3_client = None | |
| def _upload_to_s3(file_path: Union[str, Path]) -> Optional[str]: | |
| """ | |
| Upload the file to S3 and return a presigned URL (24h). | |
| If S3 is not configured, returns None (caller will fallback). | |
| """ | |
| if not _s3_client or not S3_BUCKET: | |
| return None | |
| try: | |
| file_path = str(file_path) | |
| key = f"{S3_PREFIX}{Path(file_path).name}" | |
| _s3_client.upload_file(file_path, S3_BUCKET, key) | |
| url = _s3_client.generate_presigned_url( | |
| "get_object", | |
| Params={"Bucket": S3_BUCKET, "Key": key}, | |
| ExpiresIn=24 * 3600, | |
| ) | |
| return url | |
| except (NoCredentialsError, ClientError) as e: | |
| try: | |
| current_app.logger.error(f"S3 upload failed: {e}") | |
| except Exception: | |
| print(f"S3 upload failed: {e}") | |
| return None | |
| # Media and voice references | |
| # MEDIA_ROOT = Path(os.getenv("MEDIA_ROOT", "./media")) | |
| # AUDIO_DIR = MEDIA_ROOT / "audio" | |
| # AUDIO_DIR.mkdir(parents=True, exist_ok=True) | |
| # XTTS_REF_DIR = os.getenv("XTTS_REF_DIR", "./trim") # folder with your reference audios | |
| BASE_DIR = Path(__file__).resolve().parent.parent # if app.py is top-level; if it's ragg/app.py use .parent.parent | |
| MEDIA_ROOT = Path(os.getenv("MEDIA_ROOT", str(BASE_DIR / "media"))) | |
| AUDIO_DIR = MEDIA_ROOT / "audio" | |
| AUDIO_DIR.mkdir(parents=True, exist_ok=True) | |
| XTTS_REF_DIR = os.getenv("XTTS_REF_DIR", str(BASE_DIR / "trim")) # reference voice files | |
| # D-ID config (optional) | |
| # ------------------------------------------------------------ | |
| # Blueprint (mounted at /rag by the main app) | |
| # ------------------------------------------------------------ | |
| rag_bp = Blueprint("rag", __name__) | |
| def rag_serve_audio(filename: str): | |
| return send_from_directory(AUDIO_DIR, filename, mimetype="audio/wav", conditional=True) | |
| # D-ID config (set in .env / HF Secrets) | |
| DID_API_KEY = os.getenv("DID_API_KEY", "") | |
| DID_SOURCE_IMAGE_URL = os.getenv("DID_SOURCE_IMAGE_URL", "") | |
| DID_VOICE_ID = os.getenv("DID_VOICE_ID", "en-US-JennyNeural") | |
| # Default folder for /ingest-pdfs | |
| PDF_DEFAULT_FOLDER = os.getenv("RAG_PDF_DIR", "./pdfs") | |
| # Optional: add CORS headers (the main app should still enable CORS globally) | |
| def add_cors_headers(resp): | |
| origin = request.headers.get("Origin") | |
| # Allow local Angular during dev; main app may add more origins | |
| if origin in ("http://localhost:4200", "http://127.0.0.1:4200"): | |
| resp.headers["Access-Control-Allow-Origin"] = origin | |
| resp.headers["Vary"] = "Origin" | |
| resp.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-User" | |
| resp.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" | |
| return resp | |
| # ------------------------------------------------------------ | |
| # Helpers | |
| # ------------------------------------------------------------ | |
| def user_to_db_level(username: str | None) -> str | None: | |
| if not username: | |
| return None | |
| u = username.strip().lower() | |
| if u == "lowergrade": | |
| return "low" | |
| if u == "midgrade": | |
| return "mid" | |
| if u == "highergrade": | |
| return "high" | |
| return None | |
| def extract_username_from_request(req) -> str | None: | |
| hdr = req.headers.get("X-User") | |
| if hdr: | |
| return hdr | |
| data = req.get_json(silent=True) or {} | |
| return data.get("username") | |
| # --- D-ID helpers --- | |
| def _did_create_talk(text: str): | |
| if not DID_API_KEY: | |
| return None, ("DID_API_KEY not set on the server", 500) | |
| if not DID_SOURCE_IMAGE_URL: | |
| return None, ("DID_SOURCE_IMAGE_URL not set on the server", 500) | |
| payload = { | |
| "script": { | |
| "type": "text", | |
| "input": text, | |
| "provider": {"type": "microsoft", "voice_id": DID_VOICE_ID}, | |
| }, | |
| "source_url": DID_SOURCE_IMAGE_URL, | |
| "config": {"fluent": True, "pad_audio": 0}, | |
| } | |
| try: | |
| r = requests.post("https://api.d-id.com/talks", json=payload, auth=(DID_API_KEY, "")) | |
| if r.status_code not in (200, 201): | |
| return None, (f"D-ID create error: {r.text}", 502) | |
| talk_id = r.json().get("id") | |
| if not talk_id: | |
| return None, ("D-ID did not return a talk id", 502) | |
| return talk_id, None | |
| except Exception as e: | |
| current_app.logger.exception("D-ID create failed: %s", e) | |
| return None, ("D-ID create failed", 502) | |
| def _did_poll_talk(talk_id: str, timeout_sec: int = 60, interval_sec: float = 2.0): | |
| deadline = time.time() + timeout_sec | |
| url = f"https://api.d-id.com/talks/{talk_id}" | |
| try: | |
| while time.time() < deadline: | |
| r = requests.get(url, auth=(DID_API_KEY, "")) | |
| if r.status_code != 200: | |
| return None, (f"D-ID poll error: {r.text}", 502) | |
| data = r.json() | |
| status = data.get("status") | |
| if status == "done": | |
| return data.get("result_url") or data.get("result", {}).get("url"), None | |
| if status == "error": | |
| return None, (f"D-ID generation failed: {data.get('error')}", 502) | |
| time.sleep(interval_sec) | |
| return None, ("Timed out waiting for the video", 504) | |
| except Exception as e: | |
| current_app.logger.exception("D-ID poll failed: %s", e) | |
| return None, ("D-ID poll failed", 502) | |
| # ------------------------------------------------------------ | |
| # Endpoints (NOTE: no "/rag" prefix here; the blueprint adds it) | |
| # ------------------------------------------------------------ | |
| def rag_ingest(): | |
| if request.method == "OPTIONS": | |
| return ("", 204) | |
| body = IngestBody(**(request.json or {})) | |
| result = ingest_documents(body) | |
| return jsonify(result) | |
| def rag_ingest_pdfs(): | |
| if request.method == "OPTIONS": | |
| return ("", 204) | |
| data = request.json or {} | |
| folder = data.get("folder", PDF_DEFAULT_FOLDER) | |
| subject = data.get("subject") | |
| grade = data.get("grade") | |
| chapter = data.get("chapter") | |
| result = ingest_pdfs_from_folder(folder, subject=subject, grade=grade, chapter=chapter) | |
| return jsonify(result) | |
| def rag_generate_questions(): | |
| if request.method == "OPTIONS": | |
| return ("", 204) | |
| data = request.json or {} | |
| username = extract_username_from_request(request) | |
| mapped_level = user_to_db_level(username) | |
| if not data.get("db_level"): | |
| data["db_level"] = mapped_level | |
| body = LLMBody(**data) | |
| result = llm_generate(body) | |
| return jsonify(result) | |
| # @rag_bp.route("/explain-grammar", methods=["POST", "OPTIONS"]) | |
| # @rag_bp.route("/explain-grammar", methods=["POST", "OPTIONS"]) | |
| # def rag_explain_grammar(): | |
| # if request.method == "OPTIONS": | |
| # return ("", 204) | |
| # data = request.get_json(force=True) or {} | |
| # # --- Extract username and db_level --- | |
| # username = extract_username_from_request(request) | |
| # db_level = user_to_db_level(username) | |
| # # --- MAIN BODY (your preferred structure) --- | |
| # body = ExplainBody( | |
| # question=(data.get("question") or "").strip(), | |
| # model=data.get("model", "gpt-4o-mini"), | |
| # db_level=db_level, | |
| # source_ids=data.get("source_ids") or [] | |
| # ) | |
| # # --- 1) Run LLM / RAG explanation --- | |
| # result_raw = llm_explain(body) | |
| # # --- 2) Normalize + extract answer safely --- | |
| # result_dict = None | |
| # answer_text = "" | |
| # try: | |
| # if isinstance(result_raw, dict): | |
| # result_dict = dict(result_raw) | |
| # elif hasattr(result_raw, "model_dump"): | |
| # result_dict = result_raw.model_dump() | |
| # elif hasattr(result_raw, "dict"): | |
| # result_dict = result_raw.dict() | |
| # elif isinstance(result_raw, str): | |
| # result_dict = {"answer": result_raw} | |
| # else: | |
| # result_dict = {"answer": str(result_raw)} | |
| # answer_text = ( | |
| # result_dict.get("answer") | |
| # or result_dict.get("response") | |
| # or result_dict.get("text") | |
| # or "" | |
| # ).strip() | |
| # except Exception as e: | |
| # current_app.logger.exception("Failed to normalize llm_explain result: %s", e) | |
| # return jsonify({"error": "Internal error normalizing LLM response"}), 500 | |
| # # --- 3) Optional: synthesize TTS audio --- | |
| # try: | |
| # if data.get("synthesize_audio"): | |
| # try: | |
| # out_name = f"explain_{uuid.uuid4().hex}.wav" | |
| # wav_path = xtts_speak_to_file( | |
| # text=answer_text or result_dict.get("answer", ""), | |
| # out_file=AUDIO_DIR / out_name, | |
| # reference_dir=XTTS_REF_DIR, | |
| # reference_files=None, | |
| # language=data.get("language", "en"), | |
| # ) | |
| # # Local: serve from /rag/audio/* | |
| # if "localhost" in request.host_url or "127.0.0.1" in request.host_url: | |
| # base = request.host_url.rstrip("/") | |
| # result_dict["audio_url"] = f"{base}/rag/audio/{wav_path.name}" | |
| # else: | |
| # # Deployed: try S3 first; fallback to public SPACE_URL if set | |
| # s3_url = _upload_to_s3(str(wav_path)) | |
| # if s3_url: | |
| # result_dict["audio_url"] = s3_url | |
| # else: | |
| # base = os.getenv("SPACE_URL", "https://pykara-py-learn-backend.hf.space") | |
| # result_dict["audio_url"] = f"{base}/rag/audio/{wav_path.name}" | |
| # except FileNotFoundError as e: | |
| # current_app.logger.error("XTTS reference audio missing: %s", e) | |
| # except Exception as e: | |
| # current_app.logger.exception("XTTS synthesis during explain-grammar failed: %s", e) | |
| # except Exception: | |
| # current_app.logger.exception("Unexpected error while attempting inline synthesis") | |
| # # --- 4) Optional: synthesize video (D-ID) --- | |
| # try: | |
| # if data.get("synthesize_video"): | |
| # if not DID_API_KEY or not DID_SOURCE_IMAGE_URL: | |
| # current_app.logger.error("D-ID not configured for inline explain-grammar video synthesis") | |
| # else: | |
| # try: | |
| # talk_id, err = _did_create_talk(answer_text or result_dict.get("answer", "")) | |
| # if err: | |
| # current_app.logger.error( | |
| # "D-ID create error during explain-grammar: %s", | |
| # err[0] if isinstance(err, tuple) else err, | |
| # ) | |
| # else: | |
| # video_url, err = _did_poll_talk(talk_id, timeout_sec=120, interval_sec=2.0) | |
| # if err: | |
| # current_app.logger.error( | |
| # "D-ID poll error during explain-grammar: %s", | |
| # err[0] if isinstance(err, tuple) else err, | |
| # ) | |
| # else: | |
| # if video_url: | |
| # result_dict["video_url"] = video_url | |
| # except Exception as e: | |
| # current_app.logger.exception("D-ID inline synthesis failed during explain-grammar: %s", e) | |
| # except Exception: | |
| # current_app.logger.exception("Unexpected error while attempting inline video synthesis") | |
| # # --- Final response --- | |
| # return jsonify(result_dict), 200 | |
| def rag_explain_grammar(): | |
| if request.method == "OPTIONS": | |
| return ("", 204) | |
| data = request.get_json(force=True) or {} | |
| # --- Extract username and db_level --- | |
| username = extract_username_from_request(request) | |
| db_level = user_to_db_level(username) | |
| # --- MAIN BODY (your preferred structure) --- | |
| body = ExplainBody( | |
| question=(data.get("question") or "").strip(), | |
| model=data.get("model", "gpt-4o-mini"), | |
| db_level=db_level, | |
| source_ids=data.get("source_ids") or [] | |
| ) | |
| # --- 1) Run LLM / RAG explanation --- | |
| result_raw = llm_explain(body) | |
| # --- 2) Normalize + extract answer safely --- | |
| result_dict = None | |
| answer_text = "" | |
| try: | |
| if isinstance(result_raw, dict): | |
| result_dict = dict(result_raw) | |
| elif hasattr(result_raw, "model_dump"): | |
| result_dict = result_raw.model_dump() | |
| elif hasattr(result_raw, "dict"): | |
| result_dict = result_raw.dict() | |
| elif isinstance(result_raw, str): | |
| result_dict = {"answer": result_raw} | |
| else: | |
| result_dict = {"answer": str(result_raw)} | |
| answer_text = ( | |
| result_dict.get("answer") | |
| or result_dict.get("response") | |
| or result_dict.get("text") | |
| or "" | |
| ).strip() | |
| except Exception as e: | |
| current_app.logger.exception("Failed to normalize llm_explain result: %s", e) | |
| return jsonify({"error": "Internal error normalizing LLM response"}), 500 | |
| # --- 3) Optional: synthesize TTS audio --- | |
| try: | |
| if data.get("synthesize_audio"): | |
| try: | |
| out_name = f"explain_{uuid.uuid4().hex}.wav" | |
| wav_path = xtts_speak_to_file( | |
| text=answer_text or result_dict.get("answer", ""), | |
| out_file=AUDIO_DIR / out_name, | |
| reference_dir=XTTS_REF_DIR, | |
| reference_files=None, | |
| language=data.get("language", "en"), | |
| ) | |
| base = request.host_url.rstrip("/") | |
| result_dict["audio_url"] = f"{base}/rag/audio/{wav_path.name}" | |
| except FileNotFoundError as e: | |
| current_app.logger.error("XTTS reference audio missing: %s", e) | |
| except Exception as e: | |
| current_app.logger.exception("XTTS synthesis during explain-grammar failed: %s", e) | |
| except Exception: | |
| current_app.logger.exception("Unexpected error while attempting inline synthesis") | |
| # --- 4) Optional: synthesize video (D-ID) --- | |
| try: | |
| if data.get("synthesize_video"): | |
| if not DID_API_KEY or not DID_SOURCE_IMAGE_URL: | |
| current_app.logger.error("D-ID not configured for inline explain-grammar video synthesis") | |
| else: | |
| try: | |
| talk_id, err = _did_create_talk(answer_text or result_dict.get("answer", "")) | |
| if err: | |
| current_app.logger.error( | |
| "D-ID create error during explain-grammar: %s", | |
| err[0] if isinstance(err, tuple) else err, | |
| ) | |
| else: | |
| video_url, err = _did_poll_talk(talk_id, timeout_sec=120, interval_sec=2.0) | |
| if err: | |
| current_app.logger.error( | |
| "D-ID poll error during explain-grammar: %s", | |
| err[0] if isinstance(err, tuple) else err, | |
| ) | |
| else: | |
| if video_url: | |
| result_dict["video_url"] = video_url | |
| except Exception as e: | |
| current_app.logger.exception("D-ID inline synthesis failed during explain-grammar: %s", e) | |
| except Exception: | |
| current_app.logger.exception("Unexpected error while attempting inline video synthesis") | |
| # --- Final response --- | |
| return jsonify(result_dict), 200 | |
| # @rag_bp.route("/suggest-followups", methods=["POST", "OPTIONS"]) | |
| def rag_suggest_followups(): | |
| if request.method == "OPTIONS": | |
| return ("", 204) | |
| data = request.get_json(force=True) or {} | |
| username = extract_username_from_request(request) | |
| db_level = user_to_db_level(username) | |
| body = FollowupBody( | |
| last_question=(data.get("last_question") or "").strip(), | |
| last_answer=(data.get("last_answer") or "").strip(), | |
| n=int(data.get("n", 5)), | |
| model=data.get("model", "gpt-4o-mini"), | |
| db_level=db_level, | |
| source_ids=data.get("source_ids") or [] # ← same addition here | |
| ) | |
| result = llm_followups(body) | |
| return jsonify(result) | |
| # @rag_bp.get("/_diag") | |
| def rag_diag(): | |
| try: | |
| from .rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for | |
| except ImportError: | |
| from rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for | |
| import os | |
| from flask import jsonify | |
| def _count(vs): | |
| """Handle both LangChain and chromadb client objects.""" | |
| if vs is None: | |
| return None | |
| # 1️⃣ chromadb.Collection (your new get_vectorstore_for) | |
| if hasattr(vs, "count") and callable(vs.count): | |
| try: | |
| return vs.count() | |
| except Exception: | |
| return None | |
| # 2️⃣ LangChain vectorstore | |
| if hasattr(vs, "_collection"): | |
| try: | |
| return vs._collection.count() # type: ignore | |
| except Exception: | |
| try: | |
| return vs._client.get_collection(vs._collection.name).count() # type: ignore | |
| except Exception: | |
| return None | |
| return None | |
| # load each level safely | |
| low_vs = get_vectorstore_for("low") | |
| mid_vs = get_vectorstore_for("mid") | |
| high_vs = get_vectorstore_for("high") | |
| info = { | |
| "env_seen": { | |
| "CHROMA_DIR": CHROMA_DIR, | |
| "CHROMA_ROOT": CHROMA_ROOT | |
| }, | |
| "low_dir": { | |
| "path": os.path.join(CHROMA_ROOT, "low"), | |
| "exists": os.path.isdir(os.path.join(CHROMA_ROOT, "low")), | |
| }, | |
| "counts_default": _count(get_vectorstore()), | |
| "counts_low": _count(low_vs), | |
| "counts_mid": _count(mid_vs), | |
| "counts_high": _count(high_vs), | |
| } | |
| return jsonify(info), 200 | |
| # def rag_diag(): | |
| # # minimal imports here to avoid circulars | |
| # try: | |
| # from .rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for | |
| # except ImportError: | |
| # from rag_llm import CHROMA_DIR, CHROMA_ROOT, get_vectorstore, get_vectorstore_for | |
| # | |
| # import os | |
| # from flask import jsonify | |
| # | |
| # def _count(vs): | |
| # try: | |
| # return vs._collection.count() | |
| # except Exception: | |
| # try: | |
| # return vs._client.get_collection(vs._collection.name).count() | |
| # except Exception: | |
| # return None | |
| # | |
| # info = { | |
| # "env_seen": {"CHROMA_DIR": CHROMA_DIR, "CHROMA_ROOT": CHROMA_ROOT}, | |
| # "low_dir": { | |
| # "path": os.path.join(CHROMA_ROOT, "low"), | |
| # "exists": os.path.isdir(os.path.join(CHROMA_ROOT, "low")), | |
| # }, | |
| # "counts_default": _count(get_vectorstore()), | |
| # "counts_low": _count(get_vectorstore_for("low")), | |
| # "counts_mid": _count(get_vectorstore_for("mid")), | |
| # "counts_high": _count(get_vectorstore_for("high")), | |
| # } | |
| # return jsonify(info), 200 | |
| def rag_search(): | |
| if request.method == "OPTIONS": | |
| return ("", 204) | |
| data = request.json or {} | |
| q = (data.get("q") or "").strip() | |
| if not q: | |
| return jsonify({"results": []}) | |
| # derive db_level from login, unless explicitly provided | |
| username = extract_username_from_request(request) | |
| mapped_level = user_to_db_level(username) | |
| db_level = data.get("db_level") or mapped_level | |
| vs = get_vectorstore_for(db_level) | |
| hits = vs.similarity_search_with_score(q, k=5) | |
| out = [] | |
| for doc, dist in hits: | |
| out.append({ | |
| "distance": float(dist), | |
| "snippet": doc.page_content[:200], | |
| "source_path": os.path.normpath(doc.metadata.get("source_path", "")), | |
| "page": doc.metadata.get("page_1based"), | |
| }) | |
| return jsonify({"results": out}) | |
| def generate_questions_from_vectorstore(): | |
| try: | |
| vectorstore = get_vectorstore() | |
| query_text = "important content related to grammar" | |
| results = vectorstore.similarity_search_with_score(query_text, k=5) | |
| print(f"Vectorstore query returned {len(results)} results") | |
| content = "\n".join([doc.page_content for doc, _ in results]) | |
| print(f"Retrieved content: {content[:500]}...") | |
| if not content: | |
| return {"error": "No content retrieved from vectorstore. Please ingest PDFs first."} | |
| prompt = f"Generate 5 important questions based on the following content: {content}" | |
| response = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.7, | |
| max_tokens=150, | |
| ) | |
| response_text = response.choices[0].message.content.strip() | |
| print(f"Processed OpenAI response: {response_text}") | |
| return response_text | |
| except Exception as e: | |
| print(f"Error during OpenAI API call: {e}") | |
| return {"error": f"Failed to call OpenAI: {str(e)}"} | |
| def generate_questions_from_chroma(): | |
| def _generate_questions_from_vectorstore(): | |
| try: | |
| vectorstore = get_vectorstore() | |
| query_text = "important content related to grammar" | |
| results = vectorstore.similarity_search_with_score(query_text, k=5) | |
| content = "\n".join([doc.page_content for doc, _ in results]) | |
| if not content: | |
| return {"error": "No content retrieved from vectorstore. Please ingest PDFs first."} | |
| prompt = f"Generate 5 important questions based on the following content: {content}" | |
| response = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.7, | |
| max_tokens=150, | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| return {"error": f"Failed to call OpenAI: {str(e)}"} | |
| generated = _generate_questions_from_vectorstore() | |
| return jsonify({"generated_questions": generated}) | |
| def health(): | |
| return {"status": "ok"}, 200 | |
| def rag_synthesize_audio(): | |
| """ | |
| Synthesize text to WAV on demand using XTTS and return a public URL. | |
| Body: { "text": "...", "language": "en", "reference_files": ["trim/foo.wav", ...] } | |
| """ | |
| if request.method == "OPTIONS": | |
| return ("", 204) | |
| data = request.get_json(force=True) or {} | |
| text = (data.get("text") or "").strip() | |
| if not text: | |
| return jsonify({"error": "No text provided"}), 400 | |
| language = data.get("language", "en") | |
| reference_files = data.get("reference_files") # optional list of paths | |
| try: | |
| out_name = f"synth_{uuid.uuid4().hex}.wav" | |
| wav_path = xtts_speak_to_file( | |
| text=text, | |
| out_file=AUDIO_DIR / out_name, | |
| reference_dir=XTTS_REF_DIR, | |
| reference_files=reference_files, | |
| language=language, | |
| ) | |
| # Local: serve static file | |
| if "localhost" in request.host_url or "127.0.0.1" in request.host_url: | |
| base = request.host_url.rstrip("/") | |
| audio_url = f"{base}/rag/audio/{wav_path.name}" | |
| else: | |
| # Deployed: try S3 first; fallback to SPACE_URL | |
| s3_url = _upload_to_s3(str(wav_path)) | |
| if s3_url: | |
| audio_url = s3_url | |
| else: | |
| base = os.getenv("SPACE_URL", "https://pykara-py-learn-backend.hf.space") | |
| audio_url = f"{base}/rag/audio/{wav_path.name}" | |
| return jsonify({"audio_url": audio_url, "file": wav_path.name}), 200 | |
| except Exception as e: | |
| import traceback | |
| print("=== XTTS DEBUG ERROR ===") | |
| print(traceback.format_exc()) | |
| print("========================") | |
| return jsonify({"error": "Synthesis failed", "detail": str(e)}), 500 | |
| # except FileNotFoundError as e: | |
| # current_app.logger.error("XTTS references missing: %s", e) | |
| # return jsonify({"error": "XTTS reference audio files not found on server"}), 500 | |
| except Exception as e: | |
| current_app.logger.exception("XTTS synthesis error: %s", e) | |
| return jsonify({"error": "Synthesis failed"}), 500 | |
| def rag_synthesize_video(): | |
| """ | |
| Synthesize a short video on-demand using the D-ID service and return the public video URL. | |
| Body: { "text": "..." } | |
| """ | |
| if request.method == "OPTIONS": | |
| return ("", 204) | |
| data = request.get_json(force=True) or {} | |
| text = (data.get("text") or "").strip() | |
| if not text: | |
| return jsonify({"error": "No text provided"}), 400 | |
| # Quick config check | |
| if not DID_API_KEY or not DID_SOURCE_IMAGE_URL: | |
| current_app.logger.error("D-ID not configured (DID_API_KEY or DID_SOURCE_IMAGE_URL missing)") | |
| return jsonify({"error": "D-ID not configured on server"}), 500 | |
| try: | |
| # Create talk (calls D-ID /talks) | |
| talk_id, err = _did_create_talk(text) | |
| if err: | |
| # _did_create_talk returns (None, (msg, status)) | |
| current_app.logger.error("D-ID create error: %s", err[0]) | |
| return jsonify({"error": err[0]}), err[1] | |
| # Poll for result URL | |
| video_url, err = _did_poll_talk(talk_id, timeout_sec=120, interval_sec=2.0) | |
| if err: | |
| current_app.logger.error("D-ID poll error: %s", err[0]) | |
| return jsonify({"error": err[0]}), err[1] | |
| if not video_url: | |
| current_app.logger.error("D-ID did not return a video URL for talk %s", talk_id) | |
| return jsonify({"error": "D-ID did not return a video URL"}), 502 | |
| return jsonify({"video_url": video_url}), 200 | |
| except Exception as e: | |
| current_app.logger.exception("Unexpected error generating D-ID video: %s", e) | |
| return jsonify({"error": "Internal server error generating video"}), 500 | |
| # ------------------------------------------------------------ | |
| # Local runner (DEV ONLY) | |
| # ------------------------------------------------------------ | |
| if __name__ == "__main__": | |
| # Allow this module to run as a standalone server on port 7000 for local dev | |
| from flask import Flask | |
| from flask_cors import CORS | |
| app = Flask(__name__) | |
| # CORS for local dev (the production app sets CORS globally in verification.py) | |
| CORS( | |
| app, | |
| resources={r"/rag/*": {"origins": ["http://localhost:4200", "http://127.0.0.1:4200"]}}, | |
| supports_credentials=True, | |
| allow_headers=["Content-Type", "Authorization", "X-User"], | |
| methods=["GET", "POST", "OPTIONS"], | |
| ) | |
| # Ensure Chroma dir exists (use CHROMA_DIR if set) | |
| os.makedirs(os.getenv("CHROMA_DIR", "./chroma"), exist_ok=True) | |
| # Mount blueprint at /rag and run | |
| app.register_blueprint(rag_bp, url_prefix="/rag") | |
| app.run(host="0.0.0.0", port=7000, debug=True) | |