testpush / langgraphe_app.py
Bachir00's picture
langGraphe code
20d5dab
from langchain_groq import ChatGroq
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, END
from typing import TypedDict, Sequence, Annotated, Union
from langchain_core.messages import BaseMessage
from dotenv import load_dotenv
from langchain_core.tools import tool
import os
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
import asyncio
from src.agents.researcher_agent import ResearcherAgent
from src.agents.content_extractor_agent import ContentExtractorAgent
from src.agents.summarizer_agent import SummarizerAgent
from src.agents.global_synthesizer_agent import GlobalSynthesizerAgent
from src.models.research_models import ResearchQuery
# ============================================================================
# VOS AGENTS EXISTANTS (ne pas modifier)
# ============================================================================
researcher_agent = ResearcherAgent()
content_extractor_agent = ContentExtractorAgent()
summarizer_agent = SummarizerAgent()
global_synthesizer_agent = GlobalSynthesizerAgent()
# ============================================================================
# OUTIL QUI ENCAPSULE VOTRE PIPELINE COMPLET
# ============================================================================
@tool
def research_complete_pipeline(topic: str, max_results: Union[int, str] = 2) -> str:
"""Exécute un pipeline de recherche complet sur un sujet donné.
Ce tool encapsule 4 agents qui travaillent ensemble :
1. ResearcherAgent : recherche web et extraction de mots-clés
2. ContentExtractorAgent : extraction du contenu des pages
3. SummarizerAgent : création de résumés détaillés
4. GlobalSynthesizerAgent : synthèse globale finale
Args:
topic: Le sujet de recherche (ex: "impact de l'IA sur l'emploi")
max_results: Nombre de sources à analyser (2-10, défaut: 2)
Returns:
Un rapport complet au format texte avec résumé exécutif et analyse détaillée
"""
# Conversion et validation
if isinstance(max_results, str):
try:
max_results = int(max_results)
except ValueError:
max_results = 2
max_results = max(2, min(max_results, 10))
async def run_pipeline():
print(f"\n{'='*60}")
print(f"🚀 DÉMARRAGE DU PIPELINE DE RECHERCHE")
print(f"📋 Sujet: {topic}")
print(f"📊 Sources à analyser: {max_results}")
print(f"{'='*60}\n")
# ÉTAPE 1: Recherche
print("🔍 [1/4] Recherche web en cours...")
query = ResearchQuery(
topic=topic,
keywords=await researcher_agent.extract_keywords_with_llm(topic),
max_results=max_results,
search_depth="basic"
)
research_data = await researcher_agent.process(query)
print(f"✅ Trouvé {research_data.total_found} sources")
# ÉTAPE 2: Extraction
print("\n📄 [2/4] Extraction du contenu...")
extraction_data = await content_extractor_agent.process_from_research_output(
research_output=research_data
)
print(f"✅ Extrait {extraction_data.successful_extractions} documents")
# ÉTAPE 3: Résumés
print("\n📝 [3/4] Création des résumés...")
summarization_data = await summarizer_agent.process_from_extraction_result(
extraction_result=extraction_data
)
print(f"✅ Généré {summarization_data.total_documents} résumés")
# ÉTAPE 4: Synthèse globale
print("\n🎯 [4/4] Synthèse globale...")
global_synthesis = await global_synthesizer_agent.process_from_summarization_output(
summarization_output=summarization_data
)
print(f"✅ Rapport final généré ({global_synthesis.final_report.word_count} mots)")
print(f"\n{'='*60}")
print("✨ PIPELINE TERMINÉ AVEC SUCCÈS")
print(f"{'='*60}\n")
# Retourner le rapport en format markdown
return global_synthesis.formatted_outputs.get('markdown',
global_synthesis.formatted_outputs.get('text',
str(global_synthesis))
)
return asyncio.run(run_pipeline())
# ============================================================================
# CONFIGURATION DU LLM ET DU GRAPHE
# ============================================================================
# État du graphe
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], add_messages]
# Chargement des variables d'environnement
load_dotenv()
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
raise ValueError("GROQ_API_KEY non définie dans .env")
# Configuration du modèle avec l'outil
tools = [research_complete_pipeline]
model = ChatGroq(
model="llama-3.1-8b-instant",
temperature=0.3, # Bas pour plus de cohérence
max_tokens=2048*2,
api_key=api_key
).bind_tools(tools)
# ============================================================================
# NŒUDS DU GRAPHE
# ============================================================================
def model_call(state: AgentState) -> AgentState:
"""Nœud qui appelle le LLM pour décider quoi faire"""
system_prompt = SystemMessage(content="""Tu es un assistant de recherche intelligent.
🎯 TON RÔLE:
Tu aides les utilisateurs à obtenir des résumés et analyses sur n'importe quel sujet.
🔧 TON OUTIL:
Tu as accès à un outil puissant appelé 'research_complete_pipeline' qui :
- Effectue des recherches web automatiques
- Extrait et analyse le contenu
- Génère des résumés détaillés
- Produit une synthèse globale complète
📋 QUAND L'UTILISER:
Utilise cet outil quand l'utilisateur demande :
- Un résumé sur un sujet
- Des informations sur un topic
- Une analyse d'un domaine
- Une recherche documentée
💡 COMMENT L'UTILISER:
- Identifie le sujet principal de la demande
- Appelle research_complete_pipeline avec le sujet en français clair
- Utilise max_results=2 pour une recherche standard
✅ EXEMPLES:
User: "Résume l'impact de l'IA sur l'emploi"
→ Appelle: research_complete_pipeline(topic="impact de l'intelligence artificielle sur le marché de l'emploi", max_results=2)
User: "Fais-moi une analyse complète sur le changement climatique"
→ Appelle: research_complete_pipeline(topic="changement climatique", max_results=3)
⚠️ IMPORTANT:
- N'essaie PAS de faire la recherche toi-même
- Utilise TOUJOURS l'outil pour les demandes de recherche
- Le résultat de l'outil est déjà un rapport complet formaté
- Tu peux présenter le résultat directement à l'utilisateur
"""
)
messages = state["messages"]
response = model.invoke([system_prompt] + messages)
return {"messages": [response]}
def should_continue(state: AgentState) -> str:
"""Décide si on continue avec des outils ou si on termine"""
messages = state["messages"]
last_message = messages[-1]
# Si le dernier message a des appels d'outils, continuer
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
return "continue"
else:
return "end"
# ============================================================================
# CONSTRUCTION DU GRAPHE LANGGRAPH
# ============================================================================
# Créer le graphe
graph = StateGraph(AgentState)
# Ajouter les nœuds
graph.add_node("llm", model_call)
tool_node = ToolNode(tools=tools)
graph.add_node("tools", tool_node)
# Définir le point d'entrée
graph.set_entry_point("llm")
# Ajouter les transitions conditionnelles
graph.add_conditional_edges(
"llm",
should_continue,
{
"continue": "tools",
"end": END,
},
)
# Après l'exécution des outils, retourner au LLM pour présenter les résultats
graph.add_edge("tools", "llm")
# Compiler le graphe
app = graph.compile()