Spaces:
Sleeping
Sleeping
| # Importations des bibliothèques standards | |
| import os | |
| import re | |
| # Importations des bibliothèques tierces | |
| from dotenv import load_dotenv | |
| from langchain_core.prompts import ChatPromptTemplate, PromptTemplate | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| try: | |
| from langchain_community.document_loaders import PyPDFLoader | |
| except ImportError: | |
| raise ImportError("Le package PyPDF n'est pas installé. Veuillez l'installer avec 'pip install pypdf'") | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.runnables import RunnablePassthrough, RunnableWithMessageHistory | |
| from langchain_core.chat_history import InMemoryChatMessageHistory | |
| # Chargement des variables d'environnement | |
| load_dotenv() | |
| # Récupération des clés API depuis les variables d'environnement | |
| GROQ_API_KEY = os.getenv('GROQ_API_KEY') | |
| GEMINI_API_KEY = os.getenv('GEMINI_API_KEY') # Non utilisé dans ce script | |
| HF_API_KEY = os.getenv('HF_API_KEY') # Non utilisé directement ici mais nécessaire pour certains modèles HuggingFace | |
| # Vérification de la présence des clés API requises | |
| if not GROQ_API_KEY: | |
| raise ValueError("La clé API GROQ n'est pas définie dans le fichier .env") | |
| try: | |
| # Initialisation du modèle de langage Groq | |
| from langchain_groq import ChatGroq | |
| llm = ChatGroq( | |
| model="llama-3.1-8b-instant", | |
| temperature=0, | |
| api_key=GROQ_API_KEY | |
| ) | |
| except ImportError: | |
| raise ImportError("Le package langchain_groq n'est pas installé. Veuillez l'installer avec 'pip install langchain-groq'") | |
| # Dictionnaire pour stocker l'historique des sessions | |
| store = {} | |
| def clean_text(text): | |
| """ | |
| Nettoie le texte en effectuant les opérations suivantes : | |
| 1. Supprime les retours à la ligne et les chiffres isolés | |
| 2. Supprime les balises HTML et les caractères spéciaux | |
| 3. Convertit le texte en minuscules | |
| Args: | |
| text (str): Texte à nettoyer | |
| Returns: | |
| str: Texte nettoyé | |
| """ | |
| # Suppression des retours à la ligne et des chiffres isolés | |
| text = re.sub(r"\n\d*\n|\n", " ", str(text)) | |
| # Suppression des balises HTML et des caractères spéciaux | |
| # On conserve uniquement les caractères alphanumériques et les espaces | |
| text = re.sub(r"<[^>]+>|[^\w\s]", "", text) | |
| return text.lower().strip() | |
| def load_documents(path="."): | |
| """ | |
| Charge et traite les documents PDF présents dans le répertoire spécifié. | |
| Args: | |
| path (str): Chemin vers le répertoire contenant les fichiers PDF | |
| Returns: | |
| list: Liste des documents chargés et traités | |
| """ | |
| if not os.path.exists(path): | |
| raise FileNotFoundError(f"Le répertoire {path} n'existe pas") | |
| big_doc = [] | |
| try: | |
| # Liste tous les fichiers PDF dans le répertoire | |
| pdf_files = [f for f in os.listdir(path) if f.lower().endswith('.pdf')] | |
| except Exception as e: | |
| print(f"Erreur lors de la lecture du répertoire {path}: {str(e)}") | |
| return big_doc | |
| if not pdf_files: | |
| print(f"Aucun fichier PDF trouvé dans le répertoire {os.path.abspath(path)}") | |
| return big_doc | |
| # Configuration du découpeur de texte avec une taille de morceau de 1000 caractères | |
| # et un chevauchement de 200 caractères pour préserver le contexte | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200 | |
| ) | |
| # Traitement de chaque fichier PDF | |
| for file in pdf_files: | |
| try: | |
| file_path = os.path.abspath(os.path.join(path, file)) | |
| print(f"\nTraitement du fichier : {file_path}") | |
| # Vérification de la lecture du fichier | |
| if not os.path.isfile(file_path): | |
| print(f" - Erreur: Le fichier {file_path} n'existe pas ou n'est pas accessible") | |
| continue | |
| # Vérification des permissions | |
| if not os.access(file_path, os.R_OK): | |
| print(f" - Erreur: Pas les permissions de lecture sur le fichier {file_path}") | |
| continue | |
| try: | |
| # Chargement et découpage du PDF | |
| loader = PyPDFLoader(file_path) | |
| documents = loader.load_and_split(text_splitter=splitter) | |
| # Nettoyage du contenu de chaque document | |
| for doc in documents: | |
| if hasattr(doc, 'page_content'): | |
| doc.page_content = clean_text(doc.page_content) | |
| else: | |
| print(f" - Avertissement: Le document ne contient pas d'attribut page_content") | |
| continue | |
| big_doc.extend(documents) | |
| print(f" - Succès: {len(documents)} morceaux extraits") | |
| except Exception as e: | |
| print(f" - Erreur lors du traitement du PDF: {str(e)}") | |
| # Essai avec un autre lecteur PDF si disponible | |
| try: | |
| from PyPDF2 import PdfReader | |
| with open(file_path, 'rb') as f: | |
| reader = PdfReader(f) | |
| print(f" - Le fichier contient {len(reader.pages)} pages") | |
| except Exception as e2: | |
| print(f" - Échec de la lecture alternative du PDF: {str(e2)}") | |
| continue | |
| except Exception as e: | |
| print(f" - Erreur inattendue: {str(e)}") | |
| continue | |
| return big_doc | |
| # Chargement des documents | |
| print("Chargement des documents...") | |
| big_doc = load_documents() | |
| print(f"{len(big_doc)} morceaux de documents chargés") | |
| if not big_doc: | |
| print("\nAucun document valide n'a pu être chargé.") | |
| print("Vérifiez que :") | |
| print("1. Des fichiers PDF sont présents dans le répertoire") | |
| print("2. Vous avez les permissions de lecture sur les fichiers") | |
| print("3. Les fichiers PDF ne sont pas corrompus") | |
| print(f"Répertoire actuel: {os.getcwd()}") | |
| print("Fichiers PDF détectés:", [f for f in os.listdir('.') if f.lower().endswith('.pdf')]) | |
| # Ne pas lever d'erreur pour permettre le débogage | |
| print("\nLe script continue avec une liste de documents vide.") | |
| print("Les fonctionnalités nécessitant des documents ne seront pas disponibles.") | |
| # Initialisation du modèle d'embedding | |
| print("Initialisation du modèle d'embedding...") | |
| try: | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| import os | |
| # Gemini Developer API | |
| embeddings = GoogleGenerativeAIEmbeddings(model="gemini-embedding-001", api_key=os.getenv("GEMINI_API_KEY")) | |
| except Exception as e: | |
| raise RuntimeError(f"Erreur lors du chargement du modèle d'embedding: {str(e)}") | |
| print("Assurez-vous d'avoir installé les dépendances avec : pip install sentence-transformers") | |
| # Création du vectorstore avec Chroma | |
| print("Création du vectorstore...") | |
| vectorstore = Chroma.from_documents(big_doc, embeddings) | |
| # Configuration du récupérateur de documents | |
| retriever = vectorstore.as_retriever( | |
| search_type="similarity", # Utilisation de la similarité cosinus | |
| search_kwargs={"k": 3} # Nombre de documents à récupérer | |
| ) | |
| from langchain_core.prompts import PromptTemplate | |
| # Définition du template pour le prompt RAG | |
| rag_template = """ | |
| Vous êtes un assistant utile qui répond aux questions en vous basant sur le contexte fourni. | |
| Contexte : | |
| {context} | |
| Question : {question} | |
| Instructions : | |
| - Répondez uniquement en français | |
| - Si la réponse ne se trouve pas dans le contexte, dites-le clairement | |
| - Soyez concis et précis dans vos réponses | |
| Réponse :""" | |
| # Création du prompt à partir du template | |
| rag_prompt = PromptTemplate.from_template(rag_template) | |
| # Configuration de la chaîne RAG de base | |
| rag_chain = ({ | |
| "context": retriever, # Récupère le contexte pertinent | |
| "question": RunnablePassthrough() # Passe la question directement | |
| }) | rag_prompt | llm | StrOutputParser() | |
| # Exemple d'utilisation simple | |
| if __name__ == "__main__": | |
| # Exemple de question de test | |
| test_question = "Quel est le sujet principal des documents ?" | |
| print(f"\nQuestion de test : {test_question}") | |
| print("-" * 50) | |
| # Affichage de la réponse en temps réel | |
| for chunk in rag_chain.stream(test_question): | |
| print(chunk, end="", flush=True) | |
| def format_docs(docs): | |
| """ | |
| Formate une liste de documents en une seule chaîne de caractères. | |
| Args: | |
| docs (list): Liste d'objets Document | |
| Returns: | |
| str: Contenu des documents concaténés avec des sauts de ligne | |
| """ | |
| return "\n\n".join(d.page_content for d in docs) | |
| # Définition du template pour la chaîne avec historique | |
| rag_prompt_with_history = PromptTemplate( | |
| input_variables=["context", "question", "history"], | |
| template=""" | |
| Vous êtes un assistant utile qui répond aux questions en tenant compte de l'historique de la conversation. | |
| Historique de la conversation : | |
| {history} | |
| Contexte : | |
| {context} | |
| Question : {question} | |
| Instructions : | |
| - Répondez en français | |
| - Tenez compte de l'historique de la conversation | |
| - Si la réponse ne se trouve pas dans le contexte, dites-le clairement | |
| - Soyez concis et précis dans vos réponses | |
| Réponse :""" | |
| ) | |
| # Configuration de la chaîne de base avec historique | |
| base_chain = ( | |
| { | |
| "context": lambda x: format_docs(retriever.invoke(x["question"])), | |
| "question": lambda x: x["question"], | |
| "history": lambda x: x.get("history", "") | |
| } | |
| | rag_prompt_with_history | |
| | llm | |
| | StrOutputParser() | |
| ) | |
| def get_session_history(session_id: str) -> InMemoryChatMessageHistory: | |
| """ | |
| Récupère ou crée un historique de conversation pour une session donnée. | |
| Args: | |
| session_id (str): Identifiant unique de la session | |
| Returns: | |
| InMemoryChatMessageHistory: Historique des messages de la session | |
| """ | |
| if session_id not in store: | |
| store[session_id] = InMemoryChatMessageHistory() | |
| return store[session_id] | |
| # Création de la chaîne avec gestion de l'historique | |
| rag_chain_with_history = RunnableWithMessageHistory( | |
| base_chain, | |
| get_session_history, | |
| input_messages_key="question", | |
| history_messages_key="history", | |
| ) | |
| def ask_question(question: str, session_id: str = "default") -> str: | |
| """ | |
| Pose une question au système RAG avec gestion de l'historique. | |
| Args: | |
| question (str): Question à poser | |
| session_id (str): Identifiant de session pour l'historique | |
| Returns: | |
| str: Réponse générée par le modèle | |
| """ | |
| print(f"\nQuestion: {question}") | |
| print("-" * 50) | |
| # Configuration de l'historique pour cette session | |
| config = {"configurable": {"session_id": session_id}} | |
| # Obtention de la réponse avec historique | |
| response = rag_chain_with_history.invoke( | |
| {"question": question}, | |
| config=config | |
| ) | |
| print(f"\nRéponse: {response}") | |
| return response | |
| if __name__ == "__main__": | |
| # Exemple d'utilisation avec historique | |
| print("\n\n=== Test avec historique de conversation ===") | |
| # Première question | |
| ask_question("Quel est le sujet principal des documents ?", "session1") | |
| # Deuxième question qui peut faire référence à la première | |
| ask_question("Peux-tu me donner plus de détails ?", "session1") | |