Spaces:
Sleeping
Sleeping
| # app/utils.py | |
| from __future__ import annotations | |
| from typing import List, Any, Tuple | |
| import ast | |
| import json | |
| import pandas as pd | |
| import streamlit as st | |
| # Columnas esperadas (las que exporta la app). Ajusta si tu export añade/quita columnas. | |
| EXPECTED_COLS = [ | |
| "uri", "autor", "fecha", "texto_raw", "texto_clean", | |
| "sentiment", "sent_desc", "p_neg", "p_neu", "p_pos", "sarcasm_score", | |
| "n_palabras", "has_url", "hashtags", "mentions", | |
| ] | |
| # Columnas que deben ser listas (si vienen como strings las convertimos) | |
| LIST_COLUMNS = ["hashtags", "mentions"] | |
| def validate_export_df(df: pd.DataFrame) -> Tuple[bool, List[str]]: | |
| """ | |
| Comprueba si el DataFrame tiene al menos las columnas mínimas esperadas. | |
| Devuelve (is_valid, missing_cols_list). | |
| """ | |
| if df is None: | |
| return False, EXPECTED_COLS.copy() | |
| missing = [c for c in EXPECTED_COLS if c not in df.columns] | |
| return len(missing) == 0, missing | |
| def _to_list_safe(val: Any) -> list: | |
| """ | |
| Convierte de forma segura valores variados a lista: | |
| - Si ya es lista -> la devuelve | |
| - Si es string representando lista -> ast.literal_eval | |
| - Si es string vacío / NaN / None -> [] | |
| - Si es otro tipo (ej str normal) -> [] | |
| """ | |
| if val is None: | |
| return [] | |
| if isinstance(val, list): | |
| return val | |
| if isinstance(val, (tuple, set)): | |
| return list(val) | |
| try: | |
| if pd.isna(val): | |
| return [] | |
| except Exception: | |
| pass | |
| if isinstance(val, str): | |
| s = val.strip() | |
| if s == "": | |
| return [] | |
| if s.startswith("[") and s.endswith("]"): | |
| try: | |
| parsed = ast.literal_eval(s) | |
| if isinstance(parsed, list): | |
| return parsed | |
| return [parsed] | |
| except Exception: | |
| inner = s[1:-1].strip() | |
| if not inner: | |
| return [] | |
| parts = [p.strip().strip("'\"") for p in inner.split(",") if p.strip()] | |
| return parts | |
| if "," in s: | |
| parts = [p.strip() for p in s.split(",") if p.strip()] | |
| return parts | |
| return [] | |
| return [] | |
| def _ensure_list_columns_in_df(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame: | |
| """ | |
| Garantiza que cada columna en `cols` exista en `df` y que todos sus valores sean listas. | |
| """ | |
| # si df es vacío, hacemo una copia con las columnas necesarias | |
| if df is None: | |
| df = pd.DataFrame(columns=EXPECTED_COLS) | |
| for c in cols: | |
| if c not in df.columns: | |
| df[c] = [[] for _ in range(len(df))] | |
| continue | |
| df[c] = df[c].apply(_to_list_safe) | |
| return df | |
| def load_csv_to_df(uploaded) -> pd.DataFrame: | |
| """ | |
| Recibe el archivo subido por Streamlit (o ruta str) y devuelve DataFrame normalizado. | |
| - Intentos con encoding heurístico. | |
| - Normaliza columnas mínimas y convierte 'hashtags'/'mentions' a listas. | |
| """ | |
| try: | |
| if isinstance(uploaded, str): | |
| df = pd.read_csv(uploaded) | |
| else: | |
| uploaded.seek(0) | |
| df = pd.read_csv(uploaded) | |
| except Exception: | |
| df = None | |
| for enc in ("utf-8", "latin1", "iso-8859-1"): | |
| try: | |
| if isinstance(uploaded, str): | |
| df = pd.read_csv(uploaded, encoding=enc) | |
| else: | |
| uploaded.seek(0) | |
| df = pd.read_csv(uploaded, encoding=enc) | |
| break | |
| except Exception: | |
| df = None | |
| if df is None: | |
| raise | |
| if "texto_raw" in df.columns and "texto_clean" not in df.columns: | |
| df["texto_clean"] = df["texto_raw"] | |
| if "fecha" in df.columns: | |
| try: | |
| df["fecha"] = pd.to_datetime(df["fecha"], errors="coerce") | |
| except Exception: | |
| pass | |
| for c in EXPECTED_COLS: | |
| if c not in df.columns: | |
| if c in LIST_COLUMNS: | |
| df[c] = [[] for _ in range(len(df))] | |
| else: | |
| df[c] = None | |
| df = _ensure_list_columns_in_df(df, LIST_COLUMNS) | |
| cols_final = [c for c in EXPECTED_COLS if c in df.columns] + [c for c in df.columns if c not in EXPECTED_COLS] | |
| return df[cols_final] | |
| def merge_exported_with_new(existing: pd.DataFrame, new: pd.DataFrame, dedup_on: str = "uri") -> pd.DataFrame: | |
| """ | |
| Concatena existing + new y elimina duplicados por 'dedup_on' keeping newest (por fecha) si fecha existe. | |
| Asegura homogeneidad en columnas lista (hashtags/mentions) para evitar errores de pyarrow/streamlit. | |
| """ | |
| # normalize inputs | |
| ex = existing.copy() if existing is not None else pd.DataFrame() | |
| nw = new.copy() if new is not None else pd.DataFrame() | |
| # Ensure expected columns exist in both | |
| for df_obj in (ex, nw): | |
| for c in EXPECTED_COLS: | |
| if c not in df_obj.columns: | |
| if c in LIST_COLUMNS: | |
| df_obj[c] = [[] for _ in range(len(df_obj))] | |
| else: | |
| df_obj[c] = None | |
| # Ensure list columns | |
| ex = _ensure_list_columns_in_df(ex, LIST_COLUMNS) | |
| nw = _ensure_list_columns_in_df(nw, LIST_COLUMNS) | |
| combined = pd.concat([ex, nw], ignore_index=True, sort=False) | |
| if "fecha" in combined.columns: | |
| try: | |
| combined = combined.sort_values(by="fecha", ascending=False, na_position="last") | |
| except Exception: | |
| pass | |
| if dedup_on in combined.columns: | |
| combined = combined.drop_duplicates(subset=[dedup_on], keep="first") | |
| else: | |
| combined = combined.drop_duplicates(keep="first") | |
| combined = combined.reset_index(drop=True) | |
| combined = _ensure_list_columns_in_df(combined, LIST_COLUMNS) | |
| # Convert lists to JSON strings for Streamlit / pyarrow compatibility | |
| for col in LIST_COLUMNS: | |
| if col in combined.columns: | |
| combined[col] = combined[col].apply(lambda x: json.dumps(x) if isinstance(x, (list, tuple, set)) else x) | |
| # ensure consistent column order | |
| cols_final = [c for c in EXPECTED_COLS if c in combined.columns] + [c for c in combined.columns if c not in EXPECTED_COLS] | |
| return combined[cols_final] | |
| def preview_df(df: pd.DataFrame, n: int = 5): | |
| """Helper visual en Streamlit.""" | |
| if df is None or df.empty: | |
| st.info("DataFrame vacío.") | |
| return | |
| st.write(f"Dimensiones: {df.shape}") | |
| st.dataframe(df.head(n), use_container_width=True) | |