LearnBetter_RAG / rag.py
FredyHoundayi's picture
Initial commit: Add AI e-learning platform with RAG-based quiz generator
5b7647c
# Importations des bibliothèques standards
import os
import re
# Importations des bibliothèques tierces
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.messages import SystemMessage, HumanMessage
try:
from langchain_community.document_loaders import PyPDFLoader
except ImportError:
raise ImportError("Le package PyPDF n'est pas installé. Veuillez l'installer avec 'pip install pypdf'")
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableWithMessageHistory
from langchain_core.chat_history import InMemoryChatMessageHistory
# Chargement des variables d'environnement
load_dotenv()
# Récupération des clés API depuis les variables d'environnement
GROQ_API_KEY = os.getenv('GROQ_API_KEY')
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY') # Non utilisé dans ce script
HF_API_KEY = os.getenv('HF_API_KEY') # Non utilisé directement ici mais nécessaire pour certains modèles HuggingFace
# Vérification de la présence des clés API requises
if not GROQ_API_KEY:
raise ValueError("La clé API GROQ n'est pas définie dans le fichier .env")
try:
# Initialisation du modèle de langage Groq
from langchain_groq import ChatGroq
llm = ChatGroq(
model="llama-3.1-8b-instant",
temperature=0,
api_key=GROQ_API_KEY
)
except ImportError:
raise ImportError("Le package langchain_groq n'est pas installé. Veuillez l'installer avec 'pip install langchain-groq'")
# Dictionnaire pour stocker l'historique des sessions
store = {}
def clean_text(text):
"""
Nettoie le texte en effectuant les opérations suivantes :
1. Supprime les retours à la ligne et les chiffres isolés
2. Supprime les balises HTML et les caractères spéciaux
3. Convertit le texte en minuscules
Args:
text (str): Texte à nettoyer
Returns:
str: Texte nettoyé
"""
# Suppression des retours à la ligne et des chiffres isolés
text = re.sub(r"\n\d*\n|\n", " ", str(text))
# Suppression des balises HTML et des caractères spéciaux
# On conserve uniquement les caractères alphanumériques et les espaces
text = re.sub(r"<[^>]+>|[^\w\s]", "", text)
return text.lower().strip()
def load_documents(path="."):
"""
Charge et traite les documents PDF présents dans le répertoire spécifié.
Args:
path (str): Chemin vers le répertoire contenant les fichiers PDF
Returns:
list: Liste des documents chargés et traités
"""
if not os.path.exists(path):
raise FileNotFoundError(f"Le répertoire {path} n'existe pas")
big_doc = []
try:
# Liste tous les fichiers PDF dans le répertoire
pdf_files = [f for f in os.listdir(path) if f.lower().endswith('.pdf')]
except Exception as e:
print(f"Erreur lors de la lecture du répertoire {path}: {str(e)}")
return big_doc
if not pdf_files:
print(f"Aucun fichier PDF trouvé dans le répertoire {os.path.abspath(path)}")
return big_doc
# Configuration du découpeur de texte avec une taille de morceau de 1000 caractères
# et un chevauchement de 200 caractères pour préserver le contexte
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
# Traitement de chaque fichier PDF
for file in pdf_files:
try:
file_path = os.path.abspath(os.path.join(path, file))
print(f"\nTraitement du fichier : {file_path}")
# Vérification de la lecture du fichier
if not os.path.isfile(file_path):
print(f" - Erreur: Le fichier {file_path} n'existe pas ou n'est pas accessible")
continue
# Vérification des permissions
if not os.access(file_path, os.R_OK):
print(f" - Erreur: Pas les permissions de lecture sur le fichier {file_path}")
continue
try:
# Chargement et découpage du PDF
loader = PyPDFLoader(file_path)
documents = loader.load_and_split(text_splitter=splitter)
# Nettoyage du contenu de chaque document
for doc in documents:
if hasattr(doc, 'page_content'):
doc.page_content = clean_text(doc.page_content)
else:
print(f" - Avertissement: Le document ne contient pas d'attribut page_content")
continue
big_doc.extend(documents)
print(f" - Succès: {len(documents)} morceaux extraits")
except Exception as e:
print(f" - Erreur lors du traitement du PDF: {str(e)}")
# Essai avec un autre lecteur PDF si disponible
try:
from PyPDF2 import PdfReader
with open(file_path, 'rb') as f:
reader = PdfReader(f)
print(f" - Le fichier contient {len(reader.pages)} pages")
except Exception as e2:
print(f" - Échec de la lecture alternative du PDF: {str(e2)}")
continue
except Exception as e:
print(f" - Erreur inattendue: {str(e)}")
continue
return big_doc
# Chargement des documents
print("Chargement des documents...")
big_doc = load_documents()
print(f"{len(big_doc)} morceaux de documents chargés")
if not big_doc:
print("\nAucun document valide n'a pu être chargé.")
print("Vérifiez que :")
print("1. Des fichiers PDF sont présents dans le répertoire")
print("2. Vous avez les permissions de lecture sur les fichiers")
print("3. Les fichiers PDF ne sont pas corrompus")
print(f"Répertoire actuel: {os.getcwd()}")
print("Fichiers PDF détectés:", [f for f in os.listdir('.') if f.lower().endswith('.pdf')])
# Ne pas lever d'erreur pour permettre le débogage
print("\nLe script continue avec une liste de documents vide.")
print("Les fonctionnalités nécessitant des documents ne seront pas disponibles.")
# Initialisation du modèle d'embedding
print("Initialisation du modèle d'embedding...")
try:
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import os
# Gemini Developer API
embeddings = GoogleGenerativeAIEmbeddings(model="gemini-embedding-001", api_key=os.getenv("GEMINI_API_KEY"))
except Exception as e:
raise RuntimeError(f"Erreur lors du chargement du modèle d'embedding: {str(e)}")
print("Assurez-vous d'avoir installé les dépendances avec : pip install sentence-transformers")
# Création du vectorstore avec Chroma
print("Création du vectorstore...")
vectorstore = Chroma.from_documents(big_doc, embeddings)
# Configuration du récupérateur de documents
retriever = vectorstore.as_retriever(
search_type="similarity", # Utilisation de la similarité cosinus
search_kwargs={"k": 3} # Nombre de documents à récupérer
)
from langchain_core.prompts import PromptTemplate
# Définition du template pour le prompt RAG
rag_template = """
Vous êtes un assistant utile qui répond aux questions en vous basant sur le contexte fourni.
Contexte :
{context}
Question : {question}
Instructions :
- Répondez uniquement en français
- Si la réponse ne se trouve pas dans le contexte, dites-le clairement
- Soyez concis et précis dans vos réponses
Réponse :"""
# Création du prompt à partir du template
rag_prompt = PromptTemplate.from_template(rag_template)
# Configuration de la chaîne RAG de base
rag_chain = ({
"context": retriever, # Récupère le contexte pertinent
"question": RunnablePassthrough() # Passe la question directement
}) | rag_prompt | llm | StrOutputParser()
# Exemple d'utilisation simple
if __name__ == "__main__":
# Exemple de question de test
test_question = "Quel est le sujet principal des documents ?"
print(f"\nQuestion de test : {test_question}")
print("-" * 50)
# Affichage de la réponse en temps réel
for chunk in rag_chain.stream(test_question):
print(chunk, end="", flush=True)
def format_docs(docs):
"""
Formate une liste de documents en une seule chaîne de caractères.
Args:
docs (list): Liste d'objets Document
Returns:
str: Contenu des documents concaténés avec des sauts de ligne
"""
return "\n\n".join(d.page_content for d in docs)
# Définition du template pour la chaîne avec historique
rag_prompt_with_history = PromptTemplate(
input_variables=["context", "question", "history"],
template="""
Vous êtes un assistant utile qui répond aux questions en tenant compte de l'historique de la conversation.
Historique de la conversation :
{history}
Contexte :
{context}
Question : {question}
Instructions :
- Répondez en français
- Tenez compte de l'historique de la conversation
- Si la réponse ne se trouve pas dans le contexte, dites-le clairement
- Soyez concis et précis dans vos réponses
Réponse :"""
)
# Configuration de la chaîne de base avec historique
base_chain = (
{
"context": lambda x: format_docs(retriever.invoke(x["question"])),
"question": lambda x: x["question"],
"history": lambda x: x.get("history", "")
}
| rag_prompt_with_history
| llm
| StrOutputParser()
)
def get_session_history(session_id: str) -> InMemoryChatMessageHistory:
"""
Récupère ou crée un historique de conversation pour une session donnée.
Args:
session_id (str): Identifiant unique de la session
Returns:
InMemoryChatMessageHistory: Historique des messages de la session
"""
if session_id not in store:
store[session_id] = InMemoryChatMessageHistory()
return store[session_id]
# Création de la chaîne avec gestion de l'historique
rag_chain_with_history = RunnableWithMessageHistory(
base_chain,
get_session_history,
input_messages_key="question",
history_messages_key="history",
)
def ask_question(question: str, session_id: str = "default") -> str:
"""
Pose une question au système RAG avec gestion de l'historique.
Args:
question (str): Question à poser
session_id (str): Identifiant de session pour l'historique
Returns:
str: Réponse générée par le modèle
"""
print(f"\nQuestion: {question}")
print("-" * 50)
# Configuration de l'historique pour cette session
config = {"configurable": {"session_id": session_id}}
# Obtention de la réponse avec historique
response = rag_chain_with_history.invoke(
{"question": question},
config=config
)
print(f"\nRéponse: {response}")
return response
if __name__ == "__main__":
# Exemple d'utilisation avec historique
print("\n\n=== Test avec historique de conversation ===")
# Première question
ask_question("Quel est le sujet principal des documents ?", "session1")
# Deuxième question qui peut faire référence à la première
ask_question("Peux-tu me donner plus de détails ?", "session1")