quick / utils /memory_mgr.py
FredyHoundayi's picture
MVP2: quickAI — AI Copilot for E-Commerce
b8da9d1
import json
import logging
import os
from typing import List, Dict, Any, Optional
from datetime import datetime
import uuid
logger = logging.getLogger(__name__)
class MemoryManager:
"""
Gestionnaire de mémoire pour stocker l'historique des conversations.
Utilise des fichiers JSON individuels par session_id.
"""
def __init__(self, storage_dir: str = "./storage/memory"):
self.storage_dir = storage_dir
# Créer le répertoire de stockage s'il n'existe pas
os.makedirs(storage_dir, exist_ok=True)
logger.info(f"MemoryManager initialisé avec répertoire: {storage_dir}")
def get_session_file_path(self, session_id: str) -> str:
"""
Génère le chemin du fichier pour une session.
Args:
session_id: Identifiant de la session
Returns:
Chemin complet du fichier JSON
"""
# Nettoyer le session_id pour éviter les problèmes de noms de fichiers
safe_session_id = "".join(c for c in session_id if c.isalnum() or c in ('-', '_'))
return os.path.join(self.storage_dir, f"{safe_session_id}.json")
def create_session(self, session_id: Optional[str] = None) -> str:
"""
Crée une nouvelle session de conversation.
Args:
session_id: ID optionnel, généré automatiquement si non fourni
Returns:
ID de la session créée
"""
if not session_id:
session_id = str(uuid.uuid4())
session_data = {
"session_id": session_id,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"message_count": 0,
"messages": []
}
try:
file_path = self.get_session_file_path(session_id)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(session_data, f, ensure_ascii=False, indent=2)
logger.info(f"Nouvelle session créée: {session_id}")
return session_id
except Exception as e:
logger.error(f"Erreur lors de la création de la session {session_id}: {str(e)}")
raise
def get_session_history(self, session_id: str) -> List[Dict[str, str]]:
"""
Récupère l'historique complet d'une session.
Args:
session_id: Identifiant de la session
Returns:
Liste des messages [{question, answer, timestamp}, ...]
"""
try:
file_path = self.get_session_file_path(session_id)
if not os.path.exists(file_path):
logger.info(f"Session {session_id} non trouvée, création d'une nouvelle session")
self.create_session(session_id)
return []
with open(file_path, 'r', encoding='utf-8') as f:
session_data = json.load(f)
messages = session_data.get("messages", [])
logger.info(f"Historique chargé pour session {session_id}: {len(messages)} messages")
return messages
except json.JSONDecodeError as e:
logger.error(f"Erreur JSON lors de la lecture de la session {session_id}: {str(e)}")
return []
except Exception as e:
logger.error(f"Erreur lors de la récupération de l'historique de la session {session_id}: {str(e)}")
return []
def add_message(self, session_id: str, question: str, answer: str) -> bool:
"""
Ajoute un échange question/réponse à une session.
Args:
session_id: Identifiant de la session
question: Question de l'utilisateur
answer: Réponse de l'agent
Returns:
True si l'ajout a réussi
"""
try:
file_path = self.get_session_file_path(session_id)
# Créer la session si elle n'existe pas
if not os.path.exists(file_path):
self.create_session(session_id)
# Lire les données existantes
with open(file_path, 'r', encoding='utf-8') as f:
session_data = json.load(f)
# Ajouter le nouveau message
new_message = {
"question": question,
"answer": answer,
"timestamp": datetime.now().isoformat()
}
session_data["messages"].append(new_message)
session_data["updated_at"] = datetime.now().isoformat()
session_data["message_count"] = len(session_data["messages"])
# Sauvegarder les données mises à jour
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(session_data, f, ensure_ascii=False, indent=2)
logger.info(f"Message ajouté à la session {session_id} (total: {session_data['message_count']})")
return True
except Exception as e:
logger.error(f"Erreur lors de l'ajout du message à la session {session_id}: {str(e)}")
return False
def update_session_metadata(self, session_id: str, metadata: Dict[str, Any]) -> bool:
"""
Met à jour les métadonnées d'une session.
Args:
session_id: Identifiant de la session
metadata: Métadonnées à ajouter/mettre à jour
Returns:
True si la mise à jour a réussi
"""
try:
file_path = self.get_session_file_path(session_id)
if not os.path.exists(file_path):
logger.warning(f"Session {session_id} non trouvée pour mise à jour des métadonnées")
return False
# Lire les données existantes
with open(file_path, 'r', encoding='utf-8') as f:
session_data = json.load(f)
# Mettre à jour les métadonnées
session_data.update(metadata)
session_data["updated_at"] = datetime.now().isoformat()
# Sauvegarder
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(session_data, f, ensure_ascii=False, indent=2)
logger.info(f"Métadonnées mises à jour pour la session {session_id}")
return True
except Exception as e:
logger.error(f"Erreur lors de la mise à jour des métadonnées de la session {session_id}: {str(e)}")
return False
def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]:
"""
Récupère les informations d'une session sans les messages.
Args:
session_id: Identifiant de la session
Returns:
Informations de la session ou None
"""
try:
file_path = self.get_session_file_path(session_id)
if not os.path.exists(file_path):
return None
with open(file_path, 'r', encoding='utf-8') as f:
session_data = json.load(f)
# Retourner les informations sans les messages
info = {
"session_id": session_data.get("session_id"),
"created_at": session_data.get("created_at"),
"updated_at": session_data.get("updated_at"),
"message_count": session_data.get("message_count", 0)
}
return info
except Exception as e:
logger.error(f"Erreur lors de la récupération des infos de la session {session_id}: {str(e)}")
return None
def list_sessions(self) -> List[Dict[str, Any]]:
"""
Liste toutes les sessions existantes.
Returns:
Liste des informations des sessions
"""
try:
sessions = []
if not os.path.exists(self.storage_dir):
return sessions
for filename in os.listdir(self.storage_dir):
if filename.endswith('.json'):
session_id = filename[:-5] # Enlever .json
session_info = self.get_session_info(session_id)
if session_info:
sessions.append(session_info)
# Trier par date de mise à jour décroissante
sessions.sort(key=lambda x: x.get("updated_at", ""), reverse=True)
logger.info(f"Liste des sessions: {len(sessions)} sessions trouvées")
return sessions
except Exception as e:
logger.error(f"Erreur lors de la liste des sessions: {str(e)}")
return []
def delete_session(self, session_id: str) -> bool:
"""
Supprime une session et son fichier associé.
Args:
session_id: Identifiant de la session à supprimer
Returns:
True si la suppression a réussi
"""
try:
file_path = self.get_session_file_path(session_id)
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Session {session_id} supprimée avec succès")
return True
else:
logger.warning(f"Fichier de session {session_id} non trouvé pour suppression")
return False
except Exception as e:
logger.error(f"Erreur lors de la suppression de la session {session_id}: {str(e)}")
return False
def clear_old_sessions(self, days_threshold: int = 30) -> int:
"""
Supprime les sessions plus anciennes que le seuil spécifié.
Args:
days_threshold: Nombre de jours avant suppression
Returns:
Nombre de sessions supprimées
"""
try:
sessions = self.list_sessions()
deleted_count = 0
cutoff_date = datetime.now().timestamp() - (days_threshold * 24 * 3600)
for session in sessions:
updated_at_str = session.get("updated_at", "")
if updated_at_str:
try:
updated_at = datetime.fromisoformat(updated_at_str.replace('Z', '+00:00')).timestamp()
if updated_at < cutoff_date:
if self.delete_session(session["session_id"]):
deleted_count += 1
except ValueError:
continue
logger.info(f"Nettoyage terminé: {deleted_count} anciennes sessions supprimées")
return deleted_count
except Exception as e:
logger.error(f"Erreur lors du nettoyage des anciennes sessions: {str(e)}")
return 0
def get_session_statistics(self) -> Dict[str, Any]:
"""
Retourne des statistiques sur les sessions.
Returns:
Statistiques des sessions
"""
try:
sessions = self.list_sessions()
if not sessions:
return {
"total_sessions": 0,
"total_messages": 0,
"average_messages_per_session": 0,
"oldest_session": None,
"newest_session": None
}
total_messages = sum(s.get("message_count", 0) for s in sessions)
avg_messages = total_messages / len(sessions) if sessions else 0
# Trouver la session la plus ancienne et la plus récente
sessions_by_date = sorted(sessions, key=lambda x: x.get("created_at", ""))
return {
"total_sessions": len(sessions),
"total_messages": total_messages,
"average_messages_per_session": round(avg_messages, 2),
"oldest_session": sessions_by_date[0].get("session_id") if sessions_by_date else None,
"newest_session": sessions_by_date[-1].get("session_id") if sessions_by_date else None
}
except Exception as e:
logger.error(f"Erreur lors du calcul des statistiques: {str(e)}")
return {}