DB2_UTILITIES / app.py
Geoeasy's picture
Upload app.py
041adf8 verified
# -*- coding: utf-8 -*-
# Db2 z/OS • RAG (NVIDIA NIM)
# Sidebar + Diagnóstico + Extração robusta (com OCR opcional) + Chunkização por caracteres
# Requisitos:
# pip install openai gradio numpy pypdf pdfminer.six pymupdf
# (OCR opcional) + dependências de SO:
# pip install pytesseract pdf2image pillow
# Linux: apt-get install -y tesseract-ocr poppler-utils
import os
import re
import json
from pathlib import Path
from typing import List, Tuple, Dict, Optional
import numpy as np
import gradio as gr
# ==============================
# Config (Db2 + NVIDIA NIM)
# ==============================
BASE_DIR = Path(__file__).parent if "__file__" in globals() else Path.cwd()
# Altere se necessário; o app também aceita PDFs no diretório atual (*.pdf)
USER_PDF = Path("db2z_13_utilities.pdf")
PDFS = [USER_PDF] if USER_PDF.exists() else sorted([p for p in BASE_DIR.glob("*.pdf") if p.is_file() and p.suffix.lower()==".pdf"])
NVCF_BASE = os.getenv("NVCF_BASE", "https://integrate.api.nvidia.com/v1")
NVCF_API_KEY = os.getenv("NVCF_API_KEY", "")
EMBED_MODEL = os.getenv("EMBED_MODEL", "nvidia/nv-embed-v1") # 4096-dim atualmente
CHAT_MODEL = os.getenv("CHAT_MODEL", "meta/llama-3.1-8b-instruct")
SAFE_IDX = f".db2_index_{EMBED_MODEL.replace('/','__')}"
INDEX_DIR = BASE_DIR / SAFE_IDX
INDEX_DIR.mkdir(exist_ok=True)
VEC_FILE = INDEX_DIR / "vectors.npy"
META_FILE = INDEX_DIR / "meta.json"
TOP_K_RETRIEVE = 3
TARGET_CONTEXT_CHARS = 1500
# ==============================
# NVIDIA (OpenAI-compatible)
# ==============================
try:
from openai import OpenAI
except ImportError:
raise RuntimeError("Instale: pip install openai gradio numpy pypdf pdfminer.six pymupdf")
def get_client():
if not NVCF_API_KEY or NVCF_API_KEY == "xxxxxxxxxxxxxxxxxxx":
raise RuntimeError("NVCF_API_KEY não definido. export/set NVCF_API_KEY='SUA_CHAVE'")
return OpenAI(base_url=NVCF_BASE, api_key=NVCF_API_KEY)
# ==============================
# PDF utils (robusto + OCR opcional)
# ==============================
try:
import fitz # PyMuPDF
except Exception:
fitz = None
try:
from pdfminer.high_level import extract_text as pdfminer_extract_text
except Exception:
pdfminer_extract_text = None
try:
from pypdf import PdfReader
except Exception:
PdfReader = None
NBSP = "\u00A0"
def _normalize_text(t: str) -> str:
if not isinstance(t, str):
t = str(t or "")
t = t.replace(NBSP, " ")
t = re.sub(r"[\u0000-\u001F]", " ", t) # remove controles
# mantém quebras simples e colapsa espaços longos
t = re.sub(r"[ \t]{2,}", " ", t)
t = re.sub(r"\n{3,}", "\n\n", t)
return t.strip()
def _extract_with_pymupdf(path: Path) -> List[Tuple[int, str]]:
out: List[Tuple[int, str]] = []
with fitz.open(path) as doc:
for i, page in enumerate(doc):
text = page.get_text("text") or ""
out.append((i + 1, _normalize_text(text)))
return out
def _extract_with_pdfminer(path: Path) -> List[Tuple[int, str]]:
text_all = pdfminer_extract_text(str(path)) or ""
pages = re.split(r"\f", text_all)
out: List[Tuple[int, str]] = []
for i, tx in enumerate(pages):
out.append((i + 1, _normalize_text(tx)))
return out
def _extract_with_pypdf(path: Path) -> List[Tuple[int, str]]:
if PdfReader is None:
return []
try:
r = PdfReader(str(path), strict=False)
except Exception as e:
print(f"[WARN] pypdf: falha ao abrir {path.name}: {e}")
return []
out: List[Tuple[int, str]] = []
for i in range(len(r.pages)):
try:
page = r.pages[i]
t = page.extract_text() or ""
except Exception as e:
print(f"[WARN] pypdf: falha ao extrair pag {i+1}: {e}")
t = ""
out.append((i + 1, _normalize_text(t)))
return out
def _maybe_ocr_images(path: Path) -> List[Tuple[int, str]]:
try:
import pytesseract
from pdf2image import convert_from_path
except Exception:
return []
out: List[Tuple[int, str]] = []
try:
images = convert_from_path(str(path))
for i, img in enumerate(images):
txt = pytesseract.image_to_string(img) or ""
out.append((i + 1, _normalize_text(txt)))
except Exception as e:
print(f"[WARN] OCR: falha ao converter/ler {path.name}: {e}")
return out
def read_pdf_pages(path: Path) -> List[Tuple[int, str]]:
if not path.exists():
return []
# 1) PyMuPDF
if fitz is not None:
try:
pages = _extract_with_pymupdf(path)
if any((tx or "").strip() for _, tx in pages):
return pages
except Exception as e:
print(f"[WARN] PyMuPDF falhou: {e}")
# 2) pdfminer
if pdfminer_extract_text is not None:
try:
pages = _extract_with_pdfminer(path)
if any((tx or "").strip() for _, tx in pages):
return pages
except Exception as e:
print(f"[WARN] pdfminer falhou: {e}")
# 3) pypdf
try:
pages = _extract_with_pypdf(path)
if any((tx or "").strip() for _, tx in pages):
return pages
except Exception as e:
print(f"[WARN] pypdf falhou: {e}")
# 4) OCR quando nada foi extraído
ocr_pages = _maybe_ocr_images(path)
if not any((tx or "").strip() for _, tx in ocr_pages):
print("[ERRO] Nenhum texto extraído, nem com OCR.")
return ocr_pages
# ==============================
# Segmentação (detecção de seções para metadados)
# ==============================
DB2_HEADER_RE = re.compile(
r"^(Part\s+\d+\.|Chapter\s+\d+\.)|"
r"\b(BACKUP SYSTEM|CATMAINT|CHECK DATA|CHECK INDEX|CHECK LOB|COPY|COPYTOCOPY|DIAGNOSE|LISTDEF|LOAD|"
r"MERGECOPY|MODIFY RECOVERY|MODIFY STATISTICS|OPTIONS|QUIESCE|REBUILD INDEX|RECOVER|REORG INDEX|REORG TABLESPACE|"
r"REPAIR|REPORT|RESTORE SYSTEM|RUNSTATS|STOSPACE|TEMPLATE|UNLOAD)\b",
re.IGNORECASE
)
def split_db2_docs(pages: List[Tuple[int, str]], doc_label: str) -> List[Dict]:
"""Agrupa páginas por possíveis cabeçalhos (capítulos/utilities) para compor metadados de seção."""
blocks: List[Dict] = []
current = {"doc": doc_label, "section": "INTRO", "start_page": 1, "texts": []}
for pg, tx in pages:
head = (tx or "")[:300]
if DB2_HEADER_RE.search(head):
if current["texts"]:
current["end_page"] = current["texts"][-1][0]
blocks.append(current)
m = re.search(r"(Chapter\s+\d+\.\s*[^\n]+|^[^\n]{1,200})", tx or "")
title = (m.group(1).strip() if m else f"Section@{pg}")
current = {"doc": doc_label, "section": title, "start_page": pg, "texts": []}
current["texts"].append((pg, tx or ""))
if current["texts"]:
current["end_page"] = current["texts"][-1][0]
blocks.append(current)
return blocks
# ==============================
# Chunkização por caracteres (robusta)
# ==============================
def make_chunks_by_chars(blocks: List[Dict], max_chars: int = 1500, min_chars: int = 180) -> List[Dict]:
"""Concatena o texto das páginas de cada bloco e fatia por janelas de caracteres com overlap."""
out: List[Dict] = []
for b in blocks:
pieces: List[str] = []
pages: List[int] = []
for pg, tx in b["texts"]:
txn = _normalize_text(tx or "")
if txn:
pieces.append(txn)
pages.append(pg)
if not pieces:
continue
blob = "\n".join(pieces).strip()
if not blob:
continue
start_page = min(pages) if pages else b.get("start_page", 0)
end_page = max(pages) if pages else b.get("end_page", start_page)
if len(blob) <= max_chars and len(blob) >= min_chars:
out.append({
"doc": b["doc"],
"section": b["section"],
"start_page": start_page,
"end_page": end_page,
"text": blob
})
continue
overlap = 120
i, n = 0, len(blob)
while i < n:
j = min(i + max_chars, n)
chunk_text = blob[i:j].strip()
if len(chunk_text) >= min_chars:
out.append({
"doc": b["doc"],
"section": b["section"],
"start_page": start_page,
"end_page": end_page,
"text": chunk_text
})
new_i = j - overlap
i = j if new_i <= i else new_i
# filtro final
out = [c for c in out if (c.get("text") or "").strip()]
return out
# ==============================
# Embeddings
# ==============================
def embed_texts(texts: List[str], batch_size: int = 16) -> np.ndarray:
client = get_client()
clean = [(i, t) for i, t in enumerate(texts) if isinstance(t, str) and t.strip()]
if not clean:
return np.zeros((0, 0), dtype=np.float32)
order, payload = zip(*clean)
vecs: Dict[int, np.ndarray] = {}
for i in range(0, len(payload), batch_size):
batch = list(payload[i:i + batch_size])
resp = client.embeddings.create(model=EMBED_MODEL, input=batch)
for k, item in enumerate(resp.data):
vecs[int(order[i + k])] = np.array(item.embedding, dtype=np.float32)
rows: List[np.ndarray] = []
for idx in range(len(texts)):
if idx in vecs:
rows.append(vecs[idx])
if not rows:
return np.zeros((0, 0), dtype=np.float32)
mat = np.vstack(rows).astype(np.float32)
norms = np.linalg.norm(mat, axis=1, keepdims=True)
norms[norms == 0] = 1.0
return mat / norms
def embed_query(q: str) -> np.ndarray:
client = get_client()
resp = client.embeddings.create(model=EMBED_MODEL, input=[q])
v = np.array(resp.data[0].embedding, dtype=np.float32)
n = np.linalg.norm(v)
return (v / (n if n > 0 else 1.0)).astype(np.float32)
# ==============================
# Indexação
# ==============================
def build_index() -> Tuple[np.ndarray, List[Dict]]:
all_blocks: List[Dict] = []
for p in PDFS:
pages = read_pdf_pages(p)
if not pages or not any((tx or "").strip() for _, tx in pages):
print(f"[WARN] Sem texto legível em {p.name}; ignorando.")
continue
blks = split_db2_docs(pages, p.name)
all_blocks.extend(blks)
all_chunks = make_chunks_by_chars(all_blocks, max_chars=1500, min_chars=180)
all_chunks = [c for c in all_chunks if (c.get("text") or "").strip()]
if not all_chunks:
with open(META_FILE, "w", encoding="utf-8") as f:
json.dump({"chunks": [], "embed_model": EMBED_MODEL, "embed_dim": 0, "total_chars": 0}, f, ensure_ascii=False, indent=2)
np.save(VEC_FILE, np.zeros((0, 0), dtype=np.float32))
raise RuntimeError("Nenhum chunk foi criado. Verifique extração/ OCR.")
texts = [c["text"] for c in all_chunks]
total_chars = sum(len(t) for t in texts)
mat = embed_texts(texts) if texts else np.zeros((0, 0), dtype=np.float32)
embed_dim = int(mat.shape[1]) if mat.size else 0
np.save(VEC_FILE, mat)
with open(META_FILE, "w", encoding="utf-8") as f:
json.dump(
{"chunks": all_chunks, "embed_model": EMBED_MODEL, "embed_dim": embed_dim, "total_chars": total_chars},
f, ensure_ascii=False, indent=2
)
return mat, all_chunks
def load_index() -> Tuple[np.ndarray, List[Dict]]:
if VEC_FILE.exists() and META_FILE.exists():
mat = np.load(VEC_FILE)
dd = json.loads(META_FILE.read_text(encoding="utf-8"))
chunks = dd.get("chunks", [])
return mat, chunks
return build_index()
def wipe_index() -> str:
try:
if INDEX_DIR.exists():
for p in INDEX_DIR.glob("*"):
p.unlink()
INDEX_DIR.rmdir()
INDEX_DIR.mkdir(exist_ok=True)
return "Índice limpo."
except Exception as e:
return f"Erro ao limpar índice: {e}"
# ==============================
# Recuperação + LLM
# ==============================
def _check_embed_dim(mat: np.ndarray) -> Optional[str]:
try:
dd = json.loads(META_FILE.read_text(encoding="utf-8"))
idx_dim = int(dd.get("embed_dim", 0))
except Exception:
idx_dim = 0
try:
v = embed_query("dim_test")
cur_dim = int(v.shape[0])
except Exception as e:
return f"Falha ao checar dimensão do embedding: {e}"
if idx_dim and cur_dim and idx_dim != cur_dim:
return (f"Incompatibilidade de dimensão do embedding: índice={idx_dim}, modelo atual={cur_dim}. "
f"Reindexe com o mesmo EMBED_MODEL. (Atual EMBED_MODEL: {EMBED_MODEL})")
return None
def retrieve_topk(query: str, doc_filter: Optional[str] = None, k: int = TOP_K_RETRIEVE) -> List[Dict]:
mat, chunks = load_index()
if mat.shape[0] == 0 or not chunks:
return []
qv = embed_query(query)
if mat.shape[1] != qv.shape[0]:
raise RuntimeError(
f"Dimensão incompatível mat={mat.shape} vs query={qv.shape}. "
f"Provável troca de EMBED_MODEL após criar o índice. Clique 'Reindexar'."
)
sims = (mat @ qv).astype(float)
if doc_filter and doc_filter != "(Todos)":
mask = np.array([1.0 if c["doc"] == doc_filter else 0.0 for c in chunks], dtype=float)
sims *= mask
idxs = np.argsort(-sims)[:k]
out = []
for i in idxs:
c = chunks[int(i)]
out.append({
"doc": c["doc"],
"section": c.get("section", ""),
"start_page": c.get("start_page", "?"),
"end_page": c.get("end_page", "?"),
"text": c["text"],
"score": float(sims[int(i)]),
"idx": int(i)
})
return out
def expand_context(hits: List[Dict], all_chunks: List[Dict], target_chars: int = TARGET_CONTEXT_CHARS) -> Tuple[str, List[Tuple[str, str, str]]]:
if not hits:
return "", []
best = max(hits, key=lambda x: x["score"])
ctx = best["text"]
srcs = {(best["doc"], best["section"], f"{best['start_page']}{best['end_page']}")}
doc, section, best_idx = best["doc"], best["section"], best["idx"]
indices = [i for i, c in enumerate(all_chunks) if c["doc"] == doc and c.get("section", "") == section]
if not indices:
return ctx, sorted(list(srcs))
indices.sort()
if best_idx not in indices:
return ctx, sorted(list(srcs))
pos = indices.index(best_idx)
left, right = pos - 1, pos + 1
while len(ctx) < target_chars and (left >= 0 or right < len(indices)):
if right < len(indices) and len(ctx) < target_chars:
rch = all_chunks[indices[right]]
ctx += "\n\n" + rch["text"]
srcs.add((doc, section, f"{rch.get('start_page', '?')}{rch.get('end_page', '?')}"))
right += 1
if left >= 0 and len(ctx) < target_chars:
lch = all_chunks[indices[left]]
ctx = lch["text"] + "\n\n" + ctx
srcs.add((doc, section, f"{lch.get('start_page', '?')}{lch.get('end_page', '?')}"))
left -= 1
return ctx, sorted(list(srcs))
def answer_with_llm(question: str, context: str) -> str:
client = get_client()
system = ("Você é um assistente especialista em IBM Db2 para z/OS. "
"Responda em português, com exemplos de comandos SQL/JCL completos e corretos. "
"Use apenas o contexto fornecido; se algo não estiver nele, diga que não está disponível.")
user = (f"Pergunta:\n{question}\n\n"
f"Contexto do(s) manual(is):\n{context}\n\n"
"Regras de resposta:\n"
"- Explique o necessário e como fazer.\n"
"- Inclua pelo menos um exemplo de comando Db2 utilitário, SQL ou JCL (auto-contido), se aplicável.\n"
"- Liste observações/pré-requisitos, se houver.\n"
"- Cite as fontes (Documento e páginas) ao final.")
chat = client.chat.completions.create(
model=CHAT_MODEL,
messages=[{"role": "system", "content": system}, {"role": "user", "content": user}],
temperature=0.2,
)
return chat.choices[0].message.content.strip()
def format_sources_md(sources: List[Tuple[str, str, str]]) -> str:
if not sources:
return ""
lines = [
f"- **Documento:** {d} \n **Seção:** {s} \n **Páginas:** {p}"
for (d, s, p) in sources
]
return "\n".join(lines)
# ==============================
# Templates Db2 (exemplos)
# ==============================
DB2_TEMPLATES: Dict[str, str] = {
"RUNSTATS_TABLESPACE": (
"//RUNSTAT JOB (ACCT),'RUNSTATS',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='RUNSTATS',UTPROC=''\n"
"//SYSIN DD *\n"
" RUNSTATS TABLESPACE(DBNAME.TSNAME) TABLE(ALL) INDEX(ALL)\n"
"/*\n"
),
"REORG_TABLESPACE": (
"//REORG JOB (ACCT),'REORG',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='REORGTS',UTPROC=''\n"
"//SYSIN DD *\n"
" REORG TABLESPACE(DBNAME.TSNAME) SHRLEVEL CHANGE\n"
"/*\n"
),
"EXPLAIN_SQL": (
"//EXPLAIN JOB (ACCT),'EXPLAIN',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNTEP2,SYSTEM=DSN1\n"
"//SYSIN DD *\n"
" EXPLAIN PLAN FOR\n"
" SELECT COL1, COL2 FROM DBNAME.TBNAME WHERE COL3 = 'X';\n"
"/*\n"
),
"DISPLAY_BUFFERPOOL": (
"//DISPBP JOB (ACCT),'DISPLAY BP',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC PGM=IKJEFT01\n"
"//SYSTSPRT DD SYSOUT=*\n"
"//SYSIN DD *\n"
" DSN SYSTEM(DSN1)\n"
" -DISPLAY BUFFERPOOL(BP0) DETAIL\n"
" END\n"
"/*\n"
),
"DSNTEP2_SELECT": (
"//SELECT JOB (ACCT),'DSNTEP2',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNTEP2,SYSTEM=DSN1\n"
"//SYSIN DD *\n"
" SELECT FIRSTNME, LASTNAME FROM DSN8810.EMP\n"
" WHERE WORKDEPT = 'A00';\n"
"/*\n"
),
"COPY_TABLESPACE": (
"//COPYTS JOB (ACCT),'COPY',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='COPYTS',UTPROC=''\n"
"//SYSIN DD *\n"
" COPY TABLESPACE(DBNAME.TSNAME) FULL YES SHRLEVEL CHANGE\n"
"/*\n"
),
"LOAD_TABLE": (
"//LOADTBL JOB (ACCT),'LOAD',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='LOADTBL',UTPROC=''\n"
"//SYSIN DD *\n"
" LOAD DATA INDDN SYSREC INTO TABLE DBNAME.TBNAME\n"
" REPLACE\n"
"/*\n"
),
"RECOVER_TABLESPACE": (
"//RECOVTS JOB (ACCT),'RECOVER',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='RECOVTS',UTPROC=''\n"
"//SYSIN DD *\n"
" RECOVER TABLESPACE(DBNAME.TSNAME)\n"
"/*\n"
),
"STATS_INDEX": (
"//STATSIX JOB (ACCT),'STATS INDEX',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='STATSIX',UTPROC=''\n"
"//SYSIN DD *\n"
" RUNSTATS INDEX(DBNAME.IXNAME) ALL\n"
"/*\n"
),
"MODIFY_RECOVERY": (
"//MODREC JOB (ACCT),'MODIFY RECOVERY',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='MODREC',UTPROC=''\n"
"//SYSIN DD *\n"
" MODIFY RECOVERY TABLESPACE(DBNAME.TSNAME) AGE(30)\n"
"/*\n"
),
"CHECK_DATA": (
"//CHKDATA JOB (ACCT),'CHECK DATA',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n"
"//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='CHKDATA',UTPROC=''\n"
"//SYSIN DD *\n"
" CHECK DATA TABLESPACE(DBNAME.TSNAME) SCOPE ALL\n"
"/*\n"
),
}
def template_for_db2(cmd: str) -> str:
return DB2_TEMPLATES.get(cmd, "//GENERIC ...\n")
# ==============================
# UI — layout com SIDEBAR + diagnóstico
# ==============================
CUSTOM_CSS = """
:root{ --ink:#0f172a; --muted:#475569; }
.gradio-container { max-width: 1200px !important; margin: 0 auto !important; }
.section-card { background: #fff; border: 1px solid #e2e8f0; border-radius: 16px; padding: 16px; box-shadow: 0 10px 30px rgba(2,6,23,.05); }
.section-title { font-size: 1.05rem; font-weight: 800; color: var(--ink); display: flex; gap: .6rem; align-items: center; }
.subtitle { color:var(--muted); font-size:.95rem; margin-top:.25rem; }
.result-card { background:#fcfdff; border:1px solid #e2e8f0; border-radius:12px; padding:12px; }
hr.sep { border:none; border-top:1px dashed #e2e8f0; margin:10px 0 14px; }
.small { font-size:.9rem; color:var(--muted); }
"""
def build_app():
doc_label = PDFS[0].name if PDFS else "(Nenhum PDF)"
all_doc_names = [p.name for p in PDFS] or ["(Nenhum PDF encontrado)"]
default_doc = all_doc_names[0] if all_doc_names else "(Todos)"
with gr.Blocks(title="Db2 z/OS • RAG (NVIDIA NIM)", css=CUSTOM_CSS, fill_height=True) as demo:
# ===== Sidebar =====
with gr.Sidebar():
gr.Markdown(
f"""
<div class="section-title">💼 Db2 RAG</div>
<div class="small">Contexto: <code>{doc_label}</code></div>
"""
)
status_box = gr.Markdown("Pronto ✅" if PDFS else "⚠️ Nenhum PDF encontrado.")
gr.Markdown("<hr class='sep'/>")
with gr.Group():
gr.Markdown("**Ações**")
test_btn = gr.Button("🧪 Testar conexão NVIDIA", variant="secondary")
rebuild_btn = gr.Button("🔁 Reindexar (NIM)")
diag_btn = gr.Button("🛠️ Diagnóstico do Índice")
gr.Markdown("<hr class='sep'/>")
with gr.Accordion("Configuração", open=False):
gr.Markdown(f"- **Embeddings:** `{EMBED_MODEL}`\n- **LLM:** `{CHAT_MODEL}`\n- **Índice:** `{INDEX_DIR.name}`")
doc_choice = gr.Dropdown(
choices=(["(Todos)"] + all_doc_names),
value=default_doc if PDFS else "(Todos)",
label="Documento"
)
# ===== Main content =====
gr.Markdown(
f"""
<div class="section-card" style="padding:18px; display:flex; gap:16px; align-items:center;">
<div style="font-size:26px;">🧭</div>
<div style="flex:1">
<div style="font-size:1.2rem; font-weight:800; color:#0f172a;">DB2 -Z/OS UTILITIES | RAG + NVIDIA NIM</div>
<div class="subtitle">Pergunte sobre utilidades (COPY, LOAD, REORG, RUNSTATS, RECOVER, etc.). As respostas vêm do manual: <code>{doc_label}</code>.</div>
</div>
</div>
"""
)
with gr.Row():
q = gr.Textbox(
label="Pergunta (Db2 Utilities)",
placeholder="Ex.: Como usar COPY FULL com SHRLEVEL CHANGE? • Quando rodar RUNSTATS INDEX? • REORG TABLESPACE SHRLEVEL CHANGE • RECOVER PITR...",
scale=8
)
with gr.Row():
ask_btn = gr.Button("🔍 Buscar", variant="primary", scale=2)
clear_btn = gr.Button("🧹 Limpar", scale=1)
out = gr.Markdown(label="Resposta (Db2)")
gr.Markdown("<hr class='sep'/>")
with gr.Accordion("🧩 Templates Db2 executáveis", open=False):
db2_choice = gr.Dropdown(
choices=list(DB2_TEMPLATES.keys()),
value="RUNSTATS_TABLESPACE",
label="Comando / Padrão"
)
db2_btn = gr.Button("📄 Gerar exemplo")
db2_out = gr.Textbox(label="Exemplo (copiar/ajustar)", lines=18, show_copy_button=True)
with gr.Accordion("🧪 Log / Diagnóstico", open=False):
diag_out = gr.Markdown()
# ===== Callbacks =====
def _test_conn():
try:
dim = len(get_client().embeddings.create(model=EMBED_MODEL, input=["ping"]).data[0].embedding)
return f"Conexão ok ✅ — dimensão do embedding: **{dim}**"
except Exception as e:
return f"⚠️ Falha na conexão/credenciais NVIDIA: `{type(e).__name__}` — {e}"
def _rebuild():
try:
msg = wipe_index()
mat, chunks = build_index()
return msg + f" Reindexação concluída ✅ PDFs: {len(PDFS)} • Chunks: {len(chunks)} • Vetores: {mat.shape}"
except Exception as e:
return f"⚠️ Erro ao reindexar: `{type(e).__name__}` — {e}"
def _diagnose(dsel: str) -> str:
try:
if not (VEC_FILE.exists() and META_FILE.exists()):
return "❌ Nenhum índice encontrado. Clique **Reindexar (NIM)**."
mat = np.load(VEC_FILE)
meta = json.loads(META_FILE.read_text(encoding="utf-8"))
chunks = meta.get("chunks", [])
embed_dim = meta.get("embed_dim", 0)
total_chars = int(meta.get("total_chars", 0))
dim_msg = _check_embed_dim(mat)
# primeiras seções
first_secs = []
for c in chunks[:12]:
if dsel == "(Todos)" or c["doc"] == dsel:
first_secs.append(f"- {c['doc']}{c.get('section','?')} • p.{c.get('start_page','?')}-{c.get('end_page','?')}")
if not first_secs:
first_secs = ["(Filtro de documento não encontra seções no índice.)"]
# prévia do primeiro chunk
preview = ""
for c in chunks:
t = (c.get("text") or "").strip()
if t:
preview = t[:400].replace("\n", " ")
break
if not preview:
preview = "(Nenhum chunk contém texto — verifique extração/OCR.)"
msg = [
f"**Índice**: Vetores `{mat.shape}` • embed_dim(meta): `{embed_dim}` • Modelo atual: `{EMBED_MODEL}`",
f"**Chunks**: **{len(chunks)}** • **Total de caracteres**: {total_chars}",
f"**Documento selecionado**: `{dsel}`",
f"**Primeiras seções**:\n" + "\n".join(first_secs),
f"\n**Prévia (400 chars)**:\n```\n{preview}\n```"
]
if dim_msg:
msg.append(f"\n⚠️ {dim_msg}")
return "\n".join(msg)
except Exception as e:
return f"⚠️ Diagnóstico falhou: `{type(e).__name__}` — {e}"
def _search_answer(qstr: str, d: str) -> str:
try:
if not qstr or qstr.strip() == "":
return "_Informe uma pergunta._"
if not (VEC_FILE.exists() and META_FILE.exists()):
return "_Nenhum conteúdo indexado. Use **Reindexar**._"
mat = np.load(VEC_FILE)
meta = json.loads(META_FILE.read_text(encoding="utf-8"))
chunks = meta.get("chunks", [])
if mat.size == 0 or not chunks:
return "_Índice vazio. Reindexe (pode ser necessário OCR)._"
dim_msg = _check_embed_dim(mat)
if dim_msg:
return f"⚠️ {dim_msg}"
# retrieve
hits = retrieve_topk(qstr, None if d == "(Todos)" else d, k=TOP_K_RETRIEVE)
hits = [h for h in hits if (h.get("text") or "").strip()]
if not hits:
return "_Nada encontrado para a consulta (verifique o filtro de documento ou reindexe)._"
context, sources = expand_context(hits, chunks, TARGET_CONTEXT_CHARS)
if not context.strip():
return "_Contexto insuficiente encontrado._"
answer = answer_with_llm(qstr, context)
src_md = format_sources_md(sources)
return f"<div class='result-card'>{answer}</div>\n\n### Fontes\n{src_md}"
except Exception as e:
return f"⚠️ Erro ao buscar: `{type(e).__name__}` — {e}"
def _clear(doc_default: str) -> Tuple[str, str]:
return "", (doc_default if PDFS else "(Todos)")
def ui_db2_template(cmd_choice: str) -> str:
return template_for_db2(cmd_choice)
test_btn.click(_test_conn, outputs=[status_box])
rebuild_btn.click(_rebuild, outputs=[status_box])
diag_btn.click(_diagnose, inputs=[doc_choice], outputs=[diag_out])
ask_btn.click(_search_answer, inputs=[q, doc_choice], outputs=[out])
clear_btn.click(_clear, inputs=[gr.State(default_doc)], outputs=[q, doc_choice])
db2_btn.click(ui_db2_template, inputs=[db2_choice], outputs=[db2_out])
return demo
# ==============================
# Main (robusto: bind público, respeita $PORT, SSR off, queue opcional)
# ==============================
if __name__ == "__main__":
try:
_ = load_index()
except Exception as e:
print(f"[AVISO] Índice não carregado: {e}")
app = build_app()
app.launch(server_name="0.0.0.0", server_port=7860)