import json import logging import os from typing import List, Dict, Any, Optional from datetime import datetime import uuid logger = logging.getLogger(__name__) class MemoryManager: """ Gestionnaire de mémoire pour stocker l'historique des conversations. Utilise des fichiers JSON individuels par session_id. """ def __init__(self, storage_dir: str = "./storage/memory"): self.storage_dir = storage_dir # Créer le répertoire de stockage s'il n'existe pas os.makedirs(storage_dir, exist_ok=True) logger.info(f"MemoryManager initialisé avec répertoire: {storage_dir}") def get_session_file_path(self, session_id: str) -> str: """ Génère le chemin du fichier pour une session. Args: session_id: Identifiant de la session Returns: Chemin complet du fichier JSON """ # Nettoyer le session_id pour éviter les problèmes de noms de fichiers safe_session_id = "".join(c for c in session_id if c.isalnum() or c in ('-', '_')) return os.path.join(self.storage_dir, f"{safe_session_id}.json") def create_session(self, session_id: Optional[str] = None) -> str: """ Crée une nouvelle session de conversation. Args: session_id: ID optionnel, généré automatiquement si non fourni Returns: ID de la session créée """ if not session_id: session_id = str(uuid.uuid4()) session_data = { "session_id": session_id, "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(), "message_count": 0, "messages": [] } try: file_path = self.get_session_file_path(session_id) with open(file_path, 'w', encoding='utf-8') as f: json.dump(session_data, f, ensure_ascii=False, indent=2) logger.info(f"Nouvelle session créée: {session_id}") return session_id except Exception as e: logger.error(f"Erreur lors de la création de la session {session_id}: {str(e)}") raise def get_session_history(self, session_id: str) -> List[Dict[str, str]]: """ Récupère l'historique complet d'une session. Args: session_id: Identifiant de la session Returns: Liste des messages [{question, answer, timestamp}, ...] """ try: file_path = self.get_session_file_path(session_id) if not os.path.exists(file_path): logger.info(f"Session {session_id} non trouvée, création d'une nouvelle session") self.create_session(session_id) return [] with open(file_path, 'r', encoding='utf-8') as f: session_data = json.load(f) messages = session_data.get("messages", []) logger.info(f"Historique chargé pour session {session_id}: {len(messages)} messages") return messages except json.JSONDecodeError as e: logger.error(f"Erreur JSON lors de la lecture de la session {session_id}: {str(e)}") return [] except Exception as e: logger.error(f"Erreur lors de la récupération de l'historique de la session {session_id}: {str(e)}") return [] def add_message(self, session_id: str, question: str, answer: str) -> bool: """ Ajoute un échange question/réponse à une session. Args: session_id: Identifiant de la session question: Question de l'utilisateur answer: Réponse de l'agent Returns: True si l'ajout a réussi """ try: file_path = self.get_session_file_path(session_id) # Créer la session si elle n'existe pas if not os.path.exists(file_path): self.create_session(session_id) # Lire les données existantes with open(file_path, 'r', encoding='utf-8') as f: session_data = json.load(f) # Ajouter le nouveau message new_message = { "question": question, "answer": answer, "timestamp": datetime.now().isoformat() } session_data["messages"].append(new_message) session_data["updated_at"] = datetime.now().isoformat() session_data["message_count"] = len(session_data["messages"]) # Sauvegarder les données mises à jour with open(file_path, 'w', encoding='utf-8') as f: json.dump(session_data, f, ensure_ascii=False, indent=2) logger.info(f"Message ajouté à la session {session_id} (total: {session_data['message_count']})") return True except Exception as e: logger.error(f"Erreur lors de l'ajout du message à la session {session_id}: {str(e)}") return False def update_session_metadata(self, session_id: str, metadata: Dict[str, Any]) -> bool: """ Met à jour les métadonnées d'une session. Args: session_id: Identifiant de la session metadata: Métadonnées à ajouter/mettre à jour Returns: True si la mise à jour a réussi """ try: file_path = self.get_session_file_path(session_id) if not os.path.exists(file_path): logger.warning(f"Session {session_id} non trouvée pour mise à jour des métadonnées") return False # Lire les données existantes with open(file_path, 'r', encoding='utf-8') as f: session_data = json.load(f) # Mettre à jour les métadonnées session_data.update(metadata) session_data["updated_at"] = datetime.now().isoformat() # Sauvegarder with open(file_path, 'w', encoding='utf-8') as f: json.dump(session_data, f, ensure_ascii=False, indent=2) logger.info(f"Métadonnées mises à jour pour la session {session_id}") return True except Exception as e: logger.error(f"Erreur lors de la mise à jour des métadonnées de la session {session_id}: {str(e)}") return False def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]: """ Récupère les informations d'une session sans les messages. Args: session_id: Identifiant de la session Returns: Informations de la session ou None """ try: file_path = self.get_session_file_path(session_id) if not os.path.exists(file_path): return None with open(file_path, 'r', encoding='utf-8') as f: session_data = json.load(f) # Retourner les informations sans les messages info = { "session_id": session_data.get("session_id"), "created_at": session_data.get("created_at"), "updated_at": session_data.get("updated_at"), "message_count": session_data.get("message_count", 0) } return info except Exception as e: logger.error(f"Erreur lors de la récupération des infos de la session {session_id}: {str(e)}") return None def list_sessions(self) -> List[Dict[str, Any]]: """ Liste toutes les sessions existantes. Returns: Liste des informations des sessions """ try: sessions = [] if not os.path.exists(self.storage_dir): return sessions for filename in os.listdir(self.storage_dir): if filename.endswith('.json'): session_id = filename[:-5] # Enlever .json session_info = self.get_session_info(session_id) if session_info: sessions.append(session_info) # Trier par date de mise à jour décroissante sessions.sort(key=lambda x: x.get("updated_at", ""), reverse=True) logger.info(f"Liste des sessions: {len(sessions)} sessions trouvées") return sessions except Exception as e: logger.error(f"Erreur lors de la liste des sessions: {str(e)}") return [] def delete_session(self, session_id: str) -> bool: """ Supprime une session et son fichier associé. Args: session_id: Identifiant de la session à supprimer Returns: True si la suppression a réussi """ try: file_path = self.get_session_file_path(session_id) if os.path.exists(file_path): os.remove(file_path) logger.info(f"Session {session_id} supprimée avec succès") return True else: logger.warning(f"Fichier de session {session_id} non trouvé pour suppression") return False except Exception as e: logger.error(f"Erreur lors de la suppression de la session {session_id}: {str(e)}") return False def clear_old_sessions(self, days_threshold: int = 30) -> int: """ Supprime les sessions plus anciennes que le seuil spécifié. Args: days_threshold: Nombre de jours avant suppression Returns: Nombre de sessions supprimées """ try: sessions = self.list_sessions() deleted_count = 0 cutoff_date = datetime.now().timestamp() - (days_threshold * 24 * 3600) for session in sessions: updated_at_str = session.get("updated_at", "") if updated_at_str: try: updated_at = datetime.fromisoformat(updated_at_str.replace('Z', '+00:00')).timestamp() if updated_at < cutoff_date: if self.delete_session(session["session_id"]): deleted_count += 1 except ValueError: continue logger.info(f"Nettoyage terminé: {deleted_count} anciennes sessions supprimées") return deleted_count except Exception as e: logger.error(f"Erreur lors du nettoyage des anciennes sessions: {str(e)}") return 0 def get_session_statistics(self) -> Dict[str, Any]: """ Retourne des statistiques sur les sessions. Returns: Statistiques des sessions """ try: sessions = self.list_sessions() if not sessions: return { "total_sessions": 0, "total_messages": 0, "average_messages_per_session": 0, "oldest_session": None, "newest_session": None } total_messages = sum(s.get("message_count", 0) for s in sessions) avg_messages = total_messages / len(sessions) if sessions else 0 # Trouver la session la plus ancienne et la plus récente sessions_by_date = sorted(sessions, key=lambda x: x.get("created_at", "")) return { "total_sessions": len(sessions), "total_messages": total_messages, "average_messages_per_session": round(avg_messages, 2), "oldest_session": sessions_by_date[0].get("session_id") if sessions_by_date else None, "newest_session": sessions_by_date[-1].get("session_id") if sessions_by_date else None } except Exception as e: logger.error(f"Erreur lors du calcul des statistiques: {str(e)}") return {}