|
|
import os |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import List, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import faiss |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
try: |
|
|
from pypdf import PdfReader |
|
|
except Exception: |
|
|
PdfReader = None |
|
|
|
|
|
|
|
|
from sentence_transformers import SentenceTransformer |
|
|
from openai import OpenAI, OpenAIError |
|
|
|
|
|
""" |
|
|
DFSORT RAG – Assistente em Português (Gradio) |
|
|
--------------------------------------------- |
|
|
• Interface totalmente em português. |
|
|
• Botões "Enviar" e "Limpar" no chat. |
|
|
• Página enquadrada (layout responsivo) para tudo ficar visível. |
|
|
• RAG simples: FAISS + MiniLM sobre o PDF fornecido (somente ele como fonte). |
|
|
""" |
|
|
|
|
|
|
|
|
APP_TITLE = "DFSORT RAG (PDF)" |
|
|
PDF_PATH = "ice2ca11.pdf" |
|
|
INDEX_FILE = "r_docs.index" |
|
|
CHUNKS_FILE = "r_chunks.npy" |
|
|
|
|
|
|
|
|
CHAT_MODEL = "meta/llama3-8b-instruct" |
|
|
NV_API_KEY = os.environ.get("NV_API_KEY") |
|
|
if not NV_API_KEY: |
|
|
raise RuntimeError("🔒 NV_API_KEY não definido. Configure em Settings → Variables & Secrets.") |
|
|
|
|
|
client = OpenAI(base_url="https://integrate.api.nvidia.com/v1", api_key=NV_API_KEY) |
|
|
|
|
|
|
|
|
EMB_MODEL_NAME = "all-MiniLM-L6-v2" |
|
|
embedding_model = SentenceTransformer(EMB_MODEL_NAME) |
|
|
|
|
|
|
|
|
faiss_index = None |
|
|
pdf_chunks = None |
|
|
|
|
|
|
|
|
|
|
|
def _pdf_to_text_chunks(pdf_path: str, max_chunk_chars: int = 1200) -> List[str]: |
|
|
"""Extrai texto do PDF e cria chunks (~max_chunk_chars) para o RAG. |
|
|
- Divide por páginas; normaliza espaços/linhas; agrega em blocos. |
|
|
""" |
|
|
path = Path(pdf_path) |
|
|
if not path.exists(): |
|
|
raise FileNotFoundError(f"PDF não encontrado: {pdf_path}") |
|
|
|
|
|
raw_pages: List[str] = [] |
|
|
if PdfReader is None: |
|
|
|
|
|
with open(path, "rb") as f: |
|
|
data = f.read() |
|
|
text = data.decode(errors="ignore") |
|
|
raw_pages = re.split(r"\f|\n\s*\n", text) |
|
|
else: |
|
|
reader = PdfReader(str(path)) |
|
|
for pg in reader.pages: |
|
|
try: |
|
|
raw = pg.extract_text() or "" |
|
|
except Exception: |
|
|
raw = "" |
|
|
raw_pages.append(raw) |
|
|
|
|
|
blocks: List[str] = [] |
|
|
for page_txt in raw_pages: |
|
|
if not page_txt: |
|
|
continue |
|
|
t = re.sub(r"[ \t]+", " ", page_txt) |
|
|
t = re.sub(r"\n{2,}", "\n\n", t).strip() |
|
|
parts = re.split(r"\n\n+|\n• |\n- ", t) |
|
|
blocks.extend(p.strip() for p in parts if p and p.strip()) |
|
|
|
|
|
chunks: List[str] = [] |
|
|
buf: List[str] = [] |
|
|
size = 0 |
|
|
for b in blocks: |
|
|
if size + len(b) + 1 > max_chunk_chars: |
|
|
if buf: |
|
|
chunks.append("\n".join(buf)) |
|
|
buf = [b] |
|
|
size = len(b) |
|
|
else: |
|
|
buf.append(b) |
|
|
size += len(b) + 1 |
|
|
if buf: |
|
|
chunks.append("\n".join(buf)) |
|
|
|
|
|
|
|
|
chunks = [c.strip() for c in chunks if len(c.strip()) > 50] |
|
|
return chunks |
|
|
|
|
|
|
|
|
def build_or_load_index(pdf_path: str, index_path: str, chunks_path: str) -> Tuple[faiss.IndexFlatIP, np.ndarray]: |
|
|
"""Cria/carrega índice FAISS e os chunks a partir do PDF.""" |
|
|
if Path(index_path).exists() and Path(chunks_path).exists(): |
|
|
index = faiss.read_index(index_path) |
|
|
chunks = np.load(chunks_path, allow_pickle=True) |
|
|
return index, chunks |
|
|
|
|
|
|
|
|
chunks_list = _pdf_to_text_chunks(pdf_path) |
|
|
emb = embedding_model.encode(chunks_list, convert_to_numpy=True, normalize_embeddings=True) |
|
|
d = emb.shape[1] |
|
|
index = faiss.IndexFlatIP(d) |
|
|
index.add(emb) |
|
|
faiss.write_index(index, index_path) |
|
|
np.save(chunks_path, np.array(chunks_list, dtype=object)) |
|
|
return index, np.array(chunks_list, dtype=object) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def retrieve_context(query: str, index: faiss.IndexFlatIP, chunks: np.ndarray, k: int = 6) -> str: |
|
|
q = embedding_model.encode([query], convert_to_numpy=True, normalize_embeddings=True) |
|
|
scores, idxs = index.search(q, k) |
|
|
parts: List[str] = [] |
|
|
for i in idxs[0]: |
|
|
if 0 <= i < len(chunks): |
|
|
parts.append(str(chunks[i])) |
|
|
return "\n---\n".join(parts) |
|
|
|
|
|
|
|
|
def nv_complete(messages, temperature: float, top_p: float, max_tokens: int) -> str: |
|
|
resp = client.chat.completions.create( |
|
|
model=CHAT_MODEL, |
|
|
messages=messages, |
|
|
temperature=temperature, |
|
|
top_p=top_p, |
|
|
max_tokens=max_tokens, |
|
|
stream=False, |
|
|
) |
|
|
return resp.choices[0].message.content.strip() |
|
|
|
|
|
|
|
|
def make_system_prompt(ctx: str) -> str: |
|
|
return ( |
|
|
"Você é um assistente especializado em DFSORT (IBM z/OS).\n" |
|
|
"Responda **apenas** com base no contexto recuperado do PDF.\n" |
|
|
"Se a informação não estiver no contexto, diga que não sabe.\n\n" |
|
|
f"=== Contexto (trechos do PDF) ===\n{ctx}\n\n" |
|
|
"Quando der exemplos, forneça JCL/SYSIN curtos e claros." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ensure_index_loaded(): |
|
|
global faiss_index, pdf_chunks |
|
|
if faiss_index is None or pdf_chunks is None: |
|
|
faiss_index, pdf_chunks = build_or_load_index(PDF_PATH, INDEX_FILE, CHUNKS_FILE) |
|
|
|
|
|
|
|
|
def on_send(user_msg, history, temperature, top_p, max_tokens, k): |
|
|
"""Envia a pergunta, roda o RAG e devolve o histórico atualizado.""" |
|
|
ensure_index_loaded() |
|
|
history = history or [] |
|
|
user_msg = (user_msg or "").strip() |
|
|
if not user_msg: |
|
|
return history, "" |
|
|
|
|
|
ctx = retrieve_context(user_msg, faiss_index, pdf_chunks, k=int(k)) |
|
|
sys_msg = {"role": "system", "content": make_system_prompt(ctx)} |
|
|
usr_msg = {"role": "user", "content": user_msg} |
|
|
|
|
|
try: |
|
|
answer = nv_complete([sys_msg, usr_msg], float(temperature), float(top_p), int(max_tokens)) |
|
|
except OpenAIError as e: |
|
|
answer = f"⚠️ Erro da API: {e.__class__.__name__}: {e}" |
|
|
|
|
|
history = history + [ |
|
|
{"role": "user", "content": user_msg}, |
|
|
{"role": "assistant", "content": answer}, |
|
|
] |
|
|
return history, "" |
|
|
|
|
|
|
|
|
def on_clear(): |
|
|
return [], "" |
|
|
|
|
|
|
|
|
def rebuild_index_action(): |
|
|
global faiss_index, pdf_chunks |
|
|
faiss_index, pdf_chunks = build_or_load_index(PDF_PATH, INDEX_FILE, CHUNKS_FILE) |
|
|
return "✅ Índice reconstruído com sucesso a partir do PDF." |
|
|
|
|
|
|
|
|
|
|
|
custom_css = r""" |
|
|
:root { --primary:#2156d9; --bg:#f8fafc; --ink:#0f172a; } |
|
|
body { background: var(--bg); color: var(--ink); } |
|
|
.container { max-width: 1200px; margin: 0 auto; } |
|
|
#chatbox { height: 70vh; overflow-y: auto; border:1px solid #cbd5e1; border-radius:8px; padding:0.5rem; } |
|
|
""" |
|
|
|
|
|
with gr.Blocks(title=APP_TITLE, css=custom_css, theme=gr.themes.Base()) as demo: |
|
|
with gr.Column(elem_classes="container"): |
|
|
gr.Markdown(f"## {APP_TITLE}") |
|
|
gr.Markdown( |
|
|
"Assistente **RAG** sobre **DFSORT**, usando **apenas** o PDF fornecido. " |
|
|
"Se algo não estiver no PDF, eu aviso que não sei." |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
chatbot = gr.Chatbot(type="messages", elem_id="chatbox", height=560) |
|
|
state_history = gr.State([]) |
|
|
|
|
|
user_box = gr.Textbox(placeholder="Pergunte algo sobre DFSORT… ex.: Como uso INCLUDE COND?", lines=2) |
|
|
with gr.Row(): |
|
|
btn_send = gr.Button("Enviar", variant="primary") |
|
|
btn_clear = gr.Button("Limpar") |
|
|
|
|
|
with gr.Row(): |
|
|
temperature = gr.Slider(0, 1, 0.4, step=0.05, label="Temperature") |
|
|
top_p = gr.Slider(0, 1, 0.95, step=0.01, label="Top-p") |
|
|
with gr.Row(): |
|
|
max_tokens = gr.Slider(128, 4096, 768, step=64, label="Max Tokens") |
|
|
k_chunks = gr.Slider(2, 12, 6, step=1, label="Trechos (k)") |
|
|
|
|
|
|
|
|
btn_send.click( |
|
|
on_send, |
|
|
inputs=[user_box, state_history, temperature, top_p, max_tokens, k_chunks], |
|
|
outputs=[chatbot, user_box], |
|
|
) |
|
|
user_box.submit( |
|
|
on_send, |
|
|
inputs=[user_box, state_history, temperature, top_p, max_tokens, k_chunks], |
|
|
outputs=[chatbot, user_box], |
|
|
) |
|
|
btn_clear.click(on_clear, outputs=[chatbot, user_box]) |
|
|
|
|
|
|
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("### Controlo do índice") |
|
|
gr.Markdown(f"PDF atual(DFSORT Application Programming Guide)): `{PDF_PATH}`") |
|
|
btn_rebuild = gr.Button("Reconstruir índice a partir do PDF") |
|
|
msg = gr.Markdown() |
|
|
btn_rebuild.click(lambda: rebuild_index_action(), [], [msg]) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### Dicas de consulta") |
|
|
gr.Markdown( |
|
|
"- Ex.: `Ordenar por 10 bytes a partir da posição 1 (CH, A).`\n" |
|
|
"- Ex.: `Como faço para eliminar duplicados com SUM FIELDS=NONE?`\n" |
|
|
"- Ex.: `JOINKEYS: explique o uso de REFORMAT.`\n" |
|
|
"- Ex.: `Exemplo de OUTFIL com cabeçalho e REMOVECC.`" |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
if not Path(INDEX_FILE).exists() or not Path(CHUNKS_FILE).exists(): |
|
|
print("[i] Construindo índice a partir do PDF…") |
|
|
faiss_index, pdf_chunks = build_or_load_index(PDF_PATH, INDEX_FILE, CHUNKS_FILE) |
|
|
print("[i] Índice criado.") |
|
|
demo.launch(server_name="0.0.0.0", server_port=7860) |