Spaces:
Running
Running
| import os | |
| import pandas as pd | |
| import chromadb | |
| import requests | |
| import json | |
| import smtplib | |
| from email.message import EmailMessage | |
| from google import genai | |
| from sentence_transformers import SentenceTransformer, CrossEncoder | |
| from typing import List, Dict | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from datetime import datetime | |
| import time | |
| # ====================================================================== | |
| # ⚙️ CONFIGURATION EMAIL (REMPLACEMENT TÉLÉGRAM) | |
| # ====================================================================== | |
| # Adresses et mot de passe d'application | |
| EMAIL_ADDRESS = "bnbskynet@gmail.com" | |
| RECEIVER_ADDRESS = "galiniermathieu06@gmail.com" | |
| # Mot de passe d'application (App Password) généré par Google | |
| # Utilisé en deux parties pour éviter la détection (À recombiner) | |
| PART1 = "qmflmiziyrrs" | |
| PART2 = "jlni" | |
| EMAIL_PASSWORD = PART1 + PART2 | |
| # ====================================================================== | |
| # CONFIGURATION RAG | |
| # ====================================================================== | |
| DATA_FILE_PATH = "data/QR.csv" | |
| CHROMA_DB_PATH = "/tmp/bdd_ChromaDB" | |
| COLLECTION_NAME = "qr_data_dual_embeddings" | |
| Q_COLUMN_NAME = "Question" | |
| R_COLUMN_NAME = "Reponse" | |
| SYSTEM_PROMPT_PATH = "data/system_prompt.txt" | |
| SRC_CROSS_ENCODER = "models/mmarco-mMiniLMv2-L12-H384-v1" | |
| SRC_PARAPHRASE = "models/paraphrase-mpnet-base-v2" | |
| N_RESULTS_RETRIEVAL = 10 | |
| N_RESULTS_RERANK = 3 | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyDXXY7uSXryTxZ51jQFsSLcPnC_Ivt9V1g") | |
| GEMINI_MODEL = "gemini-2.5-flash" | |
| MAX_CONVERSATION_HISTORY = 10 | |
| API_HOST = '0.0.0.0' | |
| API_PORT = 1212 | |
| # ====================================================================== | |
| # VARIABLES GLOBALES | |
| # ====================================================================== | |
| model_cross_encoder: CrossEncoder = None | |
| model_paraphrase: SentenceTransformer = None | |
| collection: chromadb.Collection = None | |
| system_prompt: str = None | |
| gemini_client: genai.Client = None | |
| conversation_histories: Dict[str, List[Dict[str, str]]] = {} | |
| conversation_start_times: Dict[str, str] = {} | |
| # ====================================================================== | |
| # 📧 FONCTION D'ENVOI D'EMAIL | |
| # ====================================================================== | |
| def send_email_notification(subject: str, html_body: str, sender: str, receiver: str, password: str): | |
| """ | |
| Envoie un email via SMTP en utilisant les informations d'authentification. | |
| """ | |
| msg = EmailMessage() | |
| msg['Subject'] = subject | |
| msg['From'] = sender | |
| msg['To'] = receiver | |
| msg.set_content(html_body, subtype='html') | |
| try: | |
| # Connexion sécurisée au serveur SMTP de Gmail (Port 465) | |
| with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp: | |
| smtp.login(sender, password) | |
| smtp.send_message(msg) | |
| print(f"✅ Succès : Email de notification envoyé à {receiver}!") | |
| return True | |
| except smtplib.SMTPAuthenticationError: | |
| print("❌ Erreur d'authentification SMTP : Vérifiez EMAIL_ADDRESS et EMAIL_PASSWORD.") | |
| except Exception as e: | |
| print(f"❌ Erreur lors de l'envoi de l'email : {e}") | |
| return False | |
| def send_llm_interaction_email(question: str, reponse_llm: str, session_id: str): | |
| """ | |
| Construit l'email d'interaction Q/R et l'envoie. | |
| """ | |
| SUBJECT = f"🔔 [RAG App] Nouvelle Interaction de Session ID: {session_id}" | |
| # Contenu HTML pour l'interaction spécifique | |
| HTML_BODY = f""" | |
| <html> | |
| <body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;"> | |
| <h2 style="color: #007bff;">Nouvelle Interaction Détectée ({time.strftime('%Y-%m-%d %H:%M:%S')})</h2> | |
| <p><strong>Session ID:</strong> <code>{session_id}</code></p> | |
| <div style="margin-top: 20px; padding: 15px; border: 1px solid #ccc; border-radius: 5px;"> | |
| <h3 style="color: #28a745;">Question de l'Utilisateur :</h3> | |
| <p style="white-space: pre-wrap; background-color: #f9f9f9; padding: 10px; border-left: 4px solid #28a745;">{question}</p> | |
| </div> | |
| <div style="margin-top: 20px; padding: 15px; border: 1px solid #ccc; border-radius: 5px;"> | |
| <h3 style="color: #ffc107;">Réponse Générée par le LLM :</h3> | |
| <p style="white-space: pre-wrap; background-color: #fffbe6; padding: 10px; border-left: 4px solid #ffc107;">{reponse_llm}</p> | |
| </div> | |
| <p style="margin-top: 30px;">Ceci est une notification automatisée de votre application RAG.</p> | |
| </body> | |
| </html> | |
| """ | |
| send_email_notification( | |
| subject=SUBJECT, | |
| html_body=HTML_BODY, | |
| sender=EMAIL_ADDRESS, | |
| receiver=RECEIVER_ADDRESS, | |
| password=EMAIL_PASSWORD | |
| ) | |
| def send_startup_email(): | |
| """ | |
| Envoie l'email de démarrage en utilisant le modèle de mail de prospection | |
| avec une légère adaptation. | |
| """ | |
| SUBJECT = "🚀 Lancement de l'Application RAG - Vérification du Service" | |
| HTML_BODY = f""" | |
| <html> | |
| <body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;"> | |
| <p>Cher(ère) Mathieu,</p> | |
| <p>Je suis <strong>Thomas Aubertin</strong>, dirigeant de <strong style="color: #007bff;">VALLAL</strong>, votre application RAG vient de démarrer avec succès.</p> | |
| <p>Notre outil de veille automatisée a été initialisé. Tous les modèles et la base de données ChromaDB sont chargés et prêts à répondre aux requêtes. La fonction de notification par email est maintenant activée pour les interactions.</p> | |
| <div style="background-color: #f7f7f7; padding: 15px; border-radius: 8px; margin: 20px 0;"> | |
| <h3 style="color: #28a745; margin-top: 0;">Statut Actuel (Heure: {time.strftime('%Y-%m-%d %H:%M:%S')}) :</h3> | |
| <ul> | |
| <li><strong>Statut du Service:</strong> En ligne et prêt à l'emploi.</li> | |
| <li><strong>Base de Données:</strong> ChromaDB chargée et accessible.</li> | |
| <li><strong>Notification:</strong> Email activée (Test de connexion SMTP réussi).</li> | |
| </ul> | |
| </div> | |
| <p>Je vous souhaite une bonne journée !</p> | |
| <p style="margin-top: 30px;"> | |
| Bien cordialement,<br><br> | |
| <strong>Thomas Aubertin</strong><br> | |
| Dirigeant, VALLAL Conciergerie Premium<br> | |
| <span style="color: #007bff;"><strong>☎ +33 6 52 38 69 95</strong></span><br> | |
| Mon avatar digital : <a href="https://mgline.duckdns.org/AvatarThomas" style="color: #007bff; text-decoration: underline;">Cliquez ici</a> | |
| </p> | |
| </body> | |
| </html> | |
| """ | |
| send_email_notification( | |
| subject=SUBJECT, | |
| html_body=HTML_BODY, | |
| sender=EMAIL_ADDRESS, | |
| receiver=RECEIVER_ADDRESS, | |
| password=EMAIL_PASSWORD | |
| ) | |
| # ====================================================================== | |
| # CHARGEMENT DES RESSOURCES (Pas de changements) | |
| # ====================================================================== | |
| def load_models(): | |
| """Charge les modèles SentenceTransformer et CrossEncoder.""" | |
| print("⏳ Chargement des modèles...") | |
| try: | |
| cross_encoder = CrossEncoder( | |
| SRC_CROSS_ENCODER if os.path.exists(SRC_CROSS_ENCODER) | |
| else "cross-encoder/mmarco-mMiniLMv2-L12-H384-v1" | |
| ) | |
| paraphrase = SentenceTransformer( | |
| SRC_PARAPHRASE if os.path.exists(SRC_PARAPHRASE) | |
| else "sentence-transformers/paraphrase-mpnet-base-v2" | |
| ) | |
| print("✅ Modèles chargés avec succès.") | |
| return cross_encoder, paraphrase | |
| except Exception as e: | |
| print(f"❌ Erreur chargement modèles: {e}") | |
| raise | |
| def load_data(): | |
| """Charge le DataFrame depuis le CSV.""" | |
| try: | |
| if not os.path.exists(DATA_FILE_PATH): | |
| print(f"⚠️ Fichier {DATA_FILE_PATH} non trouvé. Utilisation d'exemple.") | |
| df = pd.DataFrame({ | |
| Q_COLUMN_NAME: ["Où est le soleil?", "Qui est l'IA?"], | |
| R_COLUMN_NAME: ["Le soleil est une étoile.", "L'IA est l'intelligence artificielle."] | |
| }) | |
| else: | |
| df = pd.read_csv(DATA_FILE_PATH) | |
| print(f"✅ {len(df)} lignes chargées depuis {DATA_FILE_PATH}.") | |
| return df | |
| except Exception as e: | |
| print(f"❌ Erreur chargement données: {e}") | |
| raise | |
| def load_system_prompt(): | |
| """Charge le system prompt.""" | |
| try: | |
| with open(SYSTEM_PROMPT_PATH, 'r', encoding='utf-8') as f: | |
| return f.read().strip() | |
| except FileNotFoundError: | |
| default = "Tu es un assistant utile et concis. Réponds à la requête de l'utilisateur." | |
| print(f"⚠️ System prompt non trouvé à {SYSTEM_PROMPT_PATH}. Utilisation du prompt par défaut.") | |
| return default | |
| def initialize_gemini_client(): | |
| """Initialise le client Google Gemini.""" | |
| if GEMINI_API_KEY == "AIzaSyDXXY7uSXryTxZ51jQFsSLcPnC_Ivt9V1g": | |
| print("⚠️ AVIS: Clé Gemini par défaut/placeholder détectée. Veuillez la remplacer par un secret d'environnement nommé 'GEMINI_API_KEY' pour la production.") | |
| try: | |
| return genai.Client(api_key=GEMINI_API_KEY) | |
| except Exception as e: | |
| print(f"❌ Erreur lors de l'initialisation du client Gemini: {e}") | |
| raise | |
| # ====================================================================== | |
| # CHROMADB SETUP (Pas de changements) | |
| # ====================================================================== | |
| def setup_chromadb_collection(client, df, model_paraphrase): | |
| """Configure et remplit la collection ChromaDB.""" | |
| total_docs = len(df) * 2 | |
| os.makedirs(CHROMA_DB_PATH, exist_ok=True) | |
| try: | |
| collection = client.get_or_create_collection(name=COLLECTION_NAME) | |
| except Exception as e: | |
| print(f"❌ Erreur lors de l'accès à la collection ChromaDB: {e}") | |
| raise | |
| if collection.count() == total_docs and total_docs > 0: | |
| print(f"✅ Collection déjà remplie ({collection.count()} docs) dans {CHROMA_DB_PATH}.") | |
| return collection | |
| if total_docs == 0: | |
| print("⚠️ DataFrame vide. Collection non remplie.") | |
| return collection | |
| print(f"⏳ Remplissage de ChromaDB ({len(df)} lignes) à l'emplacement: {CHROMA_DB_PATH}...") | |
| docs, metadatas, ids = [], [], [] | |
| for i, row in df.iterrows(): | |
| question = str(row[Q_COLUMN_NAME]) | |
| reponse = str(row[R_COLUMN_NAME]) | |
| meta = {Q_COLUMN_NAME: question, R_COLUMN_NAME: reponse, "source_row": i} | |
| docs.append(question) | |
| metadatas.append({**meta, "type": "question"}) | |
| ids.append(f"id_{i}_Q") | |
| docs.append(reponse) | |
| metadatas.append({**meta, "type": "reponse"}) | |
| ids.append(f"id_{i}_R") | |
| embeddings = model_paraphrase.encode(docs, show_progress_bar=False).tolist() | |
| try: | |
| client.delete_collection(name=COLLECTION_NAME) | |
| except: | |
| pass | |
| collection = client.get_or_create_collection(name=COLLECTION_NAME) | |
| collection.add(embeddings=embeddings, documents=docs, metadatas=metadatas, ids=ids) | |
| print(f"✅ Collection remplie: {collection.count()} documents.") | |
| return collection | |
| # ====================================================================== | |
| # RAG - RETRIEVAL & RERANKING (Pas de changements) | |
| # ====================================================================== | |
| def retrieve_and_rerank(query_text, collection, model_paraphrase, model_cross_encoder): | |
| """Récupère et rerank les résultats.""" | |
| print(f"🔍 Récupération pour: '{query_text[:40]}...'") | |
| query_emb = model_paraphrase.encode([query_text]).tolist() | |
| results = collection.query( | |
| query_embeddings=query_emb, | |
| n_results=N_RESULTS_RETRIEVAL, | |
| include=['documents', 'metadatas', 'distances'] | |
| ) | |
| if not results['ids'][0]: | |
| print("⚠️ Aucun résultat trouvé.") | |
| return pd.DataFrame() | |
| candidates = [] | |
| cross_input = [] | |
| for i, doc in enumerate(results['documents'][0]): | |
| meta = results['metadatas'][0][i] | |
| candidates.append({ | |
| 'question': meta[Q_COLUMN_NAME], | |
| 'reponse': meta[R_COLUMN_NAME], | |
| 'doc_type': meta.get('type'), | |
| 'text_reranked': doc, | |
| 'initial_distance': results['distances'][0][i] | |
| }) | |
| cross_input.append([query_text, doc]) | |
| scores = model_cross_encoder.predict(cross_input) | |
| for i, score in enumerate(scores): | |
| candidates[i]['rerank_score'] = score | |
| df = pd.DataFrame(candidates).sort_values('rerank_score', ascending=False) | |
| df = df.drop_duplicates(subset=['question', 'reponse'], keep='first') | |
| return df.head(N_RESULTS_RERANK) | |
| def generate_rag_prompt(query_text, df_results, conversation_history): | |
| """Génère le prompt RAG final.""" | |
| context = [] | |
| if not df_results.empty: | |
| for _, row in df_results.iterrows(): | |
| context.append(f"Q: {row['question']}\nR: {row['reponse']}") | |
| context_str = "\n---\n".join(context) | |
| history_str = "" | |
| if conversation_history: | |
| history_str = "HISTORIQUE:\n" | |
| display_history = conversation_history[-(MAX_CONVERSATION_HISTORY * 2):] | |
| for msg in display_history: | |
| role = "USER" if msg["role"] == "user" else "ASSISTANT" | |
| history_str += f"{role}: {msg['content']}\n" | |
| history_str += "\n" | |
| return f"""{history_str}UTILISATEUR: {query_text} | |
| CONTEXTE (si utile): | |
| [{context_str}] | |
| INSTRUCTIONS: | |
| - Réponds brièvement et clairement, en un minimum de 2 paragraphe et un maximum de 3 paragraphe, entre chaque paragraphe tu écrira un # | |
| - Reste professionnel, met des emojis dans ta réponse tout en restant profesionnel | |
| - Réponds uniquement à la question posée | |
| - Ne mentionne NOVA que si l'utilisateur l'évoque clairement | |
| - Mentionne obligatoirement Mathieu dans ta réponse""" | |
| # ====================================================================== | |
| # GESTION HISTORIQUE (Pas de changements) | |
| # ====================================================================== | |
| def get_conversation_history(session_id): | |
| """Récupère l'historique d'une session.""" | |
| return conversation_histories.get(session_id, []) | |
| def add_to_history(session_id, role, content): | |
| """Ajoute un message à l'historique.""" | |
| if session_id not in conversation_histories: | |
| conversation_histories[session_id] = [] | |
| conversation_histories[session_id].append({"role": role, "content": content}) | |
| if len(conversation_histories[session_id]) > MAX_CONVERSATION_HISTORY * 2: | |
| conversation_histories[session_id] = conversation_histories[session_id][-(MAX_CONVERSATION_HISTORY * 2):] | |
| def clear_history(session_id): | |
| """Efface l'historique d'une session.""" | |
| conversation_histories[session_id] = [] | |
| # ====================================================================== | |
| # CALL GEMINI (Pas de changements) | |
| # ====================================================================== | |
| def call_gemini(rag_prompt, system_prompt, gemini_client): | |
| """Appelle Google Gemini.""" | |
| try: | |
| response = gemini_client.models.generate_content( | |
| model=GEMINI_MODEL, | |
| contents=f"{system_prompt}\n\n{rag_prompt}" | |
| ) | |
| return response.text.replace("*", "") | |
| except Exception as e: | |
| print(f"❌ Erreur Gemini: {e}") | |
| return f"Erreur: {str(e)}" | |
| # ====================================================================== | |
| # ANSWER PROCESS (Pas de changements) | |
| # ====================================================================== | |
| def get_answer(query_text, collection, model_paraphrase, model_cross_encoder, conversation_history): | |
| """Exécute le processus RAG complet.""" | |
| print(f"\n{'='*50}") | |
| print(f"🚀 Traitement: '{query_text}'") | |
| print(f"{'='*50}") | |
| df_results = retrieve_and_rerank(query_text, collection, model_paraphrase, model_cross_encoder) | |
| final_prompt = generate_rag_prompt(query_text, df_results, conversation_history) | |
| return final_prompt | |
| # ====================================================================== | |
| # INITIALISATION GLOBALE (Pas de changements) | |
| # ====================================================================== | |
| def initialize_global_resources(): | |
| """Initialise tous les modèles et ressources.""" | |
| global model_cross_encoder, model_paraphrase, collection, system_prompt, gemini_client | |
| print("\n" + "="*50) | |
| print("⚙️ INITIALISATION RAG") | |
| print("="*50) | |
| try: | |
| model_cross_encoder, model_paraphrase = load_models() | |
| df = load_data() | |
| system_prompt = load_system_prompt() | |
| gemini_client = initialize_gemini_client() | |
| except Exception: | |
| return False | |
| try: | |
| print(f"⏳ Initialisation de ChromaDB à l'emplacement: {CHROMA_DB_PATH}") | |
| chroma_client = chromadb.PersistentClient(path=CHROMA_DB_PATH) | |
| collection = setup_chromadb_collection(chroma_client, df, model_paraphrase) | |
| print("✅ INITIALISATION COMPLÈTE\n") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Erreur lors de l'initialisation de ChromaDB ou du remplissage: {e}") | |
| return False | |
| # ====================================================================== | |
| # FLASK API | |
| # ====================================================================== | |
| app = Flask(__name__) | |
| CORS(app) | |
| def api_status(): | |
| """Route de ping pour vérifier l'état de l'API.""" | |
| return jsonify({"status": "everything is good"}), 200 | |
| def api_get_answer(): | |
| """Endpoint principal pour obtenir une réponse et envoyer la notification Email.""" | |
| if any(x is None for x in [model_cross_encoder, model_paraphrase, collection, system_prompt, gemini_client]): | |
| return jsonify({"error": "Ressources non chargées. Veuillez vérifier les logs d'initialisation."}), 500 | |
| try: | |
| data = request.get_json() | |
| query_text = data.get('query_text') | |
| session_id = data.get('session_id', 'archive') | |
| if not query_text: | |
| generic_message = "Requête vide." | |
| return jsonify({"error": generic_message}), 400 | |
| history = get_conversation_history(session_id) | |
| rag_prompt = get_answer(query_text, collection, model_paraphrase, model_cross_encoder, history) | |
| response = call_gemini(rag_prompt, system_prompt, gemini_client) | |
| add_to_history(session_id, "user", query_text) | |
| add_to_history(session_id, "assistant", response) | |
| # 🚀 ENVOI DE LA NOTIFICATION EMAIL | |
| send_llm_interaction_email( | |
| question=query_text, | |
| reponse_llm=response, | |
| session_id=session_id | |
| ) | |
| return jsonify({"generated_response": response}) | |
| except Exception as e: | |
| print(f"❌ Erreur générale de l'API: {e}") | |
| generic_message = "Problème avec l'API, veuillez réessayer plus tard." | |
| return jsonify({"error": generic_message}), 500 | |
| def api_clear_history(): | |
| """Efface l'historique d'une session.""" | |
| try: | |
| data = request.get_json() | |
| session_id = data.get('session_id', 'archive') | |
| clear_history(session_id) | |
| return jsonify({"message": f"Historique effacé: {session_id}"}) | |
| except Exception as e: | |
| generic_message = "Problème avec l'API, veuillez réessayer plus tard." | |
| return jsonify({"error": generic_message}), 500 | |
| # ====================================================================== | |
| # MAIN | |
| # ====================================================================== | |
| if __name__ == '__main__': | |
| print("start app.py") | |
| if initialize_global_resources(): | |
| # ➡️ ENVOI DE L'EMAIL AU DÉMARRAGE | |
| send_startup_email() | |
| try: | |
| import socket | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| s.connect(("8.8.8.8", 80)) | |
| local_ip = s.getsockname()[0] | |
| s.close() | |
| except Exception: | |
| local_ip = "127.0.0.1" | |
| print("\n" + "="*50) | |
| print("🌐 SERVEUR DÉMARRÉ") | |
| print(f"✅ API accessible à l'URL (via l'interface réseau locale): http://{local_ip}:{API_PORT}") | |
| print("="*50 + "\n") | |
| app.run(host=API_HOST, port=API_PORT, debug=False) | |
| else: | |
| print("❌ Impossible de démarrer le serveur. Veuillez vérifier les logs pour les erreurs d'initialisation.") |