Spaces:
Running
Running
| from typing import TypedDict, Annotated, Sequence | |
| from langchain_core.messages import BaseMessage | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.graph.message import add_messages | |
| from neo4j_utils import unified_search | |
| from typing import TypedDict, Sequence, List, Dict, Optional, Annotated | |
| from config import llm | |
| import re | |
| from evaluations.correctness import correctness | |
| class GraphState(TypedDict): | |
| messages: Annotated[Sequence[BaseMessage], add_messages] | |
| query: str | |
| relevant_docs: List[Dict[str, Optional[Dict[str, float]]]] # Résultats de la recherche hybride | |
| neo4j_results: list # Résultats de la recherche Neo4j | |
| response: str | |
| score: Optional[float] | |
| evaluation_explanation: Optional[str] | |
| k: int | |
| alpha: float | |
| similarity_threshold: float | |
| def retrieve_unified(state: GraphState) -> dict: | |
| """Récupération unifiée : recherche hybride (Pinecone + BM25) + Neo4j.""" | |
| # Appeler la fonction unified_search | |
| unified_results = unified_search(state["query"], | |
| alpha=state.get("alpha"), | |
| k=state.get("k"), | |
| similarity_threshold=state.get("similarity_threshold")) | |
| # Séparer les résultats hybrides et Neo4j | |
| hybrid_results = [result for result in unified_results if result["type"] == "text"] | |
| neo4j_results = [result for result in unified_results if result["type"] == "entity"] | |
| return { | |
| "relevant_docs": hybrid_results, | |
| "neo4j_results": neo4j_results | |
| } | |
| def generate_response(state: GraphState) -> dict: | |
| """Génération de réponse en combinant informations sémantiques, mots-clés et données Neo4j.""" | |
| context = "\n\n".join([doc["content"] for doc in state["relevant_docs"] if doc["type"] == "text"]) | |
| neo4j_context = "\n\n".join([ | |
| str(record["content"]) for record in state["neo4j_results"] | |
| if not any(keyword in str(record["content"]) for keyword in ["MATCH", "RETURN", "CREATE", "MERGE", "WHERE", "SET"]) | |
| ]) | |
| prompt = f""" | |
| Vous êtes un expert en analyse de texte et en données structurées. | |
| Votre tâche consiste à répondre à la question de l'utilisateur en utilisant les informations pertinentes fournies. | |
| Intégrez à la fois les éléments sémantiques (contexte, sens), les mots-clés exacts et les données structurées pour fournir une réponse fluide et cohérente. | |
| **Instructions supplémentaires** : | |
| - Utilisez les mots-clés pertinents de manière naturelle dans votre réponse. | |
| - Expliquez les concepts en vous appuyant sur le contexte sémantique. | |
| - Intégrez les données structurées (entités, relations) de manière claire et concise. | |
| - Évitez de générer des reponses redondantes, faites une fusion entre les élements sémantiques et les elements structurées. | |
| - Ne mentionnez pas explicitement les termes "recherche sémantique", "recherche par mots-clés" ou "base de données Neo4j". | |
| - **Ne fournissez pas la requête Cypher dans la réponse.** | |
| **Informations pertinentes trouvées (texte)** : | |
| {context} | |
| **Informations pertinentes trouvées (données structurées)** : | |
| {neo4j_context} | |
| **Question de l'utilisateur** : | |
| {state["query"]} | |
| **Réponse :** | |
| [Fournissez une réponse cohérente qui intègre à la fois les éléments sémantiques, les mots-clés pertinents et les données structurées sans les distinguer explicitement.] | |
| """ | |
| response = llm.invoke(prompt) | |
| # **Nettoyage avancé : suppression des requêtes Cypher dans la réponse** | |
| response_cleaned = re.sub(r"(MATCH|RETURN|CREATE|MERGE|WHERE|SET)[\s\S]*?[.;]", "", response.content, flags=re.IGNORECASE).strip() | |
| return {"response": response_cleaned} | |
| def evaluate_response(state: GraphState) -> dict: | |
| """Évalue la réponse générée en comparant avec la vérité terrain (LangSmith).""" | |
| inputs = {"question": state["query"]} | |
| outputs = {"answer": state["response"]} | |
| try: | |
| score = correctness(inputs, outputs) | |
| explanation = f"La réponse a obtenu un score de {score}/10. Voici l'explication de l'évaluation..." | |
| except Exception as e: | |
| score = 0 | |
| explanation = f"Erreur lors de l'évaluation : {str(e)}" | |
| return {"score": score, "evaluation_explanation": explanation} | |
| def post_process_response(state: GraphState) -> dict: | |
| """Nettoie et valide la réponse.""" | |
| response = state["response"].strip() | |
| # Vérifier si la réponse est pertinente | |
| if not response or response.lower() in ["je ne sais pas", "i don't know"]: | |
| response = "Désolé, je n'ai pas trouvé d'informations pertinentes pour votre question." | |
| evaluation = evaluate_response(state) | |
| return { | |
| "response": response, | |
| "score": evaluation["score"], # Ajout du score | |
| "evaluation_explanation": evaluation["evaluation_explanation"] # Explication | |
| } | |
| # Construction du graphe | |
| graph_builder = StateGraph(GraphState) | |
| # Ajouter les nœuds | |
| graph_builder.add_node("evaluate", evaluate_response) | |
| graph_builder.add_node("retrieve", retrieve_unified) | |
| graph_builder.add_node("generate", generate_response) | |
| graph_builder.add_node("post_process", post_process_response) | |
| # Définir les transitions | |
| graph_builder.set_entry_point("retrieve") | |
| graph_builder.add_edge("retrieve", "generate") | |
| graph_builder.add_edge("generate", "evaluate") | |
| graph_builder.add_edge("evaluate", "post_process") | |
| graph_builder.add_edge("post_process", END) | |
| # Compiler le graphe | |
| agent = graph_builder.compile() |