Spaces:
Running
Running
| """Pipeline RAG complet : document → chunks → embeddings → retrieval → génération.""" | |
| from __future__ import annotations | |
| import logging | |
| from pathlib import Path | |
| from src.config import SYSTEM_PROMPT, TOP_K | |
| from src.document_processor import DocumentProcessor | |
| from src.llm_provider import LLMProvider, get_llm_provider | |
| from src.vector_store import VectorStore | |
| logger = logging.getLogger(__name__) | |
| class RAGEngine: | |
| """Orchestre le pipeline RAG de bout en bout.""" | |
| def __init__(self): | |
| self.processor = DocumentProcessor() | |
| self.store = VectorStore() | |
| self.llm: LLMProvider = get_llm_provider() | |
| # Tenter de charger un index existant | |
| self.store.load("default") | |
| # ── Ingestion ──────────────────────────────────────────────────────── | |
| def ingest_pdf(self, pdf_path: str | Path) -> str: | |
| """Ingère un PDF : parse → chunk → embed → index.""" | |
| parsed = self.processor.process_pdf(pdf_path) | |
| added = self.store.add_document(parsed) | |
| self.store.save("default") | |
| if added == 0: | |
| return f"ℹ️ «{parsed.filename}» était déjà indexé." | |
| return parsed.summary | |
| def ingest_directory(self, dir_path: str | Path) -> list[str]: | |
| """Ingère tous les PDFs d'un répertoire.""" | |
| dir_path = Path(dir_path) | |
| pdfs = sorted(dir_path.glob("*.pdf")) | |
| if not pdfs: | |
| return ["⚠️ Aucun PDF trouvé dans ce répertoire."] | |
| results = [] | |
| for pdf in pdfs: | |
| try: | |
| results.append(self.ingest_pdf(pdf)) | |
| except Exception as e: | |
| results.append(f"❌ Erreur sur {pdf.name} : {e}") | |
| return results | |
| # ── Question-Réponse ───────────────────────────────────────────────── | |
| def ask(self, question: str, top_k: int = TOP_K) -> dict: | |
| """Pose une question et retourne la réponse avec les sources.""" | |
| if not question.strip(): | |
| return {"answer": "⚠️ Veuillez poser une question.", "sources": []} | |
| if self.store.index.ntotal == 0: | |
| return { | |
| "answer": ( | |
| "⚠️ Aucun document n'est indexé. " | |
| "Veuillez d'abord uploader un PDF." | |
| ), | |
| "sources": [], | |
| } | |
| # 1. Retrieval | |
| results = self.store.search(question, top_k=top_k) | |
| # 2. Construire le contexte | |
| context = self._build_context(results) | |
| # 3. Génération via le provider LLM (local ou cloud) | |
| user_message = ( | |
| f"Contexte documentaire :\n\n{context}\n\n" | |
| f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n" | |
| f"Question de l'utilisateur : {question}" | |
| ) | |
| answer = self.llm.generate(SYSTEM_PROMPT, user_message) | |
| # 4. Formater les sources | |
| sources = self._format_sources(results) | |
| return {"answer": answer, "sources": sources} | |
| # ── Construction du contexte ───────────────────────────────────────── | |
| def _build_context(results: list[dict]) -> str: | |
| """Formate les chunks retrouvés en contexte pour le LLM.""" | |
| if not results: | |
| return "Aucun passage pertinent trouvé." | |
| sections = [] | |
| for i, r in enumerate(results, 1): | |
| meta = r["metadata"] | |
| source = meta.get("source", "inconnu") | |
| page = meta.get("page", "?") | |
| score = r["score"] | |
| sections.append( | |
| f"━━━ Extrait {i} [Document: «{source}», Page {page}] " | |
| f"(pertinence: {score:.2f}) ━━━\n{r['text']}" | |
| ) | |
| return "\n\n".join(sections) | |
| # ── Formatage des sources ──────────────────────────────────────────── | |
| def _format_sources(results: list[dict]) -> list[dict]: | |
| """Formate les sources pour l'affichage dans l'UI.""" | |
| sources = [] | |
| for r in results: | |
| meta = r["metadata"] | |
| sources.append( | |
| { | |
| "document": meta.get("source", "inconnu"), | |
| "page": meta.get("page", "?"), | |
| "score": round(r["score"], 3), | |
| "extrait": r["text"][:200] + "..." | |
| if len(r["text"]) > 200 | |
| else r["text"], | |
| } | |
| ) | |
| return sources | |
| # ── Info ───────────────────────────────────────────────────────────── | |
| def status(self) -> dict: | |
| """État actuel du pipeline.""" | |
| store_stats = self.store.stats | |
| return { | |
| "documents_indexés": len(store_stats["indexed_documents"]), | |
| "liste_documents": store_stats["indexed_documents"], | |
| "chunks_total": store_stats["total_chunks"], | |
| "vecteurs_total": store_stats["total_vectors"], | |
| "llm_disponible": self.llm.is_available(), | |
| "modèle_llm": self.llm.name, | |
| } | |