ProyMLFin / app.py
IvanSRV's picture
Upload 3 files
4acf5ae verified
import streamlit as st
import fitz # PyMuPDF
import torch
import io
import re
import time
from datetime import datetime
from docx import Document
from transformers import (
AutoTokenizer,
AutoModelForSeq2SeqLM,
MarianMTModel,
MarianTokenizer,
pipeline,
)
import gc
import os
# ══════════════════════════════════════════════════════════════════════════════
# CONFIGURACIΓ“N INICIAL
# ══════════════════════════════════════════════════════════════════════════════
st.set_page_config(page_title="Paper Analyzer", page_icon="πŸ”¬", layout="wide")
st.title("πŸ”¬ Paper Analyzer")
st.markdown("Sube un paper cientΓ­fico en PDF y obtΓ©n un anΓ‘lisis completo con IA.")
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128"
device = 0 if torch.cuda.is_available() else -1
device_name = torch.cuda.get_device_name(0) if device == 0 else "CPU"
if device == 0:
st.sidebar.success(f"🟒 GPU: {device_name}")
else:
st.sidebar.warning("🟑 Usando CPU")
# ─── Idiomas disponibles ─────────────────────────────────────────────────────
LANGUAGES = {
"πŸ‡ΊπŸ‡Έ English": None,
"πŸ‡²πŸ‡½ EspaΓ±ol": "Helsinki-NLP/opus-mt-en-es",
"πŸ‡°πŸ‡· ν•œκ΅­μ–΄": "Helsinki-NLP/opus-mt-en-ko",
"πŸ‡«πŸ‡· Francais": "Helsinki-NLP/opus-mt-en-fr",
"πŸ‡©πŸ‡ͺ Deutsch ": "Helsinki-NLP/opus-mt-en-de",
"πŸ‡¨πŸ‡³ δΈ­ζ–‡ ": "Helsinki-NLP/opus-mt-en-zh",
"πŸ‡―πŸ‡΅ ζ—₯本θͺž": "Helsinki-NLP/opus-mt-en-jap",
}
# ─── Áreas temΓ‘ticas ─────────────────────────────────────────────────────────
TOPIC_LABELS = [
"Physics", "Computer Science", "Medicine", "Biology",
"Chemistry", "Engineering", "Mathematics", "Economics",
"Psychology", "Aerospace", "Materials Science", "Neuroscience"
]
# ══════════════════════════════════════════════════════════════════════════════
# CACHÉ DE MODELOS (se cargan una sola vez por sesión)
# ══════════════════════════════════════════════════════════════════════════════
@st.cache_resource(show_spinner=False)
def load_summarizer():
"""Carga el modelo LED especializado en papers largos."""
model_name = "allenai/led-large-16384-arxiv"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
if device == 0:
model = model.to("cuda")
return pipeline("summarization", model=model, tokenizer=tokenizer,
device=device, max_length=500, min_length=60,
truncation=True, no_repeat_ngram_size=3)
@st.cache_resource(show_spinner=False)
def load_ner():
"""Carga el modelo NER para autores/organizaciones."""
return pipeline("ner", model="dslim/bert-base-NER",
aggregation_strategy="simple", device=device)
@st.cache_resource(show_spinner=False)
def load_classifier():
"""Modelo ligero zero-shot para clasificaciΓ³n temΓ‘tica."""
return pipeline("zero-shot-classification",
model="MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli",
device=device)
@st.cache_resource(show_spinner=False)
def load_qa_model():
"""
Modelo extractivo de QA basado en RoBERTa fine-tuned en SQuAD2.
- Gratuito y open-source en Hugging Face.
- No requiere GPU (funciona bien en CPU).
- Especializado en responder preguntas con evidencia textual directa.
Modelo: deepset/roberta-base-squad2
"""
return pipeline(
"question-answering",
model="deepset/roberta-base-squad2",
tokenizer="deepset/roberta-base-squad2",
device=device,
)
@st.cache_resource(show_spinner=False)
def load_translator(model_name):
"""Carga un modelo de traducciΓ³n MarianMT. Se cachea por idioma."""
tok = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)
if device == 0:
model = model.to("cuda")
return tok, model
# ══════════════════════════════════════════════════════════════════════════════
# FUNCIONES AUXILIARES – TEXTO
# ══════════════════════════════════════════════════════════════════════════════
def translate_single(text, model_name):
"""Traduce un texto corto usando el modelo MarianMT."""
if not text or model_name is None:
return text
try:
tok, model = load_translator(model_name)
inputs = tok(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
if device == 0:
inputs = {k: v.to("cuda") for k, v in inputs.items()}
out = model.generate(**inputs)
return tok.decode(out[0], skip_special_tokens=True)
except Exception:
return text
def extract_text_from_pdf(pdf_bytes):
"""Extrae todo el texto de un PDF a partir de sus bytes."""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
text = "".join(page.get_text() for page in doc)
metadata = doc.metadata
doc.close()
return text.strip(), metadata
def robust_sentence_split(text):
"""
Divide un texto en oraciones respetando abreviaturas comunes.
Primero parte por signos de puntuaciΓ³n seguidos de espacio y luego
recombina los segmentos que terminan en una abreviatura conocida.
"""
# Conjunto de abreviaturas que suelen llevar punto pero no finalizan oraciΓ³n
ABBREVIATIONS = {
"Mr", "Mrs", "Ms", "Dr", "Prof", "Sr", "Jr",
"e.g", "i.e", "vs", "etc", "al", "Vol", "Fig",
"Eq", "Sec", "Ref", "No", "St", "dept", "approx",
"vs", "Jan", "Feb", "Mar", "Apr", "Jun", "Jul",
"Aug", "Sep", "Oct", "Nov", "Dec"
}
# ParticiΓ³n inicial por . ! ? seguidos de espacios (o final de cadena)
raw_parts = re.split(r'(?<=[.!?])\s+', text)
sentences = []
i = 0
while i < len(raw_parts):
part = raw_parts[i].strip()
# Si la parte estΓ‘ vacΓ­a, la ignoramos
if not part:
i += 1
continue
# Si la parte termina en punto y la palabra anterior es una abreviatura,
# la juntamos con la siguiente parte (si existe)
if part.endswith('.') and i + 1 < len(raw_parts):
# Extraemos la ΓΊltima "palabra" con el punto
last_word_with_dot = part.split()[-1].lower() if part.split() else ''
# Si coincide con alguna abreviatura (sin punto, porque ya lo quitamos)
if last_word_with_dot.rstrip('.') in ABBREVIATIONS:
# Unimos con el siguiente fragmento
part = part + ' ' + raw_parts[i+1].strip()
i += 2
sentences.append(part.strip())
continue
sentences.append(part)
i += 1
# Si no se generΓ³ ninguna frase, devolvemos el texto original como una sola
return sentences if sentences else [text.strip()]
def translate_text(text, model_name):
"""Traduce un texto usando MarianMT con lotes por oraciones."""
tok, model = load_translator(model_name)
sentences = robust_sentence_split(text)
translated = []
for sent in sentences:
if not sent:
continue
inputs = tok(sent, return_tensors="pt", padding=True,
truncation=True, max_length=512)
if device == 0:
inputs = {k: v.to("cuda") for k, v in inputs.items()}
out = model.generate(**inputs)
translated.append(tok.decode(out[0], skip_special_tokens=True))
return " ".join(translated)
# ══════════════════════════════════════════════════════════════════════════════
# FUNCIONES AUXILIARES – Q&A
# ══════════════════════════════════════════════════════════════════════════════
def split_into_chunks(text: str, chunk_size: int = 400, overlap: int = 50) -> list:
"""
Divide el texto en ventanas de `chunk_size` palabras con solapamiento.
El solapamiento evita que respuestas que caen en la frontera de dos
chunks queden truncadas.
"""
words = text.split()
chunks = []
start = 0
while start < len(words):
end = min(start + chunk_size, len(words))
chunks.append(" ".join(words[start:end]))
if end == len(words):
break
start += chunk_size - overlap
return chunks
def answer_question(question: str, full_text: str, qa_pipe) -> dict:
"""
Pipeline de QA sobre documento largo:
1. Divide el texto en chunks con solapamiento.
2. Corre el modelo en cada chunk y se queda con el de mayor score.
3. Devuelve la respuesta, score de confianza y el fragmento de contexto.
Por quΓ© esta estrategia:
- Los modelos extractivos tienen lΓ­mite de tokens (~512).
- Recorrer todos los chunks garantiza no perder informaciΓ³n.
- El score interno del modelo (probabilidad softmax) es buen proxy
de quΓ© chunk contiene la respuesta real.
"""
chunks = split_into_chunks(full_text, chunk_size=400, overlap=50)
best_score = -1.0
best_answer = "No encontrΓ© informaciΓ³n suficiente para responder esta pregunta."
best_chunk = chunks[0]
for chunk in chunks:
try:
result = qa_pipe(question=question, context=chunk)
if result["score"] > best_score:
best_score = result["score"]
best_answer = result["answer"]
best_chunk = chunk
except Exception:
continue
return {
"answer": best_answer,
"score": best_score,
"context": best_chunk,
}
# ══════════════════════════════════════════════════════════════════════════════
# PIPELINE PRINCIPAL DE ANÁLISIS
# ══════════════════════════════════════════════════════════════════════════════
def analyze_paper(text, metadata, selected_lang, translation_model):
torch.cuda.empty_cache()
gc.collect()
"""Ejecuta todas las herramientas de anΓ‘lisis en orden."""
results = {}
# ── 1. Resumen ────────────────────────────────────────────────────────────
st.info("πŸ”„ Generando resumen con modelo cientΓ­fico...")
summarizer = load_summarizer()
max_input_tokens = 8192
tokenizer = summarizer.tokenizer
tokens = tokenizer.encode(text, add_special_tokens=False, truncation=False)
if len(tokens) > max_input_tokens:
truncated_tokens = tokens[:max_input_tokens]
text_for_summary = tokenizer.decode(truncated_tokens, skip_special_tokens=True)
st.warning(f"⚠️ Texto truncado a {max_input_tokens} tokens para el resumen.")
else:
text_for_summary = text
try:
summary_list = summarizer(
text_for_summary,
max_length=500, min_length=60,
do_sample=False, num_beams=4,
early_stopping=True, truncation=True,
)
summary_en = summary_list[0]['summary_text']
except Exception as e:
st.error(f"Error en el resumen: {e}")
summary_en = "No se pudo generar el resumen."
results['summary_en'] = summary_en
# ── 2. MΓ©tricas ───────────────────────────────────────────────────────────
st.info("πŸ”„ Extrayendo metadatos y entidades...")
pdf_authors = metadata.get("author", "")
pdf_title = metadata.get("title", "")
years = re.findall(r'\b((?:19|20)\d{2})\b', text[:2000])
year = years[0] if years else "No detectado"
try:
ner = load_ner()
entities = ner(text[:3000])
authors_ner, orgs_ner = [], []
for ent in entities:
if ent['entity_group'] == 'PER':
authors_ner.append(ent['word'])
elif ent['entity_group'] == 'ORG':
orgs_ner.append(ent['word'])
authors_ner = list(dict.fromkeys(authors_ner))[:6]
orgs_ner = list(dict.fromkeys(orgs_ner))[:4]
except Exception as e:
st.warning(f"NER no disponible: {e}")
authors_ner, orgs_ner = [], []
if pdf_authors:
authors_list = [a.strip() for a in re.split(r'[,;]+', pdf_authors) if a.strip()]
else:
authors_list = []
for a in authors_ner:
if a not in authors_list:
authors_list.append(a)
results['authors'] = authors_list[:6] if authors_list else ["No detectados"]
results['orgs'] = orgs_ner if orgs_ner else ["No detectadas"]
results['year'] = year
results['title'] = pdf_title if pdf_title else "TΓ­tulo no disponible"
# ── 3. ClasificaciΓ³n temΓ‘tica ─────────────────────────────────────────────
st.info("πŸ”„ Clasificando el paper en Γ‘reas cientΓ­ficas...")
classifier = load_classifier()
try:
classification = classifier(
summary_en[:1500],
candidate_labels=TOPIC_LABELS,
multi_label=False,
)
topics = list(zip(classification['labels'][:3], classification['scores'][:3]))
except Exception as e:
st.error(f"Error en clasificaciΓ³n: {e}")
topics = [("Desconocido", 0.0)]
results['topics'] = topics
# ── 4. TraducciΓ³n ─────────────────────────────────────────────────────────
if translation_model is not None:
st.info(f"πŸ”„ Traduciendo TLDR a {selected_lang}...")
try:
summary_translated = translate_text(summary_en, translation_model)
except Exception as e:
st.error(f"Error en traducciΓ³n: {e}")
summary_translated = summary_en
else:
summary_translated = None
results['summary_translated'] = summary_translated
return results
# ══════════════════════════════════════════════════════════════════════════════
# GENERACIΓ“N DEL INFORME EN WORD
# ══════════════════════════════════════════════════════════════════════════════
def generate_word_report(results, selected_lang, translation_model, qa_history=None):
"""Crea un documento .docx con los resultados del anΓ‘lisis + Q&A."""
doc = Document()
doc.add_heading("Paper Analysis Report", level=1)
doc.add_heading("Metadatos", level=2)
doc.add_paragraph(f"AΓ±o: {results['year']}")
doc.add_paragraph(f"Autores: {', '.join(results['authors'])}")
doc.add_paragraph(f"Instituciones detectadas: {', '.join(results['orgs'])}")
doc.add_paragraph(f"TΓ­tulo del PDF: {results.get('title', 'N/A')}")
doc.add_heading("ClasificaciΓ³n TemΓ‘tica", level=2)
for label, score in results['topics']:
doc.add_paragraph(f"{label}: {score:.1%}")
if translation_model is None:
doc.add_heading("Resumen", level=2)
doc.add_paragraph(results['summary_en'])
else:
doc.add_heading(f"Resumen ({selected_lang})", level=2)
doc.add_paragraph(results['summary_translated'])
# Historial de preguntas y respuestas
if qa_history:
doc.add_heading("Preguntas y Respuestas sobre el Paper", level=2)
for i, qa in enumerate(qa_history, 1):
doc.add_paragraph(f"P{i}: {qa['question']}", style="List Number")
p = doc.add_paragraph(f"Respuesta: {qa['answer']}")
p.add_run(f" [confianza: {qa['score']:.1%}]").italic = True
buffer = io.BytesIO()
doc.save(buffer)
buffer.seek(0)
return buffer
# ══════════════════════════════════════════════════════════════════════════════
# INTERFAZ DE USUARIO
# ══════════════════════════════════════════════════════════════════════════════
uploaded_file = st.file_uploader("πŸ“‚ Sube tu paper (PDF)", type=["pdf"])
st.sidebar.markdown("---")
st.sidebar.subheader("🌐 Idioma del resumen")
selected_lang = st.sidebar.radio("Selecciona idioma:", list(LANGUAGES.keys()))
translation_model_id = LANGUAGES[selected_lang]
# ── Estado persistente entre reruns ──────────────────────────────────────────
if "paper_text" not in st.session_state: st.session_state.paper_text = None
if "analysis_done" not in st.session_state: st.session_state.analysis_done = False
if "results" not in st.session_state: st.session_state.results = None
if "qa_history" not in st.session_state: st.session_state.qa_history = []
if uploaded_file:
pdf_bytes = uploaded_file.read()
with st.spinner("πŸ“„ Leyendo documento..."):
try:
text, metadata = extract_text_from_pdf(pdf_bytes)
st.session_state.paper_text = text
except Exception as e:
st.error(f"Error al leer el PDF: {e}")
st.stop()
st.subheader("πŸ“ƒ Vista previa del texto extraΓ­do")
with st.expander("Ver primeros 3000 caracteres"):
st.text(text[:3000] + ("..." if len(text) > 3000 else ""))
st.info(f"Total caracteres: {len(text):,}")
# ── BotΓ³n de anΓ‘lisis ─────────────────────────────────────────────────────
if st.button("πŸš€ Analizar Paper", use_container_width=True):
st.session_state.qa_history = [] # reset Q&A al reanalizar
st.session_state.analysis_done = False
progress_bar = st.progress(0, text="Iniciando anΓ‘lisis...")
start_time = time.time()
try:
progress_bar.progress(10, text="Cargando modelos (primera vez puede tardar)...")
results = analyze_paper(text, metadata, selected_lang, translation_model_id)
st.session_state.results = results
st.session_state.analysis_done = True
progress_bar.progress(100, text="Β‘AnΓ‘lisis completado!")
elapsed = time.time() - start_time
st.success(f"βœ… AnΓ‘lisis finalizado en {elapsed:.1f} segundos.")
except Exception as e:
st.error(f"Error crΓ­tico durante el anΓ‘lisis: {e}")
raise e
# ── Mostrar resultados ────────────────────────────────────────────────────
if st.session_state.analysis_done and st.session_state.results:
results = st.session_state.results
st.markdown("---")
st.header("πŸ“Š Resultados del AnΓ‘lisis")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("πŸ“… AΓ±o", results['year'])
with col2:
st.metric("πŸ‘₯ Autores encontrados", len(results['authors']))
with col3:
st.metric("🏷️ Área principal", results['topics'][0][0])
st.markdown("### πŸ‘₯ Autores")
st.write(", ".join(results['authors']))
st.markdown("### πŸ›οΈ Instituciones")
st.write(", ".join(results['orgs']))
st.markdown("### 🏷️ Clasificación TemÑtica")
for label, score in results['topics']:
st.progress(float(score), text=f"{label}: {score:.1%}")
if translation_model_id is None:
st.markdown("### πŸ“ TLDR")
st.success(results['summary_en'])
else:
st.markdown(f"### πŸ“ TLDR ({selected_lang})")
st.success(results['summary_translated'])
# ══════════════════════════════════════════════════════════════════════
# SECCIΓ“N Q&A
# ══════════════════════════════════════════════════════════════════════
st.markdown("---")
st.header("πŸ’¬ PregΓΊntale al Paper")
st.markdown(
"Escribe cualquier pregunta sobre el contenido del paper. "
"El modelo **RoBERTa-SQuAD2** buscarΓ‘ la respuesta directamente "
"en el texto original."
)
# Preguntas sugeridas como botones rΓ‘pidos
# Preguntas sugeridas (originales en inglΓ©s)
SUGGESTED_QUESTIONS_EN = [
"What is the main objective of this paper?",
"What methods or techniques are used?",
"What are the main results or findings?",
"What datasets were used in the experiments?",
"What are the limitations mentioned by the authors?",
"What future work do the authors propose?",
]
# Si hay un idioma de traducciΓ³n seleccionado, traducir las preguntas para mostrarlas
if translation_model_id is not None:
display_questions = []
for q in SUGGESTED_QUESTIONS_EN:
try:
display_questions.append(translate_single(q, translation_model_id))
except Exception:
display_questions.append(q) # fallback al inglΓ©s
else:
display_questions = SUGGESTED_QUESTIONS_EN
st.markdown("**πŸ’‘ Preguntas sugeridas** _(haz clic para usarlas)_:")
cols = st.columns(3)
for i, (q_display, q_original) in enumerate(zip(display_questions, SUGGESTED_QUESTIONS_EN)):
if cols[i % 3].button(q_display, key=f"sugg_{i}", use_container_width=True):
# Guardamos siempre la versiΓ³n en inglΓ©s para que el modelo QA funcione
st.session_state["pending_question"] = q_original
# Input de pregunta libre
user_question = st.text_input(
"✏️ O escribe tu propia pregunta en inglés:",
placeholder="e.g. What neural architecture is proposed?",
key="qa_input",
)
# Determinar quΓ© pregunta ejecutar
question_to_run = st.session_state.pop("pending_question", None) or (
user_question.strip() if user_question else None
)
if question_to_run:
with st.spinner("πŸ” Buscando respuesta en el paper..."):
try:
qa_pipe = load_qa_model()
qa_result = answer_question(
question_to_run,
st.session_state.paper_text,
qa_pipe,
)
# Traducir la respuesta al idioma seleccionado (si no es inglΓ©s)
if translation_model_id is not None:
translated_answer = translate_single(qa_result["answer"], translation_model_id)
else:
translated_answer = qa_result["answer"]
# Agregar al historial (sin duplicados consecutivos)
if not st.session_state.qa_history or \
st.session_state.qa_history[-1]["question"] != question_to_run:
st.session_state.qa_history.append({
"question": question_to_run, # la pregunta se guarda en inglΓ©s (o como vino)
"answer": translated_answer, # respuesta traducida
"score": qa_result["score"],
"context": qa_result["context"], # el contexto sigue en inglΓ©s para referencia
})
except Exception as e:
st.error(f"Error en Q&A: {e}")
# Mostrar historial de Q&A
if st.session_state.qa_history:
st.markdown("### πŸ“‹ Respuestas")
for qa in reversed(st.session_state.qa_history):
# Icono de confianza
if qa['score'] > 0.6:
icon = "🟒"
label = "Alta confianza"
elif qa['score'] > 0.3:
icon = "🟑"
label = "Confianza media"
else:
icon = "πŸ”΄"
label = "Baja confianza β€” revisa el contexto"
with st.expander(
f"{icon} **{qa['question']}** β€” {label} ({qa['score']:.1%})",
expanded=True,
):
st.markdown(f"**πŸ“Œ Respuesta encontrada:**")
st.info(qa['answer'])
st.caption("πŸ“„ Fragmento del paper donde se encontrΓ³ la respuesta:")
# Resaltar la respuesta dentro del contexto
context_display = qa['context'][:700]
if qa['answer'] in context_display:
context_display = context_display.replace(
qa['answer'],
f"**:blue[{qa['answer']}]**",
)
st.markdown(f"> {context_display}{'...' if len(qa['context']) > 700 else ''}")
if st.button("πŸ—‘οΈ Limpiar historial de preguntas"):
st.session_state.qa_history = []
st.rerun()
# ── BotΓ³n de descarga ─────────────────────────────────────────────────
st.markdown("---")
doc_buffer = generate_word_report(
results, selected_lang, translation_model_id,
qa_history=st.session_state.qa_history,
)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
st.download_button(
label="⬇️ Descargar reporte completo (.docx)",
data=doc_buffer,
file_name=f"paper_analysis_{timestamp}.docx",
mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
use_container_width=True,
)