# MistralChat.py (version LangGraph hybride RAG + SQL) import atexit import os import threading import time import requests import streamlit as st import logfire # --------------------------------------------------------------------------- # Keep-alive côté serveur : thread daemon qui se ping lui-même toutes les 25 min # Fonctionne même si aucun navigateur n'a la page ouverte. # L'URL cible est lue depuis la variable d'environnement STREAMLIT_URL # (par défaut http://localhost:7860). # # Protection inter-processus via un fichier PID (/tmp/.streamlit_keepalive.pid) : # Streamlit peut spawner plusieurs workers ; sans ce verrou, chaque processus # démarrerait son propre thread keep-alive en parallèle. # --------------------------------------------------------------------------- _KEEPALIVE_WARMUP_S = 60 # délai avant le 1er ping (laisse Streamlit démarrer) _KEEPALIVE_MIN_INTERVAL_S = 1500 # intervalle minimum entre deux pings (25 min) _KEEPALIVE_PIDFILE = "/tmp/.streamlit_keepalive.pid" def _keepalive_loop(url: str, interval: int) -> None: """Boucle infinie de ping keep-alive.""" effective_interval = max(interval, _KEEPALIVE_MIN_INTERVAL_S) time.sleep(_KEEPALIVE_WARMUP_S) while True: try: resp = requests.get(url, timeout=10) logfire.info(f"[keep-alive] ping → {url} status={resp.status_code}") except Exception as exc: logfire.warning(f"[keep-alive] ping échoué : {exc}") time.sleep(effective_interval) def _start_keepalive() -> None: """Démarre le thread de keep-alive une seule fois, tous processus confondus. Utilise un fichier PID pour empêcher les workers Streamlit secondaires de démarrer leur propre thread (le verrou threading ne suffit pas car chaque processus a son propre espace mémoire). """ pid = os.getpid() # Vérifier si un autre processus a déjà démarré le keep-alive if os.path.exists(_KEEPALIVE_PIDFILE): try: with open(_KEEPALIVE_PIDFILE) as f: existing_pid = int(f.read().strip()) # Tester si ce processus est encore vivant (signal 0 = vérification) os.kill(existing_pid, 0) logfire.info(f"[keep-alive] déjà actif dans le processus {existing_pid} — ignoré.") return # Un thread tourne déjà dans un autre worker except (ProcessLookupError, ValueError): pass # Le processus propriétaire est mort → on prend le relais # Écrire notre PID et nettoyer à la sortie try: with open(_KEEPALIVE_PIDFILE, "w") as f: f.write(str(pid)) atexit.register(lambda: os.unlink(_KEEPALIVE_PIDFILE) if os.path.exists(_KEEPALIVE_PIDFILE) else None) except OSError: pass # /tmp non accessible — on démarre quand même sans protection fichier url = os.getenv("STREAMLIT_URL", "http://localhost:7860") interval = int(os.getenv("KEEPALIVE_INTERVAL_S", str(25 * 60))) # défaut : 25 min effective = max(interval, _KEEPALIVE_MIN_INTERVAL_S) t = threading.Thread(target=_keepalive_loop, args=(url, interval), daemon=True, name="streamlit-keepalive") t.start() logfire.info( f"[keep-alive] thread démarré (pid={pid}) — warmup {_KEEPALIVE_WARMUP_S}s " f"puis ping toutes les {effective}s sur {url}" ) _start_keepalive() try: from utils.config import ( MISTRAL_API_KEY, MODEL_NAME, APP_TITLE, NAME, LOGFIRE_TOKEN, PG_HOST, PG_PORT, PG_DB, PG_ADMIN, POSTGRES_PASSWORD, ) from utils.vector_store import VectorStoreManager from utils.langgraph_app import build_graph, AppState # Graphe partagé RAG + SQL from load_excel_to_db import main as load_excel_main except ImportError as e: st.error(f"Erreur d'importation: {e}. Vérifiez la structure de vos dossiers et les fichiers dans 'utils'.") st.stop() # --- Configuration de Logfire --- if LOGFIRE_TOKEN: logfire.configure(token=LOGFIRE_TOKEN, send_to_logfire=True) else: logfire.configure(send_to_logfire=False) # --- Initialisation de la base SQL (si public.teams est vide) --- @st.cache_resource def init_sql_db(): """Vérifie si public.teams est vide ; si oui, lance le pipeline ETL Excel → PostgreSQL. Retourne une liste de tuples (niveau, message) pour affichage dans la sidebar.""" import psycopg2 messages = [] try: conn = psycopg2.connect( host=PG_HOST, port=PG_PORT, dbname=PG_DB, user=PG_ADMIN, password=POSTGRES_PASSWORD, ) with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM public.teams;") count = cur.fetchone()[0] conn.close() if count == 0: logfire.info("public.teams vide — démarrage du chargement Excel → PostgreSQL.") messages.append(("info", "⏳ Base SQL vide — chargement des données en cours…")) load_excel_main() messages.append(("success", "✅ Données SQL chargées avec succès.")) else: logfire.info(f"public.teams contient {count} ligne(s) — chargement ignoré.") messages.append(("success", f"✅ Base SQL opérationnelle ({count} équipe(s) chargée(s)).")) except Exception as e: logfire.error("Erreur lors de la vérification/initialisation de la base SQL", erreur=str(e)) messages.append(("warning", f"⚠️ Impossible de vérifier la base SQL : {e}")) return messages # --- Chargement du Vector Store (mis en cache) --- @st.cache_resource def get_vector_store_manager(): """Retourne (manager_ou_None, liste_de_messages).""" logfire.info("Tentative de chargement du VectorStoreManager...") messages = [] try: manager = VectorStoreManager() if manager.index is None or not manager.document_chunks: messages.append(("error", "❌ L'index vectoriel ou les chunks n'ont pas pu être chargés.")) messages.append(("warning", "Assurez-vous d'avoir exécuté 'python indexer.py' après avoir placé vos fichiers dans le dossier 'inputs'.")) logfire.error("Index Faiss ou chunks non trouvés/chargés par VectorStoreManager.") return None, messages logfire.info(f"VectorStoreManager chargé avec succès ({manager.index.ntotal} vecteurs).") messages.append(("success", f"✅ Index vectoriel chargé ({manager.index.ntotal} vecteurs).")) return manager, messages except FileNotFoundError: messages.append(("error", "❌ Fichiers d'index ou de chunks non trouvés.")) messages.append(("warning", "Veuillez exécuter 'python indexer.py' pour créer la base de connaissances.")) logfire.error("FileNotFoundError lors de l'init de VectorStoreManager.") return None, messages except Exception as e: messages.append(("error", f"❌ Erreur inattendue lors du chargement du VectorStoreManager: {e}")) logfire.exception("Erreur chargement VectorStoreManager") return None, messages sql_messages = init_sql_db() vector_store_manager, vs_messages = get_vector_store_manager() # --- Compilation du graphe LangGraph hybride (RAG + SQL) --- @st.cache_resource def get_langgraph(_vector_store_manager): """Construit et cache le graphe LangGraph (évite la recompilation à chaque rerun).""" logfire.info("Compilation du graphe LangGraph...") graph, llm = build_graph(_vector_store_manager) logfire.info("Graphe LangGraph compilé et mis en cache.") return graph graph = get_langgraph(vector_store_manager) # ============================== # CSS Global — Design professionnel & responsive # ============================== st.markdown(""" """, unsafe_allow_html=True) # ============================== # Interface Utilisateur Streamlit # ============================== # --- Sidebar : état des bases de données --- with st.sidebar: st.markdown("""
🏀 NBA Analyst AI
Powered by Mistral · RAG · SQL
""", unsafe_allow_html=True) st.divider() with st.expander("🗄️ État des bases de données", expanded=False): st.markdown("**Base SQL (PostgreSQL)**") for level, msg in sql_messages: if level == "success": st.success(msg) elif level == "info": st.info(msg) elif level == "warning": st.warning(msg) else: st.error(msg) st.markdown("**Base vectorielle (FAISS)**") for level, msg in vs_messages: if level == "success": st.success(msg) elif level == "info": st.info(msg) elif level == "warning": st.warning(msg) else: st.error(msg) st.divider() # Bouton effacer l'historique if st.button("🗑️ Effacer la conversation", use_container_width=True): st.session_state.messages = [ {"role": "assistant", "content": ( f"Bonjour ! Je suis votre analyste IA pour la {NAME}. " "Posez-moi vos questions sur les équipes, les joueurs ou les statistiques.\n\n" )} ] st.rerun() st.divider() st.markdown("""
Comment utiliser ?
Posez une question sur les équipes, joueurs ou statistiques NBA.

🔎 RAG — recherche dans les documents
🔢 SQL — interroge la base de données
🤖 Le routeur choisit automatiquement.
""", unsafe_allow_html=True) # --- Hero header --- import base64, pathlib def _img_to_b64(path: str) -> str | None: try: data = pathlib.Path(path).read_bytes() return base64.b64encode(data).decode() except Exception: return None logo_b64 = _img_to_b64("Docs/Logo_SportSee.png") logo_html = ( f'SportSee' if logo_b64 else '
🏀
' ) st.markdown(f"""
{logo_html}

{APP_TITLE}

Votre analyste IA · Équipes · Joueurs · Statistiques

⚡ Live AI
""", unsafe_allow_html=True) # --- Chips de questions suggérées --- SUGGESTED = [ "🏆 Quelle équipe a le plus de points ?", "👤 Top 3 rebondeurs offensifs des Lakers ?", "📊 Équipe avec le code MIA ?", "👤 Joueurs des 76ers ?", "📅 Moyenne d'âge d'Atlanta ?", ] if "chip_prompt" not in st.session_state: st.session_state.chip_prompt = None # Boutons chips cliquables cols = st.columns(len(SUGGESTED)) for i, (col, label) in enumerate(zip(cols, SUGGESTED)): with col: if st.button(label, key=f"chip_{i}", use_container_width=True, help=label, type="secondary"): st.session_state.chip_prompt = label # --- Initialisation de l'historique --- if "messages" not in st.session_state: st.session_state.messages = [ {"role": "assistant", "content": ( f"Bonjour ! Je suis votre analyste IA pour la {NAME}. " "Posez-moi vos questions sur les équipes, les joueurs ou les statistiques.\n\n" "NOTE : Je ne mémorise que nos 3 derniers échanges..." )}, ] # Affichage de l'historique for message in st.session_state.messages: with st.chat_message(message["role"]): # Utiliser la version affichée (avec badge route) si disponible st.markdown(message.get("display", message["content"])) # --- Gestion du prompt (saisie libre ou chip) --- user_input = st.chat_input(f"Posez votre question sur la {NAME}...") # Si un chip a été cliqué, on l'utilise comme prompt if st.session_state.chip_prompt: user_input = st.session_state.chip_prompt st.session_state.chip_prompt = None if user_input: prompt = user_input # 1. Afficher le message utilisateur st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) # 2. Invoquer le graphe LangGraph (routage automatique RAG ou SQL) with st.chat_message("assistant"): message_placeholder = st.empty() message_placeholder.markdown("""
  Analyse en cours…
""", unsafe_allow_html=True) try: logfire.info(f"[Graph] Invocation pour : '{prompt}'") # Exclure le message de bienvenue (index 0) et la question en cours (dernier) chat_history = st.session_state.messages[1:-1] result = graph.invoke({ "user_question": prompt, "chat_history": chat_history, }) response_content = result.get("final_answer", "Je n'ai pas pu générer de réponse.") response_route = result.get("route", "unknown") # Préfixe visuel de la route route_badge = {"rag": "🔎 `[RAG]`", "sql": "🔢 `[SQL]`"}.get(response_route, "🤖 `[?]`") response_display = f"{route_badge} {response_content}" logfire.info(f"[Graph] Réponse finale obtenue (route={response_route}).") except Exception as e: response_content = "Je suis désolé, une erreur technique m'empêche de répondre. Veuillez réessayer." response_display = response_content response_route = "unknown" st.error(f"Erreur lors de l'appel au graphe : {e}") logfire.exception("Erreur lors de graph.invoke") message_placeholder.markdown(response_display) # 3. Ajouter la réponse à l'historique (avec route pour filtrage contextuel) st.session_state.messages.append({ "role": "assistant", "content": response_content, # contenu brut sans badge pour le contexte LLM "route": response_route, "display": response_display, # version affichée avec badge })