NormAssist / src /rag_engine.py
Dakoro's picture
Deploy NormAssist — 2026-03-30 16:14
4211d95
Raw
History Blame Contribute Delete
5.57 kB
"""Pipeline RAG complet : document → chunks → embeddings → retrieval → génération."""
from __future__ import annotations
import logging
from pathlib import Path
from src.config import SYSTEM_PROMPT, TOP_K
from src.document_processor import DocumentProcessor
from src.llm_provider import LLMProvider, get_llm_provider
from src.vector_store import VectorStore
logger = logging.getLogger(__name__)
class RAGEngine:
"""Orchestre le pipeline RAG de bout en bout."""
def __init__(self):
self.processor = DocumentProcessor()
self.store = VectorStore()
self.llm: LLMProvider = get_llm_provider()
# Tenter de charger un index existant
self.store.load("default")
# ── Ingestion ────────────────────────────────────────────────────────
def ingest_pdf(self, pdf_path: str | Path) -> str:
"""Ingère un PDF : parse → chunk → embed → index."""
parsed = self.processor.process_pdf(pdf_path)
added = self.store.add_document(parsed)
self.store.save("default")
if added == 0:
return f"ℹ️ «{parsed.filename}» était déjà indexé."
return parsed.summary
def ingest_directory(self, dir_path: str | Path) -> list[str]:
"""Ingère tous les PDFs d'un répertoire."""
dir_path = Path(dir_path)
pdfs = sorted(dir_path.glob("*.pdf"))
if not pdfs:
return ["⚠️ Aucun PDF trouvé dans ce répertoire."]
results = []
for pdf in pdfs:
try:
results.append(self.ingest_pdf(pdf))
except Exception as e:
results.append(f"❌ Erreur sur {pdf.name} : {e}")
return results
# ── Question-Réponse ─────────────────────────────────────────────────
def ask(self, question: str, top_k: int = TOP_K) -> dict:
"""Pose une question et retourne la réponse avec les sources."""
if not question.strip():
return {"answer": "⚠️ Veuillez poser une question.", "sources": []}
if self.store.index.ntotal == 0:
return {
"answer": (
"⚠️ Aucun document n'est indexé. "
"Veuillez d'abord uploader un PDF."
),
"sources": [],
}
# 1. Retrieval
results = self.store.search(question, top_k=top_k)
# 2. Construire le contexte
context = self._build_context(results)
# 3. Génération via le provider LLM (local ou cloud)
user_message = (
f"Contexte documentaire :\n\n{context}\n\n"
f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n"
f"Question de l'utilisateur : {question}"
)
answer = self.llm.generate(SYSTEM_PROMPT, user_message)
# 4. Formater les sources
sources = self._format_sources(results)
return {"answer": answer, "sources": sources}
# ── Construction du contexte ─────────────────────────────────────────
@staticmethod
def _build_context(results: list[dict]) -> str:
"""Formate les chunks retrouvés en contexte pour le LLM."""
if not results:
return "Aucun passage pertinent trouvé."
sections = []
for i, r in enumerate(results, 1):
meta = r["metadata"]
source = meta.get("source", "inconnu")
page = meta.get("page", "?")
score = r["score"]
sections.append(
f"━━━ Extrait {i} [Document: «{source}», Page {page}] "
f"(pertinence: {score:.2f}) ━━━\n{r['text']}"
)
return "\n\n".join(sections)
# ── Formatage des sources ────────────────────────────────────────────
@staticmethod
def _format_sources(results: list[dict]) -> list[dict]:
"""Formate les sources pour l'affichage dans l'UI."""
sources = []
for r in results:
meta = r["metadata"]
sources.append(
{
"document": meta.get("source", "inconnu"),
"page": meta.get("page", "?"),
"score": round(r["score"], 3),
"extrait": r["text"][:200] + "..."
if len(r["text"]) > 200
else r["text"],
}
)
return sources
# ── Info ─────────────────────────────────────────────────────────────
@property
def status(self) -> dict:
"""État actuel du pipeline."""
store_stats = self.store.stats
return {
"documents_indexés": len(store_stats["indexed_documents"]),
"liste_documents": store_stats["indexed_documents"],
"chunks_total": store_stats["total_chunks"],
"vecteurs_total": store_stats["total_vectors"],
"llm_disponible": self.llm.is_available(),
"modèle_llm": self.llm.name,
}