| """ |
| RAG completo em Gradio usando: |
| - Crawler para o Pandas (links internos) |
| - Chunking + FAISS (vector store local) |
| - Embeddings e LLM via NVIDIA NIM (API- compatível com OpenAI) |
| |
| Como usar: |
| 1) Instale dependências: |
| pip install gradio requests beautifulsoup4 langchain langchain-community faiss-cpu sentence-transformers langchain-nvidia-ai-endpoints |
| |
| 2) Defina a sua chave da NVIDIA (NIM): |
| export NVIDIA_API_KEY="SEU_TOKEN" |
| # ou em Windows PowerShell: $env:NVIDIA_API_KEY="SEU_TOKEN" |
| |
| 3) Rode o app: |
| python app.py |
| |
| Notas: |
| - O índice FAISS é salvo em ./indices/pandas_userg e reutilizado nas próximas execuções. |
| - O crawler respeita robots.txt e limita a taxa de requisições (SLEEP_SECONDS). |
| - Você pode limitar o número de páginas durante testes definindo MAX_PAGES. |
| |
| Trocar modelos: |
| - LLM: mude `LLM_MODEL` (ex.: "meta/llama-3.1-8b-instruct", "mistralai/mixtral-8x7b-instruct-v0.1", etc.) |
| - Embeddings: mude `EMBED_MODEL` (ex.: "nvidia/nv-embed-v1") |
| """ |
| from __future__ import annotations |
|
|
| import os |
| import re |
| import time |
| import queue |
| import logging |
| import base64 |
| from io import StringIO |
| from typing import List, Dict, Set, Tuple |
|
|
| import requests |
| from bs4 import BeautifulSoup |
| from urllib.parse import urljoin, urlparse, urldefrag |
| import urllib.robotparser as robotparser |
|
|
| import gradio as gr |
|
|
| |
| from langchain_community.vectorstores import FAISS |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_core.documents import Document |
| from langchain_core.documents import Document |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
| |
| from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings, ChatNVIDIA |
|
|
| |
| |
| |
| class InMemoryLogHandler(logging.Handler): |
| def __init__(self): |
| super().__init__() |
| self.buffer = StringIO() |
| def emit(self, record): |
| msg = self.format(record) |
| self.buffer.write(msg + "\n") |
| def get_value(self): |
| return self.buffer.getvalue() |
| def clear(self): |
| self.buffer.seek(0) |
| self.buffer.truncate(0) |
|
|
| logger = logging.getLogger("rag_pandas") |
| logger.setLevel(logging.INFO) |
| _stream_handler = logging.StreamHandler() |
| _stream_handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) |
| logger.addHandler(_stream_handler) |
|
|
| mem_handler = InMemoryLogHandler() |
| mem_handler.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s", datefmt="%H:%M:%S")) |
| logger.addHandler(mem_handler) |
|
|
| |
| |
| |
| BASE_URL = "https://pandas.pydata.org/docs/user_guide/index.html" |
| SAVE_DIR = os.path.join("indices") |
|
|
| USER_AGENT = "RAG-Indexer/1.0 (+https://example.com/contact)" |
|
|
| CHUNK_SIZE = 1000 |
| CHUNK_OVERLAP = 200 |
| REQUEST_TIMEOUT = 25 |
| SLEEP_SECONDS = 0.6 |
| MAX_PAGES = None |
| ALLOWED_NETLOC = urlparse(BASE_URL).netloc |
| ALLOWED_PREFIX = BASE_URL |
|
|
| |
| EMBED_MODEL = "nvidia/nv-embed-v1" |
| LLM_MODEL = "meta/llama-3.1-8b-instruct" |
|
|
| |
| LOGO_PATH = r"logo.svg" |
|
|
| |
| |
| |
| def _clean_text_from_html(html: str) -> str: |
| soup = BeautifulSoup(html, "html.parser") |
| for tag in soup(["script", "style", "noscript", "header", "footer", "nav", "aside"]): |
| tag.decompose() |
| main = soup.find("div", {"role": "main"}) or soup |
| text = main.get_text("\n", strip=True) |
| text = re.sub(r"\n{3,}", "\n\n", text) |
| return text |
|
|
| def _canonicalize(href: str, base: str) -> str: |
| abs_url = urljoin(base, href) |
| abs_url, _ = urldefrag(abs_url) |
| if abs_url.endswith("index.html"): |
| abs_url = abs_url[:-10] |
| return abs_url |
|
|
| def _same_site_internal(url: str) -> bool: |
| u = urlparse(url) |
| return (u.netloc == ALLOWED_NETLOC) and url.startswith(ALLOWED_PREFIX) |
|
|
| def _is_allowed_by_robots(url: str, rp: robotparser.RobotFileParser) -> bool: |
| try: |
| return rp.can_fetch(USER_AGENT, url) |
| except Exception: |
| return True |
|
|
| def _fetch(url: str) -> Tuple[int, str]: |
| resp = requests.get(url, headers={"User-Agent": USER_AGENT}, timeout=REQUEST_TIMEOUT) |
| return resp.status_code, resp.text |
|
|
| def _svg_data_uri(path: str) -> str | None: |
| try: |
| with open(path, "rb") as f: |
| b64 = base64.b64encode(f.read()).decode("ascii") |
| return f"data:image/svg+xml;base64,{b64}" |
| except Exception as e: |
| logger.warning(f"Logo não encontrado ou inválido: {path} ({e})") |
| return None |
|
|
| |
| |
| |
| def crawl_training_manual(start_url: str, max_pages: int | None = None) -> List[Dict]: |
| robots_url = urljoin(start_url, "/robots.txt") |
| rp = robotparser.RobotFileParser() |
| try: |
| rp.set_url(robots_url) |
| rp.read() |
| except Exception: |
| pass |
|
|
| visited: Set[str] = set() |
| out: List[Dict] = [] |
| q: queue.Queue[str] = queue.Queue() |
| q.put(start_url) |
|
|
| while not q.empty(): |
| url = q.get() |
| if url in visited: |
| continue |
| visited.add(url) |
|
|
| if not _same_site_internal(url): |
| continue |
| if not _is_allowed_by_robots(url, rp): |
| continue |
|
|
| try: |
| status, html = _fetch(url) |
| except Exception: |
| continue |
| if status != 200: |
| continue |
|
|
| soup = BeautifulSoup(html, "html.parser") |
| title = soup.title.get_text(strip=True) if soup.title else url |
| text = _clean_text_from_html(html) |
| if text: |
| out.append({"url": url, "title": title, "text": text}) |
|
|
| for a in soup.find_all("a", href=True): |
| href = a["href"].strip() |
| if href.startswith(("mailto:", "javascript:", "tel:")): |
| continue |
| abs_url = _canonicalize(href, url) |
| if _same_site_internal(abs_url) and abs_url not in visited: |
| q.put(abs_url) |
|
|
| time.sleep(SLEEP_SECONDS) |
| if max_pages and len(out) >= max_pages: |
| break |
|
|
| return out |
|
|
| |
| |
| |
| def _make_documents(pages: List[Dict]) -> List[Document]: |
| splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) |
| docs: List[Document] = [] |
| for p in pages: |
| meta_base = {"source": p["url"], "title": p.get("title", "")} |
| chunks = splitter.split_text(p["text"]) |
| for i, ch in enumerate(chunks): |
| meta = dict(meta_base) |
| meta["chunk"] = i |
| docs.append(Document(page_content=ch, metadata=meta)) |
| return docs |
|
|
| def build_or_load_index(force_rebuild: bool = False) -> Tuple[FAISS, NVIDIAEmbeddings]: |
| os.makedirs(SAVE_DIR, exist_ok=True) |
| embeddings = NVIDIAEmbeddings(model=EMBED_MODEL, api_key=os.getenv("NVIDIA_API_KEY")) |
|
|
| index_path = os.path.join(SAVE_DIR, "index.faiss") |
| store_path = os.path.join(SAVE_DIR, "index.pkl") |
|
|
| if (not force_rebuild) and os.path.exists(index_path) and os.path.exists(store_path): |
| db = FAISS.load_local(SAVE_DIR, embeddings, allow_dangerous_deserialization=True) |
| return db, embeddings |
|
|
| pages = crawl_training_manual(BASE_URL, max_pages=MAX_PAGES) |
| docs = _make_documents(pages) |
| db = FAISS.from_documents(docs, embeddings) |
| db.save_local(SAVE_DIR) |
| return db, embeddings |
|
|
| |
| |
| |
| def make_llm() -> ChatNVIDIA: |
| api_key = os.getenv("NVIDIA_API_KEY") |
| if not api_key: |
| raise RuntimeError("Defina NVIDIA_API_KEY no ambiente.") |
| return ChatNVIDIA(model=LLM_MODEL, api_key=api_key) |
|
|
| def format_answer(question: str, context_docs: List[Document], llm_text: str) -> str: |
| seen = set() |
| refs = [] |
| for d in context_docs: |
| src = d.metadata.get("source", "") |
| title = d.metadata.get("title", "") or src |
| key = (title, src) |
| if key not in seen: |
| seen.add(key) |
| refs.append(f"- {title}\n {src}") |
| if len(refs) >= 5: |
| break |
| refs_block = "\n".join(refs) if refs else "- (sem fontes encontradas)" |
| return f"{llm_text}\n\n---\n**Pergunta:** {question}\n\n**Fontes:**\n{refs_block}" |
|
|
| def rag_answer(db: FAISS, llm: ChatNVIDIA, question: str, k: int = 4, max_context_tokens: int = 2800) -> str: |
| retriever = db.as_retriever(search_kwargs={"k": k}) |
| docs = retriever.get_relevant_documents(question) |
|
|
| ctx_parts, total = [], 0 |
| for d in docs: |
| txt = d.page_content.strip() |
| if total + len(txt) > max_context_tokens: |
| txt = txt[: max(0, max_context_tokens - total)] |
| ctx_parts.append(txt) |
| total += len(txt) |
| if total >= max_context_tokens: |
| break |
| context = "\n\n".join(ctx_parts) |
|
|
| system_msg = ( |
| "Você é um Expert no package Pandas. Responda de forma direta, cite passos práticos e comandos quando útil.\n" |
| "Se a resposta não estiver clara no contexto, seja honesto sobre a incerteza." |
| ) |
| user_prompt = ( |
| f"Use APENAS o contexto a seguir para responder. Se faltar informação, diga o que falta.\n\n" |
| f"### Contexto\n{context}\n\n" |
| f"### Pergunta\n{question}" |
| ) |
|
|
| from langchain_core.messages import SystemMessage, HumanMessage |
| messages = [SystemMessage(content=system_msg), HumanMessage(content=user_prompt)] |
| llm_text = llm.invoke(messages).content |
| return format_answer(question, docs, llm_text) |
|
|
| |
| |
| |
| db_global: FAISS | None = None |
| llm_global: ChatNVIDIA | None = None |
|
|
| def _init_once(force_rebuild: bool = False): |
| global db_global, llm_global |
| if db_global is None or force_rebuild: |
| db_global, _ = build_or_load_index(force_rebuild=force_rebuild) |
| if llm_global is None: |
| llm_global = make_llm() |
|
|
| def ui_query(question: str, k: int, force_rebuild: bool): |
| try: |
| _init_once(force_rebuild) |
| return rag_answer(db_global, llm_global, question, k=k) |
| except Exception as e: |
| return f"Erro: {e}" |
|
|
| def build_ui(): |
| custom_css = """ |
| .gradio-container { padding: 0 !important; } /* remove padding global */ |
| #logo_bar { margin: 0 !important; padding: 0 !important; } /* barra do logo sem espaços */ |
| #logo_bar img { display: block; margin: 0 !important; } /* imagem sem margens */ |
| #title_md { margin-top: 0 !important; } /* título encostado no topo */ |
| """ |
|
|
| with gr.Blocks(title="RAG PANDAS", css=custom_css) as demo: |
| |
| _logo_uri = _svg_data_uri(LOGO_PATH) |
| if _logo_uri: |
| gr.HTML( |
| f'<div id="logo_bar" style="width:100%;display:block;">' |
| f' <img src="{_logo_uri}" alt="logo" style="height:200px;"/>' |
| f'</div>' |
| ) |
|
|
| gr.Markdown(""" |
| # Manual do PANDAS |
| Este app realiza *crawl* do manual, indexa localmente (FAISS). |
| """, elem_id="title_md") |
|
|
| with gr.Row(): |
| question = gr.Textbox(label="Pergunta", placeholder="Ex.: Como criar um dataframe?") |
| with gr.Row(): |
| k = gr.Slider(1, 10, value=4, step=1, label="k (nº de trechos)") |
| rebuild = gr.Checkbox(False, label="Reindexar do zero (forçar crawler)") |
| btn = gr.Button("Consultar") |
| output = gr.Markdown() |
| btn.click(fn=ui_query, inputs=[question, k, rebuild], outputs=output) |
|
|
| gr.Markdown(""" |
| **Dicas** |
| - A primeira execução pode demorar (crawler + indexação). Nas próximas, o índice é reaproveitado. |
| - Marque *Reindexar do zero* se quiser atualizar ou refazer o índice. |
| """) |
|
|
| return demo |
|
|
| if __name__ == "__main__": |
| demo = build_ui() |
| demo.launch() |
|
|