Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from PyPDF2 import PdfReader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| import os | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain_groq import ChatGroq | |
| from langchain.chains.question_answering import load_qa_chain | |
| from langchain.prompts import PromptTemplate | |
| from dotenv import load_dotenv | |
| import re | |
| load_dotenv() | |
| os.getenv("GROQ_API_KEY") | |
| css_style = """ | |
| <style> | |
| .step-number { font-size: 24px; font-weight: bold; } | |
| .response-box { padding: 20px; background-color: #f8f9fa; border-radius: 10px; border-left: 5px solid #252850; margin: 20px 0; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } | |
| .metadata-box { padding: 20px; background-color: #f0f2f6; border-radius: 10px; margin-bottom: 20px; } | |
| .custom-input { font-size: 16px; padding: 10px; border-radius: 5px; border: 1px solid #ccc; } | |
| .suggestion-container { border: 1px solid #e0e0e0; border-radius: 8px; padding: 15px; margin: 10px 0; background: #f8f9fa; } | |
| .suggestion-btn { width: 100%; margin: 3px 0; padding: 8px; border-radius: 5px; border: 1px solid #252850; background: white; cursor: pointer; transition: all 0.2s; } | |
| .suggestion-btn:hover { background: #252850; color: white; } | |
| </style> | |
| """ | |
| def eliminar_proceso_pensamiento(texto): | |
| texto_limpio = re.sub(r'<.*?>', '', texto, flags=re.DOTALL) | |
| lineas = [line.strip() for line in texto_limpio.split('\n') if line.strip()] | |
| return lineas[-1] if lineas else "Respuesta no disponible" | |
| def get_pdf_text(pdf_docs): | |
| text = "" | |
| for pdf in pdf_docs: | |
| pdf_reader = PdfReader(pdf) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| return text | |
| def get_text_chunks(text): | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=5000, chunk_overlap=500) | |
| return text_splitter.split_text(text) | |
| def get_vector_store(text_chunks): | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| return FAISS.from_texts(text_chunks, embedding=embeddings) | |
| def get_conversational_chain(): | |
| prompt_template = """ | |
| Responde en español exclusivamente con la información solicitada usando el contexto, además sé lo más extenso y detallado posible | |
| siempre que se pueda desarollar, como explicando el contenido de referencias nombradas. | |
| Formato: Respuesta directa sin prefijos. Si no hay información, di "No disponible". | |
| Contexto: | |
| {context} | |
| Pregunta: | |
| {question} | |
| Respuesta: | |
| """ | |
| model = ChatGroq( | |
| temperature=0.2, | |
| model_name="deepseek-r1-distill-llama-70b", | |
| groq_api_key=os.getenv("GROQ_API_KEY") | |
| ) | |
| return load_qa_chain(model, chain_type="stuff", prompt=PromptTemplate(template=prompt_template, input_variables=["context", "question"])) | |
| def extract_metadata(vector_store): | |
| metadata_questions = { | |
| "title": "¿Cuál es el título principal del documento? Formato: Respuesta simple con algunas letras en mayúscula si hiciera falta", | |
| "entity": "¿A qué organización pertenece este documento?. Formato: Respuesta directa con el nombre de la entidad.", | |
| "date": "¿A qué fecha corresponde el documento? Si existen indicios indica la fecha, sino di 'No disponible'" | |
| } | |
| metadata = {} | |
| chain = get_conversational_chain() | |
| for key, question in metadata_questions.items(): | |
| docs = vector_store.similarity_search(question, k=2) | |
| response = chain({"input_documents": docs, "question": question}, return_only_outputs=True) | |
| clean_response = eliminar_proceso_pensamiento(response['output_text']) | |
| metadata[key] = clean_response if clean_response else "No disponible" | |
| return metadata | |
| def mostrar_respuesta(texto): | |
| st.markdown(f'<div class="response-box">{texto}</div>', unsafe_allow_html=True) | |
| def generar_sugerencias(): | |
| if 'vector_store' not in st.session_state: | |
| return | |
| try: | |
| docs = st.session_state.vector_store.similarity_search("", k=3) | |
| context = "\n".join([doc.page_content for doc in docs]) | |
| prompt_template = """ | |
| Genera exactamente 3 preguntas en español basadas en el contexto. | |
| Las preguntas deben ser en español, simples y sencillas de máximo 10 palabras. | |
| Formato de respuesta: | |
| 1. [Pregunta completa en español] | |
| 2. [Pregunta completa en español] | |
| 3. [Pregunta completa en español] | |
| Contexto: | |
| {context} | |
| """ | |
| model = ChatGroq( | |
| temperature=0.4, | |
| model_name="deepseek-r1-distill-llama-70b", | |
| groq_api_key=os.getenv("GROQ_API_KEY") | |
| ) | |
| response = model.invoke(prompt_template.format(context=context)) | |
| preguntas = [] | |
| for line in response.content.split("\n"): | |
| line = line.strip() | |
| if line and line[0].isdigit(): | |
| pregunta = line.split('. ', 1)[1] if '. ' in line else line[2:] | |
| if pregunta: | |
| preguntas.append(pregunta) | |
| return preguntas[:3] | |
| except Exception as e: | |
| st.error(f"Error generando sugerencias: {str(e)}") | |
| return | |
| def procesar_consulta(user_question): | |
| if 'vector_store' not in st.session_state: | |
| st.error("Por favor carga un documento primero") | |
| return | |
| chain = get_conversational_chain() | |
| docs = st.session_state.vector_store.similarity_search(user_question) | |
| with st.spinner("Analizando documento..."): | |
| response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) | |
| respuesta_final = eliminar_proceso_pensamiento(response['output_text']) | |
| mostrar_respuesta(respuesta_final) | |
| def main(): | |
| st.set_page_config(page_title="PDF Consultor 🔍", page_icon="🔍", layout="wide") | |
| st.markdown(css_style, unsafe_allow_html=True) | |
| st.markdown('<h1>PDF Consultor 🔍</h1>', unsafe_allow_html=True) | |
| estados = { | |
| 'documento_cargado': False, | |
| 'sugerencias': [], | |
| 'pregunta_actual': "", | |
| 'respuestas': [] | |
| } | |
| for key, value in estados.items(): | |
| if key not in st.session_state: | |
| st.session_state[key] = value | |
| with st.sidebar: | |
| st.markdown('<p class="step-number">1 Subir archivos</p>', unsafe_allow_html=True) | |
| pdf_docs = st.file_uploader("Subir PDF(s)", accept_multiple_files=True, type=["pdf"], label_visibility="collapsed") | |
| if pdf_docs and not st.session_state.documento_cargado: | |
| with st.spinner("Analizando documento..."): | |
| try: | |
| raw_text = get_pdf_text(pdf_docs) | |
| text_chunks = get_text_chunks(raw_text) | |
| vector_store = get_vector_store(text_chunks) | |
| st.session_state.metadata = extract_metadata(vector_store) | |
| st.session_state.vector_store = vector_store | |
| st.session_state.documento_cargado = True | |
| st.session_state.sugerencias = generar_sugerencias() | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error procesando documento: {str(e)}") | |
| if 'metadata' in st.session_state: | |
| st.markdown("---") | |
| cols = st.columns(3) | |
| campos_metadata = [ | |
| ("📄 Título", "title"), | |
| ("🏛️ Entidad", "entity"), | |
| ("📅 Fecha", "date") | |
| ] | |
| for col, (icono, key) in zip(cols, campos_metadata): | |
| with col: | |
| st.markdown(f""" | |
| <div class="metadata-box"> | |
| <div style="font-size:16px; margin-bottom:10px;">{icono}</div> | |
| {st.session_state.metadata[key]} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| if st.session_state.sugerencias: | |
| st.markdown("---") | |
| with st.container(): | |
| st.markdown(""" | |
| <div class="suggestion-container"> | |
| <div style="font-size:14px; color:#666; margin-bottom:8px;">💡 ¿Necesitas ideas?</div> | |
| """, unsafe_allow_html=True) | |
| cols_sugerencias = st.columns(3) | |
| for i, (col, pregunta) in enumerate(zip(cols_sugerencias, st.session_state.sugerencias)): | |
| with col: | |
| if st.button(pregunta, key=f"sug_{i}", help="Haz clic para usar esta pregunta", use_container_width=True): | |
| st.session_state.pregunta_actual = pregunta | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| if st.session_state.documento_cargado: | |
| with st.form(key="consulta_form"): | |
| col1, col2 = st.columns([5, 1]) | |
| with col1: | |
| pregunta_usuario = st.text_input("Escribe tu pregunta:", value=st.session_state.get('pregunta_actual', ''), placeholder="Ej: ¿De qué trata este documento?", label_visibility="collapsed") | |
| with col2: | |
| st.markdown("<br>", unsafe_allow_html=True) | |
| enviar = st.form_submit_button("Enviar ▶") | |
| if enviar or st.session_state.pregunta_actual: | |
| pregunta_final = pregunta_usuario or st.session_state.pregunta_actual | |
| procesar_consulta(pregunta_final) | |
| if 'pregunta_actual' in st.session_state: | |
| del st.session_state.pregunta_actual | |
| elif not st.session_state.documento_cargado: | |
| st.info("Por favor, sube un documento PDF para comenzar.") | |
| if __name__ == "__main__": | |
| main() |