Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import os | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| class MemoryManager: | |
| """ | |
| Gestionnaire de mémoire pour stocker l'historique des conversations. | |
| Utilise des fichiers JSON individuels par session_id. | |
| """ | |
| def __init__(self, storage_dir: str = "./storage/memory"): | |
| self.storage_dir = storage_dir | |
| # Créer le répertoire de stockage s'il n'existe pas | |
| os.makedirs(storage_dir, exist_ok=True) | |
| logger.info(f"MemoryManager initialisé avec répertoire: {storage_dir}") | |
| def get_session_file_path(self, session_id: str) -> str: | |
| """ | |
| Génère le chemin du fichier pour une session. | |
| Args: | |
| session_id: Identifiant de la session | |
| Returns: | |
| Chemin complet du fichier JSON | |
| """ | |
| # Nettoyer le session_id pour éviter les problèmes de noms de fichiers | |
| safe_session_id = "".join(c for c in session_id if c.isalnum() or c in ('-', '_')) | |
| return os.path.join(self.storage_dir, f"{safe_session_id}.json") | |
| def create_session(self, session_id: Optional[str] = None) -> str: | |
| """ | |
| Crée une nouvelle session de conversation. | |
| Args: | |
| session_id: ID optionnel, généré automatiquement si non fourni | |
| Returns: | |
| ID de la session créée | |
| """ | |
| if not session_id: | |
| session_id = str(uuid.uuid4()) | |
| session_data = { | |
| "session_id": session_id, | |
| "created_at": datetime.now().isoformat(), | |
| "updated_at": datetime.now().isoformat(), | |
| "message_count": 0, | |
| "messages": [] | |
| } | |
| try: | |
| file_path = self.get_session_file_path(session_id) | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(session_data, f, ensure_ascii=False, indent=2) | |
| logger.info(f"Nouvelle session créée: {session_id}") | |
| return session_id | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la création de la session {session_id}: {str(e)}") | |
| raise | |
| def get_session_history(self, session_id: str) -> List[Dict[str, str]]: | |
| """ | |
| Récupère l'historique complet d'une session. | |
| Args: | |
| session_id: Identifiant de la session | |
| Returns: | |
| Liste des messages [{question, answer, timestamp}, ...] | |
| """ | |
| try: | |
| file_path = self.get_session_file_path(session_id) | |
| if not os.path.exists(file_path): | |
| logger.info(f"Session {session_id} non trouvée, création d'une nouvelle session") | |
| self.create_session(session_id) | |
| return [] | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| session_data = json.load(f) | |
| messages = session_data.get("messages", []) | |
| logger.info(f"Historique chargé pour session {session_id}: {len(messages)} messages") | |
| return messages | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Erreur JSON lors de la lecture de la session {session_id}: {str(e)}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la récupération de l'historique de la session {session_id}: {str(e)}") | |
| return [] | |
| def add_message(self, session_id: str, question: str, answer: str) -> bool: | |
| """ | |
| Ajoute un échange question/réponse à une session. | |
| Args: | |
| session_id: Identifiant de la session | |
| question: Question de l'utilisateur | |
| answer: Réponse de l'agent | |
| Returns: | |
| True si l'ajout a réussi | |
| """ | |
| try: | |
| file_path = self.get_session_file_path(session_id) | |
| # Créer la session si elle n'existe pas | |
| if not os.path.exists(file_path): | |
| self.create_session(session_id) | |
| # Lire les données existantes | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| session_data = json.load(f) | |
| # Ajouter le nouveau message | |
| new_message = { | |
| "question": question, | |
| "answer": answer, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| session_data["messages"].append(new_message) | |
| session_data["updated_at"] = datetime.now().isoformat() | |
| session_data["message_count"] = len(session_data["messages"]) | |
| # Sauvegarder les données mises à jour | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(session_data, f, ensure_ascii=False, indent=2) | |
| logger.info(f"Message ajouté à la session {session_id} (total: {session_data['message_count']})") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Erreur lors de l'ajout du message à la session {session_id}: {str(e)}") | |
| return False | |
| def update_session_metadata(self, session_id: str, metadata: Dict[str, Any]) -> bool: | |
| """ | |
| Met à jour les métadonnées d'une session. | |
| Args: | |
| session_id: Identifiant de la session | |
| metadata: Métadonnées à ajouter/mettre à jour | |
| Returns: | |
| True si la mise à jour a réussi | |
| """ | |
| try: | |
| file_path = self.get_session_file_path(session_id) | |
| if not os.path.exists(file_path): | |
| logger.warning(f"Session {session_id} non trouvée pour mise à jour des métadonnées") | |
| return False | |
| # Lire les données existantes | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| session_data = json.load(f) | |
| # Mettre à jour les métadonnées | |
| session_data.update(metadata) | |
| session_data["updated_at"] = datetime.now().isoformat() | |
| # Sauvegarder | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(session_data, f, ensure_ascii=False, indent=2) | |
| logger.info(f"Métadonnées mises à jour pour la session {session_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la mise à jour des métadonnées de la session {session_id}: {str(e)}") | |
| return False | |
| def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Récupère les informations d'une session sans les messages. | |
| Args: | |
| session_id: Identifiant de la session | |
| Returns: | |
| Informations de la session ou None | |
| """ | |
| try: | |
| file_path = self.get_session_file_path(session_id) | |
| if not os.path.exists(file_path): | |
| return None | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| session_data = json.load(f) | |
| # Retourner les informations sans les messages | |
| info = { | |
| "session_id": session_data.get("session_id"), | |
| "created_at": session_data.get("created_at"), | |
| "updated_at": session_data.get("updated_at"), | |
| "message_count": session_data.get("message_count", 0) | |
| } | |
| return info | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la récupération des infos de la session {session_id}: {str(e)}") | |
| return None | |
| def list_sessions(self) -> List[Dict[str, Any]]: | |
| """ | |
| Liste toutes les sessions existantes. | |
| Returns: | |
| Liste des informations des sessions | |
| """ | |
| try: | |
| sessions = [] | |
| if not os.path.exists(self.storage_dir): | |
| return sessions | |
| for filename in os.listdir(self.storage_dir): | |
| if filename.endswith('.json'): | |
| session_id = filename[:-5] # Enlever .json | |
| session_info = self.get_session_info(session_id) | |
| if session_info: | |
| sessions.append(session_info) | |
| # Trier par date de mise à jour décroissante | |
| sessions.sort(key=lambda x: x.get("updated_at", ""), reverse=True) | |
| logger.info(f"Liste des sessions: {len(sessions)} sessions trouvées") | |
| return sessions | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la liste des sessions: {str(e)}") | |
| return [] | |
| def delete_session(self, session_id: str) -> bool: | |
| """ | |
| Supprime une session et son fichier associé. | |
| Args: | |
| session_id: Identifiant de la session à supprimer | |
| Returns: | |
| True si la suppression a réussi | |
| """ | |
| try: | |
| file_path = self.get_session_file_path(session_id) | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| logger.info(f"Session {session_id} supprimée avec succès") | |
| return True | |
| else: | |
| logger.warning(f"Fichier de session {session_id} non trouvé pour suppression") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la suppression de la session {session_id}: {str(e)}") | |
| return False | |
| def clear_old_sessions(self, days_threshold: int = 30) -> int: | |
| """ | |
| Supprime les sessions plus anciennes que le seuil spécifié. | |
| Args: | |
| days_threshold: Nombre de jours avant suppression | |
| Returns: | |
| Nombre de sessions supprimées | |
| """ | |
| try: | |
| sessions = self.list_sessions() | |
| deleted_count = 0 | |
| cutoff_date = datetime.now().timestamp() - (days_threshold * 24 * 3600) | |
| for session in sessions: | |
| updated_at_str = session.get("updated_at", "") | |
| if updated_at_str: | |
| try: | |
| updated_at = datetime.fromisoformat(updated_at_str.replace('Z', '+00:00')).timestamp() | |
| if updated_at < cutoff_date: | |
| if self.delete_session(session["session_id"]): | |
| deleted_count += 1 | |
| except ValueError: | |
| continue | |
| logger.info(f"Nettoyage terminé: {deleted_count} anciennes sessions supprimées") | |
| return deleted_count | |
| except Exception as e: | |
| logger.error(f"Erreur lors du nettoyage des anciennes sessions: {str(e)}") | |
| return 0 | |
| def get_session_statistics(self) -> Dict[str, Any]: | |
| """ | |
| Retourne des statistiques sur les sessions. | |
| Returns: | |
| Statistiques des sessions | |
| """ | |
| try: | |
| sessions = self.list_sessions() | |
| if not sessions: | |
| return { | |
| "total_sessions": 0, | |
| "total_messages": 0, | |
| "average_messages_per_session": 0, | |
| "oldest_session": None, | |
| "newest_session": None | |
| } | |
| total_messages = sum(s.get("message_count", 0) for s in sessions) | |
| avg_messages = total_messages / len(sessions) if sessions else 0 | |
| # Trouver la session la plus ancienne et la plus récente | |
| sessions_by_date = sorted(sessions, key=lambda x: x.get("created_at", "")) | |
| return { | |
| "total_sessions": len(sessions), | |
| "total_messages": total_messages, | |
| "average_messages_per_session": round(avg_messages, 2), | |
| "oldest_session": sessions_by_date[0].get("session_id") if sessions_by_date else None, | |
| "newest_session": sessions_by_date[-1].get("session_id") if sessions_by_date else None | |
| } | |
| except Exception as e: | |
| logger.error(f"Erreur lors du calcul des statistiques: {str(e)}") | |
| return {} | |