"""Pipeline RAG complet : document → chunks → embeddings → retrieval → génération.""" from __future__ import annotations import logging from pathlib import Path from src.config import SYSTEM_PROMPT, TOP_K from src.document_processor import DocumentProcessor from src.llm_provider import LLMProvider, get_llm_provider from src.vector_store import VectorStore logger = logging.getLogger(__name__) class RAGEngine: """Orchestre le pipeline RAG de bout en bout.""" def __init__(self): self.processor = DocumentProcessor() self.store = VectorStore() self.llm: LLMProvider = get_llm_provider() # Tenter de charger un index existant self.store.load("default") # ── Ingestion ──────────────────────────────────────────────────────── def ingest_pdf(self, pdf_path: str | Path) -> str: """Ingère un PDF : parse → chunk → embed → index.""" parsed = self.processor.process_pdf(pdf_path) added = self.store.add_document(parsed) self.store.save("default") if added == 0: return f"ℹ️ «{parsed.filename}» était déjà indexé." return parsed.summary def ingest_directory(self, dir_path: str | Path) -> list[str]: """Ingère tous les PDFs d'un répertoire.""" dir_path = Path(dir_path) pdfs = sorted(dir_path.glob("*.pdf")) if not pdfs: return ["⚠️ Aucun PDF trouvé dans ce répertoire."] results = [] for pdf in pdfs: try: results.append(self.ingest_pdf(pdf)) except Exception as e: results.append(f"❌ Erreur sur {pdf.name} : {e}") return results # ── Question-Réponse ───────────────────────────────────────────────── def ask(self, question: str, top_k: int = TOP_K) -> dict: """Pose une question et retourne la réponse avec les sources.""" if not question.strip(): return {"answer": "⚠️ Veuillez poser une question.", "sources": []} if self.store.index.ntotal == 0: return { "answer": ( "⚠️ Aucun document n'est indexé. " "Veuillez d'abord uploader un PDF." ), "sources": [], } # 1. Retrieval results = self.store.search(question, top_k=top_k) # 2. Construire le contexte context = self._build_context(results) # 3. Génération via le provider LLM (local ou cloud) user_message = ( f"Contexte documentaire :\n\n{context}\n\n" f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n" f"Question de l'utilisateur : {question}" ) answer = self.llm.generate(SYSTEM_PROMPT, user_message) # 4. Formater les sources sources = self._format_sources(results) return {"answer": answer, "sources": sources} # ── Construction du contexte ───────────────────────────────────────── @staticmethod def _build_context(results: list[dict]) -> str: """Formate les chunks retrouvés en contexte pour le LLM.""" if not results: return "Aucun passage pertinent trouvé." sections = [] for i, r in enumerate(results, 1): meta = r["metadata"] source = meta.get("source", "inconnu") page = meta.get("page", "?") score = r["score"] sections.append( f"━━━ Extrait {i} [Document: «{source}», Page {page}] " f"(pertinence: {score:.2f}) ━━━\n{r['text']}" ) return "\n\n".join(sections) # ── Formatage des sources ──────────────────────────────────────────── @staticmethod def _format_sources(results: list[dict]) -> list[dict]: """Formate les sources pour l'affichage dans l'UI.""" sources = [] for r in results: meta = r["metadata"] sources.append( { "document": meta.get("source", "inconnu"), "page": meta.get("page", "?"), "score": round(r["score"], 3), "extrait": r["text"][:200] + "..." if len(r["text"]) > 200 else r["text"], } ) return sources # ── Info ───────────────────────────────────────────────────────────── @property def status(self) -> dict: """État actuel du pipeline.""" store_stats = self.store.stats return { "documents_indexés": len(store_stats["indexed_documents"]), "liste_documents": store_stats["indexed_documents"], "chunks_total": store_stats["total_chunks"], "vecteurs_total": store_stats["total_vectors"], "llm_disponible": self.llm.is_available(), "modèle_llm": self.llm.name, }