Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import gradio as gr | |
| # LangChain + Google Gemini | |
| from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.document_loaders import PyPDFLoader, TextLoader, Docx2txtLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain.chains import RetrievalQA | |
| from langchain.prompts import PromptTemplate | |
| # ----------------------- | |
| # Utilidades | |
| # ----------------------- | |
| def get_api_key(): | |
| # Lee secretos en runtime para evitar "faltan" claves si el env se inyectó después del import | |
| return os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY") | |
| def _get_path(file_obj): | |
| if file_obj is None: | |
| return None | |
| if hasattr(file_obj, "name"): | |
| return file_obj.name | |
| if isinstance(file_obj, str): | |
| return file_obj | |
| if isinstance(file_obj, dict) and "name" in file_obj: | |
| return file_obj["name"] | |
| return None | |
| def _load_docs(path): | |
| ext = (path or "").lower().split(".")[-1] | |
| if ext == "pdf": | |
| return PyPDFLoader(path).load() | |
| if ext == "docx": | |
| return Docx2txtLoader(path).load() | |
| if ext == "txt": | |
| return TextLoader(path, encoding="utf-8").load() | |
| raise ValueError(f"Formato no soportado: .{ext} (usa PDF, DOCX o TXT)") | |
| def _split(docs): | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=800, chunk_overlap=120, separators=["\n\n", "\n", ". ", " ", ""] | |
| ) | |
| return splitter.split_documents(docs) | |
| def build_chain(retriever, system_message, temperature, api_key): | |
| if retriever is None: | |
| raise RuntimeError("Primero sube e indexa un documento.") | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash", | |
| temperature=float(temperature), | |
| google_api_key=api_key, | |
| ) | |
| template = ( | |
| f"{system_message}\n\n" | |
| "Responde SOLO usando el CONTEXTO recuperado. " | |
| "Si no está en el contexto, di explícitamente que no aparece.\n\n" | |
| "CONTEXTO:\n{{context}}\n\n" | |
| "PREGUNTA:\n{{question}}\n\n" | |
| "RESPUESTA:" | |
| ) | |
| prompt = PromptTemplate(template=template, input_variables=["context", "question"]) | |
| qa = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| retriever=retriever, | |
| chain_type="stuff", | |
| return_source_documents=True, | |
| chain_type_kwargs={"prompt": prompt}, | |
| ) | |
| return qa | |
| # ----------------------- | |
| # Callbacks (sin anotaciones de tipos para evitar schema issues) | |
| # ----------------------- | |
| def on_upload(file, system_message, temperature, emb_state, retr_state, qa_state, last_sys_state, last_temp_state): | |
| api_key = get_api_key() | |
| if not api_key: | |
| return "❌ Falta configurar GOOGLE_API_KEY o GEMINI_API_KEY en Settings → Secrets.", None, None, None, None, None | |
| path = _get_path(file) | |
| if not path: | |
| return "No se ha subido ningún archivo.", emb_state, retr_state, qa_state, last_sys_state, last_temp_state | |
| try: | |
| docs = _load_docs(path) | |
| splits = _split(docs) | |
| # Embeddings (crear si no hay) | |
| emb = emb_state | |
| if emb is None: | |
| emb = GoogleGenerativeAIEmbeddings(model="text-embedding-004", google_api_key=api_key) | |
| # Vectorstore + retriever | |
| vs = FAISS.from_documents(splits, emb) | |
| retriever = vs.as_retriever(search_kwargs={"k": 4}) | |
| first = splits[0].page_content if splits else "" | |
| preview = first[:500].replace("\n", " ") + ("..." if len(first) > 500 else "") | |
| # Resetear QA y marcas | |
| qa = None | |
| last_sys = None | |
| last_temp = None | |
| msg = f"✅ Documento indexado. Chunks: {len(splits)}\n\nPreview:\n{preview}" | |
| return msg, emb, retriever, qa, last_sys, last_temp | |
| except Exception as e: | |
| return f"❌ Error al procesar el archivo: {e}", emb_state, retr_state, qa_state, last_sys_state, last_temp_state | |
| def on_user_send(message, history): | |
| if not message: | |
| return "", history | |
| history = (history or []) + [{"role": "user", "content": message}] | |
| return "", history | |
| def on_bot_reply(history, system_message, max_tokens, temperature, emb_state, retr_state, qa_state, last_sys_state, last_temp_state): | |
| api_key = get_api_key() | |
| if not api_key: | |
| return (history or []) + [{"role": "assistant", "content": "❌ Falta GOOGLE_API_KEY o GEMINI_API_KEY."}], qa_state, last_sys_state, last_temp_state | |
| if not history: | |
| return history, qa_state, last_sys_state, last_temp_state | |
| last = history[-1] | |
| if last.get("role") != "user": | |
| return history, qa_state, last_sys_state, last_temp_state | |
| retriever = retr_state | |
| if retriever is None: | |
| return history + [{"role": "assistant", "content": "Primero sube e indexa un documento."}], qa_state, last_sys_state, last_temp_state | |
| # (Re)construir chain si cambió sistema o temperatura o no existe | |
| need_build = qa_state is None or last_sys_state != system_message or last_temp_state != float(temperature) | |
| if need_build: | |
| qa_state = build_chain(retriever, system_message, float(temperature), api_key) | |
| try: | |
| res = qa_state.invoke({"query": last["content"]}) | |
| answer = res.get("result", "") or "No se obtuvo respuesta." | |
| sources = res.get("source_documents", []) | |
| if sources: | |
| lines = [] | |
| for i, d in enumerate(sources, 1): | |
| src = d.metadata.get("source", "doc") | |
| page = d.metadata.get("page", "?") | |
| preview = (d.page_content or "")[:140].replace("\n", " ") | |
| lines.append(f"{i}. {src} (pág. {page}) → {preview} ...") | |
| answer += "\n\n📚 Fuentes:\n" + "\n".join(lines) | |
| history = history + [{"role": "assistant", "content": answer}] | |
| last_sys_state = system_message | |
| last_temp_state = float(temperature) | |
| return history, qa_state, last_sys_state, last_temp_state | |
| except Exception as e: | |
| return history + [{"role": "assistant", "content": f"❌ Error: {e}"}], qa_state, last_sys_state, last_temp_state | |
| # ----------------------- | |
| # UI | |
| # ----------------------- | |
| with gr.Blocks(title="Chat RAG con LangChain + Gemini") as demo: | |
| gr.Markdown("# 🤖 Chat RAG con LangChain + Gemini (Google)\nSube un documento (PDF, DOCX, TXT) y hazle preguntas.") | |
| # Estados atómicos (evitamos dicts) | |
| emb_state = gr.State(None) # GoogleGenerativeAIEmbeddings | |
| retr_state = gr.State(None) # retriever | |
| qa_state = gr.State(None) # RetrievalQA | |
| last_sys_state = gr.State(None) # str | |
| last_temp_state = gr.State(None) # float | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📄 Subir e indexar documento") | |
| file_upload = gr.File(label="Selecciona un archivo", file_types=[".pdf", ".docx", ".txt"]) | |
| file_status = gr.Textbox(label="Estado del archivo", interactive=False, max_lines=10) | |
| gr.Markdown("### ⚙️ Configuración") | |
| system_message = gr.Textbox( | |
| value=( | |
| "Eres un asistente que responde preguntas basándose EXCLUSIVAMENTE en el documento proporcionado. " | |
| "Si la información no aparece en el contexto recuperado, indícalo explícitamente." | |
| ), | |
| label="Mensaje del sistema", | |
| max_lines=4 | |
| ) | |
| max_tokens = gr.Slider(50, 2048, value=512, step=50, label="Máximo de tokens (informativo)") | |
| temperature = gr.Slider(0.0, 1.0, value=0.2, step=0.1, label="Temperatura") | |
| with gr.Column(scale=2): | |
| gr.Markdown("### 💬 Chat") | |
| chatbot = gr.Chatbot(label="Conversación", height=520, type="messages") | |
| msg = gr.Textbox(label="Tu pregunta", placeholder="Escribe tu pregunta sobre el documento...") | |
| with gr.Row(): | |
| submit_btn = gr.Button("Enviar", variant="primary") | |
| clear_btn = gr.Button("Limpiar chat") | |
| # Eventos | |
| file_upload.change( | |
| fn=on_upload, | |
| inputs=[file_upload, system_message, temperature, emb_state, retr_state, qa_state, last_sys_state, last_temp_state], | |
| outputs=[file_status, emb_state, retr_state, qa_state, last_sys_state, last_temp_state], | |
| ) | |
| msg.submit( | |
| fn=on_user_send, | |
| inputs=[msg, chatbot], | |
| outputs=[msg, chatbot], | |
| queue=False | |
| ).then( | |
| fn=on_bot_reply, | |
| inputs=[chatbot, system_message, max_tokens, temperature, emb_state, retr_state, qa_state, last_sys_state, last_temp_state], | |
| outputs=[chatbot, qa_state, last_sys_state, last_temp_state], | |
| ) | |
| submit_btn.click( | |
| fn=on_user_send, | |
| inputs=[msg, chatbot], | |
| outputs=[msg, chatbot], | |
| queue=False | |
| ).then( | |
| fn=on_bot_reply, | |
| inputs=[chatbot, system_message, max_tokens, temperature, emb_state, retr_state, qa_state, last_sys_state, last_temp_state], | |
| outputs=[chatbot, qa_state, last_sys_state, last_temp_state], | |
| ) | |
| clear_btn.click(lambda: [], None, chatbot, queue=False) | |
| # Lanzar (evitamos API explorer) | |
| demo.queue().launch(show_api=False) | |