| import os |
| import re |
| import requests |
| import gradio as gr |
| import faiss |
| import numpy as np |
|
|
| from pypdf import PdfReader |
| from sentence_transformers import SentenceTransformer |
|
|
| |
| |
| |
|
|
| PDF_URL = "https://www.sanidad.gob.es/gabinetePrensa/notaPrensa/pdf/ComeSanoyMuevete12decisionesSaludables.pdf" |
| PDF_PATH = "documento.pdf" |
|
|
| EMBEDDING_MODEL = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" |
|
|
| CHUNK_SIZE = 800 |
| CHUNK_OVERLAP = 100 |
| TOP_K = 6 |
|
|
| |
| |
| |
|
|
| def descargar_pdf(): |
| if not os.path.exists(PDF_PATH): |
| r = requests.get(PDF_URL, timeout=30) |
| r.raise_for_status() |
| with open(PDF_PATH, "wb") as f: |
| f.write(r.content) |
|
|
| def extraer_paginas(pdf_path): |
| reader = PdfReader(pdf_path) |
| paginas = [] |
| for i, page in enumerate(reader.pages): |
| texto = page.extract_text() |
| if texto: |
| paginas.append({"page": i + 1, "text": texto}) |
| return paginas |
|
|
| |
| |
| |
|
|
| def dividir_texto(texto, chunk_size=800, chunk_overlap=100): |
| chunks = [] |
| inicio = 0 |
| texto = texto.strip() |
|
|
| while inicio < len(texto): |
| fin = inicio + chunk_size |
| chunk = texto[inicio:fin] |
| chunks.append(chunk) |
| inicio += chunk_size - chunk_overlap |
|
|
| return chunks |
|
|
| def construir_chunks(paginas): |
| textos = [] |
| metas = [] |
|
|
| for pagina in paginas: |
| trozos = dividir_texto(pagina["text"], CHUNK_SIZE, CHUNK_OVERLAP) |
| for trozo in trozos: |
| textos.append(trozo) |
| metas.append({"page": pagina["page"]}) |
|
|
| return textos, metas |
|
|
| |
| |
| |
|
|
| def limpiar_texto(texto): |
| texto = texto.replace("\n", " ") |
| texto = re.sub(r"\s+", " ", texto) |
| texto = re.sub(r"\?+", "", texto) |
| texto = re.sub(r"\!+", "", texto) |
| return texto.strip() |
|
|
| def extraer_frases(texto, max_frases=3): |
| texto = limpiar_texto(texto) |
| frases = re.split(r"(?<=[\.\:\;])\s+", texto) |
|
|
| frases_validas = [] |
| for f in frases: |
| f = f.strip() |
| if len(f) > 40: |
| frases_validas.append(f) |
|
|
| return frases_validas[:max_frases] |
|
|
| |
| |
| |
|
|
| print("Descargando PDF...") |
| descargar_pdf() |
|
|
| print("Extrayendo texto del documento...") |
| paginas = extraer_paginas(PDF_PATH) |
| chunk_texts, chunk_meta = construir_chunks(paginas) |
|
|
| print("Cargando modelo de embeddings...") |
| embedder = SentenceTransformer(EMBEDDING_MODEL) |
|
|
| print("Generando embeddings...") |
| embeddings = embedder.encode(chunk_texts, convert_to_numpy=True, show_progress_bar=False) |
| embeddings = embeddings.astype("float32") |
|
|
| dimension = embeddings.shape[1] |
| index = faiss.IndexFlatL2(dimension) |
| index.add(embeddings) |
|
|
| |
| |
| |
|
|
| def recuperar_contexto(query, top_k=6): |
| query_emb = embedder.encode([query], convert_to_numpy=True).astype("float32") |
| distances, indices = index.search(query_emb, top_k) |
|
|
| resultados = [] |
| for idx, dist in zip(indices[0], distances[0]): |
| resultados.append({ |
| "text": chunk_texts[idx], |
| "page": chunk_meta[idx]["page"], |
| "score": float(dist) |
| }) |
|
|
| return resultados |
|
|
| |
| |
| |
|
|
| def construir_respuesta_desde_contexto(query, resultados): |
| if not resultados: |
| return "No tengo información suficiente para responder a esta pregunta." |
|
|
| mejor = resultados[0]["text"] |
| frases = extraer_frases(mejor, max_frases=3) |
|
|
| if not frases: |
| return "No tengo información suficiente para responder a esta pregunta." |
|
|
| respuesta = "Según el documento, " + " ".join(frases) |
|
|
| if len(respuesta) > 900: |
| respuesta = respuesta[:900] + "..." |
|
|
| return respuesta |
|
|
| def responder(query): |
| if not query.strip(): |
| return "Escribe una pregunta.", "", "" |
|
|
| resultados = recuperar_contexto(query, top_k=TOP_K) |
|
|
| respuesta = construir_respuesta_desde_contexto(query, resultados) |
|
|
| fuentes = "\n".join( |
| [f"Página {r['page']} | score={r['score']:.4f}" for r in resultados] |
| ) |
|
|
| contexto_mostrar = "\n\n".join( |
| [f"[Página {r['page']}]\n{limpiar_texto(r['text'])[:500]}..." for r in resultados] |
| ) |
|
|
| return respuesta, fuentes, contexto_mostrar |
|
|
| |
| |
| |
|
|
| examples = [ |
| ["¿Qué dice el documento sobre el desayuno?"], |
| ["¿Qué dice el documento sobre beber agua?"], |
| ["¿Qué recomendaciones da sobre frutas, verduras y fibra?"], |
| ["¿Qué indica el documento sobre la sal y las grasas?"], |
| ["¿Qué dice el documento sobre la actividad física?"] |
| ] |
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# Práctica 9 - Sistema RAG") |
| gr.Markdown( |
| "Haz una pregunta sobre el documento " |
| "**Come sano y muévete: 12 decisiones saludables**." |
| ) |
|
|
| pregunta = gr.Textbox( |
| label="Pregunta", |
| placeholder="Escribe aquí tu pregunta..." |
| ) |
|
|
| boton = gr.Button("Generar respuesta") |
|
|
| respuesta = gr.Textbox(label="Respuesta") |
| fuentes = gr.Textbox(label="Fuentes recuperadas") |
| contexto = gr.Textbox(label="Contexto recuperado", lines=14) |
|
|
| gr.Examples(examples=examples, inputs=pregunta) |
|
|
| boton.click( |
| fn=responder, |
| inputs=pregunta, |
| outputs=[respuesta, fuentes, contexto] |
| ) |
|
|
| demo.launch() |