Pandas / app.py
Geoeasy's picture
Update app.py
fc89b40 verified
"""
RAG completo em Gradio usando:
- Crawler para o Pandas (links internos)
- Chunking + FAISS (vector store local)
- Embeddings e LLM via NVIDIA NIM (API- compatível com OpenAI)
Como usar:
1) Instale dependências:
pip install gradio requests beautifulsoup4 langchain langchain-community faiss-cpu sentence-transformers langchain-nvidia-ai-endpoints
2) Defina a sua chave da NVIDIA (NIM):
export NVIDIA_API_KEY="SEU_TOKEN"
# ou em Windows PowerShell: $env:NVIDIA_API_KEY="SEU_TOKEN"
3) Rode o app:
python app.py
Notas:
- O índice FAISS é salvo em ./indices/pandas_userg e reutilizado nas próximas execuções.
- O crawler respeita robots.txt e limita a taxa de requisições (SLEEP_SECONDS).
- Você pode limitar o número de páginas durante testes definindo MAX_PAGES.
Trocar modelos:
- LLM: mude `LLM_MODEL` (ex.: "meta/llama-3.1-8b-instruct", "mistralai/mixtral-8x7b-instruct-v0.1", etc.)
- Embeddings: mude `EMBED_MODEL` (ex.: "nvidia/nv-embed-v1")
"""
from __future__ import annotations
import os
import re
import time
import queue
import logging
import base64
from io import StringIO
from typing import List, Dict, Set, Tuple
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse, urldefrag
import urllib.robotparser as robotparser
import gradio as gr
# LangChain & vector search
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
# NVIDIA NIM endpoints (LangChain integration)
from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings, ChatNVIDIA
# ----------------------------
# Log / Observabilidade
# ----------------------------
class InMemoryLogHandler(logging.Handler):
def __init__(self):
super().__init__()
self.buffer = StringIO()
def emit(self, record):
msg = self.format(record)
self.buffer.write(msg + "\n")
def get_value(self):
return self.buffer.getvalue()
def clear(self):
self.buffer.seek(0)
self.buffer.truncate(0)
logger = logging.getLogger("rag_pandas")
logger.setLevel(logging.INFO)
_stream_handler = logging.StreamHandler()
_stream_handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logger.addHandler(_stream_handler)
mem_handler = InMemoryLogHandler()
mem_handler.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s", datefmt="%H:%M:%S"))
logger.addHandler(mem_handler)
# ----------------------------
# Configurações
# ----------------------------
BASE_URL = "https://pandas.pydata.org/docs/user_guide/index.html"
SAVE_DIR = os.path.join("indices")
USER_AGENT = "RAG-Indexer/1.0 (+https://example.com/contact)"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
REQUEST_TIMEOUT = 25
SLEEP_SECONDS = 0.6
MAX_PAGES = None
ALLOWED_NETLOC = urlparse(BASE_URL).netloc
ALLOWED_PREFIX = BASE_URL
# Modelos NVIDIA NIM
EMBED_MODEL = "nvidia/nv-embed-v1"
LLM_MODEL = "meta/llama-3.1-8b-instruct"
# Logo (SVG) — alinhar à esquerda, sem espaços
LOGO_PATH = r"logo.svg"
# ----------------------------
# Utilidades
# ----------------------------
def _clean_text_from_html(html: str) -> str:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "noscript", "header", "footer", "nav", "aside"]):
tag.decompose()
main = soup.find("div", {"role": "main"}) or soup
text = main.get_text("\n", strip=True)
text = re.sub(r"\n{3,}", "\n\n", text)
return text
def _canonicalize(href: str, base: str) -> str:
abs_url = urljoin(base, href)
abs_url, _ = urldefrag(abs_url)
if abs_url.endswith("index.html"):
abs_url = abs_url[:-10]
return abs_url
def _same_site_internal(url: str) -> bool:
u = urlparse(url)
return (u.netloc == ALLOWED_NETLOC) and url.startswith(ALLOWED_PREFIX)
def _is_allowed_by_robots(url: str, rp: robotparser.RobotFileParser) -> bool:
try:
return rp.can_fetch(USER_AGENT, url)
except Exception:
return True
def _fetch(url: str) -> Tuple[int, str]:
resp = requests.get(url, headers={"User-Agent": USER_AGENT}, timeout=REQUEST_TIMEOUT)
return resp.status_code, resp.text
def _svg_data_uri(path: str) -> str | None:
try:
with open(path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("ascii")
return f"data:image/svg+xml;base64,{b64}"
except Exception as e:
logger.warning(f"Logo não encontrado ou inválido: {path} ({e})")
return None
# ----------------------------
# Crawler
# ----------------------------
def crawl_training_manual(start_url: str, max_pages: int | None = None) -> List[Dict]:
robots_url = urljoin(start_url, "/robots.txt")
rp = robotparser.RobotFileParser()
try:
rp.set_url(robots_url)
rp.read()
except Exception:
pass
visited: Set[str] = set()
out: List[Dict] = []
q: queue.Queue[str] = queue.Queue()
q.put(start_url)
while not q.empty():
url = q.get()
if url in visited:
continue
visited.add(url)
if not _same_site_internal(url):
continue
if not _is_allowed_by_robots(url, rp):
continue
try:
status, html = _fetch(url)
except Exception:
continue
if status != 200:
continue
soup = BeautifulSoup(html, "html.parser")
title = soup.title.get_text(strip=True) if soup.title else url
text = _clean_text_from_html(html)
if text:
out.append({"url": url, "title": title, "text": text})
for a in soup.find_all("a", href=True):
href = a["href"].strip()
if href.startswith(("mailto:", "javascript:", "tel:")):
continue
abs_url = _canonicalize(href, url)
if _same_site_internal(abs_url) and abs_url not in visited:
q.put(abs_url)
time.sleep(SLEEP_SECONDS)
if max_pages and len(out) >= max_pages:
break
return out
# ----------------------------
# Indexação
# ----------------------------
def _make_documents(pages: List[Dict]) -> List[Document]:
splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
docs: List[Document] = []
for p in pages:
meta_base = {"source": p["url"], "title": p.get("title", "")}
chunks = splitter.split_text(p["text"])
for i, ch in enumerate(chunks):
meta = dict(meta_base)
meta["chunk"] = i
docs.append(Document(page_content=ch, metadata=meta))
return docs
def build_or_load_index(force_rebuild: bool = False) -> Tuple[FAISS, NVIDIAEmbeddings]:
os.makedirs(SAVE_DIR, exist_ok=True)
embeddings = NVIDIAEmbeddings(model=EMBED_MODEL, api_key=os.getenv("NVIDIA_API_KEY"))
index_path = os.path.join(SAVE_DIR, "index.faiss")
store_path = os.path.join(SAVE_DIR, "index.pkl")
if (not force_rebuild) and os.path.exists(index_path) and os.path.exists(store_path):
db = FAISS.load_local(SAVE_DIR, embeddings, allow_dangerous_deserialization=True)
return db, embeddings
pages = crawl_training_manual(BASE_URL, max_pages=MAX_PAGES)
docs = _make_documents(pages)
db = FAISS.from_documents(docs, embeddings)
db.save_local(SAVE_DIR)
return db, embeddings
# ----------------------------
# LLM & RAG
# ----------------------------
def make_llm() -> ChatNVIDIA:
api_key = os.getenv("NVIDIA_API_KEY")
if not api_key:
raise RuntimeError("Defina NVIDIA_API_KEY no ambiente.")
return ChatNVIDIA(model=LLM_MODEL, api_key=api_key)
def format_answer(question: str, context_docs: List[Document], llm_text: str) -> str:
seen = set()
refs = []
for d in context_docs:
src = d.metadata.get("source", "")
title = d.metadata.get("title", "") or src
key = (title, src)
if key not in seen:
seen.add(key)
refs.append(f"- {title}\n {src}")
if len(refs) >= 5:
break
refs_block = "\n".join(refs) if refs else "- (sem fontes encontradas)"
return f"{llm_text}\n\n---\n**Pergunta:** {question}\n\n**Fontes:**\n{refs_block}"
def rag_answer(db: FAISS, llm: ChatNVIDIA, question: str, k: int = 4, max_context_tokens: int = 2800) -> str:
retriever = db.as_retriever(search_kwargs={"k": k})
docs = retriever.get_relevant_documents(question)
ctx_parts, total = [], 0
for d in docs:
txt = d.page_content.strip()
if total + len(txt) > max_context_tokens:
txt = txt[: max(0, max_context_tokens - total)]
ctx_parts.append(txt)
total += len(txt)
if total >= max_context_tokens:
break
context = "\n\n".join(ctx_parts)
system_msg = (
"Você é um Expert no package Pandas. Responda de forma direta, cite passos práticos e comandos quando útil.\n"
"Se a resposta não estiver clara no contexto, seja honesto sobre a incerteza."
)
user_prompt = (
f"Use APENAS o contexto a seguir para responder. Se faltar informação, diga o que falta.\n\n"
f"### Contexto\n{context}\n\n"
f"### Pergunta\n{question}"
)
from langchain_core.messages import SystemMessage, HumanMessage
messages = [SystemMessage(content=system_msg), HumanMessage(content=user_prompt)]
llm_text = llm.invoke(messages).content
return format_answer(question, docs, llm_text)
# ----------------------------
# Gradio UI
# ----------------------------
db_global: FAISS | None = None
llm_global: ChatNVIDIA | None = None
def _init_once(force_rebuild: bool = False):
global db_global, llm_global
if db_global is None or force_rebuild:
db_global, _ = build_or_load_index(force_rebuild=force_rebuild)
if llm_global is None:
llm_global = make_llm()
def ui_query(question: str, k: int, force_rebuild: bool):
try:
_init_once(force_rebuild)
return rag_answer(db_global, llm_global, question, k=k)
except Exception as e:
return f"Erro: {e}"
def build_ui():
custom_css = """
.gradio-container { padding: 0 !important; } /* remove padding global */
#logo_bar { margin: 0 !important; padding: 0 !important; } /* barra do logo sem espaços */
#logo_bar img { display: block; margin: 0 !important; } /* imagem sem margens */
#title_md { margin-top: 0 !important; } /* título encostado no topo */
"""
with gr.Blocks(title="RAG PANDAS", css=custom_css) as demo:
# LOGO à esquerda, sem espaços
_logo_uri = _svg_data_uri(LOGO_PATH)
if _logo_uri:
gr.HTML(
f'<div id="logo_bar" style="width:100%;display:block;">'
f' <img src="{_logo_uri}" alt="logo" style="height:200px;"/>'
f'</div>'
)
gr.Markdown("""
# Manual do PANDAS
Este app realiza *crawl* do manual, indexa localmente (FAISS).
""", elem_id="title_md")
with gr.Row():
question = gr.Textbox(label="Pergunta", placeholder="Ex.: Como criar um dataframe?")
with gr.Row():
k = gr.Slider(1, 10, value=4, step=1, label="k (nº de trechos)")
rebuild = gr.Checkbox(False, label="Reindexar do zero (forçar crawler)")
btn = gr.Button("Consultar")
output = gr.Markdown()
btn.click(fn=ui_query, inputs=[question, k, rebuild], outputs=output)
gr.Markdown("""
**Dicas**
- A primeira execução pode demorar (crawler + indexação). Nas próximas, o índice é reaproveitado.
- Marque *Reindexar do zero* se quiser atualizar ou refazer o índice.
""")
return demo
if __name__ == "__main__":
demo = build_ui()
demo.launch()