| | |
| | import pandas as pd |
| | import json |
| | from datetime import datetime |
| | from typing import Optional, Union, List, Dict |
| | from huggingface_hub import HfApi, file_exists, hf_hub_download, list_repo_files |
| | import gradio as gr |
| |
|
| | class LoggingManager: |
| | def __init__(self, repo_name: str = "pharma-IA", project_id: str = "logs-engineering", hf_token: Optional[str] = None): |
| | """ |
| | Inicializa el gestor de logs con configuración básica. |
| | |
| | Args: |
| | repo_name: Nombre del repositorio en Hugging Face |
| | project_id: ID del proyecto dentro del repositorio |
| | hf_token: Token de autenticación para Hugging Face Hub |
| | """ |
| | self.repo_name = repo_name |
| | self.project_id = project_id |
| | self.hf_token = hf_token |
| | self.repo_id = f"{repo_name}/{project_id}" |
| | self.api = HfApi() |
| |
|
| | |
| | |
| | |
| | |
| | def _get_current_filename(self) -> str: |
| | """Genera el nombre del archivo de logs para el mes actual.""" |
| | return f"logs_{datetime.now().strftime('%Y-%m')}.csv" |
| | |
| | @staticmethod |
| | def _normalize_text(text: str) -> str: |
| | """Normaliza texto para comparaciones.""" |
| | return text.strip().lower() |
| | |
| | def _file_exists(self, filename: str) -> bool: |
| | """Verifica si un archivo existe en el repositorio.""" |
| | try: |
| | files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) |
| | return filename in files |
| | except Exception as e: |
| | print(f"Error checking file existence: {e}") |
| | return False |
| |
|
| | |
| | |
| | |
| | |
| | def save_interaction(self, user_message: Union[str, dict], response_text: str, user: str) -> bool: |
| | """ |
| | Guarda una interacción usuario-sistema en los logs. |
| | |
| | Args: |
| | user_message: Mensaje del usuario (puede ser string o dict con texto/archivos) |
| | response_text: Respuesta generada por el sistema |
| | user: Identificador del usuario |
| | |
| | Returns: |
| | bool: True si se guardó correctamente, False si hubo error |
| | """ |
| | try: |
| | filename = self._get_current_filename() |
| | |
| | |
| | if self._file_exists(filename): |
| | local_path = hf_hub_download( |
| | repo_id=self.repo_id, |
| | filename=filename, |
| | repo_type="dataset", |
| | token=self.hf_token |
| | ) |
| | df = pd.read_csv(local_path) |
| | else: |
| | df = pd.DataFrame(columns=["timestamp", "user_message", "response_text", "flag", "user"]) |
| | |
| | |
| | new_entry = { |
| | "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
| | "user_message": json.dumps(user_message) if isinstance(user_message, dict) else user_message, |
| | "response_text": response_text, |
| | "flag": "", |
| | "user": user |
| | } |
| | |
| | |
| | df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True) |
| | df.to_csv(filename, index=False) |
| | |
| | |
| | self.api.upload_file( |
| | path_or_fileobj=filename, |
| | path_in_repo=filename, |
| | repo_id=self.repo_id, |
| | token=self.hf_token, |
| | repo_type="dataset" |
| | ) |
| | return True |
| | |
| | except Exception as e: |
| | print(f"Error saving interaction: {e}") |
| | return False |
| | |
| | def record_feedback(self, feedback_data: gr.LikeData) -> bool: |
| | """ |
| | Registra feedback de usuario (like/dislike) en los logs. |
| | |
| | Args: |
| | feedback_data: Datos del feedback desde Gradio |
| | |
| | Returns: |
| | bool: True si se registró correctamente |
| | """ |
| | if not feedback_data: |
| | print("No feedback data provided") |
| | return False |
| | |
| | try: |
| | text_value = feedback_data.value if isinstance(feedback_data.value, str) else feedback_data.value.get('value', '') |
| | filename = self._get_current_filename() |
| | |
| | if not self._file_exists(filename): |
| | print(f"Log file {filename} doesn't exist") |
| | return False |
| | |
| | |
| | local_path = hf_hub_download( |
| | repo_id=self.repo_id, |
| | filename=filename, |
| | repo_type="dataset", |
| | token=self.hf_token |
| | ) |
| | df = pd.read_csv(local_path) |
| | |
| | |
| | normalized_value = self._normalize_text(text_value) |
| | df['normalized_response'] = df['response_text'].apply(self._normalize_text) |
| | |
| | matching_indices = df.index[ |
| | df['normalized_response'].str.contains(normalized_value, na=False, regex=False) |
| | ].tolist() |
| | |
| | if matching_indices: |
| | last_match = matching_indices[-1] |
| | df.at[last_match, 'flag'] = str(feedback_data.liked) |
| | df = df.drop(columns=['normalized_response']) |
| | |
| | |
| | df.to_csv(filename, index=False) |
| | self.api.upload_file( |
| | path_or_fileobj=filename, |
| | path_in_repo=filename, |
| | repo_id=self.repo_id, |
| | token=self.hf_token, |
| | repo_type="dataset" |
| | ) |
| | return True |
| | else: |
| | print("No matching interaction found for feedback") |
| | return False |
| | |
| | except Exception as e: |
| | print(f"Error recording feedback: {e}") |
| | return False |
| | |
| | def save_evaluation_metrics( |
| | self, |
| | query: str, |
| | faithfulness_score: float, |
| | answer_relevancy_score: float, |
| | context_relevancy_score: float |
| | ) -> bool: |
| | """ |
| | Guarda métricas de evaluación para una interacción específica. |
| | |
| | Args: |
| | query: Texto de la consulta original (puede ser string JSON o texto plano) |
| | faithfulness_score: Puntaje de groundedness/faithfulness |
| | answer_relevancy_score: Puntaje de relevancia de la respuesta |
| | context_relevancy_score: Puntaje de relevancia del contexto |
| | |
| | Returns: |
| | bool: True si se guardó correctamente |
| | """ |
| | try: |
| | filename = self._get_current_filename() |
| | if not self._file_exists(filename): |
| | print(f"Log file {filename} doesn't exist") |
| | return False |
| | |
| | |
| | local_path = hf_hub_download( |
| | repo_id=self.repo_id, |
| | filename=filename, |
| | repo_type="dataset", |
| | token=self.hf_token |
| | ) |
| | df = pd.read_csv(local_path) |
| | |
| | |
| | try: |
| | |
| | import json |
| | query_dict = json.loads(query.replace("'", '"')) |
| | query_text = query_dict['text'] |
| | except: |
| | |
| | query_text = query |
| | |
| | |
| | norm_query = self._normalize_text(query_text) |
| | |
| | def extract_query_text(cell): |
| | try: |
| | if pd.isna(cell): |
| | return "" |
| | cell_dict = json.loads(cell.replace("'", '"')) |
| | return self._normalize_text(cell_dict['text']) |
| | except: |
| | return self._normalize_text(str(cell)) |
| | |
| | matches = df.index[ |
| | df['user_message'].apply(extract_query_text) == norm_query |
| | ].tolist() |
| | |
| | if matches: |
| | last_match = matches[-1] |
| | df.at[last_match, 'groundedness'] = faithfulness_score |
| | df.at[last_match, 'answer_relevancy'] = answer_relevancy_score |
| | df.at[last_match, 'context_relevancy'] = context_relevancy_score |
| | |
| | |
| | df.to_csv(filename, index=False) |
| | self.api.upload_file( |
| | path_or_fileobj=filename, |
| | path_in_repo=filename, |
| | repo_id=self.repo_id, |
| | token=self.hf_token, |
| | repo_type="dataset" |
| | ) |
| | return True |
| | else: |
| | print("No matching query found in logs") |
| | print(f"Buscando: '{norm_query}'") |
| | print("Consultas existentes:", df['user_message'].apply(extract_query_text).unique()) |
| | return False |
| | |
| | except Exception as e: |
| | print(f"Error saving evaluation metrics: {e}") |
| | return False |
| | |
| | def save_node_references( |
| | self, |
| | query: dict, |
| | source_nodes: list, |
| | kg_nodes: list |
| | ) -> bool: |
| | """ |
| | Guarda referencias a nodos utilizados en una respuesta. |
| | |
| | Args: |
| | query: Consulta original (dict con al menos campo 'text') |
| | source_nodes: Nodos de documentos usados |
| | kg_nodes: Nodos de knowledge graph usados |
| | |
| | Returns: |
| | bool: True si se guardó correctamente |
| | """ |
| | try: |
| | filename = self._get_current_filename() |
| | if not self._file_exists(filename): |
| | print(f"Log file {filename} doesn't exist") |
| | return False |
| | |
| | |
| | local_path = hf_hub_download( |
| | repo_id=self.repo_id, |
| | filename=filename, |
| | repo_type="dataset", |
| | token=self.hf_token |
| | ) |
| | df = pd.read_csv(local_path) |
| | |
| | |
| | query_text = query.get('text', '') |
| | norm_query = self._normalize_text(query_text) |
| | matches = df.index[ |
| | df['user_message'].apply(self._normalize_text).str.contains(norm_query, na=False, regex=False) |
| | ].tolist() |
| | |
| | if matches: |
| | last_match = matches[-1] |
| | df.at[last_match, 'response_node_ids'] = ", ".join([n.node.id_ for n in source_nodes]) |
| | df.at[last_match, 'kg_node_ids'] = ", ".join([n.node.id_ for n in kg_nodes]) |
| | |
| | |
| | df.to_csv(filename, index=False) |
| | self.api.upload_file( |
| | path_or_fileobj=filename, |
| | path_in_repo=filename, |
| | repo_id=self.repo_id, |
| | token=self.hf_token, |
| | repo_type="dataset" |
| | ) |
| | return True |
| | else: |
| | print("No matching query found for node references") |
| | return False |
| | |
| | except Exception as e: |
| | print(f"Error saving node references: {e}") |
| | return False |
| | |
| | |
| | |
| | |
| | |
| | def get_available_log_months(self) -> List[str]: |
| | """Obtiene los meses con logs disponibles.""" |
| | try: |
| | files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) |
| | return sorted([f.split('_')[1].replace('.csv', '') for f in files if f.startswith('logs_')]) |
| | except Exception as e: |
| | print(f"Error getting available months: {e}") |
| | return [] |
| | |
| | def get_audit_trail(self, month: str) -> pd.DataFrame: |
| | """ |
| | Obtiene los logs de un mes específico para auditoría. |
| | |
| | Args: |
| | month: Mes en formato YYYY-MM |
| | |
| | Returns: |
| | pd.DataFrame: DataFrame con los logs o vacío si hay error |
| | """ |
| | try: |
| | filename = f"logs_{month}.csv" |
| | if not self._file_exists(filename): |
| | print(f"No logs found for {month}") |
| | return pd.DataFrame() |
| | |
| | local_path = hf_hub_download( |
| | repo_id=self.repo_id, |
| | filename=filename, |
| | repo_type="dataset", |
| | token=self.hf_token |
| | ) |
| | df = pd.read_csv(local_path) |
| | |
| | |
| | df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.strftime('%Y-%m-%d %H:%M:%S UTC-0') |
| | df = df.sort_values("timestamp", ascending=False) |
| | |
| | |
| | return df.rename(columns={ |
| | "timestamp": "Timestamp", |
| | "user_message": "User Message", |
| | "response_text": "Response", |
| | "flag": "Feedback", |
| | "user": "User", |
| | "groundedness": "Groundedness", |
| | "answer_relevancy": "Answer Relevancy", |
| | "context_relevancy": "Context Relevancy", |
| | "response_node_ids": "Document Nodes", |
| | "kg_node_ids": "KG Nodes" |
| | }) |
| | |
| | except Exception as e: |
| | print(f"Error loading audit trail: {e}") |
| | return pd.DataFrame() |
| | |
| | def get_user_history(self, user: str, limit: int = 5) -> str: |
| | """ |
| | Obtiene el historial de un usuario formateado en Markdown. |
| | |
| | Args: |
| | user: Identificador del usuario |
| | limit: Número máximo de interacciones a devolver |
| | |
| | Returns: |
| | str: Historial formateado o mensaje de error |
| | """ |
| | try: |
| | history = self._get_raw_history(user, limit) |
| | if not history: |
| | return "⚠️ No history found for this user" |
| | |
| | markdown = [ |
| | f"## Chat History for {user} (last {len(history)} interactions)", |
| | "*Ordered from oldest to newest*\n" |
| | ] |
| | |
| | for i, interaction in enumerate(history, 1): |
| | question = self._format_question(interaction['user_message']) |
| | response = interaction['response_text'].strip() if interaction['response_text'] else "(No response)" |
| | |
| | markdown.extend([ |
| | f"\n### Interaction {i}", |
| | f"**📅 {interaction['timestamp']}**", |
| | "", |
| | "**❓ Question:**", |
| | f"> {question}", |
| | "", |
| | "**💡 Response:**", |
| | f"> {response}", |
| | "", |
| | "---" |
| | ]) |
| | |
| | return "\n".join(markdown[:-1]) + "\n\n*End of history*" |
| | |
| | except Exception as e: |
| | print(f"Error generating user history: {e}") |
| | return "⚠️ Error retrieving history" |
| | |
| | |
| | |
| | |
| | |
| | def _format_question(self, question_data: Union[str, dict]) -> str: |
| | """Formatea el texto de pregunta que puede ser string o dict.""" |
| | if not question_data: |
| | return "(No text)" |
| | |
| | if isinstance(question_data, str): |
| | try: |
| | data = json.loads(question_data) |
| | if isinstance(data, dict): |
| | question_data = data |
| | except json.JSONDecodeError: |
| | pass |
| | |
| | if isinstance(question_data, dict): |
| | question_text = question_data.get('text', '(No text)') |
| | if files := question_data.get('files', []): |
| | attachments = "\n📎 Attachments: " + ", ".join([f"`{f.get('name', 'file')}`" for f in files]) |
| | return f"{question_text}{attachments}" |
| | return question_text |
| | return str(question_data) |
| | |
| | def _get_raw_history(self, user: str, limit: int) -> List[Dict]: |
| | """Obtiene el historial crudo de un usuario.""" |
| | try: |
| | all_data = [] |
| | for month in self.get_available_log_months(): |
| | filename = f"logs_{month}.csv" |
| | try: |
| | if self._file_exists(filename): |
| | local_path = hf_hub_download( |
| | repo_id=self.repo_id, |
| | filename=filename, |
| | repo_type="dataset", |
| | token=self.hf_token |
| | ) |
| | df = pd.read_csv(local_path) |
| | if all(col in df.columns for col in ['user', 'user_message', 'response_text', 'timestamp']): |
| | all_data.append(df) |
| | except Exception as e: |
| | print(f"Error processing {filename}: {e}") |
| | continue |
| | |
| | if not all_data: |
| | return [] |
| | |
| | |
| | combined = pd.concat(all_data, ignore_index=True) |
| | user_data = combined[combined['user'] == user].copy() |
| | |
| | if user_data.empty: |
| | return [] |
| | |
| | |
| | try: |
| | user_data['timestamp'] = pd.to_datetime(user_data['timestamp']) |
| | user_data = user_data.sort_values('timestamp', ascending=True).tail(limit) |
| | except: |
| | user_data = user_data.tail(limit) |
| | |
| | return user_data[['user_message', 'response_text', 'timestamp']].to_dict('records') |
| | |
| | except Exception as e: |
| | print(f"Error getting raw history: {e}") |
| | return [] |