import os import pandas as pd import chromadb from google import genai from sentence_transformers import SentenceTransformer, CrossEncoder from typing import List, Dict from flask import Flask, request, jsonify from flask_cors import CORS from datetime import datetime import time # ====================================================================== # CONFIGURATION # ====================================================================== DATA_FILE_PATH = "data/QR.csv" CHROMA_DB_PATH = "/tmp/bdd_ChromaDB" COLLECTION_NAME = "qr_data_dual_embeddings" Q_COLUMN_NAME = "Question" R_COLUMN_NAME = "Reponse" SYSTEM_PROMPT_PATH = "data/system_prompt.txt" SRC_CROSS_ENCODER = "models/mmarco-mMiniLMv2-L12-H384-v1" SRC_PARAPHRASE = "models/paraphrase-mpnet-base-v2" N_RESULTS_RETRIEVAL = 10 N_RESULTS_RERANK = 3 # Clé pour la route RAG (récupérée de l'environnement ou par défaut) GEMINI_API_KEY_RAG = os.getenv("GEMINI_API_KEY", "AIzaSyDXXY7uSXryTxZ51jQFsSLcPnC_Ivt9V1g") # NOUVELLE CLÉ demandée, mise en dur pour la route directe GEMINI_API_KEY_DIRECT = "AIzaSyCpG2G3K0cZmTxWFO-c4OoOrW1fcTYQwgo" GEMINI_MODEL = "gemini-2.5-flash" MAX_CONVERSATION_HISTORY = 10 API_HOST = '0.0.0.0' API_PORT = 1212 # ====================================================================== # VARIABLES GLOBALES # ====================================================================== model_cross_encoder: CrossEncoder = None model_paraphrase: SentenceTransformer = None collection: chromadb.Collection = None system_prompt: str = None gemini_client_rag: genai.Client = None # Client pour la route RAG gemini_client_direct: genai.Client = None # Client pour la route directe (CELUI UTILISÉ PAR api_gemini_only) conversation_histories: Dict[str, List[Dict[str, str]]] = {} conversation_start_times: Dict[str, str] = {} # ====================================================================== # CHARGEMENT DES RESSOURCES (Inchangé, car les ressources RAG sont nécessaires) # ====================================================================== def load_models(): """Charge les modèles SentenceTransformer et CrossEncoder.""" print("⏳ Chargement des modèles...") try: cross_encoder = CrossEncoder( SRC_CROSS_ENCODER if os.path.exists(SRC_CROSS_ENCODER) else "cross-encoder/mmarco-mMiniLMv2-L12-H384-v1" ) paraphrase = SentenceTransformer( SRC_PARAPHRASE if os.path.exists(SRC_PARAPHRASE) else "sentence-transformers/paraphrase-mpnet-base-v2" ) print("✅ Modèles chargés avec succès.") return cross_encoder, paraphrase except Exception as e: print(f"❌ Erreur chargement modèles: {e}") raise def load_data(): """Charge le DataFrame depuis le CSV.""" try: if not os.path.exists(DATA_FILE_PATH): print(f"⚠️ Fichier {DATA_FILE_PATH} non trouvé. Utilisation d'exemple.") df = pd.DataFrame({ Q_COLUMN_NAME: ["Où est le soleil?", "Qui est l'IA?"], R_COLUMN_NAME: ["Le soleil est une étoile.", "L'IA est l'intelligence artificielle."] }) else: df = pd.read_csv(DATA_FILE_PATH) print(f"✅ {len(df)} lignes chargées depuis {DATA_FILE_PATH}.") return df except Exception as e: print(f"❌ Erreur chargement données: {e}") raise def load_system_prompt(): """Charge le system prompt.""" try: with open(SYSTEM_PROMPT_PATH, 'r', encoding='utf-8') as f: return f.read().strip() except FileNotFoundError: default = "Tu es un assistant utile et concis. Réponds à la requête de l'utilisateur." print(f"⚠️ System prompt non trouvé à {SYSTEM_PROMPT_PATH}. Utilisation du prompt par défaut.") return default def initialize_gemini_client(api_key, client_name): """Initialise un client Google Gemini.""" if api_key == "AIzaSyDXXY7uSXryTxZ51jQFsSLcPnC_Ivt9V1g": print(f"⚠️ AVIS pour {client_name}: Clé Gemini par défaut/placeholder détectée.") if api_key == "AIzaSyCpG2G3K0cZmTxWFO-c4OoOrW1fcTYQwgo": print(f"💡 AVIS pour {client_name}: Clé Gemini directe mise en dur détectée.") try: print(f"✅ Client Gemini '{client_name}' initialisé.") return genai.Client(api_key=api_key) except Exception as e: print(f"❌ Erreur lors de l'initialisation du client Gemini '{client_name}': {e}") raise # ====================================================================== # CHROMADB SETUP (Inchangé) # ====================================================================== def setup_chromadb_collection(client, df, model_paraphrase): """Configure et remplit la collection ChromaDB.""" total_docs = len(df) * 2 os.makedirs(CHROMA_DB_PATH, exist_ok=True) try: collection = client.get_or_create_collection(name=COLLECTION_NAME) except Exception as e: print(f"❌ Erreur lors de l'accès à la collection ChromaDB: {e}") raise if collection.count() == total_docs and total_docs > 0: print(f"✅ Collection déjà remplie ({collection.count()} docs) dans {CHROMA_DB_PATH}.") return collection if total_docs == 0: print("⚠️ DataFrame vide. Collection non remplie.") return collection print(f"⏳ Remplissage de ChromaDB ({len(df)} lignes) à l'emplacement: {CHROMA_DB_PATH}...") docs, metadatas, ids = [], [], [] for i, row in df.iterrows(): question = str(row[Q_COLUMN_NAME]) reponse = str(row[R_COLUMN_NAME]) meta = {Q_COLUMN_NAME: question, R_COLUMN_NAME: reponse, "source_row": i} docs.append(question) metadatas.append({**meta, "type": "question"}) ids.append(f"id_{i}_Q") docs.append(reponse) metadatas.append({**meta, "type": "reponse"}) ids.append(f"id_{i}_R") embeddings = model_paraphrase.encode(docs, show_progress_bar=False).tolist() try: client.delete_collection(name=COLLECTION_NAME) except: pass collection = client.get_or_create_collection(name=COLLECTION_NAME) collection.add(embeddings=embeddings, documents=docs, metadatas=metadatas, ids=ids) print(f"✅ Collection remplie: {collection.count()} documents.") return collection # ====================================================================== # RAG - RETRIEVAL & RERANKING (Inchangé, mais seulement utilisé par api_get_answer) # ====================================================================== def retrieve_and_rerank(query_text, collection, model_paraphrase, model_cross_encoder): """Récupère et rerank les résultats.""" # ... (logique inchangée) ... print(f"🔍 Récupération pour: '{query_text[:40]}...'") query_emb = model_paraphrase.encode([query_text]).tolist() results = collection.query( query_embeddings=query_emb, n_results=N_RESULTS_RETRIEVAL, include=['documents', 'metadatas', 'distances'] ) if not results['ids'][0]: print("⚠️ Aucun résultat trouvé.") return pd.DataFrame() candidates = [] cross_input = [] for i, doc in enumerate(results['documents'][0]): meta = results['metadatas'][0][i] candidates.append({ 'question': meta[Q_COLUMN_NAME], 'reponse': meta[R_COLUMN_NAME], 'doc_type': meta.get('type'), 'text_reranked': doc, 'initial_distance': results['distances'][0][i] }) cross_input.append([query_text, doc]) scores = model_cross_encoder.predict(cross_input) for i, score in enumerate(scores): candidates[i]['rerank_score'] = score df = pd.DataFrame(candidates).sort_values('rerank_score', ascending=False) df = df.drop_duplicates(subset=['question', 'reponse'], keep='first') return df.head(N_RESULTS_RERANK) def generate_rag_prompt(query_text, df_results, conversation_history): """Génère le prompt RAG final.""" context = [] if not df_results.empty: for _, row in df_results.iterrows(): context.append(f"Q: {row['question']}\nR: {row['reponse']}") context_str = "\n---\n".join(context) history_str = "" if conversation_history: history_str = "HISTORIQUE:\n" display_history = conversation_history[-(MAX_CONVERSATION_HISTORY * 2):] for msg in display_history: role = "USER" if msg["role"] == "user" else "ASSISTANT" history_str += f"{role}: {msg['content']}\n" history_str += "\n" # Le prompt customisé RAG (avec Mathieu, NOVA, etc.) return f"""{history_str}UTILISATEUR: {query_text} CONTEXTE (si utile): [{context_str}] INSTRUCTIONS: - Réponds brièvement et clairement, en un minimum de 2 paragraphe et un maximum de 3 paragraphe, entre chaque paragraphe tu écrira un # - Reste professionnel, met des emojis dans ta réponse tout en restant profesionnel - Réponds uniquement à la question posée - Ne mentionne NOVA que si l'utilisateur l'évoque clairement - Mentionne obligatoirement Mathieu dans ta réponse""" # ====================================================================== # GESTION HISTORIQUE (Inchangé) # ====================================================================== def get_conversation_history(session_id): """Récupère l'historique d'une session.""" return conversation_histories.get(session_id, []) def add_to_history(session_id, role, content): """Ajoute un message à l'historique.""" if session_id not in conversation_histories: conversation_histories[session_id] = [] conversation_histories[session_id].append({"role": role, "content": content}) if len(conversation_histories[session_id]) > MAX_CONVERSATION_HISTORY * 2: conversation_histories[session_id] = conversation_histories[session_id][-(MAX_CONVERSATION_HISTORY * 2):] def clear_history(session_id): """Efface l'historique d'une session.""" conversation_histories[session_id] = [] # ====================================================================== # INITIALISATION GLOBALE (Inchangé) # ====================================================================== def initialize_global_resources(): """Initialise tous les modèles et ressources.""" global model_cross_encoder, model_paraphrase, collection, system_prompt global gemini_client_rag, gemini_client_direct print("\n" + "="*50) print("⚙️ INITIALISATION RAG & Clients Gemini") print("="*50) try: model_cross_encoder, model_paraphrase = load_models() df = load_data() system_prompt = load_system_prompt() gemini_client_rag = initialize_gemini_client(GEMINI_API_KEY_RAG, "RAG (Env/Default)") # Client Direct gemini_client_direct = initialize_gemini_client(GEMINI_API_KEY_DIRECT, "Direct (Hardcoded)") except Exception: return False try: print(f"⏳ Initialisation de ChromaDB à l'emplacement: {CHROMA_DB_PATH}") chroma_client = chromadb.PersistentClient(path=CHROMA_DB_PATH) collection = setup_chromadb_collection(chroma_client, df, model_paraphrase) print("✅ INITIALISATION COMPLÈTE\n") return True except Exception as e: print(f"❌ Erreur lors de l'initialisation de ChromaDB ou du remplissage: {e}") return False # ====================================================================== # FLASK API # ====================================================================== app = Flask(__name__) CORS(app) @app.route('/status', methods=['GET']) def api_status(): """Route de ping pour vérifier l'état de l'API.""" return jsonify({"status": "everything is good"}), 200 @app.route('/api/get_answer', methods=['POST']) def api_get_answer(): """Endpoint pour obtenir une réponse avec RAG (Inchangé).""" if any(x is None for x in [model_cross_encoder, model_paraphrase, collection, system_prompt, gemini_client_rag]): return jsonify({"error": "Ressources RAG non chargées."}), 500 try: data = request.get_json() query_text = data.get('query_text') session_id = data.get('session_id', 'archive') if not query_text: return jsonify({"error": "Problème avec l'API, veuillez réessayer plus tard."}), 500 history = get_conversation_history(session_id) df_results = retrieve_and_rerank(query_text, collection, model_paraphrase, model_cross_encoder) rag_prompt = generate_rag_prompt(query_text, df_results, history) # Logique d'appel Gemini (intégrée de l'ancienne fonction call_gemini) MAX_RETRIES = 10 response = "Erreur: Toutes les tentatives ont échoué." client = gemini_client_rag for attempt in range(MAX_RETRIES): try: print(f" 📞 Tentative d'appel Gemini RAG #{attempt + 1}...") response_obj = client.models.generate_content( model=GEMINI_MODEL, contents=f"{system_prompt}\n\n{rag_prompt}" ) response = response_obj.text.replace("*", "") break # Succès except Exception as e: error_message = str(e) print(f" ❌ Erreur Gemini (Tentative {attempt + 1}/{MAX_RETRIES}): {error_message}") if attempt < MAX_RETRIES - 1: time.sleep(2) else: response = f"Erreur fatale après {MAX_RETRIES} tentatives: {error_message}" add_to_history(session_id, "user", query_text) add_to_history(session_id, "assistant", response) return jsonify({"generated_response": response}) except Exception as e: print(f"❌ Erreur générale de l'API RAG: {e}") return jsonify({"error": "Problème avec l'API RAG, veuillez réessayer plus tard."}), 500 # ====================================================================== # NOUVELLE ROUTE : TOTALEMENT AUTONOME # ====================================================================== @app.route('/api/gemini_only', methods=['POST']) def api_gemini_only(): """ ROUTE TOTALE : Endpoint pour les requêtes directes à Gemini sans RAG. Contient toute la logique d'appel, y compris les réessais. """ # Vérification du client client = gemini_client_direct if client is None: return jsonify({"error": "Client Gemini direct non initialisé. Vérifiez les logs."}), 500 try: data = request.get_json() query_text = data.get('query_text') # Utilisation du system_prompt par défaut si non spécifié custom_system_prompt = data.get('system_prompt', system_prompt) if not query_text: return jsonify({"error": "Paramètre 'query_text' manquant."}), 400 print(f"\n{'='*50}") print(f"⚡ Traitement Direct Intégral: '{query_text}'") print(f"{'='*50}") # LOGIQUE INTÉGRÉE 1 : Préparation du prompt (remplace get_answer_direct_process) final_prompt = f"UTILISATEUR: {query_text}" # LOGIQUE INTÉGRÉE 2 : Appel Gemini avec logique de réessai (remplace call_gemini) MAX_RETRIES = 10 response = "Erreur: Toutes les tentatives ont échoué." for attempt in range(MAX_RETRIES): try: print(f" 📞 Tentative d'appel Gemini Direct #{attempt + 1}...") response_obj = client.models.generate_content( model=GEMINI_MODEL, contents=f"{custom_system_prompt}\n\n{final_prompt}" ) response = response_obj.text.replace("*", "") break # Succès except Exception as e: error_message = str(e) print(f" ❌ Erreur Gemini (Tentative {attempt + 1}/{MAX_RETRIES}): {error_message}") if attempt < MAX_RETRIES - 1: # Attente de 2 secondes avant de réessayer print(" 😴 Attente de 2 secondes avant de réessayer...") time.sleep(2) else: response = f"Erreur fatale après {MAX_RETRIES} tentatives: {error_message}" # Pas d'ajout à l'historique return jsonify({"generated_response": response}) except Exception as e: print(f"❌ Erreur générale de l'API Direct: {e}") generic_message = "Problème avec l'API directe, veuillez réessayer plus tard." return jsonify({"error": generic_message}), 500 @app.route('/api/clear_history', methods=['POST']) def api_clear_history(): """Efface l'historique d'une session (Inchangé).""" try: data = request.get_json() session_id = data.get('session_id', 'archive') clear_history(session_id) return jsonify({"message": f"Historique effacé: {session_id}"}) except Exception as e: generic_message = "Problème avec l'API, veuillez réessayer plus tard." return jsonify({"error": generic_message}), 500 # ====================================================================== # MAIN (Inchangé) # ====================================================================== if __name__ == '__main__': print("start app.py") if initialize_global_resources(): try: import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] s.close() except Exception: local_ip = "127.0.0.1" print("\n" + "="*50) print("🌐 SERVEUR DÉMARRÉ") print(f"✅ API accessible à l'URL (via l'interface réseau locale): http://{local_ip}:{API_PORT}") print(f"✅ Route RAG (avec Historique): http://{local_ip}:{API_PORT}/api/get_answer") print(f"✅ Route DIRECTE TOTALE (Clé spéciale): http://{local_ip}:{API_PORT}/api/gemini_only") print(f"💡 N'oubliez pas de configurer 'app_port: 1212' et 'sdk: docker' dans votre README.md !") print("="*50 + "\n") app.run(host=API_HOST, port=API_PORT, debug=False) else: print("❌ Impossible de démarrer le serveur. Veuillez vérifier les logs pour les erreurs d'initialisation.")