bluesky-explorer / app /analyzer.py
jccolon's picture
Update app/analyzer.py
d5f49ca verified
# app/analyzer.py
from __future__ import annotations
import re
from typing import List, Tuple, Optional, Dict
import pandas as pd
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
import streamlit as st
# ---- RÁPIDO: VADER ----
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
# Librería para manejo de emojis (usa emoji.demojize)
import emoji
# Intentamos importar emosent-py (Emoji Sentiment Ranking). Si no está, usamos fallback.
try:
from emosent import get_emoji_sentiment_rank_multiple, get_emoji_sentiment_rank
_EMOSENT_AVAILABLE = True
except Exception:
_EMOSENT_AVAILABLE = False
# Intentamos importar Afinn y NRCLex (mejoras léxicas). Son opcionales: fallback si no están.
try:
from afinn import Afinn
_AFINN_AVAILABLE = True
except Exception:
Afinn = None # type: ignore
_AFINN_AVAILABLE = False
try:
from nrclex import NRCLex
_NRC_AVAILABLE = True
except Exception:
NRCLex = None # type: ignore
_NRC_AVAILABLE = False
# ======================================================================
# Config
# ======================================================================
MODEL_ID = "nlptown/bert-base-multilingual-uncased-sentiment" # preciso
BATCH_SIZE_CPU = 32
BATCH_SIZE_GPU = 64
# pesos: ajustables
WEIGHT_EMOJI = 0.18 # aporte de emojis
WEIGHT_LEXICON = 0.18 # aporte de lexicones (Afinn + NRCLex)
# WEIGHT_VADER idealmente = 1 - WEIGHT_EMOJI - WEIGHT_LEXICON
WEIGHT_VADER = max(0.0, 1.0 - WEIGHT_EMOJI - WEIGHT_LEXICON)
# Sarcasm detector config (puedes cambiar el modelo por otro más adecuado)
SARCASM_MODEL_ID = "mrm8488/distilbert-finetuned-sarcasm-classification"
# Umbral para considerar sarcasmo "probable" al ajustar compound
SARC_MODEL_THRESH = 0.5
LABEL_MAP_ES = {
"1 star": "muy negativo",
"2 stars": "negativo",
"3 stars": "neutral",
"4 stars": "positivo",
"5 stars": "muy positivo",
# defensivo:
"1 stars": "muy negativo",
"2 star": "negativo",
"3 star": "neutral",
"4 star": "positivo",
"5 star": "muy positivo",
}
NEG_SET = {"muy negativo", "negativo"}
POS_SET = {"positivo", "muy positivo"}
# ======================================================================
# Cargas cacheadas
# ======================================================================
@st.cache_resource(show_spinner=False)
def load_sentiment_components():
"""Componentes del modo preciso (BERT)."""
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID)
model.eval()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)
return tokenizer, model, device
@st.cache_resource(show_spinner=False)
def load_sarcasm_detector(model_id: str = SARCASM_MODEL_ID):
device = 0 if torch.cuda.is_available() else -1
# 1) intento directo con pipeline
try:
detector = pipeline("text-classification", model=model_id, device=device)
return detector
except Exception as e_pipeline:
try:
# 2) intento manual: cargar tokenizer y modelo indicando from_tf=True
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForSequenceClassification.from_pretrained(model_id, from_tf=True)
model.eval()
if torch.cuda.is_available():
try:
model.to(torch.device("cuda"))
except Exception:
pass
# construir pipeline a partir del objeto model+tokenizer
detector = pipeline("text-classification", model=model, tokenizer=tokenizer, device=device)
return detector
except Exception as e_from_tf:
# Mostrar aviso resumido en UI (sin stack demasiado grande)
try:
st.warning(
f"No se pudo cargar el detector de sarcasmo ({model_id}): {e_pipeline}\n"
f"Intento from_tf también falló: {e_from_tf}",
icon="⚠️",
)
except Exception:
pass
return None
# ======================================================================
# Afinn / NRCLex helpers (cacheados, defensivos)
# ======================================================================
@st.cache_resource(show_spinner=False)
def _get_afinn():
if not _AFINN_AVAILABLE:
return None
try:
# intentar en inglés; si quieres en español, cambia language="es" si tu Afinn lo soporta
return Afinn(language="en")
except Exception:
try:
return Afinn()
except Exception:
return None
@st.cache_resource(show_spinner=False)
def _get_nrc_available():
# solo un sentinel para saber si NRC está presente; NRCLex se instancia por texto
return _NRC_AVAILABLE
def _lexicon_scores(text: str) -> float:
"""
Combina Afinn y NRCLex para devolver un score en [-1..1].
Rápido: Afinn = suma de valores por palabra; NRCLex -> pos-neg balance.
"""
if not text:
return 0.0
# Afinn
af = _get_afinn()
af_norm = 0.0
if af is not None:
try:
af_score = af.score(text) # rango variable
# AFNORM: normalizador heurístico; ajusta según longitud media
AFNORM = 10.0
af_norm = max(-1.0, min(1.0, af_score / AFNORM))
except Exception:
af_norm = 0.0
# NRCLex
nrc_balance = 0.0
if _get_nrc_available():
try:
n = NRCLex(text)
freqs = n.raw_emotion_scores or {}
pos = freqs.get("positive", 0) + 0.0
neg = freqs.get("negative", 0) + 0.0
if pos + neg > 0:
nrc_balance = (pos - neg) / (pos + neg)
else:
nrc_balance = 0.0
except Exception:
nrc_balance = 0.0
# combinar Afinn y NRCLex (ponderado)
w_af = 0.7
w_nrc = 0.3
lex_score = af_norm * w_af + nrc_balance * w_nrc
lex_score = max(-1.0, min(1.0, lex_score))
return lex_score
# ======================================================================
# Emoji handling: uso de emosent-py si está disponible + fallback heurístico
# ======================================================================
# Mapa corto y priorizado para emojis muy frecuentes. Es complemento, no reemplazo.
_EMOJI_SENTIMENT_MAP: Dict[str, float] = {
":thumbs_up:": 1.6,
":thumbs_down:": -1.6,
":heart:": 2.0,
":red_heart:": 2.0,
":fire:": 1.5,
":smile:": 1.4,
":grin:": 1.6,
":joy:": 1.8,
":sob:": -2.0,
":cry:": -1.8,
":rage:": -2.5,
":angry:": -2.2,
":pensive:": -1.0,
":unamused:": -1.0,
":neutral_face:": -0.2,
":face_with_rolling_eyes:": -0.8,
}
_POS_KEYWORDS = {"smile", "grin", "joy", "laugh", "heart", "love", "thumbsup", "ok_hand", "fire", "star", "clap"}
_NEG_KEYWORDS = {"sad", "cry", "sob", "angry", "rage", "thumbsdown", "frown", "broken_heart", "vomit", "nausea"}
def _emoji_name_sentiment(name: str) -> float:
"""Heurística local para nombres demojizados."""
if not name:
return 0.0
if name in _EMOJI_SENTIMENT_MAP:
return float(_EMOJI_SENTIMENT_MAP[name])
key = name.strip(":").lower()
for kw in _POS_KEYWORDS:
if kw in key:
return 1.0
for kw in _NEG_KEYWORDS:
if kw in key:
return -1.0
return 0.0
def _extract_emoji_sentiments(text: str) -> list[float]:
"""
Intenta extraer scores por emoji usando emosent-py (si está disponible).
Si no está, usa fallback: demojize + heurística _emoji_name_sentiment.
Devuelve lista de scores en [-1,1].
"""
if not text:
return []
# 1) Si emosent está disponible, usar su función optimizada
if _EMOSENT_AVAILABLE:
try:
parsed = get_emoji_sentiment_rank_multiple(text)
scores = []
for item in parsed:
rank = item.get("emoji_sentiment_rank") or {}
s = rank.get("sentiment_score")
if s is None:
continue
# convertir [0..1] -> [-1..1]
s_norm = float(s) * 2.0 - 1.0
scores.append(s_norm)
if scores:
return scores
except Exception:
# caemos al fallback
pass
# 2) Fallback: detectar emojis con emoji library y mapear por nombre
try:
# demojize el texto (':smile:') y extraer tokens que parecen emoji names
dem = emoji.demojize(text, language="en")
parts = dem.split()
scores = []
for p in parts:
if p.startswith(":") and p.endswith(":"):
val = _emoji_name_sentiment(p)
if val != 0.0:
# normalizamos a [-1,1] si el mapa estaba en otra escala (nuestro mapa ya usa ~[-3..3])
# limitamos a [-1,1] para combinar con compound
s = max(-1.0, min(1.0, val / 2.0)) # heurística: dividir mapa amplio
scores.append(s)
return scores
except Exception:
return []
def _text_demojize_to_names(text: str) -> str:
"""Reemplaza emojis por sus nombres ':name:' (ej: 'hola 😄' -> 'hola :smile:')."""
if not text:
return ""
return emoji.demojize(text, language="en")
# ======================================================================
# Crea el VADER analyzer y "mejora" su lexicon con algunos emojis del mapa corto
# ======================================================================
@st.cache_resource(show_spinner=False)
def _get_vader():
"""Analizador VADER para el modo rápido, con lexicon ampliado (parcial)."""
analyzer = SentimentIntensityAnalyzer()
# inyectar los nombres del mapa corto para mejorar matching
try:
lex_add: Dict[str, float] = {}
for name, val in _EMOJI_SENTIMENT_MAP.items():
lex_add[name] = val
lex_add[name.strip(":")] = val
if lex_add:
analyzer.lexicon.update(lex_add)
except Exception:
pass
return analyzer
# ======================================================================
# Limpieza básica de texto
# ======================================================================
_url = re.compile(
r"(https?://[^\s]+|www\.[^\s]+|[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/[^\s]*|\b[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b)"
)
_mention = re.compile(r"@\w+")
_ws = re.compile(r"\s+")
def normalize_text(t: str) -> str:
if not t:
return ""
# eliminar URLs y dominios
t = _url.sub(" ", t)
# normalizar menciones
t = _mention.sub("@user", t)
# normalizar comillas tipográficas
t = (
t.replace("“", '"')
.replace("”", '"')
.replace("‘", "'")
.replace("’", "'")
)
# normalizar saltos y espacios
t = re.sub(r"[\r\n\t]+", " ", t)
t = _ws.sub(" ", t)
# reducir repeticiones exageradas
t = re.sub(r"([!?]){3,}", r"\1\1", t)
# reducir alargamientos
t = re.sub(r"(.)\1{3,}", r"\1\1", t)
return t.strip()
# ======================================================================
# Heurística de sarcasmo/ironía (rápida y transparente)
# ======================================================================
SARC_HASHTAGS = {
"#sarcasmo", "#sarcasm", "#ironia", "#irony", "#sarc", "#irónica", "#irónico"
}
SARC_MARKERS = {
"/s", # convención Reddit/foros
"sí claro", "claro que sí", "yeah right", "ajá",
"gracias por nada", "qué podría salir mal", "buenísimo...", "genial...", "perfecto...",
}
SARC_EMOJIS = {"🙃", "😒", "🙄"}
def sarcasm_score(t: str) -> int:
"""Devuelve 0/1/2/3 según señales de sarcasmo encontradas."""
if not t:
return 0
tl = t.lower()
score = 0
if any(tag in tl for tag in SARC_HASHTAGS):
score += 2
if any(m in tl for m in SARC_MARKERS):
score += 1
if any(e in t for e in SARC_EMOJIS):
score += 1
if ('"' in t or "“" in t or "”" in t) and any(p in tl for p in ("genial", "perfecto", "maravilloso")):
score += 1
return min(score, 3)
def adjust_with_sarcasm(label_es: str, score: int) -> str:
"""Ajusta la etiqueta en español ante señales de sarcasmo."""
if score <= 0:
return label_es
if label_es in POS_SET:
return "negativo" if score >= 2 else "neutral"
if label_es == "neutral" and score >= 1:
return "negativo" if score >= 2 else "neutral"
if label_es in NEG_SET and score >= 3:
return "muy negativo"
return label_es
# ======================================================================
# Sarcasm detector batch (usa pipeline HF si está disponible)
# ======================================================================
def detect_sarcasm_batch(texts: List[str], detector_pipeline=None) -> List[float]:
"""
Devuelve probabilidad de sarcasmo [0..1] por texto.
Usa pipeline cacheado si no se pasa detector_pipeline.
Si no hay pipeline (fallback), devuelve la heurística normalizada.
"""
if detector_pipeline is None:
detector_pipeline = load_sarcasm_detector()
# Fallback a heurística si no hay pipeline
if detector_pipeline is None:
return [min(1.0, s / 3.0) for s in (sarcasm_score(t) for t in texts)]
probs: List[float] = []
B = 64
for i in range(0, len(texts), B):
chunk = texts[i:i+B]
try:
preds = detector_pipeline(chunk)
# pipeline devuelve lista de dicts para batch
for p in preds:
score = float(p.get("score", 0.0))
probs.append(score)
except Exception:
# fallback parcial: heurística en este chunk
probs.extend([min(1.0, s / 3.0) for s in (sarcasm_score(t) for t in chunk)])
return probs
# ======================================================================
# FAST MODE (VADER): mapping compound -> (p_neg,p_neu,p_pos) + etiqueta
# ======================================================================
def _compound_to_triplet_and_label(compound: float) -> tuple[tuple[float, float, float], str]:
"""
Convierte la puntuación compound de VADER [-1..1] en (p_neg, p_neu, p_pos) y etiqueta LABEL_MAP_ES.
"""
p_pos = max(0.0, compound)
p_neg = max(0.0, -compound)
p_neu = 1.0 - (p_pos + p_neg)
total = max(p_neg + p_neu + p_pos, 1e-9)
p_neg, p_neu, p_pos = p_neg / total, p_neu / total, p_pos / total
if compound <= -0.6:
raw = "1 star"
elif compound <= -0.2:
raw = "2 stars"
elif compound < 0.2:
raw = "3 stars"
elif compound < 0.6:
raw = "4 stars"
else:
raw = "5 stars"
label_es = LABEL_MAP_ES.get(raw, "neutral")
return (p_neg, p_neu, p_pos), label_es
def _predict_fast(texts: List[str]) -> tuple[list[str], list[tuple[float, float, float]], list[int]]:
"""
Analiza con VADER: devuelve (labels_raw tipo '1 star'..'5 stars', probs_triplet, sarcasm_scores)
Súper rápido en CPU. Ahora con reconocimiento automático de emojis mediante emosent-py si está,
ajuste por lexicones (Afinn/NRCLex) y ajuste conservador por sarcasmo usando detector HF (si está disponible).
"""
vader = _get_vader()
labels_raw: list[str] = []
probs_agg: list[tuple[float, float, float]] = []
sarc_scores: list[int] = []
inv_map = {v: k for k, v in LABEL_MAP_ES.items()}
# obtener probabilidades de sarcasmo por modelo (batch) — fallback heurístico si no hay detector
sarcasm_detector = None
try:
sarcasm_detector = load_sarcasm_detector()
except Exception:
sarcasm_detector = None
sarcasm_probs = detect_sarcasm_batch(texts, detector_pipeline=sarcasm_detector)
for i, t in enumerate(texts):
# 1) expandir emojis a nombres ':name:' para que VADER los vea
text_for_vader = _text_demojize_to_names(t or "")
# 2) obtener scores VADER
scores = vader.polarity_scores(text_for_vader)
comp = float(scores.get("compound", 0.0))
# 3) obtener lexicon score (Afinn + NRCLex)
lex_score = _lexicon_scores(t or "")
# 4) Mezcla VADER + lexicon (ponderada)
comp_vl = comp * WEIGHT_VADER + lex_score * WEIGHT_LEXICON
comp_vl = max(-1.0, min(1.0, comp_vl))
# 5) obtener scores de emojis (emosent-py o fallback heurístico) e incorporar
emoji_scores = _extract_emoji_sentiments(t or "")
if emoji_scores:
avg_emoji = sum(emoji_scores) / len(emoji_scores)
comp_vl = max(-1.0, min(1.0, comp_vl * (1.0 - WEIGHT_EMOJI) + avg_emoji * WEIGHT_EMOJI))
comp = comp_vl
# 6) ajustar por sarcasmo: combinamos heurística y modelo (tomamos el max conservador)
heuristic_s = sarcasm_score(t) / 3.0 # escala 0..1
model_s = sarcasm_probs[i] if i < len(sarcasm_probs) else 0.0
combined_sarc = max(heuristic_s, model_s)
if combined_sarc >= SARC_MODEL_THRESH:
# heurística conservadora: penalizar compound si hay sarcasmo probable
if comp > 0.5:
comp = comp * 0.25
elif comp > 0.2:
comp = comp * 0.5
elif comp >= 0.0:
comp = comp * 0.8
else:
# ya negativo -> enfatizar ligeramente
comp = max(-1.0, comp - 0.1)
(p_neg, p_neu, p_pos), label_es = _compound_to_triplet_and_label(comp)
raw = inv_map.get(label_es, "3 stars")
labels_raw.append(raw)
probs_agg.append((p_neg, p_neu, p_pos))
# guardamos la heurística original (0..3) para trazabilidad
sarc_scores.append(sarcasm_score(t))
return labels_raw, probs_agg, sarc_scores
# ======================================================================
# PRECISION MODE (BERT): inferencia PyTorch pura (sin numpy)
# ======================================================================
def _predict_batch(texts: List[str], max_length: int = 256) -> Tuple[List[str], List[Tuple[float, float, float]], List[int]]:
"""
Devuelve:
- labels_raw: etiquetas originales del modelo ("1 star"...)
- probs_agg: lista de (p_neg, p_neu, p_pos) agregadas
- sarc_scores: sarcasm score por texto (para trazabilidad)
"""
tokenizer, model, device = load_sentiment_components()
bs = BATCH_SIZE_GPU if torch.cuda.is_available() else BATCH_SIZE_CPU
labels_raw: List[str] = []
probs_agg: List[Tuple[float, float, float]] = []
sarc_scores: List[int] = []
with torch.inference_mode():
for i in range(0, len(texts), bs):
chunk = texts[i:i + bs]
enc = tokenizer(
chunk,
padding=True,
truncation=True,
max_length=max_length,
return_tensors="pt",
)
enc = {k: v.to(device) for k, v in enc.items()}
out = model(**enc) # logits [B,5]
probs = F.softmax(out.logits, dim=-1) # [B,5]
# agregamos (neg=1+2, neu=3, pos=4+5)
p_neg = probs[:, 0] + probs[:, 1]
p_neu = probs[:, 2]
p_pos = probs[:, 3] + probs[:, 4]
top_idx = torch.argmax(probs, dim=-1).cpu().tolist()
labels_raw.extend([model.config.id2label[int(j)] for j in top_idx])
probs_agg.extend([(float(a), float(b), float(c)) for a, b, c in zip(p_neg.cpu(), p_neu.cpu(), p_pos.cpu())])
# sarcasmo por texto del batch (heurística para trazabilidad)
sarc_scores.extend([sarcasm_score(t) for t in chunk])
return labels_raw, probs_agg, sarc_scores
# ======================================================================
# API principal para la app (UNIFICADA)
# ======================================================================
@st.cache_data(show_spinner=False)
def clean_and_analyze(
df: pd.DataFrame,
min_chars: int = 0,
dedup_cols: List[str] | None = None,
use_clean_text: bool = True,
mode: str = "rapido", # ahora por defecto usamos los nombres del main: 'rapido' | 'preciso'
) -> pd.DataFrame:
"""
Limpia, deduplica y enriquece el DataFrame, y añade columnas de sentimiento.
- min_chars=0 y dedup_cols=['uri'] → modo “volumen máximo”.
- use_clean_text=True → analiza sobre texto limpio (mejor estabilidad del modelo).
- mode: 'rapido' (VADER) | 'preciso' (BERT) — acepta también sinónimos en inglés.
"""
if df is None or df.empty:
return df
# Normalizar modo para aceptar español/inglés/sinónimos y mapear a la variable interna
internal_mode = "fast"
if isinstance(mode, str):
m = mode.lower().strip()
if m in {"precise", "preciso", "precisión", "precision", "bert"}:
internal_mode = "precise"
elif m in {"fast", "rapido", "rápido", "vader", "rápido_vader"}:
internal_mode = "fast"
else:
# fallback por seguridad: si viene algo inesperado, asumimos 'fast'
internal_mode = "fast"
d = df.copy()
# --- Deduplicado ---
if dedup_cols:
d = d.drop_duplicates(subset=dedup_cols)
# --- Texto básico + filtros ---
d["texto"] = d["texto"].fillna("")
if min_chars and min_chars > 0:
d = d[d["texto"].str.len() >= min_chars]
if d.empty:
return d
# Guardamos el original y creamos una versión limpia para el modelo
d["texto_raw"] = d["texto"]
d["texto_clean"] = d["texto_raw"].map(normalize_text) if use_clean_text else d["texto_raw"]
# --- Enriquecimiento rápido ---
d["n_palabras"] = d["texto_raw"].str.split().str.len()
d["has_url"] = d["texto_raw"].str.contains(r"https?://", na=False)
d["hashtags"] = d["texto_raw"].str.findall(r"#\w+")
d["mentions"] = d["texto_raw"].str.findall(r"@\w+")
# --- Inferencia (según modo) ---
texts_for_model = d["texto_clean"].astype(str).tolist()
if internal_mode == "precise":
labels_raw, probs_agg, sarc_scores = _predict_batch(texts_for_model, max_length=256)
else:
# Rápido por defecto (ahora con reconocimiento de emojis, lexicons y detector de sarcasmo)
labels_raw, probs_agg, sarc_scores = _predict_fast(texts_for_model)
d["sentiment"] = labels_raw
d["p_neg"], d["p_neu"], d["p_pos"] = zip(*probs_agg)
d["sarcasm_score"] = sarc_scores
# --- Etiquetas en español + ajuste por sarcasmo ---
d["sent_desc"] = d["sentiment"].map(LABEL_MAP_ES).fillna("neutral")
d["sent_desc_adj"] = [adjust_with_sarcasm(lbl, sc) for lbl, sc in zip(d["sent_desc"], d["sarcasm_score"])]
# Por compatibilidad con el resto de la app, exponemos 'sent_desc' como final:
d["sent_desc"] = d["sent_desc_adj"]
d = d.drop(columns=["sent_desc_adj"], errors="ignore")
# Orden sugerido de columnas (mantén también las originales no listadas)
cols_order = [
"uri", "autor", "fecha", "texto_raw", "texto_clean",
"sentiment", "sent_desc", "p_neg", "p_neu", "p_pos", "sarcasm_score",
"n_palabras", "has_url", "hashtags", "mentions",
]
cols_final = [c for c in cols_order if c in d.columns] + [c for c in d.columns if c not in cols_order]
d = d[cols_final]
return d