NOVA_API / app.py
MathieuGAL's picture
Update app.py
09bd272 verified
raw
history blame
20.6 kB
import os
import pandas as pd
import chromadb
import requests
import json
import smtplib
from email.message import EmailMessage
from google import genai
from sentence_transformers import SentenceTransformer, CrossEncoder
from typing import List, Dict
from flask import Flask, request, jsonify
from flask_cors import CORS
from datetime import datetime
import time
# ======================================================================
# ⚙️ CONFIGURATION EMAIL (REMPLACEMENT TÉLÉGRAM)
# ======================================================================
# Adresses et mot de passe d'application
EMAIL_ADDRESS = "bnbskynet@gmail.com"
RECEIVER_ADDRESS = "galiniermathieu06@gmail.com"
# Mot de passe d'application (App Password) généré par Google
# Utilisé en deux parties pour éviter la détection (À recombiner)
PART1 = "qmflmiziyrrs"
PART2 = "jlni"
EMAIL_PASSWORD = PART1 + PART2
# ======================================================================
# CONFIGURATION RAG
# ======================================================================
DATA_FILE_PATH = "data/QR.csv"
CHROMA_DB_PATH = "/tmp/bdd_ChromaDB"
COLLECTION_NAME = "qr_data_dual_embeddings"
Q_COLUMN_NAME = "Question"
R_COLUMN_NAME = "Reponse"
SYSTEM_PROMPT_PATH = "data/system_prompt.txt"
SRC_CROSS_ENCODER = "models/mmarco-mMiniLMv2-L12-H384-v1"
SRC_PARAPHRASE = "models/paraphrase-mpnet-base-v2"
N_RESULTS_RETRIEVAL = 10
N_RESULTS_RERANK = 3
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyDXXY7uSXryTxZ51jQFsSLcPnC_Ivt9V1g")
GEMINI_MODEL = "gemini-2.5-flash"
MAX_CONVERSATION_HISTORY = 10
API_HOST = '0.0.0.0'
API_PORT = 1212
# ======================================================================
# VARIABLES GLOBALES
# ======================================================================
model_cross_encoder: CrossEncoder = None
model_paraphrase: SentenceTransformer = None
collection: chromadb.Collection = None
system_prompt: str = None
gemini_client: genai.Client = None
conversation_histories: Dict[str, List[Dict[str, str]]] = {}
conversation_start_times: Dict[str, str] = {}
# ======================================================================
# 📧 FONCTION D'ENVOI D'EMAIL
# ======================================================================
def send_email_notification(subject: str, html_body: str, sender: str, receiver: str, password: str):
"""
Envoie un email via SMTP en utilisant les informations d'authentification.
"""
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = receiver
msg.set_content(html_body, subtype='html')
try:
# Connexion sécurisée au serveur SMTP de Gmail (Port 465)
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp:
smtp.login(sender, password)
smtp.send_message(msg)
print(f"✅ Succès : Email de notification envoyé à {receiver}!")
return True
except smtplib.SMTPAuthenticationError:
print("❌ Erreur d'authentification SMTP : Vérifiez EMAIL_ADDRESS et EMAIL_PASSWORD.")
except Exception as e:
print(f"❌ Erreur lors de l'envoi de l'email : {e}")
return False
def send_llm_interaction_email(question: str, reponse_llm: str, session_id: str):
"""
Construit l'email d'interaction Q/R et l'envoie.
"""
SUBJECT = f"🔔 [RAG App] Nouvelle Interaction de Session ID: {session_id}"
# Contenu HTML pour l'interaction spécifique
HTML_BODY = f"""
<html>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
<h2 style="color: #007bff;">Nouvelle Interaction Détectée ({time.strftime('%Y-%m-%d %H:%M:%S')})</h2>
<p><strong>Session ID:</strong> <code>{session_id}</code></p>
<div style="margin-top: 20px; padding: 15px; border: 1px solid #ccc; border-radius: 5px;">
<h3 style="color: #28a745;">Question de l'Utilisateur :</h3>
<p style="white-space: pre-wrap; background-color: #f9f9f9; padding: 10px; border-left: 4px solid #28a745;">{question}</p>
</div>
<div style="margin-top: 20px; padding: 15px; border: 1px solid #ccc; border-radius: 5px;">
<h3 style="color: #ffc107;">Réponse Générée par le LLM :</h3>
<p style="white-space: pre-wrap; background-color: #fffbe6; padding: 10px; border-left: 4px solid #ffc107;">{reponse_llm}</p>
</div>
<p style="margin-top: 30px;">Ceci est une notification automatisée de votre application RAG.</p>
</body>
</html>
"""
send_email_notification(
subject=SUBJECT,
html_body=HTML_BODY,
sender=EMAIL_ADDRESS,
receiver=RECEIVER_ADDRESS,
password=EMAIL_PASSWORD
)
def send_startup_email():
"""
Envoie l'email de démarrage en utilisant le modèle de mail de prospection
avec une légère adaptation.
"""
SUBJECT = "🚀 Lancement de l'Application RAG - Vérification du Service"
HTML_BODY = f"""
<html>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
<p>Cher(ère) Mathieu,</p>
<p>Je suis <strong>Thomas Aubertin</strong>, dirigeant de <strong style="color: #007bff;">VALLAL</strong>, votre application RAG vient de démarrer avec succès.</p>
<p>Notre outil de veille automatisée a été initialisé. Tous les modèles et la base de données ChromaDB sont chargés et prêts à répondre aux requêtes. La fonction de notification par email est maintenant activée pour les interactions.</p>
<div style="background-color: #f7f7f7; padding: 15px; border-radius: 8px; margin: 20px 0;">
<h3 style="color: #28a745; margin-top: 0;">Statut Actuel (Heure: {time.strftime('%Y-%m-%d %H:%M:%S')}) :</h3>
<ul>
<li><strong>Statut du Service:</strong> En ligne et prêt à l'emploi.</li>
<li><strong>Base de Données:</strong> ChromaDB chargée et accessible.</li>
<li><strong>Notification:</strong> Email activée (Test de connexion SMTP réussi).</li>
</ul>
</div>
<p>Je vous souhaite une bonne journée !</p>
<p style="margin-top: 30px;">
Bien cordialement,<br><br>
<strong>Thomas Aubertin</strong><br>
Dirigeant, VALLAL Conciergerie Premium<br>
<span style="color: #007bff;"><strong>&#9742; +33 6 52 38 69 95</strong></span><br>
Mon avatar digital : <a href="https://mgline.duckdns.org/AvatarThomas" style="color: #007bff; text-decoration: underline;">Cliquez ici</a>
</p>
</body>
</html>
"""
send_email_notification(
subject=SUBJECT,
html_body=HTML_BODY,
sender=EMAIL_ADDRESS,
receiver=RECEIVER_ADDRESS,
password=EMAIL_PASSWORD
)
# ======================================================================
# CHARGEMENT DES RESSOURCES (Pas de changements)
# ======================================================================
def load_models():
"""Charge les modèles SentenceTransformer et CrossEncoder."""
print("⏳ Chargement des modèles...")
try:
cross_encoder = CrossEncoder(
SRC_CROSS_ENCODER if os.path.exists(SRC_CROSS_ENCODER)
else "cross-encoder/mmarco-mMiniLMv2-L12-H384-v1"
)
paraphrase = SentenceTransformer(
SRC_PARAPHRASE if os.path.exists(SRC_PARAPHRASE)
else "sentence-transformers/paraphrase-mpnet-base-v2"
)
print("✅ Modèles chargés avec succès.")
return cross_encoder, paraphrase
except Exception as e:
print(f"❌ Erreur chargement modèles: {e}")
raise
def load_data():
"""Charge le DataFrame depuis le CSV."""
try:
if not os.path.exists(DATA_FILE_PATH):
print(f"⚠️ Fichier {DATA_FILE_PATH} non trouvé. Utilisation d'exemple.")
df = pd.DataFrame({
Q_COLUMN_NAME: ["Où est le soleil?", "Qui est l'IA?"],
R_COLUMN_NAME: ["Le soleil est une étoile.", "L'IA est l'intelligence artificielle."]
})
else:
df = pd.read_csv(DATA_FILE_PATH)
print(f"✅ {len(df)} lignes chargées depuis {DATA_FILE_PATH}.")
return df
except Exception as e:
print(f"❌ Erreur chargement données: {e}")
raise
def load_system_prompt():
"""Charge le system prompt."""
try:
with open(SYSTEM_PROMPT_PATH, 'r', encoding='utf-8') as f:
return f.read().strip()
except FileNotFoundError:
default = "Tu es un assistant utile et concis. Réponds à la requête de l'utilisateur."
print(f"⚠️ System prompt non trouvé à {SYSTEM_PROMPT_PATH}. Utilisation du prompt par défaut.")
return default
def initialize_gemini_client():
"""Initialise le client Google Gemini."""
if GEMINI_API_KEY == "AIzaSyDXXY7uSXryTxZ51jQFsSLcPnC_Ivt9V1g":
print("⚠️ AVIS: Clé Gemini par défaut/placeholder détectée. Veuillez la remplacer par un secret d'environnement nommé 'GEMINI_API_KEY' pour la production.")
try:
return genai.Client(api_key=GEMINI_API_KEY)
except Exception as e:
print(f"❌ Erreur lors de l'initialisation du client Gemini: {e}")
raise
# ======================================================================
# CHROMADB SETUP (Pas de changements)
# ======================================================================
def setup_chromadb_collection(client, df, model_paraphrase):
"""Configure et remplit la collection ChromaDB."""
total_docs = len(df) * 2
os.makedirs(CHROMA_DB_PATH, exist_ok=True)
try:
collection = client.get_or_create_collection(name=COLLECTION_NAME)
except Exception as e:
print(f"❌ Erreur lors de l'accès à la collection ChromaDB: {e}")
raise
if collection.count() == total_docs and total_docs > 0:
print(f"✅ Collection déjà remplie ({collection.count()} docs) dans {CHROMA_DB_PATH}.")
return collection
if total_docs == 0:
print("⚠️ DataFrame vide. Collection non remplie.")
return collection
print(f"⏳ Remplissage de ChromaDB ({len(df)} lignes) à l'emplacement: {CHROMA_DB_PATH}...")
docs, metadatas, ids = [], [], []
for i, row in df.iterrows():
question = str(row[Q_COLUMN_NAME])
reponse = str(row[R_COLUMN_NAME])
meta = {Q_COLUMN_NAME: question, R_COLUMN_NAME: reponse, "source_row": i}
docs.append(question)
metadatas.append({**meta, "type": "question"})
ids.append(f"id_{i}_Q")
docs.append(reponse)
metadatas.append({**meta, "type": "reponse"})
ids.append(f"id_{i}_R")
embeddings = model_paraphrase.encode(docs, show_progress_bar=False).tolist()
try:
client.delete_collection(name=COLLECTION_NAME)
except:
pass
collection = client.get_or_create_collection(name=COLLECTION_NAME)
collection.add(embeddings=embeddings, documents=docs, metadatas=metadatas, ids=ids)
print(f"✅ Collection remplie: {collection.count()} documents.")
return collection
# ======================================================================
# RAG - RETRIEVAL & RERANKING (Pas de changements)
# ======================================================================
def retrieve_and_rerank(query_text, collection, model_paraphrase, model_cross_encoder):
"""Récupère et rerank les résultats."""
print(f"🔍 Récupération pour: '{query_text[:40]}...'")
query_emb = model_paraphrase.encode([query_text]).tolist()
results = collection.query(
query_embeddings=query_emb,
n_results=N_RESULTS_RETRIEVAL,
include=['documents', 'metadatas', 'distances']
)
if not results['ids'][0]:
print("⚠️ Aucun résultat trouvé.")
return pd.DataFrame()
candidates = []
cross_input = []
for i, doc in enumerate(results['documents'][0]):
meta = results['metadatas'][0][i]
candidates.append({
'question': meta[Q_COLUMN_NAME],
'reponse': meta[R_COLUMN_NAME],
'doc_type': meta.get('type'),
'text_reranked': doc,
'initial_distance': results['distances'][0][i]
})
cross_input.append([query_text, doc])
scores = model_cross_encoder.predict(cross_input)
for i, score in enumerate(scores):
candidates[i]['rerank_score'] = score
df = pd.DataFrame(candidates).sort_values('rerank_score', ascending=False)
df = df.drop_duplicates(subset=['question', 'reponse'], keep='first')
return df.head(N_RESULTS_RERANK)
def generate_rag_prompt(query_text, df_results, conversation_history):
"""Génère le prompt RAG final."""
context = []
if not df_results.empty:
for _, row in df_results.iterrows():
context.append(f"Q: {row['question']}\nR: {row['reponse']}")
context_str = "\n---\n".join(context)
history_str = ""
if conversation_history:
history_str = "HISTORIQUE:\n"
display_history = conversation_history[-(MAX_CONVERSATION_HISTORY * 2):]
for msg in display_history:
role = "USER" if msg["role"] == "user" else "ASSISTANT"
history_str += f"{role}: {msg['content']}\n"
history_str += "\n"
return f"""{history_str}UTILISATEUR: {query_text}
CONTEXTE (si utile):
[{context_str}]
INSTRUCTIONS:
- Réponds brièvement et clairement, en un minimum de 2 paragraphe et un maximum de 3 paragraphe, entre chaque paragraphe tu écrira un #
- Reste professionnel, met des emojis dans ta réponse tout en restant profesionnel
- Réponds uniquement à la question posée
- Ne mentionne NOVA que si l'utilisateur l'évoque clairement
- Mentionne obligatoirement Mathieu dans ta réponse"""
# ======================================================================
# GESTION HISTORIQUE (Pas de changements)
# ======================================================================
def get_conversation_history(session_id):
"""Récupère l'historique d'une session."""
return conversation_histories.get(session_id, [])
def add_to_history(session_id, role, content):
"""Ajoute un message à l'historique."""
if session_id not in conversation_histories:
conversation_histories[session_id] = []
conversation_histories[session_id].append({"role": role, "content": content})
if len(conversation_histories[session_id]) > MAX_CONVERSATION_HISTORY * 2:
conversation_histories[session_id] = conversation_histories[session_id][-(MAX_CONVERSATION_HISTORY * 2):]
def clear_history(session_id):
"""Efface l'historique d'une session."""
conversation_histories[session_id] = []
# ======================================================================
# CALL GEMINI (Pas de changements)
# ======================================================================
def call_gemini(rag_prompt, system_prompt, gemini_client):
"""Appelle Google Gemini."""
try:
response = gemini_client.models.generate_content(
model=GEMINI_MODEL,
contents=f"{system_prompt}\n\n{rag_prompt}"
)
return response.text.replace("*", "")
except Exception as e:
print(f"❌ Erreur Gemini: {e}")
return f"Erreur: {str(e)}"
# ======================================================================
# ANSWER PROCESS (Pas de changements)
# ======================================================================
def get_answer(query_text, collection, model_paraphrase, model_cross_encoder, conversation_history):
"""Exécute le processus RAG complet."""
print(f"\n{'='*50}")
print(f"🚀 Traitement: '{query_text}'")
print(f"{'='*50}")
df_results = retrieve_and_rerank(query_text, collection, model_paraphrase, model_cross_encoder)
final_prompt = generate_rag_prompt(query_text, df_results, conversation_history)
return final_prompt
# ======================================================================
# INITIALISATION GLOBALE (Pas de changements)
# ======================================================================
def initialize_global_resources():
"""Initialise tous les modèles et ressources."""
global model_cross_encoder, model_paraphrase, collection, system_prompt, gemini_client
print("\n" + "="*50)
print("⚙️ INITIALISATION RAG")
print("="*50)
try:
model_cross_encoder, model_paraphrase = load_models()
df = load_data()
system_prompt = load_system_prompt()
gemini_client = initialize_gemini_client()
except Exception:
return False
try:
print(f"⏳ Initialisation de ChromaDB à l'emplacement: {CHROMA_DB_PATH}")
chroma_client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
collection = setup_chromadb_collection(chroma_client, df, model_paraphrase)
print("✅ INITIALISATION COMPLÈTE\n")
return True
except Exception as e:
print(f"❌ Erreur lors de l'initialisation de ChromaDB ou du remplissage: {e}")
return False
# ======================================================================
# FLASK API
# ======================================================================
app = Flask(__name__)
CORS(app)
@app.route('/status', methods=['GET'])
def api_status():
"""Route de ping pour vérifier l'état de l'API."""
return jsonify({"status": "everything is good"}), 200
@app.route('/api/get_answer', methods=['POST'])
def api_get_answer():
"""Endpoint principal pour obtenir une réponse et envoyer la notification Email."""
if any(x is None for x in [model_cross_encoder, model_paraphrase, collection, system_prompt, gemini_client]):
return jsonify({"error": "Ressources non chargées. Veuillez vérifier les logs d'initialisation."}), 500
try:
data = request.get_json()
query_text = data.get('query_text')
session_id = data.get('session_id', 'archive')
if not query_text:
generic_message = "Requête vide."
return jsonify({"error": generic_message}), 400
history = get_conversation_history(session_id)
rag_prompt = get_answer(query_text, collection, model_paraphrase, model_cross_encoder, history)
response = call_gemini(rag_prompt, system_prompt, gemini_client)
add_to_history(session_id, "user", query_text)
add_to_history(session_id, "assistant", response)
# 🚀 ENVOI DE LA NOTIFICATION EMAIL
send_llm_interaction_email(
question=query_text,
reponse_llm=response,
session_id=session_id
)
return jsonify({"generated_response": response})
except Exception as e:
print(f"❌ Erreur générale de l'API: {e}")
generic_message = "Problème avec l'API, veuillez réessayer plus tard."
return jsonify({"error": generic_message}), 500
@app.route('/api/clear_history', methods=['POST'])
def api_clear_history():
"""Efface l'historique d'une session."""
try:
data = request.get_json()
session_id = data.get('session_id', 'archive')
clear_history(session_id)
return jsonify({"message": f"Historique effacé: {session_id}"})
except Exception as e:
generic_message = "Problème avec l'API, veuillez réessayer plus tard."
return jsonify({"error": generic_message}), 500
# ======================================================================
# MAIN
# ======================================================================
if __name__ == '__main__':
print("start app.py")
if initialize_global_resources():
# ➡️ ENVOI DE L'EMAIL AU DÉMARRAGE
send_startup_email()
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
except Exception:
local_ip = "127.0.0.1"
print("\n" + "="*50)
print("🌐 SERVEUR DÉMARRÉ")
print(f"✅ API accessible à l'URL (via l'interface réseau locale): http://{local_ip}:{API_PORT}")
print("="*50 + "\n")
app.run(host=API_HOST, port=API_PORT, debug=False)
else:
print("❌ Impossible de démarrer le serveur. Veuillez vérifier les logs pour les erreurs d'initialisation.")