| import streamlit as st
|
| import fitz
|
| import torch
|
| import io
|
| import re
|
| import time
|
| from datetime import datetime
|
| from docx import Document
|
| from transformers import (
|
| AutoTokenizer,
|
| AutoModelForSeq2SeqLM,
|
| MarianMTModel,
|
| MarianTokenizer,
|
| pipeline,
|
| )
|
| import gc
|
| import os
|
|
|
|
|
|
|
|
|
|
|
| st.set_page_config(page_title="Paper Analyzer", page_icon="π¬", layout="wide")
|
| st.title("π¬ Paper Analyzer")
|
| st.markdown("Sube un paper cientΓfico en PDF y obtΓ©n un anΓ‘lisis completo con IA.")
|
|
|
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128"
|
| device = 0 if torch.cuda.is_available() else -1
|
| device_name = torch.cuda.get_device_name(0) if device == 0 else "CPU"
|
|
|
| if device == 0:
|
| st.sidebar.success(f"π’ GPU: {device_name}")
|
| else:
|
| st.sidebar.warning("π‘ Usando CPU")
|
|
|
|
|
| LANGUAGES = {
|
| "πΊπΈ English": None,
|
| "π²π½ EspaΓ±ol": "Helsinki-NLP/opus-mt-en-es",
|
| "π°π· νκ΅μ΄": "Helsinki-NLP/opus-mt-en-ko",
|
| "π«π· Francais": "Helsinki-NLP/opus-mt-en-fr",
|
| "π©πͺ Deutsch ": "Helsinki-NLP/opus-mt-en-de",
|
| "π¨π³ δΈζ ": "Helsinki-NLP/opus-mt-en-zh",
|
| "π―π΅ ζ₯ζ¬θͺ": "Helsinki-NLP/opus-mt-en-jap",
|
| }
|
|
|
|
|
| TOPIC_LABELS = [
|
| "Physics", "Computer Science", "Medicine", "Biology",
|
| "Chemistry", "Engineering", "Mathematics", "Economics",
|
| "Psychology", "Aerospace", "Materials Science", "Neuroscience"
|
| ]
|
|
|
|
|
|
|
|
|
|
|
| @st.cache_resource(show_spinner=False)
|
| def load_summarizer():
|
| """Carga el modelo LED especializado en papers largos."""
|
| model_name = "allenai/led-large-16384-arxiv"
|
| tokenizer = AutoTokenizer.from_pretrained(model_name)
|
| model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
|
| if device == 0:
|
| model = model.to("cuda")
|
| return pipeline("summarization", model=model, tokenizer=tokenizer,
|
| device=device, max_length=500, min_length=60,
|
| truncation=True, no_repeat_ngram_size=3)
|
|
|
| @st.cache_resource(show_spinner=False)
|
| def load_ner():
|
| """Carga el modelo NER para autores/organizaciones."""
|
| return pipeline("ner", model="dslim/bert-base-NER",
|
| aggregation_strategy="simple", device=device)
|
|
|
| @st.cache_resource(show_spinner=False)
|
| def load_classifier():
|
| """Modelo ligero zero-shot para clasificaciΓ³n temΓ‘tica."""
|
| return pipeline("zero-shot-classification",
|
| model="MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli",
|
| device=device)
|
|
|
| @st.cache_resource(show_spinner=False)
|
| def load_qa_model():
|
| """
|
| Modelo extractivo de QA basado en RoBERTa fine-tuned en SQuAD2.
|
| - Gratuito y open-source en Hugging Face.
|
| - No requiere GPU (funciona bien en CPU).
|
| - Especializado en responder preguntas con evidencia textual directa.
|
| Modelo: deepset/roberta-base-squad2
|
| """
|
| return pipeline(
|
| "question-answering",
|
| model="deepset/roberta-base-squad2",
|
| tokenizer="deepset/roberta-base-squad2",
|
| device=device,
|
| )
|
|
|
| @st.cache_resource(show_spinner=False)
|
| def load_translator(model_name):
|
| """Carga un modelo de traducciΓ³n MarianMT. Se cachea por idioma."""
|
| tok = MarianTokenizer.from_pretrained(model_name)
|
| model = MarianMTModel.from_pretrained(model_name)
|
| if device == 0:
|
| model = model.to("cuda")
|
| return tok, model
|
|
|
|
|
|
|
|
|
| def translate_single(text, model_name):
|
| """Traduce un texto corto usando el modelo MarianMT."""
|
| if not text or model_name is None:
|
| return text
|
| try:
|
| tok, model = load_translator(model_name)
|
| inputs = tok(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
|
| if device == 0:
|
| inputs = {k: v.to("cuda") for k, v in inputs.items()}
|
| out = model.generate(**inputs)
|
| return tok.decode(out[0], skip_special_tokens=True)
|
| except Exception:
|
| return text
|
|
|
| def extract_text_from_pdf(pdf_bytes):
|
| """Extrae todo el texto de un PDF a partir de sus bytes."""
|
| doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
| text = "".join(page.get_text() for page in doc)
|
| metadata = doc.metadata
|
| doc.close()
|
| return text.strip(), metadata
|
|
|
| def robust_sentence_split(text):
|
| """
|
| Divide un texto en oraciones respetando abreviaturas comunes.
|
| Primero parte por signos de puntuaciΓ³n seguidos de espacio y luego
|
| recombina los segmentos que terminan en una abreviatura conocida.
|
| """
|
|
|
| ABBREVIATIONS = {
|
| "Mr", "Mrs", "Ms", "Dr", "Prof", "Sr", "Jr",
|
| "e.g", "i.e", "vs", "etc", "al", "Vol", "Fig",
|
| "Eq", "Sec", "Ref", "No", "St", "dept", "approx",
|
| "vs", "Jan", "Feb", "Mar", "Apr", "Jun", "Jul",
|
| "Aug", "Sep", "Oct", "Nov", "Dec"
|
| }
|
|
|
|
|
| raw_parts = re.split(r'(?<=[.!?])\s+', text)
|
|
|
| sentences = []
|
| i = 0
|
| while i < len(raw_parts):
|
| part = raw_parts[i].strip()
|
|
|
| if not part:
|
| i += 1
|
| continue
|
|
|
|
|
| if part.endswith('.') and i + 1 < len(raw_parts):
|
|
|
| last_word_with_dot = part.split()[-1].lower() if part.split() else ''
|
|
|
| if last_word_with_dot.rstrip('.') in ABBREVIATIONS:
|
|
|
| part = part + ' ' + raw_parts[i+1].strip()
|
| i += 2
|
| sentences.append(part.strip())
|
| continue
|
| sentences.append(part)
|
| i += 1
|
|
|
|
|
| return sentences if sentences else [text.strip()]
|
|
|
| def translate_text(text, model_name):
|
| """Traduce un texto usando MarianMT con lotes por oraciones."""
|
| tok, model = load_translator(model_name)
|
| sentences = robust_sentence_split(text)
|
| translated = []
|
| for sent in sentences:
|
| if not sent:
|
| continue
|
| inputs = tok(sent, return_tensors="pt", padding=True,
|
| truncation=True, max_length=512)
|
| if device == 0:
|
| inputs = {k: v.to("cuda") for k, v in inputs.items()}
|
| out = model.generate(**inputs)
|
| translated.append(tok.decode(out[0], skip_special_tokens=True))
|
| return " ".join(translated)
|
|
|
|
|
|
|
|
|
|
|
| def split_into_chunks(text: str, chunk_size: int = 400, overlap: int = 50) -> list:
|
| """
|
| Divide el texto en ventanas de `chunk_size` palabras con solapamiento.
|
| El solapamiento evita que respuestas que caen en la frontera de dos
|
| chunks queden truncadas.
|
| """
|
| words = text.split()
|
| chunks = []
|
| start = 0
|
| while start < len(words):
|
| end = min(start + chunk_size, len(words))
|
| chunks.append(" ".join(words[start:end]))
|
| if end == len(words):
|
| break
|
| start += chunk_size - overlap
|
| return chunks
|
|
|
|
|
| def answer_question(question: str, full_text: str, qa_pipe) -> dict:
|
| """
|
| Pipeline de QA sobre documento largo:
|
| 1. Divide el texto en chunks con solapamiento.
|
| 2. Corre el modelo en cada chunk y se queda con el de mayor score.
|
| 3. Devuelve la respuesta, score de confianza y el fragmento de contexto.
|
|
|
| Por quΓ© esta estrategia:
|
| - Los modelos extractivos tienen lΓmite de tokens (~512).
|
| - Recorrer todos los chunks garantiza no perder informaciΓ³n.
|
| - El score interno del modelo (probabilidad softmax) es buen proxy
|
| de quΓ© chunk contiene la respuesta real.
|
| """
|
| chunks = split_into_chunks(full_text, chunk_size=400, overlap=50)
|
|
|
| best_score = -1.0
|
| best_answer = "No encontrΓ© informaciΓ³n suficiente para responder esta pregunta."
|
| best_chunk = chunks[0]
|
|
|
| for chunk in chunks:
|
| try:
|
| result = qa_pipe(question=question, context=chunk)
|
| if result["score"] > best_score:
|
| best_score = result["score"]
|
| best_answer = result["answer"]
|
| best_chunk = chunk
|
| except Exception:
|
| continue
|
|
|
| return {
|
| "answer": best_answer,
|
| "score": best_score,
|
| "context": best_chunk,
|
| }
|
|
|
|
|
|
|
|
|
|
|
| def analyze_paper(text, metadata, selected_lang, translation_model):
|
| torch.cuda.empty_cache()
|
| gc.collect()
|
| """Ejecuta todas las herramientas de anΓ‘lisis en orden."""
|
| results = {}
|
|
|
|
|
| st.info("π Generando resumen con modelo cientΓfico...")
|
| summarizer = load_summarizer()
|
| max_input_tokens = 8192
|
| tokenizer = summarizer.tokenizer
|
| tokens = tokenizer.encode(text, add_special_tokens=False, truncation=False)
|
| if len(tokens) > max_input_tokens:
|
| truncated_tokens = tokens[:max_input_tokens]
|
| text_for_summary = tokenizer.decode(truncated_tokens, skip_special_tokens=True)
|
| st.warning(f"β οΈ Texto truncado a {max_input_tokens} tokens para el resumen.")
|
| else:
|
| text_for_summary = text
|
|
|
| try:
|
| summary_list = summarizer(
|
| text_for_summary,
|
| max_length=500, min_length=60,
|
| do_sample=False, num_beams=4,
|
| early_stopping=True, truncation=True,
|
| )
|
| summary_en = summary_list[0]['summary_text']
|
| except Exception as e:
|
| st.error(f"Error en el resumen: {e}")
|
| summary_en = "No se pudo generar el resumen."
|
| results['summary_en'] = summary_en
|
|
|
|
|
| st.info("π Extrayendo metadatos y entidades...")
|
| pdf_authors = metadata.get("author", "")
|
| pdf_title = metadata.get("title", "")
|
| years = re.findall(r'\b((?:19|20)\d{2})\b', text[:2000])
|
| year = years[0] if years else "No detectado"
|
|
|
| try:
|
| ner = load_ner()
|
| entities = ner(text[:3000])
|
| authors_ner, orgs_ner = [], []
|
| for ent in entities:
|
| if ent['entity_group'] == 'PER':
|
| authors_ner.append(ent['word'])
|
| elif ent['entity_group'] == 'ORG':
|
| orgs_ner.append(ent['word'])
|
| authors_ner = list(dict.fromkeys(authors_ner))[:6]
|
| orgs_ner = list(dict.fromkeys(orgs_ner))[:4]
|
| except Exception as e:
|
| st.warning(f"NER no disponible: {e}")
|
| authors_ner, orgs_ner = [], []
|
|
|
| if pdf_authors:
|
| authors_list = [a.strip() for a in re.split(r'[,;]+', pdf_authors) if a.strip()]
|
| else:
|
| authors_list = []
|
| for a in authors_ner:
|
| if a not in authors_list:
|
| authors_list.append(a)
|
|
|
| results['authors'] = authors_list[:6] if authors_list else ["No detectados"]
|
| results['orgs'] = orgs_ner if orgs_ner else ["No detectadas"]
|
| results['year'] = year
|
| results['title'] = pdf_title if pdf_title else "TΓtulo no disponible"
|
|
|
|
|
| st.info("π Clasificando el paper en Γ‘reas cientΓficas...")
|
| classifier = load_classifier()
|
| try:
|
| classification = classifier(
|
| summary_en[:1500],
|
| candidate_labels=TOPIC_LABELS,
|
| multi_label=False,
|
| )
|
| topics = list(zip(classification['labels'][:3], classification['scores'][:3]))
|
| except Exception as e:
|
| st.error(f"Error en clasificaciΓ³n: {e}")
|
| topics = [("Desconocido", 0.0)]
|
| results['topics'] = topics
|
|
|
|
|
| if translation_model is not None:
|
| st.info(f"π Traduciendo TLDR a {selected_lang}...")
|
| try:
|
| summary_translated = translate_text(summary_en, translation_model)
|
| except Exception as e:
|
| st.error(f"Error en traducciΓ³n: {e}")
|
| summary_translated = summary_en
|
| else:
|
| summary_translated = None
|
| results['summary_translated'] = summary_translated
|
|
|
| return results
|
|
|
|
|
|
|
|
|
|
|
| def generate_word_report(results, selected_lang, translation_model, qa_history=None):
|
| """Crea un documento .docx con los resultados del anΓ‘lisis + Q&A."""
|
| doc = Document()
|
| doc.add_heading("Paper Analysis Report", level=1)
|
|
|
| doc.add_heading("Metadatos", level=2)
|
| doc.add_paragraph(f"AΓ±o: {results['year']}")
|
| doc.add_paragraph(f"Autores: {', '.join(results['authors'])}")
|
| doc.add_paragraph(f"Instituciones detectadas: {', '.join(results['orgs'])}")
|
| doc.add_paragraph(f"TΓtulo del PDF: {results.get('title', 'N/A')}")
|
|
|
| doc.add_heading("ClasificaciΓ³n TemΓ‘tica", level=2)
|
| for label, score in results['topics']:
|
| doc.add_paragraph(f"{label}: {score:.1%}")
|
|
|
| if translation_model is None:
|
| doc.add_heading("Resumen", level=2)
|
| doc.add_paragraph(results['summary_en'])
|
| else:
|
| doc.add_heading(f"Resumen ({selected_lang})", level=2)
|
| doc.add_paragraph(results['summary_translated'])
|
|
|
|
|
| if qa_history:
|
| doc.add_heading("Preguntas y Respuestas sobre el Paper", level=2)
|
| for i, qa in enumerate(qa_history, 1):
|
| doc.add_paragraph(f"P{i}: {qa['question']}", style="List Number")
|
| p = doc.add_paragraph(f"Respuesta: {qa['answer']}")
|
| p.add_run(f" [confianza: {qa['score']:.1%}]").italic = True
|
|
|
| buffer = io.BytesIO()
|
| doc.save(buffer)
|
| buffer.seek(0)
|
| return buffer
|
|
|
|
|
|
|
|
|
|
|
| uploaded_file = st.file_uploader("π Sube tu paper (PDF)", type=["pdf"])
|
|
|
| st.sidebar.markdown("---")
|
| st.sidebar.subheader("π Idioma del resumen")
|
| selected_lang = st.sidebar.radio("Selecciona idioma:", list(LANGUAGES.keys()))
|
| translation_model_id = LANGUAGES[selected_lang]
|
|
|
|
|
| if "paper_text" not in st.session_state: st.session_state.paper_text = None
|
| if "analysis_done" not in st.session_state: st.session_state.analysis_done = False
|
| if "results" not in st.session_state: st.session_state.results = None
|
| if "qa_history" not in st.session_state: st.session_state.qa_history = []
|
|
|
| if uploaded_file:
|
| pdf_bytes = uploaded_file.read()
|
|
|
| with st.spinner("π Leyendo documento..."):
|
| try:
|
| text, metadata = extract_text_from_pdf(pdf_bytes)
|
| st.session_state.paper_text = text
|
| except Exception as e:
|
| st.error(f"Error al leer el PDF: {e}")
|
| st.stop()
|
|
|
| st.subheader("π Vista previa del texto extraΓdo")
|
| with st.expander("Ver primeros 3000 caracteres"):
|
| st.text(text[:3000] + ("..." if len(text) > 3000 else ""))
|
| st.info(f"Total caracteres: {len(text):,}")
|
|
|
|
|
| if st.button("π Analizar Paper", use_container_width=True):
|
| st.session_state.qa_history = []
|
| st.session_state.analysis_done = False
|
| progress_bar = st.progress(0, text="Iniciando anΓ‘lisis...")
|
| start_time = time.time()
|
|
|
| try:
|
| progress_bar.progress(10, text="Cargando modelos (primera vez puede tardar)...")
|
| results = analyze_paper(text, metadata, selected_lang, translation_model_id)
|
| st.session_state.results = results
|
| st.session_state.analysis_done = True
|
| progress_bar.progress(100, text="Β‘AnΓ‘lisis completado!")
|
| elapsed = time.time() - start_time
|
| st.success(f"β
AnΓ‘lisis finalizado en {elapsed:.1f} segundos.")
|
| except Exception as e:
|
| st.error(f"Error crΓtico durante el anΓ‘lisis: {e}")
|
| raise e
|
|
|
|
|
| if st.session_state.analysis_done and st.session_state.results:
|
| results = st.session_state.results
|
|
|
| st.markdown("---")
|
| st.header("π Resultados del AnΓ‘lisis")
|
|
|
| col1, col2, col3 = st.columns(3)
|
| with col1:
|
| st.metric("π
AΓ±o", results['year'])
|
| with col2:
|
| st.metric("π₯ Autores encontrados", len(results['authors']))
|
| with col3:
|
| st.metric("π·οΈ Γrea principal", results['topics'][0][0])
|
|
|
| st.markdown("### π₯ Autores")
|
| st.write(", ".join(results['authors']))
|
|
|
| st.markdown("### ποΈ Instituciones")
|
| st.write(", ".join(results['orgs']))
|
|
|
| st.markdown("### π·οΈ ClasificaciΓ³n TemΓ‘tica")
|
| for label, score in results['topics']:
|
| st.progress(float(score), text=f"{label}: {score:.1%}")
|
|
|
| if translation_model_id is None:
|
| st.markdown("### π TLDR")
|
| st.success(results['summary_en'])
|
| else:
|
| st.markdown(f"### π TLDR ({selected_lang})")
|
| st.success(results['summary_translated'])
|
|
|
|
|
|
|
|
|
| st.markdown("---")
|
| st.header("π¬ PregΓΊntale al Paper")
|
| st.markdown(
|
| "Escribe cualquier pregunta sobre el contenido del paper. "
|
| "El modelo **RoBERTa-SQuAD2** buscarΓ‘ la respuesta directamente "
|
| "en el texto original."
|
| )
|
|
|
|
|
|
|
| SUGGESTED_QUESTIONS_EN = [
|
| "What is the main objective of this paper?",
|
| "What methods or techniques are used?",
|
| "What are the main results or findings?",
|
| "What datasets were used in the experiments?",
|
| "What are the limitations mentioned by the authors?",
|
| "What future work do the authors propose?",
|
| ]
|
|
|
|
|
| if translation_model_id is not None:
|
| display_questions = []
|
| for q in SUGGESTED_QUESTIONS_EN:
|
| try:
|
| display_questions.append(translate_single(q, translation_model_id))
|
| except Exception:
|
| display_questions.append(q)
|
| else:
|
| display_questions = SUGGESTED_QUESTIONS_EN
|
|
|
| st.markdown("**π‘ Preguntas sugeridas** _(haz clic para usarlas)_:")
|
| cols = st.columns(3)
|
| for i, (q_display, q_original) in enumerate(zip(display_questions, SUGGESTED_QUESTIONS_EN)):
|
| if cols[i % 3].button(q_display, key=f"sugg_{i}", use_container_width=True):
|
|
|
| st.session_state["pending_question"] = q_original
|
|
|
|
|
| user_question = st.text_input(
|
| "βοΈ O escribe tu propia pregunta en inglΓ©s:",
|
| placeholder="e.g. What neural architecture is proposed?",
|
| key="qa_input",
|
| )
|
|
|
|
|
| question_to_run = st.session_state.pop("pending_question", None) or (
|
| user_question.strip() if user_question else None
|
| )
|
|
|
| if question_to_run:
|
| with st.spinner("π Buscando respuesta en el paper..."):
|
| try:
|
| qa_pipe = load_qa_model()
|
| qa_result = answer_question(
|
| question_to_run,
|
| st.session_state.paper_text,
|
| qa_pipe,
|
| )
|
|
|
|
|
| if translation_model_id is not None:
|
| translated_answer = translate_single(qa_result["answer"], translation_model_id)
|
| else:
|
| translated_answer = qa_result["answer"]
|
|
|
|
|
| if not st.session_state.qa_history or \
|
| st.session_state.qa_history[-1]["question"] != question_to_run:
|
| st.session_state.qa_history.append({
|
| "question": question_to_run,
|
| "answer": translated_answer,
|
| "score": qa_result["score"],
|
| "context": qa_result["context"],
|
| })
|
| except Exception as e:
|
| st.error(f"Error en Q&A: {e}")
|
|
|
|
|
| if st.session_state.qa_history:
|
| st.markdown("### π Respuestas")
|
| for qa in reversed(st.session_state.qa_history):
|
|
|
| if qa['score'] > 0.6:
|
| icon = "π’"
|
| label = "Alta confianza"
|
| elif qa['score'] > 0.3:
|
| icon = "π‘"
|
| label = "Confianza media"
|
| else:
|
| icon = "π΄"
|
| label = "Baja confianza β revisa el contexto"
|
|
|
| with st.expander(
|
| f"{icon} **{qa['question']}** β {label} ({qa['score']:.1%})",
|
| expanded=True,
|
| ):
|
| st.markdown(f"**π Respuesta encontrada:**")
|
| st.info(qa['answer'])
|
|
|
| st.caption("π Fragmento del paper donde se encontrΓ³ la respuesta:")
|
|
|
| context_display = qa['context'][:700]
|
| if qa['answer'] in context_display:
|
| context_display = context_display.replace(
|
| qa['answer'],
|
| f"**:blue[{qa['answer']}]**",
|
| )
|
| st.markdown(f"> {context_display}{'...' if len(qa['context']) > 700 else ''}")
|
|
|
| if st.button("ποΈ Limpiar historial de preguntas"):
|
| st.session_state.qa_history = []
|
| st.rerun()
|
|
|
|
|
| st.markdown("---")
|
| doc_buffer = generate_word_report(
|
| results, selected_lang, translation_model_id,
|
| qa_history=st.session_state.qa_history,
|
| )
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| st.download_button(
|
| label="β¬οΈ Descargar reporte completo (.docx)",
|
| data=doc_buffer,
|
| file_name=f"paper_analysis_{timestamp}.docx",
|
| mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
| use_container_width=True,
|
| ) |