oefa / utils.py
alexis07's picture
Update utils.py
476c707 verified
"""
utils.py β€” Funciones utilitarias reutilizables para el buscador de jurisprudencia.
Contiene toda la lΓ³gica de negocio desacoplada de la interfaz:
β€’ Carga de datos (descarga HF, procesamiento de DataFrames, conexiΓ³n ChromaDB)
β€’ BΓΊsqueda semΓ‘ntica genΓ©rica
β€’ Renderizado de resultados en HTML (con tΓ­tulo y campos extra configurables)
"""
from __future__ import annotations
import os
from typing import Any
import chromadb
import pandas as pd
from huggingface_hub import snapshot_download
# ─────────────────────────────────────────────────────────────────────────────
# 1. CARGA DE DATOS
# ─────────────────────────────────────────────────────────────────────────────
def load_database_assets(db_config: dict) -> dict:
"""
Descarga el dataset desde Hugging Face, procesa los DataFrames de textos
y metadata, y conecta con las colecciones de ChromaDB.
ParΓ‘metros
----------
db_config : dict
Diccionario de configuraciΓ³n de una base de datos individual
(una entrada del diccionario DATABASES de config.py).
Retorna
-------
dict con las claves:
- "metadata" : pd.DataFrame con la metadata indexada por ID compuesto.
- "collections" : lista de objetos chromadb.Collection.
- "field_map" : dict con el mapeo de campos internos β†’ columnas reales.
"""
print(f" Descargando dataset: {db_config['repo_id']}...")
dataset_path = snapshot_download(
repo_id=db_config["repo_id"],
repo_type=db_config.get("repo_type", "dataset"),
)
# --- Cargar textos y metadata ---
print(f" Cargando archivos: {db_config['pickle_file']}, {db_config['csv_file']}...")
textos = pd.read_pickle(os.path.join(dataset_path, db_config["pickle_file"]))
metadata = pd.read_csv(os.path.join(dataset_path, db_config["csv_file"]))
# Eliminar columnas innecesarias (si las hay)
drop_cols = db_config.get("drop_columns", [])
existing_drop_cols = [c for c in drop_cols if c in metadata.columns]
if existing_drop_cols:
metadata = metadata.drop(columns=existing_drop_cols)
# Merge de textos con metadata
merge_key = db_config.get("merge_key", "ID")
metadata = pd.merge(textos, metadata, on=merge_key, how="inner")
# Crear ID compuesto: ID_Page
id_col = db_config.get("id_column", "ID")
page_col = db_config.get("page_column", "Page")
metadata[id_col] = metadata[id_col].astype(str) + "_" + textos[page_col].astype(str)
# Indexar para bΓΊsqueda rΓ‘pida
metadata.set_index(id_col, inplace=True)
# --- Conectar a ChromaDB ---
chroma_dir = os.path.join(dataset_path, db_config["chroma_subdir"])
print(f" Conectando a ChromaDB en: {chroma_dir}...")
client = chromadb.PersistentClient(path=chroma_dir)
collection_names = db_config.get("collection_names", [])
collections = [client.get_collection(name) for name in collection_names]
print(f" βœ“ Base de datos '{db_config.get('display_name', db_config['repo_id'])}' lista.")
return {
"metadata": metadata,
"collections": collections,
"field_map": db_config.get("field_map", {}),
}
# ─────────────────────────────────────────────────────────────────────────────
# 2. BÚSQUEDA SEMÁNTICA
# ─────────────────────────────────────────────────────────────────────────────
def _safe_get(row: Any, column: str | None, default: str = "N/A") -> Any:
"""
Obtiene de forma segura el valor de una columna en una fila de DataFrame,
manejando tanto Series como filas individuales.
Retorna *default* si column es None o no existe.
"""
if column is None:
return default
try:
if hasattr(row, "get"):
return row.get(column, default)
return row[column]
except (KeyError, TypeError):
return default
def perform_search(
query_embedding: list[float],
collections: list,
metadata_df: pd.DataFrame,
field_map: dict,
n_results: int = 15,
) -> list[dict]:
"""
Ejecuta la bΓΊsqueda semΓ‘ntica en las colecciones de ChromaDB y enriquece
los resultados con la metadata del DataFrame.
ParΓ‘metros
----------
query_embedding : list[float]
Vector de embedding de la consulta del usuario.
collections : list
Lista de colecciones de ChromaDB donde buscar.
metadata_df : pd.DataFrame
DataFrame de metadata indexado por ID compuesto.
field_map : dict
Mapeo de campos internos estandarizados β†’ nombres reales de columnas.
Claves esperadas:
"enlace" β†’ columna con el link
"text" β†’ columna con el texto principal
"date" β†’ columna con la fecha
"page" β†’ columna con la pΓ‘gina
"titulo" β†’ columna con el tΓ­tulo (str o None)
"extra_fields" β†’ lista de tuplas (etiqueta, columna)
n_results : int
NΓΊmero mΓ‘ximo de resultados a retornar.
Retorna
-------
list[dict]
Lista de diccionarios con los resultados ordenados por similitud
descendente. Cada diccionario contiene las claves estandarizadas:
id, similitud, enlace, text, date, page, titulo, extras.
"""
# Nombres reales de las columnas segΓΊn el field_map
col_enlace = field_map.get("enlace", "Enlace")
col_text = field_map.get("text", "Text")
col_date = field_map.get("date", "Date")
col_page = field_map.get("page", "Page")
col_titulo = field_map.get("titulo") # puede ser None
extra_fields = field_map.get("extra_fields", []) # lista de (etiqueta, columna)
all_results: list[dict] = []
for collection in collections:
results = collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
include=["distances"],
)
cosine_similarities = [1 - dist for dist in results["distances"][0]]
for i, chroma_id in enumerate(results["ids"][0]):
try:
if chroma_id in metadata_df.index:
row = metadata_df.loc[chroma_id]
# Campos base
res: dict[str, Any] = {
"id": chroma_id,
"similitud": cosine_similarities[i],
"enlace": _safe_get(row, col_enlace, "#"),
"text": _safe_get(row, col_text, "N/A"),
"date": _safe_get(row, col_date, "N/A"),
"page": _safe_get(row, col_page, "N/A"),
}
# TΓ­tulo (puede no existir)
res["titulo"] = _safe_get(row, col_titulo, None)
# Campos extra configurables: lista de (etiqueta, valor)
extras: list[tuple[str, str]] = []
for label, col_name in extra_fields:
value = str(_safe_get(row, col_name, "N/A"))
extras.append((label, value))
res["extras"] = extras
all_results.append(res)
else:
print(f" Warning: ID {chroma_id} no encontrado en metadata.")
except Exception as e:
print(f" Error al recuperar metadata para ID {chroma_id}: {e}")
# Ordenar por similitud descendente y limitar resultados
all_results = sorted(
all_results, key=lambda x: x["similitud"], reverse=True
)[:n_results]
return all_results
# ─────────────────────────────────────────────────────────────────────────────
# 3. RENDERIZADO HTML
# ─────────────────────────────────────────────────────────────────────────────
def render_results_html(search_results: list[dict]) -> str:
"""
Genera el HTML de las tarjetas (cards) de resultados a partir de una lista
de diccionarios estandarizados.
Estructura de cada card:
β”Œβ”€ card-header ──────────────────────────────────────────────┐
β”‚ PΓ‘gina Β· Fecha Β· Relevancia β”‚
β”œβ”€ card-title (solo si hay tΓ­tulo) ───────────────────────────
β”‚ πŸ“Œ TΓ­tulo de la decisiΓ³n β”‚
β”œβ”€ card-extras (solo si hay extra_fields) ───────────────────
β”‚ Etiqueta1: valor1 | Etiqueta2: valor2 | ... β”‚
β”œβ”€ card-body ─────────────────────────────────────────────────
β”‚ Texto principal del fragmento β”‚
β”œβ”€ card-footer ───────────────────────────────────────────────
β”‚ πŸ”— Ver Documento Completo β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
ParΓ‘metros
----------
search_results : list[dict]
Lista de resultados proveniente de `perform_search`.
Retorna
-------
str
Cadena HTML con las tarjetas de resultados, lista para inyectar
en un componente gr.HTML de Gradio.
"""
if not search_results:
return (
'<div style="text-align: center; color: #64748b; padding: 40px;">'
"No se encontraron resultados relevantes.</div>"
)
html = '<div class="results-container">'
for item in search_results:
similitud_pct = f"{item['similitud'] * 100:.1f}%"
enlace = item.get("enlace", "#")
fecha = item.get("date", "N/A")
texto = str(item.get("text", "N/A")).replace("\n", " ")
pagina = item.get("page", "N/A")
titulo = item.get("titulo")
extras = item.get("extras", [])
# ── SecciΓ³n: tΓ­tulo (condicional) ──
titulo_html = ""
if titulo:
titulo_safe = str(titulo).replace("\n", " ")
titulo_html = f"""
<div class="card-title">
<span class="titulo-text">πŸ“Œ {titulo_safe}</span>
</div>"""
# ── SecciΓ³n: campos extra concatenados (condicional) ──
extras_html = ""
if extras:
items_html = " | ".join(
f"<strong>{label}:</strong> {value}" for label, value in extras
)
extras_html = f"""
<div class="card-extras">
{items_html}
</div>"""
# ── Card completa ──
html += f"""
<div class="legal-card">
<div class="card-header">
<span class="res-number">βš–οΈ PΓ‘gina: {pagina}</span>
<span class="res-date">πŸ“… {fecha}</span>
<span class="res-score">🎯 Relevancia: {similitud_pct}</span>
</div>{titulo_html}{extras_html}
<div class="card-body">
<p class="res-summary">{texto}</p>
</div>
<div class="card-footer">
<a href="{enlace}" target="_blank" class="view-link">πŸ”— Ver Documento Completo</a>
</div>
</div>
"""
html += "</div>"
return html
# ─────────────────────────────────────────────────────────────────────────────
# 4. CSS (lectura desde archivo estΓ‘tico)
# ─────────────────────────────────────────────────────────────────────────────
def load_css(css_path: str | None = None) -> str:
"""
Lee y retorna el contenido del archivo CSS.
Busca el archivo en el siguiente orden de prioridad:
1. La ruta explΓ­cita proporcionada en `css_path`.
2. `static/custom.css` (relativo al directorio de este mΓ³dulo).
3. `custom.css` en la raΓ­z del proyecto (mismo directorio que este mΓ³dulo).
ParΓ‘metros
----------
css_path : str | None
Ruta absoluta o relativa al archivo CSS. Si es None, se busca
automΓ‘ticamente en las ubicaciones por defecto.
Retorna
-------
str
Contenido del archivo CSS como cadena de texto.
Raises
------
FileNotFoundError
Si no se encuentra el archivo CSS en ninguna ubicaciΓ³n.
"""
base_dir = os.path.dirname(os.path.abspath(__file__))
# Lista de rutas candidatas en orden de prioridad
if css_path is not None:
candidates = [css_path]
else:
candidates = [
os.path.join(base_dir, "static", "custom.css"),
os.path.join(base_dir, "custom.css"),
]
for path in candidates:
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return f.read()
searched = ", ".join(candidates)
raise FileNotFoundError(
f"No se encontrΓ³ custom.css en ninguna de estas ubicaciones: {searched}"
)