import streamlit as st import fitz # PyMuPDF import torch import io import re import time from datetime import datetime from docx import Document from transformers import ( AutoTokenizer, AutoModelForSeq2SeqLM, MarianMTModel, MarianTokenizer, pipeline, ) import gc import os # ══════════════════════════════════════════════════════════════════════════════ # CONFIGURACIÓN INICIAL # ══════════════════════════════════════════════════════════════════════════════ st.set_page_config(page_title="Paper Analyzer", page_icon="🔬", layout="wide") st.title("🔬 Paper Analyzer") st.markdown("Sube un paper científico en PDF y obtén un análisis completo con IA.") os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128" device = 0 if torch.cuda.is_available() else -1 device_name = torch.cuda.get_device_name(0) if device == 0 else "CPU" if device == 0: st.sidebar.success(f"🟢 GPU: {device_name}") else: st.sidebar.warning("🟡 Usando CPU") # ─── Idiomas disponibles ───────────────────────────────────────────────────── LANGUAGES = { "🇺🇸 English": None, "🇲🇽 Español": "Helsinki-NLP/opus-mt-en-es", "🇰🇷 한국어": "Helsinki-NLP/opus-mt-en-ko", "🇫🇷 Francais": "Helsinki-NLP/opus-mt-en-fr", "🇩🇪 Deutsch ": "Helsinki-NLP/opus-mt-en-de", "🇨🇳 中文 ": "Helsinki-NLP/opus-mt-en-zh", "🇯🇵 日本語": "Helsinki-NLP/opus-mt-en-jap", } # ─── Áreas temáticas ───────────────────────────────────────────────────────── TOPIC_LABELS = [ "Physics", "Computer Science", "Medicine", "Biology", "Chemistry", "Engineering", "Mathematics", "Economics", "Psychology", "Aerospace", "Materials Science", "Neuroscience" ] # ══════════════════════════════════════════════════════════════════════════════ # CACHÉ DE MODELOS (se cargan una sola vez por sesión) # ══════════════════════════════════════════════════════════════════════════════ @st.cache_resource(show_spinner=False) def load_summarizer(): """Carga el modelo LED especializado en papers largos.""" model_name = "allenai/led-large-16384-arxiv" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSeq2SeqLM.from_pretrained(model_name) if device == 0: model = model.to("cuda") return pipeline("summarization", model=model, tokenizer=tokenizer, device=device, max_length=500, min_length=60, truncation=True, no_repeat_ngram_size=3) @st.cache_resource(show_spinner=False) def load_ner(): """Carga el modelo NER para autores/organizaciones.""" return pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=device) @st.cache_resource(show_spinner=False) def load_classifier(): """Modelo ligero zero-shot para clasificación temática.""" return pipeline("zero-shot-classification", model="MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli", device=device) @st.cache_resource(show_spinner=False) def load_qa_model(): """ Modelo extractivo de QA basado en RoBERTa fine-tuned en SQuAD2. - Gratuito y open-source en Hugging Face. - No requiere GPU (funciona bien en CPU). - Especializado en responder preguntas con evidencia textual directa. Modelo: deepset/roberta-base-squad2 """ return pipeline( "question-answering", model="deepset/roberta-base-squad2", tokenizer="deepset/roberta-base-squad2", device=device, ) @st.cache_resource(show_spinner=False) def load_translator(model_name): """Carga un modelo de traducción MarianMT. Se cachea por idioma.""" tok = MarianTokenizer.from_pretrained(model_name) model = MarianMTModel.from_pretrained(model_name) if device == 0: model = model.to("cuda") return tok, model # ══════════════════════════════════════════════════════════════════════════════ # FUNCIONES AUXILIARES – TEXTO # ══════════════════════════════════════════════════════════════════════════════ def translate_single(text, model_name): """Traduce un texto corto usando el modelo MarianMT.""" if not text or model_name is None: return text try: tok, model = load_translator(model_name) inputs = tok(text, return_tensors="pt", padding=True, truncation=True, max_length=512) if device == 0: inputs = {k: v.to("cuda") for k, v in inputs.items()} out = model.generate(**inputs) return tok.decode(out[0], skip_special_tokens=True) except Exception: return text def extract_text_from_pdf(pdf_bytes): """Extrae todo el texto de un PDF a partir de sus bytes.""" doc = fitz.open(stream=pdf_bytes, filetype="pdf") text = "".join(page.get_text() for page in doc) metadata = doc.metadata doc.close() return text.strip(), metadata def robust_sentence_split(text): """ Divide un texto en oraciones respetando abreviaturas comunes. Primero parte por signos de puntuación seguidos de espacio y luego recombina los segmentos que terminan en una abreviatura conocida. """ # Conjunto de abreviaturas que suelen llevar punto pero no finalizan oración ABBREVIATIONS = { "Mr", "Mrs", "Ms", "Dr", "Prof", "Sr", "Jr", "e.g", "i.e", "vs", "etc", "al", "Vol", "Fig", "Eq", "Sec", "Ref", "No", "St", "dept", "approx", "vs", "Jan", "Feb", "Mar", "Apr", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" } # Partición inicial por . ! ? seguidos de espacios (o final de cadena) raw_parts = re.split(r'(?<=[.!?])\s+', text) sentences = [] i = 0 while i < len(raw_parts): part = raw_parts[i].strip() # Si la parte está vacía, la ignoramos if not part: i += 1 continue # Si la parte termina en punto y la palabra anterior es una abreviatura, # la juntamos con la siguiente parte (si existe) if part.endswith('.') and i + 1 < len(raw_parts): # Extraemos la última "palabra" con el punto last_word_with_dot = part.split()[-1].lower() if part.split() else '' # Si coincide con alguna abreviatura (sin punto, porque ya lo quitamos) if last_word_with_dot.rstrip('.') in ABBREVIATIONS: # Unimos con el siguiente fragmento part = part + ' ' + raw_parts[i+1].strip() i += 2 sentences.append(part.strip()) continue sentences.append(part) i += 1 # Si no se generó ninguna frase, devolvemos el texto original como una sola return sentences if sentences else [text.strip()] def translate_text(text, model_name): """Traduce un texto usando MarianMT con lotes por oraciones.""" tok, model = load_translator(model_name) sentences = robust_sentence_split(text) translated = [] for sent in sentences: if not sent: continue inputs = tok(sent, return_tensors="pt", padding=True, truncation=True, max_length=512) if device == 0: inputs = {k: v.to("cuda") for k, v in inputs.items()} out = model.generate(**inputs) translated.append(tok.decode(out[0], skip_special_tokens=True)) return " ".join(translated) # ══════════════════════════════════════════════════════════════════════════════ # FUNCIONES AUXILIARES – Q&A # ══════════════════════════════════════════════════════════════════════════════ def split_into_chunks(text: str, chunk_size: int = 400, overlap: int = 50) -> list: """ Divide el texto en ventanas de `chunk_size` palabras con solapamiento. El solapamiento evita que respuestas que caen en la frontera de dos chunks queden truncadas. """ words = text.split() chunks = [] start = 0 while start < len(words): end = min(start + chunk_size, len(words)) chunks.append(" ".join(words[start:end])) if end == len(words): break start += chunk_size - overlap return chunks def answer_question(question: str, full_text: str, qa_pipe) -> dict: """ Pipeline de QA sobre documento largo: 1. Divide el texto en chunks con solapamiento. 2. Corre el modelo en cada chunk y se queda con el de mayor score. 3. Devuelve la respuesta, score de confianza y el fragmento de contexto. Por qué esta estrategia: - Los modelos extractivos tienen límite de tokens (~512). - Recorrer todos los chunks garantiza no perder información. - El score interno del modelo (probabilidad softmax) es buen proxy de qué chunk contiene la respuesta real. """ chunks = split_into_chunks(full_text, chunk_size=400, overlap=50) best_score = -1.0 best_answer = "No encontré información suficiente para responder esta pregunta." best_chunk = chunks[0] for chunk in chunks: try: result = qa_pipe(question=question, context=chunk) if result["score"] > best_score: best_score = result["score"] best_answer = result["answer"] best_chunk = chunk except Exception: continue return { "answer": best_answer, "score": best_score, "context": best_chunk, } # ══════════════════════════════════════════════════════════════════════════════ # PIPELINE PRINCIPAL DE ANÁLISIS # ══════════════════════════════════════════════════════════════════════════════ def analyze_paper(text, metadata, selected_lang, translation_model): torch.cuda.empty_cache() gc.collect() """Ejecuta todas las herramientas de análisis en orden.""" results = {} # ── 1. Resumen ──────────────────────────────────────────────────────────── st.info("🔄 Generando resumen con modelo científico...") summarizer = load_summarizer() max_input_tokens = 8192 tokenizer = summarizer.tokenizer tokens = tokenizer.encode(text, add_special_tokens=False, truncation=False) if len(tokens) > max_input_tokens: truncated_tokens = tokens[:max_input_tokens] text_for_summary = tokenizer.decode(truncated_tokens, skip_special_tokens=True) st.warning(f"⚠️ Texto truncado a {max_input_tokens} tokens para el resumen.") else: text_for_summary = text try: summary_list = summarizer( text_for_summary, max_length=500, min_length=60, do_sample=False, num_beams=4, early_stopping=True, truncation=True, ) summary_en = summary_list[0]['summary_text'] except Exception as e: st.error(f"Error en el resumen: {e}") summary_en = "No se pudo generar el resumen." results['summary_en'] = summary_en # ── 2. Métricas ─────────────────────────────────────────────────────────── st.info("🔄 Extrayendo metadatos y entidades...") pdf_authors = metadata.get("author", "") pdf_title = metadata.get("title", "") years = re.findall(r'\b((?:19|20)\d{2})\b', text[:2000]) year = years[0] if years else "No detectado" try: ner = load_ner() entities = ner(text[:3000]) authors_ner, orgs_ner = [], [] for ent in entities: if ent['entity_group'] == 'PER': authors_ner.append(ent['word']) elif ent['entity_group'] == 'ORG': orgs_ner.append(ent['word']) authors_ner = list(dict.fromkeys(authors_ner))[:6] orgs_ner = list(dict.fromkeys(orgs_ner))[:4] except Exception as e: st.warning(f"NER no disponible: {e}") authors_ner, orgs_ner = [], [] if pdf_authors: authors_list = [a.strip() for a in re.split(r'[,;]+', pdf_authors) if a.strip()] else: authors_list = [] for a in authors_ner: if a not in authors_list: authors_list.append(a) results['authors'] = authors_list[:6] if authors_list else ["No detectados"] results['orgs'] = orgs_ner if orgs_ner else ["No detectadas"] results['year'] = year results['title'] = pdf_title if pdf_title else "Título no disponible" # ── 3. Clasificación temática ───────────────────────────────────────────── st.info("🔄 Clasificando el paper en áreas científicas...") classifier = load_classifier() try: classification = classifier( summary_en[:1500], candidate_labels=TOPIC_LABELS, multi_label=False, ) topics = list(zip(classification['labels'][:3], classification['scores'][:3])) except Exception as e: st.error(f"Error en clasificación: {e}") topics = [("Desconocido", 0.0)] results['topics'] = topics # ── 4. Traducción ───────────────────────────────────────────────────────── if translation_model is not None: st.info(f"🔄 Traduciendo TLDR a {selected_lang}...") try: summary_translated = translate_text(summary_en, translation_model) except Exception as e: st.error(f"Error en traducción: {e}") summary_translated = summary_en else: summary_translated = None results['summary_translated'] = summary_translated return results # ══════════════════════════════════════════════════════════════════════════════ # GENERACIÓN DEL INFORME EN WORD # ══════════════════════════════════════════════════════════════════════════════ def generate_word_report(results, selected_lang, translation_model, qa_history=None): """Crea un documento .docx con los resultados del análisis + Q&A.""" doc = Document() doc.add_heading("Paper Analysis Report", level=1) doc.add_heading("Metadatos", level=2) doc.add_paragraph(f"Año: {results['year']}") doc.add_paragraph(f"Autores: {', '.join(results['authors'])}") doc.add_paragraph(f"Instituciones detectadas: {', '.join(results['orgs'])}") doc.add_paragraph(f"Título del PDF: {results.get('title', 'N/A')}") doc.add_heading("Clasificación Temática", level=2) for label, score in results['topics']: doc.add_paragraph(f"{label}: {score:.1%}") if translation_model is None: doc.add_heading("Resumen", level=2) doc.add_paragraph(results['summary_en']) else: doc.add_heading(f"Resumen ({selected_lang})", level=2) doc.add_paragraph(results['summary_translated']) # Historial de preguntas y respuestas if qa_history: doc.add_heading("Preguntas y Respuestas sobre el Paper", level=2) for i, qa in enumerate(qa_history, 1): doc.add_paragraph(f"P{i}: {qa['question']}", style="List Number") p = doc.add_paragraph(f"Respuesta: {qa['answer']}") p.add_run(f" [confianza: {qa['score']:.1%}]").italic = True buffer = io.BytesIO() doc.save(buffer) buffer.seek(0) return buffer # ══════════════════════════════════════════════════════════════════════════════ # INTERFAZ DE USUARIO # ══════════════════════════════════════════════════════════════════════════════ uploaded_file = st.file_uploader("📂 Sube tu paper (PDF)", type=["pdf"]) st.sidebar.markdown("---") st.sidebar.subheader("🌐 Idioma del resumen") selected_lang = st.sidebar.radio("Selecciona idioma:", list(LANGUAGES.keys())) translation_model_id = LANGUAGES[selected_lang] # ── Estado persistente entre reruns ────────────────────────────────────────── if "paper_text" not in st.session_state: st.session_state.paper_text = None if "analysis_done" not in st.session_state: st.session_state.analysis_done = False if "results" not in st.session_state: st.session_state.results = None if "qa_history" not in st.session_state: st.session_state.qa_history = [] if uploaded_file: pdf_bytes = uploaded_file.read() with st.spinner("📄 Leyendo documento..."): try: text, metadata = extract_text_from_pdf(pdf_bytes) st.session_state.paper_text = text except Exception as e: st.error(f"Error al leer el PDF: {e}") st.stop() st.subheader("📃 Vista previa del texto extraído") with st.expander("Ver primeros 3000 caracteres"): st.text(text[:3000] + ("..." if len(text) > 3000 else "")) st.info(f"Total caracteres: {len(text):,}") # ── Botón de análisis ───────────────────────────────────────────────────── if st.button("🚀 Analizar Paper", use_container_width=True): st.session_state.qa_history = [] # reset Q&A al reanalizar st.session_state.analysis_done = False progress_bar = st.progress(0, text="Iniciando análisis...") start_time = time.time() try: progress_bar.progress(10, text="Cargando modelos (primera vez puede tardar)...") results = analyze_paper(text, metadata, selected_lang, translation_model_id) st.session_state.results = results st.session_state.analysis_done = True progress_bar.progress(100, text="¡Análisis completado!") elapsed = time.time() - start_time st.success(f"✅ Análisis finalizado en {elapsed:.1f} segundos.") except Exception as e: st.error(f"Error crítico durante el análisis: {e}") raise e # ── Mostrar resultados ──────────────────────────────────────────────────── if st.session_state.analysis_done and st.session_state.results: results = st.session_state.results st.markdown("---") st.header("📊 Resultados del Análisis") col1, col2, col3 = st.columns(3) with col1: st.metric("📅 Año", results['year']) with col2: st.metric("👥 Autores encontrados", len(results['authors'])) with col3: st.metric("🏷️ Área principal", results['topics'][0][0]) st.markdown("### 👥 Autores") st.write(", ".join(results['authors'])) st.markdown("### 🏛️ Instituciones") st.write(", ".join(results['orgs'])) st.markdown("### 🏷️ Clasificación Temática") for label, score in results['topics']: st.progress(float(score), text=f"{label}: {score:.1%}") if translation_model_id is None: st.markdown("### 📝 TLDR") st.success(results['summary_en']) else: st.markdown(f"### 📝 TLDR ({selected_lang})") st.success(results['summary_translated']) # ══════════════════════════════════════════════════════════════════════ # SECCIÓN Q&A # ══════════════════════════════════════════════════════════════════════ st.markdown("---") st.header("💬 Pregúntale al Paper") st.markdown( "Escribe cualquier pregunta sobre el contenido del paper. " "El modelo **RoBERTa-SQuAD2** buscará la respuesta directamente " "en el texto original." ) # Preguntas sugeridas como botones rápidos # Preguntas sugeridas (originales en inglés) SUGGESTED_QUESTIONS_EN = [ "What is the main objective of this paper?", "What methods or techniques are used?", "What are the main results or findings?", "What datasets were used in the experiments?", "What are the limitations mentioned by the authors?", "What future work do the authors propose?", ] # Si hay un idioma de traducción seleccionado, traducir las preguntas para mostrarlas if translation_model_id is not None: display_questions = [] for q in SUGGESTED_QUESTIONS_EN: try: display_questions.append(translate_single(q, translation_model_id)) except Exception: display_questions.append(q) # fallback al inglés else: display_questions = SUGGESTED_QUESTIONS_EN st.markdown("**💡 Preguntas sugeridas** _(haz clic para usarlas)_:") cols = st.columns(3) for i, (q_display, q_original) in enumerate(zip(display_questions, SUGGESTED_QUESTIONS_EN)): if cols[i % 3].button(q_display, key=f"sugg_{i}", use_container_width=True): # Guardamos siempre la versión en inglés para que el modelo QA funcione st.session_state["pending_question"] = q_original # Input de pregunta libre user_question = st.text_input( "✏️ O escribe tu propia pregunta en inglés:", placeholder="e.g. What neural architecture is proposed?", key="qa_input", ) # Determinar qué pregunta ejecutar question_to_run = st.session_state.pop("pending_question", None) or ( user_question.strip() if user_question else None ) if question_to_run: with st.spinner("🔍 Buscando respuesta en el paper..."): try: qa_pipe = load_qa_model() qa_result = answer_question( question_to_run, st.session_state.paper_text, qa_pipe, ) # Traducir la respuesta al idioma seleccionado (si no es inglés) if translation_model_id is not None: translated_answer = translate_single(qa_result["answer"], translation_model_id) else: translated_answer = qa_result["answer"] # Agregar al historial (sin duplicados consecutivos) if not st.session_state.qa_history or \ st.session_state.qa_history[-1]["question"] != question_to_run: st.session_state.qa_history.append({ "question": question_to_run, # la pregunta se guarda en inglés (o como vino) "answer": translated_answer, # respuesta traducida "score": qa_result["score"], "context": qa_result["context"], # el contexto sigue en inglés para referencia }) except Exception as e: st.error(f"Error en Q&A: {e}") # Mostrar historial de Q&A if st.session_state.qa_history: st.markdown("### 📋 Respuestas") for qa in reversed(st.session_state.qa_history): # Icono de confianza if qa['score'] > 0.6: icon = "🟢" label = "Alta confianza" elif qa['score'] > 0.3: icon = "🟡" label = "Confianza media" else: icon = "🔴" label = "Baja confianza — revisa el contexto" with st.expander( f"{icon} **{qa['question']}** — {label} ({qa['score']:.1%})", expanded=True, ): st.markdown(f"**📌 Respuesta encontrada:**") st.info(qa['answer']) st.caption("📄 Fragmento del paper donde se encontró la respuesta:") # Resaltar la respuesta dentro del contexto context_display = qa['context'][:700] if qa['answer'] in context_display: context_display = context_display.replace( qa['answer'], f"**:blue[{qa['answer']}]**", ) st.markdown(f"> {context_display}{'...' if len(qa['context']) > 700 else ''}") if st.button("🗑️ Limpiar historial de preguntas"): st.session_state.qa_history = [] st.rerun() # ── Botón de descarga ───────────────────────────────────────────────── st.markdown("---") doc_buffer = generate_word_report( results, selected_lang, translation_model_id, qa_history=st.session_state.qa_history, ) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") st.download_button( label="⬇️ Descargar reporte completo (.docx)", data=doc_buffer, file_name=f"paper_analysis_{timestamp}.docx", mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", use_container_width=True, )