PharmaWise_5.0_DataIntegrity / logging_manager.py
geronimo-pericoli's picture
Create logging_manager.py
27e76cc verified
# logging_manager.py
import pandas as pd
import json
from datetime import datetime
from typing import Optional, Union, List, Dict
from huggingface_hub import HfApi, file_exists, hf_hub_download, list_repo_files
import gradio as gr # Solo para el tipo LikeData
class LoggingManager:
def __init__(self, repo_name: str = "pharma-IA", project_id: str = "logs-engineering", hf_token: Optional[str] = None):
"""
Inicializa el gestor de logs con configuración básica.
Args:
repo_name: Nombre del repositorio en Hugging Face
project_id: ID del proyecto dentro del repositorio
hf_token: Token de autenticación para Hugging Face Hub
"""
self.repo_name = repo_name
self.project_id = project_id
self.hf_token = hf_token
self.repo_id = f"{repo_name}/{project_id}"
self.api = HfApi()
# --------------------------
# Funciones básicas de utilidad
# --------------------------
def _get_current_filename(self) -> str:
"""Genera el nombre del archivo de logs para el mes actual."""
return f"logs_{datetime.now().strftime('%Y-%m')}.csv"
@staticmethod
def _normalize_text(text: str) -> str:
"""Normaliza texto para comparaciones."""
return text.strip().lower()
def _file_exists(self, filename: str) -> bool:
"""Verifica si un archivo existe en el repositorio."""
try:
files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token)
return filename in files
except Exception as e:
print(f"Error checking file existence: {e}")
return False
# --------------------------
# Funciones principales
# --------------------------
def save_interaction(self, user_message: Union[str, dict], response_text: str, user: str) -> bool:
"""
Guarda una interacción usuario-sistema en los logs.
Args:
user_message: Mensaje del usuario (puede ser string o dict con texto/archivos)
response_text: Respuesta generada por el sistema
user: Identificador del usuario
Returns:
bool: True si se guardó correctamente, False si hubo error
"""
try:
filename = self._get_current_filename()
# Cargar dataframe existente o crear uno nuevo
if self._file_exists(filename):
local_path = hf_hub_download(
repo_id=self.repo_id,
filename=filename,
repo_type="dataset",
token=self.hf_token
)
df = pd.read_csv(local_path)
else:
df = pd.DataFrame(columns=["timestamp", "user_message", "response_text", "flag", "user"])
# Preparar nuevo registro
new_entry = {
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"user_message": json.dumps(user_message) if isinstance(user_message, dict) else user_message,
"response_text": response_text,
"flag": "",
"user": user
}
# Añadir y guardar
df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True)
df.to_csv(filename, index=False)
# Subir al repositorio
self.api.upload_file(
path_or_fileobj=filename,
path_in_repo=filename,
repo_id=self.repo_id,
token=self.hf_token,
repo_type="dataset"
)
return True
except Exception as e:
print(f"Error saving interaction: {e}")
return False
def record_feedback(self, feedback_data: gr.LikeData) -> bool:
"""
Registra feedback de usuario (like/dislike) en los logs.
Args:
feedback_data: Datos del feedback desde Gradio
Returns:
bool: True si se registró correctamente
"""
if not feedback_data:
print("No feedback data provided")
return False
try:
text_value = feedback_data.value if isinstance(feedback_data.value, str) else feedback_data.value.get('value', '')
filename = self._get_current_filename()
if not self._file_exists(filename):
print(f"Log file {filename} doesn't exist")
return False
# Descargar y cargar logs
local_path = hf_hub_download(
repo_id=self.repo_id,
filename=filename,
repo_type="dataset",
token=self.hf_token
)
df = pd.read_csv(local_path)
# Buscar la interacción correspondiente
normalized_value = self._normalize_text(text_value)
df['normalized_response'] = df['response_text'].apply(self._normalize_text)
matching_indices = df.index[
df['normalized_response'].str.contains(normalized_value, na=False, regex=False)
].tolist()
if matching_indices:
last_match = matching_indices[-1]
df.at[last_match, 'flag'] = str(feedback_data.liked)
df = df.drop(columns=['normalized_response'])
# Guardar cambios
df.to_csv(filename, index=False)
self.api.upload_file(
path_or_fileobj=filename,
path_in_repo=filename,
repo_id=self.repo_id,
token=self.hf_token,
repo_type="dataset"
)
return True
else:
print("No matching interaction found for feedback")
return False
except Exception as e:
print(f"Error recording feedback: {e}")
return False
def save_evaluation_metrics(
self,
query: str,
faithfulness_score: float,
answer_relevancy_score: float,
context_relevancy_score: float
) -> bool:
"""
Guarda métricas de evaluación para una interacción específica.
Args:
query: Texto de la consulta original (puede ser string JSON o texto plano)
faithfulness_score: Puntaje de groundedness/faithfulness
answer_relevancy_score: Puntaje de relevancia de la respuesta
context_relevancy_score: Puntaje de relevancia del contexto
Returns:
bool: True si se guardó correctamente
"""
try:
filename = self._get_current_filename()
if not self._file_exists(filename):
print(f"Log file {filename} doesn't exist")
return False
# Cargar logs
local_path = hf_hub_download(
repo_id=self.repo_id,
filename=filename,
repo_type="dataset",
token=self.hf_token
)
df = pd.read_csv(local_path)
# Extraer el texto real de la query (maneja ambos formatos)
try:
# Si la query es un string JSON, extraemos el campo 'text'
import json
query_dict = json.loads(query.replace("'", '"')) # Normalizamos comillas
query_text = query_dict['text']
except:
# Si no es JSON, usamos la query directamente
query_text = query
# Buscar la interacción más reciente que coincida con la consulta
norm_query = self._normalize_text(query_text)
def extract_query_text(cell):
try:
if pd.isna(cell):
return ""
cell_dict = json.loads(cell.replace("'", '"'))
return self._normalize_text(cell_dict['text'])
except:
return self._normalize_text(str(cell))
matches = df.index[
df['user_message'].apply(extract_query_text) == norm_query
].tolist()
if matches:
last_match = matches[-1]
df.at[last_match, 'groundedness'] = faithfulness_score
df.at[last_match, 'answer_relevancy'] = answer_relevancy_score
df.at[last_match, 'context_relevancy'] = context_relevancy_score
# Guardar cambios
df.to_csv(filename, index=False)
self.api.upload_file(
path_or_fileobj=filename,
path_in_repo=filename,
repo_id=self.repo_id,
token=self.hf_token,
repo_type="dataset"
)
return True
else:
print("No matching query found in logs")
print(f"Buscando: '{norm_query}'")
print("Consultas existentes:", df['user_message'].apply(extract_query_text).unique())
return False
except Exception as e:
print(f"Error saving evaluation metrics: {e}")
return False
def save_node_references(
self,
query: dict,
source_nodes: list,
kg_nodes: list
) -> bool:
"""
Guarda referencias a nodos utilizados en una respuesta.
Args:
query: Consulta original (dict con al menos campo 'text')
source_nodes: Nodos de documentos usados
kg_nodes: Nodos de knowledge graph usados
Returns:
bool: True si se guardó correctamente
"""
try:
filename = self._get_current_filename()
if not self._file_exists(filename):
print(f"Log file {filename} doesn't exist")
return False
# Cargar logs
local_path = hf_hub_download(
repo_id=self.repo_id,
filename=filename,
repo_type="dataset",
token=self.hf_token
)
df = pd.read_csv(local_path)
# Buscar interacción correspondiente
query_text = query.get('text', '')
norm_query = self._normalize_text(query_text)
matches = df.index[
df['user_message'].apply(self._normalize_text).str.contains(norm_query, na=False, regex=False)
].tolist()
if matches:
last_match = matches[-1]
df.at[last_match, 'response_node_ids'] = ", ".join([n.node.id_ for n in source_nodes])
df.at[last_match, 'kg_node_ids'] = ", ".join([n.node.id_ for n in kg_nodes])
# Guardar cambios
df.to_csv(filename, index=False)
self.api.upload_file(
path_or_fileobj=filename,
path_in_repo=filename,
repo_id=self.repo_id,
token=self.hf_token,
repo_type="dataset"
)
return True
else:
print("No matching query found for node references")
return False
except Exception as e:
print(f"Error saving node references: {e}")
return False
# --------------------------
# Funciones de consulta/auditoría
# --------------------------
def get_available_log_months(self) -> List[str]:
"""Obtiene los meses con logs disponibles."""
try:
files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token)
return sorted([f.split('_')[1].replace('.csv', '') for f in files if f.startswith('logs_')])
except Exception as e:
print(f"Error getting available months: {e}")
return []
def get_audit_trail(self, month: str) -> pd.DataFrame:
"""
Obtiene los logs de un mes específico para auditoría.
Args:
month: Mes en formato YYYY-MM
Returns:
pd.DataFrame: DataFrame con los logs o vacío si hay error
"""
try:
filename = f"logs_{month}.csv"
if not self._file_exists(filename):
print(f"No logs found for {month}")
return pd.DataFrame()
local_path = hf_hub_download(
repo_id=self.repo_id,
filename=filename,
repo_type="dataset",
token=self.hf_token
)
df = pd.read_csv(local_path)
# Formatear y ordenar
df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.strftime('%Y-%m-%d %H:%M:%S UTC-0')
df = df.sort_values("timestamp", ascending=False)
# Renombrar columnas para visualización
return df.rename(columns={
"timestamp": "Timestamp",
"user_message": "User Message",
"response_text": "Response",
"flag": "Feedback",
"user": "User",
"groundedness": "Groundedness",
"answer_relevancy": "Answer Relevancy",
"context_relevancy": "Context Relevancy",
"response_node_ids": "Document Nodes",
"kg_node_ids": "KG Nodes"
})
except Exception as e:
print(f"Error loading audit trail: {e}")
return pd.DataFrame()
def get_user_history(self, user: str, limit: int = 5) -> str:
"""
Obtiene el historial de un usuario formateado en Markdown.
Args:
user: Identificador del usuario
limit: Número máximo de interacciones a devolver
Returns:
str: Historial formateado o mensaje de error
"""
try:
history = self._get_raw_history(user, limit)
if not history:
return "⚠️ No history found for this user"
markdown = [
f"## Chat History for {user} (last {len(history)} interactions)",
"*Ordered from oldest to newest*\n"
]
for i, interaction in enumerate(history, 1):
question = self._format_question(interaction['user_message'])
response = interaction['response_text'].strip() if interaction['response_text'] else "(No response)"
markdown.extend([
f"\n### Interaction {i}",
f"**📅 {interaction['timestamp']}**",
"",
"**❓ Question:**",
f"> {question}",
"",
"**💡 Response:**",
f"> {response}",
"",
"---"
])
return "\n".join(markdown[:-1]) + "\n\n*End of history*"
except Exception as e:
print(f"Error generating user history: {e}")
return "⚠️ Error retrieving history"
# --------------------------
# Funciones auxiliares privadas
# --------------------------
def _format_question(self, question_data: Union[str, dict]) -> str:
"""Formatea el texto de pregunta que puede ser string o dict."""
if not question_data:
return "(No text)"
if isinstance(question_data, str):
try:
data = json.loads(question_data)
if isinstance(data, dict):
question_data = data
except json.JSONDecodeError:
pass
if isinstance(question_data, dict):
question_text = question_data.get('text', '(No text)')
if files := question_data.get('files', []):
attachments = "\n📎 Attachments: " + ", ".join([f"`{f.get('name', 'file')}`" for f in files])
return f"{question_text}{attachments}"
return question_text
return str(question_data)
def _get_raw_history(self, user: str, limit: int) -> List[Dict]:
"""Obtiene el historial crudo de un usuario."""
try:
all_data = []
for month in self.get_available_log_months():
filename = f"logs_{month}.csv"
try:
if self._file_exists(filename):
local_path = hf_hub_download(
repo_id=self.repo_id,
filename=filename,
repo_type="dataset",
token=self.hf_token
)
df = pd.read_csv(local_path)
if all(col in df.columns for col in ['user', 'user_message', 'response_text', 'timestamp']):
all_data.append(df)
except Exception as e:
print(f"Error processing {filename}: {e}")
continue
if not all_data:
return []
# Combinar y filtrar
combined = pd.concat(all_data, ignore_index=True)
user_data = combined[combined['user'] == user].copy()
if user_data.empty:
return []
# Ordenar y limitar
try:
user_data['timestamp'] = pd.to_datetime(user_data['timestamp'])
user_data = user_data.sort_values('timestamp', ascending=True).tail(limit)
except:
user_data = user_data.tail(limit)
return user_data[['user_message', 'response_text', 'timestamp']].to_dict('records')
except Exception as e:
print(f"Error getting raw history: {e}")
return []