# logging_manager.py import pandas as pd import json from datetime import datetime from typing import Optional, Union, List, Dict from huggingface_hub import HfApi, file_exists, hf_hub_download, list_repo_files import gradio as gr # Solo para el tipo LikeData class LoggingManager: def __init__(self, repo_name: str = "pharma-IA", project_id: str = "logs-engineering", hf_token: Optional[str] = None): """ Inicializa el gestor de logs con configuración básica. Args: repo_name: Nombre del repositorio en Hugging Face project_id: ID del proyecto dentro del repositorio hf_token: Token de autenticación para Hugging Face Hub """ self.repo_name = repo_name self.project_id = project_id self.hf_token = hf_token self.repo_id = f"{repo_name}/{project_id}" self.api = HfApi() # -------------------------- # Funciones básicas de utilidad # -------------------------- def _get_current_filename(self) -> str: """Genera el nombre del archivo de logs para el mes actual.""" return f"logs_{datetime.now().strftime('%Y-%m')}.csv" @staticmethod def _normalize_text(text: str) -> str: """Normaliza texto para comparaciones.""" return text.strip().lower() def _file_exists(self, filename: str) -> bool: """Verifica si un archivo existe en el repositorio.""" try: files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) return filename in files except Exception as e: print(f"Error checking file existence: {e}") return False # -------------------------- # Funciones principales # -------------------------- def save_interaction(self, user_message: Union[str, dict], response_text: str, user: str) -> bool: """ Guarda una interacción usuario-sistema en los logs. Args: user_message: Mensaje del usuario (puede ser string o dict con texto/archivos) response_text: Respuesta generada por el sistema user: Identificador del usuario Returns: bool: True si se guardó correctamente, False si hubo error """ try: filename = self._get_current_filename() # Cargar dataframe existente o crear uno nuevo if self._file_exists(filename): local_path = hf_hub_download( repo_id=self.repo_id, filename=filename, repo_type="dataset", token=self.hf_token ) df = pd.read_csv(local_path) else: df = pd.DataFrame(columns=["timestamp", "user_message", "response_text", "flag", "user"]) # Preparar nuevo registro new_entry = { "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "user_message": json.dumps(user_message) if isinstance(user_message, dict) else user_message, "response_text": response_text, "flag": "", "user": user } # Añadir y guardar df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True) df.to_csv(filename, index=False) # Subir al repositorio self.api.upload_file( path_or_fileobj=filename, path_in_repo=filename, repo_id=self.repo_id, token=self.hf_token, repo_type="dataset" ) return True except Exception as e: print(f"Error saving interaction: {e}") return False def record_feedback(self, feedback_data: gr.LikeData) -> bool: """ Registra feedback de usuario (like/dislike) en los logs. Args: feedback_data: Datos del feedback desde Gradio Returns: bool: True si se registró correctamente """ if not feedback_data: print("No feedback data provided") return False try: text_value = feedback_data.value if isinstance(feedback_data.value, str) else feedback_data.value.get('value', '') filename = self._get_current_filename() if not self._file_exists(filename): print(f"Log file {filename} doesn't exist") return False # Descargar y cargar logs local_path = hf_hub_download( repo_id=self.repo_id, filename=filename, repo_type="dataset", token=self.hf_token ) df = pd.read_csv(local_path) # Buscar la interacción correspondiente normalized_value = self._normalize_text(text_value) df['normalized_response'] = df['response_text'].apply(self._normalize_text) matching_indices = df.index[ df['normalized_response'].str.contains(normalized_value, na=False, regex=False) ].tolist() if matching_indices: last_match = matching_indices[-1] df.at[last_match, 'flag'] = str(feedback_data.liked) df = df.drop(columns=['normalized_response']) # Guardar cambios df.to_csv(filename, index=False) self.api.upload_file( path_or_fileobj=filename, path_in_repo=filename, repo_id=self.repo_id, token=self.hf_token, repo_type="dataset" ) return True else: print("No matching interaction found for feedback") return False except Exception as e: print(f"Error recording feedback: {e}") return False def save_evaluation_metrics( self, query: str, faithfulness_score: float, answer_relevancy_score: float, context_relevancy_score: float ) -> bool: """ Guarda métricas de evaluación para una interacción específica. Args: query: Texto de la consulta original (puede ser string JSON o texto plano) faithfulness_score: Puntaje de groundedness/faithfulness answer_relevancy_score: Puntaje de relevancia de la respuesta context_relevancy_score: Puntaje de relevancia del contexto Returns: bool: True si se guardó correctamente """ try: filename = self._get_current_filename() if not self._file_exists(filename): print(f"Log file {filename} doesn't exist") return False # Cargar logs local_path = hf_hub_download( repo_id=self.repo_id, filename=filename, repo_type="dataset", token=self.hf_token ) df = pd.read_csv(local_path) # Extraer el texto real de la query (maneja ambos formatos) try: # Si la query es un string JSON, extraemos el campo 'text' import json query_dict = json.loads(query.replace("'", '"')) # Normalizamos comillas query_text = query_dict['text'] except: # Si no es JSON, usamos la query directamente query_text = query # Buscar la interacción más reciente que coincida con la consulta norm_query = self._normalize_text(query_text) def extract_query_text(cell): try: if pd.isna(cell): return "" cell_dict = json.loads(cell.replace("'", '"')) return self._normalize_text(cell_dict['text']) except: return self._normalize_text(str(cell)) matches = df.index[ df['user_message'].apply(extract_query_text) == norm_query ].tolist() if matches: last_match = matches[-1] df.at[last_match, 'groundedness'] = faithfulness_score df.at[last_match, 'answer_relevancy'] = answer_relevancy_score df.at[last_match, 'context_relevancy'] = context_relevancy_score # Guardar cambios df.to_csv(filename, index=False) self.api.upload_file( path_or_fileobj=filename, path_in_repo=filename, repo_id=self.repo_id, token=self.hf_token, repo_type="dataset" ) return True else: print("No matching query found in logs") print(f"Buscando: '{norm_query}'") print("Consultas existentes:", df['user_message'].apply(extract_query_text).unique()) return False except Exception as e: print(f"Error saving evaluation metrics: {e}") return False def save_node_references( self, query: dict, source_nodes: list, kg_nodes: list ) -> bool: """ Guarda referencias a nodos utilizados en una respuesta. Args: query: Consulta original (dict con al menos campo 'text') source_nodes: Nodos de documentos usados kg_nodes: Nodos de knowledge graph usados Returns: bool: True si se guardó correctamente """ try: filename = self._get_current_filename() if not self._file_exists(filename): print(f"Log file {filename} doesn't exist") return False # Cargar logs local_path = hf_hub_download( repo_id=self.repo_id, filename=filename, repo_type="dataset", token=self.hf_token ) df = pd.read_csv(local_path) # Buscar interacción correspondiente query_text = query.get('text', '') norm_query = self._normalize_text(query_text) matches = df.index[ df['user_message'].apply(self._normalize_text).str.contains(norm_query, na=False, regex=False) ].tolist() if matches: last_match = matches[-1] df.at[last_match, 'response_node_ids'] = ", ".join([n.node.id_ for n in source_nodes]) df.at[last_match, 'kg_node_ids'] = ", ".join([n.node.id_ for n in kg_nodes]) # Guardar cambios df.to_csv(filename, index=False) self.api.upload_file( path_or_fileobj=filename, path_in_repo=filename, repo_id=self.repo_id, token=self.hf_token, repo_type="dataset" ) return True else: print("No matching query found for node references") return False except Exception as e: print(f"Error saving node references: {e}") return False # -------------------------- # Funciones de consulta/auditoría # -------------------------- def get_available_log_months(self) -> List[str]: """Obtiene los meses con logs disponibles.""" try: files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) return sorted([f.split('_')[1].replace('.csv', '') for f in files if f.startswith('logs_')]) except Exception as e: print(f"Error getting available months: {e}") return [] def get_audit_trail(self, month: str) -> pd.DataFrame: """ Obtiene los logs de un mes específico para auditoría. Args: month: Mes en formato YYYY-MM Returns: pd.DataFrame: DataFrame con los logs o vacío si hay error """ try: filename = f"logs_{month}.csv" if not self._file_exists(filename): print(f"No logs found for {month}") return pd.DataFrame() local_path = hf_hub_download( repo_id=self.repo_id, filename=filename, repo_type="dataset", token=self.hf_token ) df = pd.read_csv(local_path) # Formatear y ordenar df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.strftime('%Y-%m-%d %H:%M:%S UTC-0') df = df.sort_values("timestamp", ascending=False) # Renombrar columnas para visualización return df.rename(columns={ "timestamp": "Timestamp", "user_message": "User Message", "response_text": "Response", "flag": "Feedback", "user": "User", "groundedness": "Groundedness", "answer_relevancy": "Answer Relevancy", "context_relevancy": "Context Relevancy", "response_node_ids": "Document Nodes", "kg_node_ids": "KG Nodes" }) except Exception as e: print(f"Error loading audit trail: {e}") return pd.DataFrame() def get_user_history(self, user: str, limit: int = 5) -> str: """ Obtiene el historial de un usuario formateado en Markdown. Args: user: Identificador del usuario limit: Número máximo de interacciones a devolver Returns: str: Historial formateado o mensaje de error """ try: history = self._get_raw_history(user, limit) if not history: return "⚠️ No history found for this user" markdown = [ f"## Chat History for {user} (last {len(history)} interactions)", "*Ordered from oldest to newest*\n" ] for i, interaction in enumerate(history, 1): question = self._format_question(interaction['user_message']) response = interaction['response_text'].strip() if interaction['response_text'] else "(No response)" markdown.extend([ f"\n### Interaction {i}", f"**📅 {interaction['timestamp']}**", "", "**❓ Question:**", f"> {question}", "", "**💡 Response:**", f"> {response}", "", "---" ]) return "\n".join(markdown[:-1]) + "\n\n*End of history*" except Exception as e: print(f"Error generating user history: {e}") return "⚠️ Error retrieving history" # -------------------------- # Funciones auxiliares privadas # -------------------------- def _format_question(self, question_data: Union[str, dict]) -> str: """Formatea el texto de pregunta que puede ser string o dict.""" if not question_data: return "(No text)" if isinstance(question_data, str): try: data = json.loads(question_data) if isinstance(data, dict): question_data = data except json.JSONDecodeError: pass if isinstance(question_data, dict): question_text = question_data.get('text', '(No text)') if files := question_data.get('files', []): attachments = "\n📎 Attachments: " + ", ".join([f"`{f.get('name', 'file')}`" for f in files]) return f"{question_text}{attachments}" return question_text return str(question_data) def _get_raw_history(self, user: str, limit: int) -> List[Dict]: """Obtiene el historial crudo de un usuario.""" try: all_data = [] for month in self.get_available_log_months(): filename = f"logs_{month}.csv" try: if self._file_exists(filename): local_path = hf_hub_download( repo_id=self.repo_id, filename=filename, repo_type="dataset", token=self.hf_token ) df = pd.read_csv(local_path) if all(col in df.columns for col in ['user', 'user_message', 'response_text', 'timestamp']): all_data.append(df) except Exception as e: print(f"Error processing {filename}: {e}") continue if not all_data: return [] # Combinar y filtrar combined = pd.concat(all_data, ignore_index=True) user_data = combined[combined['user'] == user].copy() if user_data.empty: return [] # Ordenar y limitar try: user_data['timestamp'] = pd.to_datetime(user_data['timestamp']) user_data = user_data.sort_values('timestamp', ascending=True).tail(limit) except: user_data = user_data.tail(limit) return user_data[['user_message', 'response_text', 'timestamp']].to_dict('records') except Exception as e: print(f"Error getting raw history: {e}") return []