Spaces:
Sleeping
Sleeping
| # app/analyzer.py | |
| from __future__ import annotations | |
| import re | |
| from typing import List, Tuple, Optional, Dict | |
| import pandas as pd | |
| import torch | |
| import torch.nn.functional as F | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline | |
| import streamlit as st | |
| # ---- RÁPIDO: VADER ---- | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| # Librería para manejo de emojis (usa emoji.demojize) | |
| import emoji | |
| # Intentamos importar emosent-py (Emoji Sentiment Ranking). Si no está, usamos fallback. | |
| try: | |
| from emosent import get_emoji_sentiment_rank_multiple, get_emoji_sentiment_rank | |
| _EMOSENT_AVAILABLE = True | |
| except Exception: | |
| _EMOSENT_AVAILABLE = False | |
| # Intentamos importar Afinn y NRCLex (mejoras léxicas). Son opcionales: fallback si no están. | |
| try: | |
| from afinn import Afinn | |
| _AFINN_AVAILABLE = True | |
| except Exception: | |
| Afinn = None # type: ignore | |
| _AFINN_AVAILABLE = False | |
| try: | |
| from nrclex import NRCLex | |
| _NRC_AVAILABLE = True | |
| except Exception: | |
| NRCLex = None # type: ignore | |
| _NRC_AVAILABLE = False | |
| # ====================================================================== | |
| # Config | |
| # ====================================================================== | |
| MODEL_ID = "nlptown/bert-base-multilingual-uncased-sentiment" # preciso | |
| BATCH_SIZE_CPU = 32 | |
| BATCH_SIZE_GPU = 64 | |
| # pesos: ajustables | |
| WEIGHT_EMOJI = 0.18 # aporte de emojis | |
| WEIGHT_LEXICON = 0.18 # aporte de lexicones (Afinn + NRCLex) | |
| # WEIGHT_VADER idealmente = 1 - WEIGHT_EMOJI - WEIGHT_LEXICON | |
| WEIGHT_VADER = max(0.0, 1.0 - WEIGHT_EMOJI - WEIGHT_LEXICON) | |
| # Sarcasm detector config (puedes cambiar el modelo por otro más adecuado) | |
| SARCASM_MODEL_ID = "mrm8488/distilbert-finetuned-sarcasm-classification" | |
| # Umbral para considerar sarcasmo "probable" al ajustar compound | |
| SARC_MODEL_THRESH = 0.5 | |
| LABEL_MAP_ES = { | |
| "1 star": "muy negativo", | |
| "2 stars": "negativo", | |
| "3 stars": "neutral", | |
| "4 stars": "positivo", | |
| "5 stars": "muy positivo", | |
| # defensivo: | |
| "1 stars": "muy negativo", | |
| "2 star": "negativo", | |
| "3 star": "neutral", | |
| "4 star": "positivo", | |
| "5 star": "muy positivo", | |
| } | |
| NEG_SET = {"muy negativo", "negativo"} | |
| POS_SET = {"positivo", "muy positivo"} | |
| # ====================================================================== | |
| # Cargas cacheadas | |
| # ====================================================================== | |
| def load_sentiment_components(): | |
| """Componentes del modo preciso (BERT).""" | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) | |
| model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID) | |
| model.eval() | |
| device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") | |
| model.to(device) | |
| return tokenizer, model, device | |
| def load_sarcasm_detector(model_id: str = SARCASM_MODEL_ID): | |
| device = 0 if torch.cuda.is_available() else -1 | |
| # 1) intento directo con pipeline | |
| try: | |
| detector = pipeline("text-classification", model=model_id, device=device) | |
| return detector | |
| except Exception as e_pipeline: | |
| try: | |
| # 2) intento manual: cargar tokenizer y modelo indicando from_tf=True | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_id, from_tf=True) | |
| model.eval() | |
| if torch.cuda.is_available(): | |
| try: | |
| model.to(torch.device("cuda")) | |
| except Exception: | |
| pass | |
| # construir pipeline a partir del objeto model+tokenizer | |
| detector = pipeline("text-classification", model=model, tokenizer=tokenizer, device=device) | |
| return detector | |
| except Exception as e_from_tf: | |
| # Mostrar aviso resumido en UI (sin stack demasiado grande) | |
| try: | |
| st.warning( | |
| f"No se pudo cargar el detector de sarcasmo ({model_id}): {e_pipeline}\n" | |
| f"Intento from_tf también falló: {e_from_tf}", | |
| icon="⚠️", | |
| ) | |
| except Exception: | |
| pass | |
| return None | |
| # ====================================================================== | |
| # Afinn / NRCLex helpers (cacheados, defensivos) | |
| # ====================================================================== | |
| def _get_afinn(): | |
| if not _AFINN_AVAILABLE: | |
| return None | |
| try: | |
| # intentar en inglés; si quieres en español, cambia language="es" si tu Afinn lo soporta | |
| return Afinn(language="en") | |
| except Exception: | |
| try: | |
| return Afinn() | |
| except Exception: | |
| return None | |
| def _get_nrc_available(): | |
| # solo un sentinel para saber si NRC está presente; NRCLex se instancia por texto | |
| return _NRC_AVAILABLE | |
| def _lexicon_scores(text: str) -> float: | |
| """ | |
| Combina Afinn y NRCLex para devolver un score en [-1..1]. | |
| Rápido: Afinn = suma de valores por palabra; NRCLex -> pos-neg balance. | |
| """ | |
| if not text: | |
| return 0.0 | |
| # Afinn | |
| af = _get_afinn() | |
| af_norm = 0.0 | |
| if af is not None: | |
| try: | |
| af_score = af.score(text) # rango variable | |
| # AFNORM: normalizador heurístico; ajusta según longitud media | |
| AFNORM = 10.0 | |
| af_norm = max(-1.0, min(1.0, af_score / AFNORM)) | |
| except Exception: | |
| af_norm = 0.0 | |
| # NRCLex | |
| nrc_balance = 0.0 | |
| if _get_nrc_available(): | |
| try: | |
| n = NRCLex(text) | |
| freqs = n.raw_emotion_scores or {} | |
| pos = freqs.get("positive", 0) + 0.0 | |
| neg = freqs.get("negative", 0) + 0.0 | |
| if pos + neg > 0: | |
| nrc_balance = (pos - neg) / (pos + neg) | |
| else: | |
| nrc_balance = 0.0 | |
| except Exception: | |
| nrc_balance = 0.0 | |
| # combinar Afinn y NRCLex (ponderado) | |
| w_af = 0.7 | |
| w_nrc = 0.3 | |
| lex_score = af_norm * w_af + nrc_balance * w_nrc | |
| lex_score = max(-1.0, min(1.0, lex_score)) | |
| return lex_score | |
| # ====================================================================== | |
| # Emoji handling: uso de emosent-py si está disponible + fallback heurístico | |
| # ====================================================================== | |
| # Mapa corto y priorizado para emojis muy frecuentes. Es complemento, no reemplazo. | |
| _EMOJI_SENTIMENT_MAP: Dict[str, float] = { | |
| ":thumbs_up:": 1.6, | |
| ":thumbs_down:": -1.6, | |
| ":heart:": 2.0, | |
| ":red_heart:": 2.0, | |
| ":fire:": 1.5, | |
| ":smile:": 1.4, | |
| ":grin:": 1.6, | |
| ":joy:": 1.8, | |
| ":sob:": -2.0, | |
| ":cry:": -1.8, | |
| ":rage:": -2.5, | |
| ":angry:": -2.2, | |
| ":pensive:": -1.0, | |
| ":unamused:": -1.0, | |
| ":neutral_face:": -0.2, | |
| ":face_with_rolling_eyes:": -0.8, | |
| } | |
| _POS_KEYWORDS = {"smile", "grin", "joy", "laugh", "heart", "love", "thumbsup", "ok_hand", "fire", "star", "clap"} | |
| _NEG_KEYWORDS = {"sad", "cry", "sob", "angry", "rage", "thumbsdown", "frown", "broken_heart", "vomit", "nausea"} | |
| def _emoji_name_sentiment(name: str) -> float: | |
| """Heurística local para nombres demojizados.""" | |
| if not name: | |
| return 0.0 | |
| if name in _EMOJI_SENTIMENT_MAP: | |
| return float(_EMOJI_SENTIMENT_MAP[name]) | |
| key = name.strip(":").lower() | |
| for kw in _POS_KEYWORDS: | |
| if kw in key: | |
| return 1.0 | |
| for kw in _NEG_KEYWORDS: | |
| if kw in key: | |
| return -1.0 | |
| return 0.0 | |
| def _extract_emoji_sentiments(text: str) -> list[float]: | |
| """ | |
| Intenta extraer scores por emoji usando emosent-py (si está disponible). | |
| Si no está, usa fallback: demojize + heurística _emoji_name_sentiment. | |
| Devuelve lista de scores en [-1,1]. | |
| """ | |
| if not text: | |
| return [] | |
| # 1) Si emosent está disponible, usar su función optimizada | |
| if _EMOSENT_AVAILABLE: | |
| try: | |
| parsed = get_emoji_sentiment_rank_multiple(text) | |
| scores = [] | |
| for item in parsed: | |
| rank = item.get("emoji_sentiment_rank") or {} | |
| s = rank.get("sentiment_score") | |
| if s is None: | |
| continue | |
| # convertir [0..1] -> [-1..1] | |
| s_norm = float(s) * 2.0 - 1.0 | |
| scores.append(s_norm) | |
| if scores: | |
| return scores | |
| except Exception: | |
| # caemos al fallback | |
| pass | |
| # 2) Fallback: detectar emojis con emoji library y mapear por nombre | |
| try: | |
| # demojize el texto (':smile:') y extraer tokens que parecen emoji names | |
| dem = emoji.demojize(text, language="en") | |
| parts = dem.split() | |
| scores = [] | |
| for p in parts: | |
| if p.startswith(":") and p.endswith(":"): | |
| val = _emoji_name_sentiment(p) | |
| if val != 0.0: | |
| # normalizamos a [-1,1] si el mapa estaba en otra escala (nuestro mapa ya usa ~[-3..3]) | |
| # limitamos a [-1,1] para combinar con compound | |
| s = max(-1.0, min(1.0, val / 2.0)) # heurística: dividir mapa amplio | |
| scores.append(s) | |
| return scores | |
| except Exception: | |
| return [] | |
| def _text_demojize_to_names(text: str) -> str: | |
| """Reemplaza emojis por sus nombres ':name:' (ej: 'hola 😄' -> 'hola :smile:').""" | |
| if not text: | |
| return "" | |
| return emoji.demojize(text, language="en") | |
| # ====================================================================== | |
| # Crea el VADER analyzer y "mejora" su lexicon con algunos emojis del mapa corto | |
| # ====================================================================== | |
| def _get_vader(): | |
| """Analizador VADER para el modo rápido, con lexicon ampliado (parcial).""" | |
| analyzer = SentimentIntensityAnalyzer() | |
| # inyectar los nombres del mapa corto para mejorar matching | |
| try: | |
| lex_add: Dict[str, float] = {} | |
| for name, val in _EMOJI_SENTIMENT_MAP.items(): | |
| lex_add[name] = val | |
| lex_add[name.strip(":")] = val | |
| if lex_add: | |
| analyzer.lexicon.update(lex_add) | |
| except Exception: | |
| pass | |
| return analyzer | |
| # ====================================================================== | |
| # Limpieza básica de texto | |
| # ====================================================================== | |
| _url = re.compile( | |
| r"(https?://[^\s]+|www\.[^\s]+|[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/[^\s]*|\b[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b)" | |
| ) | |
| _mention = re.compile(r"@\w+") | |
| _ws = re.compile(r"\s+") | |
| def normalize_text(t: str) -> str: | |
| if not t: | |
| return "" | |
| # eliminar URLs y dominios | |
| t = _url.sub(" ", t) | |
| # normalizar menciones | |
| t = _mention.sub("@user", t) | |
| # normalizar comillas tipográficas | |
| t = ( | |
| t.replace("“", '"') | |
| .replace("”", '"') | |
| .replace("‘", "'") | |
| .replace("’", "'") | |
| ) | |
| # normalizar saltos y espacios | |
| t = re.sub(r"[\r\n\t]+", " ", t) | |
| t = _ws.sub(" ", t) | |
| # reducir repeticiones exageradas | |
| t = re.sub(r"([!?]){3,}", r"\1\1", t) | |
| # reducir alargamientos | |
| t = re.sub(r"(.)\1{3,}", r"\1\1", t) | |
| return t.strip() | |
| # ====================================================================== | |
| # Heurística de sarcasmo/ironía (rápida y transparente) | |
| # ====================================================================== | |
| SARC_HASHTAGS = { | |
| "#sarcasmo", "#sarcasm", "#ironia", "#irony", "#sarc", "#irónica", "#irónico" | |
| } | |
| SARC_MARKERS = { | |
| "/s", # convención Reddit/foros | |
| "sí claro", "claro que sí", "yeah right", "ajá", | |
| "gracias por nada", "qué podría salir mal", "buenísimo...", "genial...", "perfecto...", | |
| } | |
| SARC_EMOJIS = {"🙃", "😒", "🙄"} | |
| def sarcasm_score(t: str) -> int: | |
| """Devuelve 0/1/2/3 según señales de sarcasmo encontradas.""" | |
| if not t: | |
| return 0 | |
| tl = t.lower() | |
| score = 0 | |
| if any(tag in tl for tag in SARC_HASHTAGS): | |
| score += 2 | |
| if any(m in tl for m in SARC_MARKERS): | |
| score += 1 | |
| if any(e in t for e in SARC_EMOJIS): | |
| score += 1 | |
| if ('"' in t or "“" in t or "”" in t) and any(p in tl for p in ("genial", "perfecto", "maravilloso")): | |
| score += 1 | |
| return min(score, 3) | |
| def adjust_with_sarcasm(label_es: str, score: int) -> str: | |
| """Ajusta la etiqueta en español ante señales de sarcasmo.""" | |
| if score <= 0: | |
| return label_es | |
| if label_es in POS_SET: | |
| return "negativo" if score >= 2 else "neutral" | |
| if label_es == "neutral" and score >= 1: | |
| return "negativo" if score >= 2 else "neutral" | |
| if label_es in NEG_SET and score >= 3: | |
| return "muy negativo" | |
| return label_es | |
| # ====================================================================== | |
| # Sarcasm detector batch (usa pipeline HF si está disponible) | |
| # ====================================================================== | |
| def detect_sarcasm_batch(texts: List[str], detector_pipeline=None) -> List[float]: | |
| """ | |
| Devuelve probabilidad de sarcasmo [0..1] por texto. | |
| Usa pipeline cacheado si no se pasa detector_pipeline. | |
| Si no hay pipeline (fallback), devuelve la heurística normalizada. | |
| """ | |
| if detector_pipeline is None: | |
| detector_pipeline = load_sarcasm_detector() | |
| # Fallback a heurística si no hay pipeline | |
| if detector_pipeline is None: | |
| return [min(1.0, s / 3.0) for s in (sarcasm_score(t) for t in texts)] | |
| probs: List[float] = [] | |
| B = 64 | |
| for i in range(0, len(texts), B): | |
| chunk = texts[i:i+B] | |
| try: | |
| preds = detector_pipeline(chunk) | |
| # pipeline devuelve lista de dicts para batch | |
| for p in preds: | |
| score = float(p.get("score", 0.0)) | |
| probs.append(score) | |
| except Exception: | |
| # fallback parcial: heurística en este chunk | |
| probs.extend([min(1.0, s / 3.0) for s in (sarcasm_score(t) for t in chunk)]) | |
| return probs | |
| # ====================================================================== | |
| # FAST MODE (VADER): mapping compound -> (p_neg,p_neu,p_pos) + etiqueta | |
| # ====================================================================== | |
| def _compound_to_triplet_and_label(compound: float) -> tuple[tuple[float, float, float], str]: | |
| """ | |
| Convierte la puntuación compound de VADER [-1..1] en (p_neg, p_neu, p_pos) y etiqueta LABEL_MAP_ES. | |
| """ | |
| p_pos = max(0.0, compound) | |
| p_neg = max(0.0, -compound) | |
| p_neu = 1.0 - (p_pos + p_neg) | |
| total = max(p_neg + p_neu + p_pos, 1e-9) | |
| p_neg, p_neu, p_pos = p_neg / total, p_neu / total, p_pos / total | |
| if compound <= -0.6: | |
| raw = "1 star" | |
| elif compound <= -0.2: | |
| raw = "2 stars" | |
| elif compound < 0.2: | |
| raw = "3 stars" | |
| elif compound < 0.6: | |
| raw = "4 stars" | |
| else: | |
| raw = "5 stars" | |
| label_es = LABEL_MAP_ES.get(raw, "neutral") | |
| return (p_neg, p_neu, p_pos), label_es | |
| def _predict_fast(texts: List[str]) -> tuple[list[str], list[tuple[float, float, float]], list[int]]: | |
| """ | |
| Analiza con VADER: devuelve (labels_raw tipo '1 star'..'5 stars', probs_triplet, sarcasm_scores) | |
| Súper rápido en CPU. Ahora con reconocimiento automático de emojis mediante emosent-py si está, | |
| ajuste por lexicones (Afinn/NRCLex) y ajuste conservador por sarcasmo usando detector HF (si está disponible). | |
| """ | |
| vader = _get_vader() | |
| labels_raw: list[str] = [] | |
| probs_agg: list[tuple[float, float, float]] = [] | |
| sarc_scores: list[int] = [] | |
| inv_map = {v: k for k, v in LABEL_MAP_ES.items()} | |
| # obtener probabilidades de sarcasmo por modelo (batch) — fallback heurístico si no hay detector | |
| sarcasm_detector = None | |
| try: | |
| sarcasm_detector = load_sarcasm_detector() | |
| except Exception: | |
| sarcasm_detector = None | |
| sarcasm_probs = detect_sarcasm_batch(texts, detector_pipeline=sarcasm_detector) | |
| for i, t in enumerate(texts): | |
| # 1) expandir emojis a nombres ':name:' para que VADER los vea | |
| text_for_vader = _text_demojize_to_names(t or "") | |
| # 2) obtener scores VADER | |
| scores = vader.polarity_scores(text_for_vader) | |
| comp = float(scores.get("compound", 0.0)) | |
| # 3) obtener lexicon score (Afinn + NRCLex) | |
| lex_score = _lexicon_scores(t or "") | |
| # 4) Mezcla VADER + lexicon (ponderada) | |
| comp_vl = comp * WEIGHT_VADER + lex_score * WEIGHT_LEXICON | |
| comp_vl = max(-1.0, min(1.0, comp_vl)) | |
| # 5) obtener scores de emojis (emosent-py o fallback heurístico) e incorporar | |
| emoji_scores = _extract_emoji_sentiments(t or "") | |
| if emoji_scores: | |
| avg_emoji = sum(emoji_scores) / len(emoji_scores) | |
| comp_vl = max(-1.0, min(1.0, comp_vl * (1.0 - WEIGHT_EMOJI) + avg_emoji * WEIGHT_EMOJI)) | |
| comp = comp_vl | |
| # 6) ajustar por sarcasmo: combinamos heurística y modelo (tomamos el max conservador) | |
| heuristic_s = sarcasm_score(t) / 3.0 # escala 0..1 | |
| model_s = sarcasm_probs[i] if i < len(sarcasm_probs) else 0.0 | |
| combined_sarc = max(heuristic_s, model_s) | |
| if combined_sarc >= SARC_MODEL_THRESH: | |
| # heurística conservadora: penalizar compound si hay sarcasmo probable | |
| if comp > 0.5: | |
| comp = comp * 0.25 | |
| elif comp > 0.2: | |
| comp = comp * 0.5 | |
| elif comp >= 0.0: | |
| comp = comp * 0.8 | |
| else: | |
| # ya negativo -> enfatizar ligeramente | |
| comp = max(-1.0, comp - 0.1) | |
| (p_neg, p_neu, p_pos), label_es = _compound_to_triplet_and_label(comp) | |
| raw = inv_map.get(label_es, "3 stars") | |
| labels_raw.append(raw) | |
| probs_agg.append((p_neg, p_neu, p_pos)) | |
| # guardamos la heurística original (0..3) para trazabilidad | |
| sarc_scores.append(sarcasm_score(t)) | |
| return labels_raw, probs_agg, sarc_scores | |
| # ====================================================================== | |
| # PRECISION MODE (BERT): inferencia PyTorch pura (sin numpy) | |
| # ====================================================================== | |
| def _predict_batch(texts: List[str], max_length: int = 256) -> Tuple[List[str], List[Tuple[float, float, float]], List[int]]: | |
| """ | |
| Devuelve: | |
| - labels_raw: etiquetas originales del modelo ("1 star"...) | |
| - probs_agg: lista de (p_neg, p_neu, p_pos) agregadas | |
| - sarc_scores: sarcasm score por texto (para trazabilidad) | |
| """ | |
| tokenizer, model, device = load_sentiment_components() | |
| bs = BATCH_SIZE_GPU if torch.cuda.is_available() else BATCH_SIZE_CPU | |
| labels_raw: List[str] = [] | |
| probs_agg: List[Tuple[float, float, float]] = [] | |
| sarc_scores: List[int] = [] | |
| with torch.inference_mode(): | |
| for i in range(0, len(texts), bs): | |
| chunk = texts[i:i + bs] | |
| enc = tokenizer( | |
| chunk, | |
| padding=True, | |
| truncation=True, | |
| max_length=max_length, | |
| return_tensors="pt", | |
| ) | |
| enc = {k: v.to(device) for k, v in enc.items()} | |
| out = model(**enc) # logits [B,5] | |
| probs = F.softmax(out.logits, dim=-1) # [B,5] | |
| # agregamos (neg=1+2, neu=3, pos=4+5) | |
| p_neg = probs[:, 0] + probs[:, 1] | |
| p_neu = probs[:, 2] | |
| p_pos = probs[:, 3] + probs[:, 4] | |
| top_idx = torch.argmax(probs, dim=-1).cpu().tolist() | |
| labels_raw.extend([model.config.id2label[int(j)] for j in top_idx]) | |
| probs_agg.extend([(float(a), float(b), float(c)) for a, b, c in zip(p_neg.cpu(), p_neu.cpu(), p_pos.cpu())]) | |
| # sarcasmo por texto del batch (heurística para trazabilidad) | |
| sarc_scores.extend([sarcasm_score(t) for t in chunk]) | |
| return labels_raw, probs_agg, sarc_scores | |
| # ====================================================================== | |
| # API principal para la app (UNIFICADA) | |
| # ====================================================================== | |
| def clean_and_analyze( | |
| df: pd.DataFrame, | |
| min_chars: int = 0, | |
| dedup_cols: List[str] | None = None, | |
| use_clean_text: bool = True, | |
| mode: str = "rapido", # ahora por defecto usamos los nombres del main: 'rapido' | 'preciso' | |
| ) -> pd.DataFrame: | |
| """ | |
| Limpia, deduplica y enriquece el DataFrame, y añade columnas de sentimiento. | |
| - min_chars=0 y dedup_cols=['uri'] → modo “volumen máximo”. | |
| - use_clean_text=True → analiza sobre texto limpio (mejor estabilidad del modelo). | |
| - mode: 'rapido' (VADER) | 'preciso' (BERT) — acepta también sinónimos en inglés. | |
| """ | |
| if df is None or df.empty: | |
| return df | |
| # Normalizar modo para aceptar español/inglés/sinónimos y mapear a la variable interna | |
| internal_mode = "fast" | |
| if isinstance(mode, str): | |
| m = mode.lower().strip() | |
| if m in {"precise", "preciso", "precisión", "precision", "bert"}: | |
| internal_mode = "precise" | |
| elif m in {"fast", "rapido", "rápido", "vader", "rápido_vader"}: | |
| internal_mode = "fast" | |
| else: | |
| # fallback por seguridad: si viene algo inesperado, asumimos 'fast' | |
| internal_mode = "fast" | |
| d = df.copy() | |
| # --- Deduplicado --- | |
| if dedup_cols: | |
| d = d.drop_duplicates(subset=dedup_cols) | |
| # --- Texto básico + filtros --- | |
| d["texto"] = d["texto"].fillna("") | |
| if min_chars and min_chars > 0: | |
| d = d[d["texto"].str.len() >= min_chars] | |
| if d.empty: | |
| return d | |
| # Guardamos el original y creamos una versión limpia para el modelo | |
| d["texto_raw"] = d["texto"] | |
| d["texto_clean"] = d["texto_raw"].map(normalize_text) if use_clean_text else d["texto_raw"] | |
| # --- Enriquecimiento rápido --- | |
| d["n_palabras"] = d["texto_raw"].str.split().str.len() | |
| d["has_url"] = d["texto_raw"].str.contains(r"https?://", na=False) | |
| d["hashtags"] = d["texto_raw"].str.findall(r"#\w+") | |
| d["mentions"] = d["texto_raw"].str.findall(r"@\w+") | |
| # --- Inferencia (según modo) --- | |
| texts_for_model = d["texto_clean"].astype(str).tolist() | |
| if internal_mode == "precise": | |
| labels_raw, probs_agg, sarc_scores = _predict_batch(texts_for_model, max_length=256) | |
| else: | |
| # Rápido por defecto (ahora con reconocimiento de emojis, lexicons y detector de sarcasmo) | |
| labels_raw, probs_agg, sarc_scores = _predict_fast(texts_for_model) | |
| d["sentiment"] = labels_raw | |
| d["p_neg"], d["p_neu"], d["p_pos"] = zip(*probs_agg) | |
| d["sarcasm_score"] = sarc_scores | |
| # --- Etiquetas en español + ajuste por sarcasmo --- | |
| d["sent_desc"] = d["sentiment"].map(LABEL_MAP_ES).fillna("neutral") | |
| d["sent_desc_adj"] = [adjust_with_sarcasm(lbl, sc) for lbl, sc in zip(d["sent_desc"], d["sarcasm_score"])] | |
| # Por compatibilidad con el resto de la app, exponemos 'sent_desc' como final: | |
| d["sent_desc"] = d["sent_desc_adj"] | |
| d = d.drop(columns=["sent_desc_adj"], errors="ignore") | |
| # Orden sugerido de columnas (mantén también las originales no listadas) | |
| cols_order = [ | |
| "uri", "autor", "fecha", "texto_raw", "texto_clean", | |
| "sentiment", "sent_desc", "p_neg", "p_neu", "p_pos", "sarcasm_score", | |
| "n_palabras", "has_url", "hashtags", "mentions", | |
| ] | |
| cols_final = [c for c in cols_order if c in d.columns] + [c for c in d.columns if c not in cols_order] | |
| d = d[cols_final] | |
| return d | |