datavid / app.py
datacipen's picture
Update app.py
2aa465c verified
raw
history blame
25 kB
"""
Application Chainlit pour l'Agent Collaboratif LangGraph
========================================================
Intégration complète avec:
- Chainlit 2.8.1
- Official Data Layer (PostgreSQL/Supabase)
- LangSmith monitoring
- Starters avec icônes
- Chain of Thought visible
- Style personnalisé (dark theme)
"""
import os
import json
import asyncio
from typing import Dict, Any, List, Optional
import chainlit as cl
from chainlit.types import ThreadDict, Starter
#from langsmith import Client
#from langsmith.run_helpers import traceable
from langsmith import traceable
# Import du module agent (votre code existant)
# On suppose que le code est dans agent_collaboratif_avid.py
from agent_collaboratif_avid import (
run_collaborative_agent,
retriever_manager,
PINECONE_INDEX_NAME,
OPENAI_MODEL_NAME,
SIMILARITY_TOP_K,
MAX_VALIDATION_LOOPS
)
import bcrypt
from chainlit.data.sql_alchemy import SQLAlchemyDataLayer
#from sql_alchemy import SQLAlchemyDataLayer
from supabase import create_client, Client
from supabase.lib.client_options import ClientOptions
#from langfuse.langchain import CallbackHandler
#import getpass
#os.environ['LANGFUSE_SECRET_KEY'] = getpass.getpass("Enter your Secret key: ")
#os.environ['LANGFUSE_PUBLIC_KEY'] = getpass.getpass("Enter your Public key: ")
#os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
#langfuse_handler = CallbackHandler()
SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_ANON_KEY = os.environ.get("SUPABASE_ANON_KEY")
CONNINFO = os.environ.get("CONNINFO")
url: str = SUPABASE_URL
key: str = SUPABASE_ANON_KEY
supabase: Client = create_client(url, key)
#url: str = "https://urtvfyesitnmwrouarze.supabase.co"
#key: str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InVydHZmeWVzaXRubXdyb3VhcnplIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NTk5NTgyNzIsImV4cCI6MjA3NTUzNDI3Mn0.HvZVdlyyck6iI6L5XiPupPZ8voLt-u4NCKAqgwHl6dE"
#supabase: Client = create_client(url, key, options=ClientOptions(auto_refresh_token=False,persist_session=True))
#@cl.data_layer
#def get_data_layer():
# return SQLAlchemyDataLayer(conninfo="postgresql+asyncpg://postgres.urtvfyesitnmwrouarze:2VlIHUmI3qVhJpcb@aws-1-eu-north-1.pooler.supabase.com:5432/postgres", storage_provider=supabase)
@cl.data_layer
def get_data_layer():
return SQLAlchemyDataLayer(conninfo=CONNINFO, storage_provider=supabase)
# =============================================================================
# CONFIGURATION DE L'AUTHENTIFICATION (Optionnel)
# =============================================================================
@cl.password_auth_callback
def auth_callback(username: str, password: str) -> Optional[cl.User]:
"""
Callback d'authentification (optionnel).
À configurer selon vos besoins.
"""
# Exemple simple (à remplacer par votre logique)
auth = json.loads(os.environ.get("CHAINLIT_AUTH_LOGIN"))
auth_iter = iter(auth)
while True:
# item will be "end" if iteration is complete
connexion = next(auth_iter, "end")
if bcrypt.checkpw(username.encode('utf-8'), bcrypt.hashpw(connexion['ident'].encode('utf-8'), bcrypt.gensalt())) == True and bcrypt.checkpw(password.encode('utf-8'), bcrypt.hashpw(connexion['pwd'].encode('utf-8'), bcrypt.gensalt())) == True:
print("OK")
return cl.User(
identifier=connexion['ident'],
metadata={"role": connexion['role'], "provider": "credentials"}
)
if connexion == "end":
break
return None
# =============================================================================
# CONFIGURATION LANGSMITH
# =============================================================================
LANGCHAIN_API_KEY = os.environ.get("LANGCHAIN_API_KEY")
LANGSMITH_PROJECT = os.environ.get("LANGSMITH_PROJECT")
if LANGCHAIN_API_KEY:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
#os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = LANGCHAIN_API_KEY
os.environ["LANGCHAIN_PROJECT"] = LANGSMITH_PROJECT
#langsmith_client = Client()
print(f"✅ LangSmith activé - Projet: {LANGSMITH_PROJECT}")
else:
print("⚠️ LANGCHAIN_API_KEY non définie - Monitoring désactivé")
langsmith_client = None
# =============================================================================
# FONCTIONS AUXILIAIRES POUR L'AFFICHAGE
# =============================================================================
async def send_cot_step(step_name: str, content: str, status: str = "running"):
"""Envoie une étape du Chain of Thought."""
step = cl.Step(
name=step_name,
type="tool",
show_input=True
)
step.output = content
if status == "done":
step.is_error = False
elif status == "error":
step.is_error = True
await step.send()
return step
async def display_query_analysis(analysis: Dict[str, Any]):
"""Affiche l'analyse de la requête."""
content = f"""**Bases identifiées:** {', '.join(analysis.get('databases_to_query', []))}
**Priorités:**
{json.dumps(analysis.get('priorities', {}), indent=2, ensure_ascii=False)}
**Résumé:** {analysis.get('analysis_summary', 'N/A')}
"""
await send_cot_step("🔍 Analyse de la requête", content, "done")
async def display_collection(info_list: List[Dict[str, Any]]):
"""Affiche les informations collectées."""
content_parts = []
for info in info_list:
content_parts.append(f"""
**📦 Base:** {info['database']}
**Catégorie:** {info['category']}
**Priorité:** {info['priority']}
**Résultats:** {info['results_count']}
""")
content = "\n".join(content_parts)
await send_cot_step("📊 Collecte d'informations", content, "done")
async def display_validation(validation: Dict[str, Any], iteration: int):
"""Affiche les résultats de validation."""
content = f"""**Itération:** {iteration}
**Score de confiance:** {validation.get('confidence_score', 0)}%
**Validé:** {'✅ Oui' if validation.get('is_valid') else '❌ Non'}
**Hallucinations détectées:** {len(validation.get('hallucinations_detected', []))}
"""
if validation.get('hallucinations_detected'):
content += "\n**Problèmes:**\n"
for hall in validation['hallucinations_detected']:
content += f"- {hall}\n"
status = "done" if validation.get('is_valid') else "error"
await send_cot_step(f"✅ Validation (#{iteration})", content, status)
async def display_similar_info(similar_info: List[Dict[str, Any]]):
"""Affiche les informations similaires."""
if not similar_info:
return
# Regrouper par base
grouped = {}
for item in similar_info:
db = item['database']
if db not in grouped:
grouped[db] = []
grouped[db].append(item)
elements = []
for db_name, items in grouped.items():
content_parts = [f"### 📚 {db_name.upper()}\n"]
content_parts.append(f"**Catégorie:** {items[0]['category']}")
content_parts.append(f"**Résultats:** {len(items)}\n")
for idx, item in enumerate(items[:3], 1): # Limiter à 3 par base
score = item.get('score', 'N/A')
content_parts.append(f"**{idx}. Score:** {score}")
content_preview = item['content'][:200]
if len(item['content']) > 200:
content_preview += "..."
content_parts.append(f"**Contenu:** {content_preview}\n")
# Créer un élément Chainlit
element = cl.Text(
content="\n".join(content_parts),
display="side"
)
elements.append(element)
if elements:
await cl.Message(
content="💡 **Informations similaires trouvées dans d'autres bases**",
elements=elements
).send()
async def display_web_search_results(web_search_results: List[Dict[str, Any]]):
"""Affiche les résultats de recherche web."""
if not web_search_results:
return
elements = []
content_parts = []
content_parts.append(f"### 🌐 Résultats de la recherche web\n")
content_parts.append(f"**Nombre de résultats:** {len(web_search_results)}\n")
for idx, item in enumerate(web_search_results[:5], 1): # Limiter à 5 résultats
content_parts.append(f"**{idx}. Titre:** {item['title']}")
content_parts.append(f"**Lien:** {item['markdown_link']}")
content_parts.append(f"**Résumé:** {item['summary']}\n")
element = cl.Text(
content="\n".join(content_parts),
display="side"
)
elements.append(element)
if elements:
await cl.Message(
content="🌐 **Informations trouvées sur le web**",
elements=elements
).send()
# =============================================================================
# FONCTIONS D'AFFICHAGE STREAMING PAR NŒUD
# =============================================================================
async def stream_response(content: str, msg: cl.Message, chunk_size: int = 50):
"""Stream du contenu progressivement dans un message."""
for i in range(0, len(content), chunk_size):
chunk = content[i:i + chunk_size]
msg.content += chunk
await msg.update()
await asyncio.sleep(0.25) # Petit délai pour un effet visuel
async def display_node_update(node_name: str, state: Dict[str, Any]):
"""Affiche les mises à jour d'état après l'exécution d'un nœud."""
if node_name == "analyze_query":
if state.get("query_analysis"):
await display_query_analysis(state["query_analysis"])
elif node_name == "collect_information":
if state.get("collected_information"):
await display_collection(state["collected_information"])
elif node_name == "generate_response":
if state.get("final_response"):
content = f"**Réponse générée** ({len(state['final_response'])} caractères)\n\nLa réponse complète sera affichée à la fin du workflow."
await send_cot_step("✏️ Génération de la réponse", content, "done")
elif node_name == "validate_response":
if state.get("validation_results"):
iteration = state.get("iteration_count", len(state["validation_results"]))
last_validation = state["validation_results"][-1]
await display_validation(last_validation, iteration)
elif node_name == "refine_response":
content = f"**Itération:** {state.get('iteration_count', 0)}\n**Correction en cours...**"
await send_cot_step("⚙️ Refinement", content, "done")
elif node_name == "collect_similar_information":
if state.get("additional_information"):
await display_similar_info(state["additional_information"])
# =============================================================================
# FONCTION PRINCIPALE TRACÉE AVEC LANGSMITH
# =============================================================================
@traceable(name="agent_collaboratif_query", project_name=LANGSMITH_PROJECT)
async def process_query_with_tracing(query: str, thread_id: str) -> Dict[str, Any]:
"""Traite la requête avec traçage LangSmith et streaming en temps réel."""
# Import du workflow
from agent_collaboratif_avid import AgentState, create_agent_workflow
from langchain_core.messages import HumanMessage
app = create_agent_workflow()
initial_state = {
"messages": [HumanMessage(content=query)],
"user_query": query,
"query_analysis": {},
"collected_information": [],
"validation_results": [],
"final_response": "",
"iteration_count": 0,
"errors": [],
"additional_information": [],
"similar_info_response":"",
"web_search_results": []
}
# Message de démarrage
await send_cot_step("🔄 Démarrage", "Initialisation du workflow LangGraph...", "done")
# Variables pour suivre l'état
final_state = None
# STREAMING: Utilisation de app.astream() pour obtenir les mises à jour après chaque nœud
try:
#async for event in app.astream(initial_state, {"callbacks": [langfuse_handler]}):
#app.invoke(intial_state, config={"callbacks": [langfuse_handler]})
async for event in app.astream(initial_state):
# event est un dictionnaire avec les nœuds comme clés
for node_name, node_state in event.items():
# Ignorer le nœud spécial __start__
if node_name == "__start__":
continue
# Afficher un message de progression pour le nœud actuel
node_display_names = {
"analyze_query": "🔍 Analyse de la requête",
"collect_information": "📊 Collecte d'informations",
"generate_response": "✏️ Génération de la réponse",
"validate_response": "✅ Validation anti-hallucination",
"refine_response": "⚙️ Refinement de la réponse",
"collect_similar_information": "🔗 Collecte d'informations similaires"
}
display_name = node_display_names.get(node_name, f"⚙️ {node_name}")
# Message de progression
await send_cot_step(
f"🔄 {display_name}",
f"Nœud exécuté avec succès",
"done"
)
# Afficher les détails spécifiques du nœud
await display_node_update(node_name, node_state)
# Sauvegarder l'état final
final_state = node_state
except Exception as e:
error_msg = f"Erreur lors du streaming: {str(e)}"
await send_cot_step("❌ Erreur", error_msg, "error")
raise
# Si le streaming n'a pas retourné d'état final, utiliser la méthode classique
if final_state is None:
final_state = initial_state
result = {
"query": query,
"query_analysis": final_state.get("query_analysis", {}),
"collected_information": final_state.get("collected_information", []),
"validation_results": final_state.get("validation_results", []),
"final_response": final_state.get("final_response", ""),
"iteration_count": final_state.get("iteration_count", 0),
"errors": final_state.get("errors", []),
"additional_information": final_state.get("additional_information", []),
"similar_info_response": final_state.get("similar_info_response", ""),
"web_search_results": final_state.get("web_search_results", []),
"sources_used": [
info["database"]
for info in final_state.get("collected_information", [])
],
"pinecone_index": PINECONE_INDEX_NAME
}
return result
# =============================================================================
# CALLBACKS CHAINLIT
# =============================================================================
@cl.set_chat_profiles
async def chat_profile(current_user: cl.User):
return [
cl.ChatProfile(
name="Avid Agent",
markdown_description="🎓 Avid Agent permet de converser avec un agent collaboratif entre 4 bases de données pour extraire les informations pertinentes afin de générer une réponse en réduisant les hallucations, par relecture et redéfinition des éléments.",
icon="/public/sparkles-gustaveia.png",
starters=[
cl.Starter(
label= "🔬 Laboratoires & Mobilité",
message= "Quels sont les laboratoires de l'université Gustave Eiffel travaillant sur la mobilité urbaine durable?",
#icon= "/public/icons/lab.svg"
),
cl.Starter(
label= "🎓 Formations Master",
message= "Je cherche des formations en master sur l'aménagement urbain et le développement durable",
#icon= "/public/icons/education.svg"
),
cl.Starter(
label= "🤝 Collaborations Recherche",
message= "Quels laboratoires ont des axes de recherche similaires en énergie et pourraient collaborer?",
#icon= "/public/icons/collaboration.svg"
),
cl.Starter(
label= "⚙️ Équipements Lab",
message= "Liste les équipements disponibles dans les laboratoires travaillant sur la qualité de l'air",
#icon= "/public/icons/equipment.svg"
),
cl.Starter(
label= "📚 Publications Récentes",
message= "Trouve des publications récentes sur la transition énergétique dans les villes",
#icon= "/public/icons/publications.svg"
),
cl.Starter(
label= "👥 Auteurs & Labs",
message= "Qui sont les auteurs qui publient sur la mobilité douce et dans quels laboratoires?",
#icon= "/public/icons/authors.svg"
),
cl.Starter(
label= "📖 Urbanisme Durable",
message= "Quelles publications traitent de l'urbanisme durable et quand ont-elles été publiées?",
#icon= "/public/icons/urban.svg"
),
cl.Starter(
label= "🏙️ Ville Intelligente",
message= "Compare les formations et les laboratoires sur le thème de la ville intelligente",
#icon= "/public/icons/smart-city.svg"
),
cl.Starter(
label= "🌍 Résilience Urbaine",
message= "Identifie les opportunités de partenariats entre laboratoires sur la résilience urbaine",
#icon= "/public/icons/resilience.svg"
),
cl.Starter(
label= "♻️ Économie Circulaire",
message= "Quelles sont les compétences enseignées dans les formations liées à l'économie circulaire?",
#icon= "/public/icons/circular.svg"
)
]
),cl.ChatProfile(
name="Avid Dataviz",
markdown_description="💡 Avid Dataviz permet d'avoir recours à des éléments statistiques et de corrélation entre les données laboratoires et les thématiques Ville Durable",
)
]
@cl.on_chat_start
async def start():
"""Initialisation de la session chat."""
user = cl.user_session.get("user")
chat_profile = cl.user_session.get("chat_profile")
if chat_profile == "Avid Dataviz":
await cl.Message(
content=f"Bienvenue {user.identifier}!\n\nL'environnement {chat_profile} vous restitue les données sous forme d'objets statistiques."
).send()
# Message de bienvenue avec style
# welcome_msg = f"""# 🎓 Agent Collaboratif - Université Gustave Eiffel
#Bienvenue ! Je suis votre assistant spécialisé en **Ville Durable**.
## 🔧 Configuration
#- **Index Pinecone:** `{PINECONE_INDEX_NAME}`
#- **Modèle:** `{OPENAI_MODEL_NAME}`
#- **Top K résultats:** `{SIMILARITY_TOP_K}`
#- **Max validations:** `{MAX_VALIDATION_LOOPS}`
## 💡 Fonctionnalités
#✅ Recherche multi-bases vectorielles
#✅ Validation anti-hallucination
#✅ Suggestions d'informations connexes
#✅ Traçage LangSmith actif
#**Choisissez un starter ou posez votre question !**
#"""
# await cl.Message(content=welcome_msg).send()
# Sauvegarder les métadonnées de session
# cl.user_session.set("session_started", True)
# cl.user_session.set("query_count", 0)
@cl.on_message
async def main(message: cl.Message):
"""Traitement du message utilisateur."""
query = message.content
thread_id = cl.context.session.thread_id
# Incrémenter le compteur
query_count = cl.user_session.get("query_count", 0) + 1
cl.user_session.set("query_count", query_count)
# Message de traitement
processing_msg = cl.Message(content="")
await processing_msg.send()
try:
# Traitement avec affichage du COT
result = await process_query_with_tracing(query, thread_id)
# Réponse finale en streaming
final_response = result["final_response"]
# Afficher un séparateur
await send_cot_step("📝 Réponse finale", "Affichage de la réponse complète en streaming...", "done")
# Créer un nouveau message pour la réponse finale
response_msg = cl.Message(content="")
await response_msg.send()
# Streamer la réponse complète
await stream_response(final_response, response_msg, chunk_size=50)
# Afficher les informations similaires collectées par le nœud 6
if result.get("similar_info_response"):
similar_msg = cl.Message(content="")
await similar_msg.send()
# Streamer la réponse similaire
await stream_response(result["similar_info_response"], similar_msg, chunk_size=50)
#await display_similar_info(result["similar_info_response"])
# Afficher les résultats de recherche web collectés par le nœud 7
web_msg = cl.Message(content="Résultats de recherche complémentaires sur le web : \n\n")
await web_msg.send()
for result_web in result["web_search_results"]:
web_search = "- " + result_web['markdown_link'] + " : " + result_web['summary'] + "\n\n"
await stream_response(web_search, web_msg, chunk_size=50)
#await display_web_search_results(result["web_search_results"])
# Métadonnées
metadata_parts = [
f"\n\n---\n### 📊 Métadonnées du traitement",
f"**Sources consultées:** {', '.join(result['sources_used']) if result['sources_used'] else 'Aucune'}",
f"**Itérations:** {result['iteration_count']}",
]
if result['validation_results']:
last_val = result['validation_results'][-1]
metadata_parts.append(f"**Confiance finale:** {last_val.get('confidence_score', 0)}%")
metadata_parts.append(f"**Requête n°:** {query_count}")
# Ajouter les métadonnées en streaming
metadata_text = "\n".join(metadata_parts)
await stream_response(metadata_text, response_msg, chunk_size=100)
# Supprimer le message de traitement initial vide
processing_msg.content = "✅ Traitement terminé"
await processing_msg.update()
# Sauvegarder dans l'historique de session
cl.user_session.set(f"query_{query_count}", {
"query": query,
"response": final_response,
"sources": result['sources_used']
})
except Exception as e:
error_msg = f"❌ **Erreur lors du traitement:**\n\n```\n{str(e)}\n```"
processing_msg.content = error_msg
await processing_msg.update()
# Log dans LangSmith si disponible
#if langsmith_client:
# langsmith_client.create_feedback(
# run_id=thread_id,
# key="error",
# score=0,
# comment=str(e)
# )
@cl.on_shared_thread_view
async def on_shared_thread_view(thread: ThreadDict, viewer: Optional[cl.User]) -> bool:
return True
@cl.on_chat_resume
async def on_chat_resume(thread: ThreadDict):
"""Reprise d'une conversation existante."""
thread_id = thread["id"]
resume_msg = f"""# 🔄 Conversation reprise
**Thread ID:** `{thread_id}`
Vous pouvez continuer votre conversation ou poser une nouvelle question.
"""
await cl.Message(content=resume_msg).send()
@cl.on_stop
async def on_stop():
"""Callback à l'arrêt de l'exécution."""
await cl.Message(content="⏹️ Traitement interrompu par l'utilisateur.").send()
@cl.on_chat_end
async def on_chat_end():
"""Callback à la fin de la session."""
query_count = cl.user_session.get("query_count", 0)
end_msg = f"""# 👋 Session terminée
Merci d'avoir utilisé l'agent collaboratif !
**Statistiques de session:**
- **Requêtes traitées:** {query_count}
- **Index Pinecone:** {PINECONE_INDEX_NAME}
"""
await cl.Message(content=end_msg).send()
# =============================================================================
# CONFIGURATION DU DATA LAYER (Supabase/PostgreSQL)
# =============================================================================
"""
Pour activer le Data Layer avec Supabase, créez un fichier .env:
CHAINLIT_AUTH_SECRET=your-secret-key
LITERAL_API_KEY=your-literal-api-key
LITERAL_API_URL=https://cloud.getliteral.ai
Ou configurez PostgreSQL directement:
DATABASE_URL=postgresql://user:password@host:port/dbname
Le Data Layer sera automatiquement activé si ces variables sont définies.
"""