# -*- coding: utf-8 -*- # Db2 z/OS • RAG (NVIDIA NIM) # Sidebar + Diagnóstico + Extração robusta (com OCR opcional) + Chunkização por caracteres # Requisitos: # pip install openai gradio numpy pypdf pdfminer.six pymupdf # (OCR opcional) + dependências de SO: # pip install pytesseract pdf2image pillow # Linux: apt-get install -y tesseract-ocr poppler-utils import os import re import json from pathlib import Path from typing import List, Tuple, Dict, Optional import numpy as np import gradio as gr # ============================== # Config (Db2 + NVIDIA NIM) # ============================== BASE_DIR = Path(__file__).parent if "__file__" in globals() else Path.cwd() # Altere se necessário; o app também aceita PDFs no diretório atual (*.pdf) USER_PDF = Path("db2z_13_utilities.pdf") PDFS = [USER_PDF] if USER_PDF.exists() else sorted([p for p in BASE_DIR.glob("*.pdf") if p.is_file() and p.suffix.lower()==".pdf"]) NVCF_BASE = os.getenv("NVCF_BASE", "https://integrate.api.nvidia.com/v1") NVCF_API_KEY = os.getenv("NVCF_API_KEY", "") EMBED_MODEL = os.getenv("EMBED_MODEL", "nvidia/nv-embed-v1") # 4096-dim atualmente CHAT_MODEL = os.getenv("CHAT_MODEL", "meta/llama-3.1-8b-instruct") SAFE_IDX = f".db2_index_{EMBED_MODEL.replace('/','__')}" INDEX_DIR = BASE_DIR / SAFE_IDX INDEX_DIR.mkdir(exist_ok=True) VEC_FILE = INDEX_DIR / "vectors.npy" META_FILE = INDEX_DIR / "meta.json" TOP_K_RETRIEVE = 3 TARGET_CONTEXT_CHARS = 1500 # ============================== # NVIDIA (OpenAI-compatible) # ============================== try: from openai import OpenAI except ImportError: raise RuntimeError("Instale: pip install openai gradio numpy pypdf pdfminer.six pymupdf") def get_client(): if not NVCF_API_KEY or NVCF_API_KEY == "xxxxxxxxxxxxxxxxxxx": raise RuntimeError("NVCF_API_KEY não definido. export/set NVCF_API_KEY='SUA_CHAVE'") return OpenAI(base_url=NVCF_BASE, api_key=NVCF_API_KEY) # ============================== # PDF utils (robusto + OCR opcional) # ============================== try: import fitz # PyMuPDF except Exception: fitz = None try: from pdfminer.high_level import extract_text as pdfminer_extract_text except Exception: pdfminer_extract_text = None try: from pypdf import PdfReader except Exception: PdfReader = None NBSP = "\u00A0" def _normalize_text(t: str) -> str: if not isinstance(t, str): t = str(t or "") t = t.replace(NBSP, " ") t = re.sub(r"[\u0000-\u001F]", " ", t) # remove controles # mantém quebras simples e colapsa espaços longos t = re.sub(r"[ \t]{2,}", " ", t) t = re.sub(r"\n{3,}", "\n\n", t) return t.strip() def _extract_with_pymupdf(path: Path) -> List[Tuple[int, str]]: out: List[Tuple[int, str]] = [] with fitz.open(path) as doc: for i, page in enumerate(doc): text = page.get_text("text") or "" out.append((i + 1, _normalize_text(text))) return out def _extract_with_pdfminer(path: Path) -> List[Tuple[int, str]]: text_all = pdfminer_extract_text(str(path)) or "" pages = re.split(r"\f", text_all) out: List[Tuple[int, str]] = [] for i, tx in enumerate(pages): out.append((i + 1, _normalize_text(tx))) return out def _extract_with_pypdf(path: Path) -> List[Tuple[int, str]]: if PdfReader is None: return [] try: r = PdfReader(str(path), strict=False) except Exception as e: print(f"[WARN] pypdf: falha ao abrir {path.name}: {e}") return [] out: List[Tuple[int, str]] = [] for i in range(len(r.pages)): try: page = r.pages[i] t = page.extract_text() or "" except Exception as e: print(f"[WARN] pypdf: falha ao extrair pag {i+1}: {e}") t = "" out.append((i + 1, _normalize_text(t))) return out def _maybe_ocr_images(path: Path) -> List[Tuple[int, str]]: try: import pytesseract from pdf2image import convert_from_path except Exception: return [] out: List[Tuple[int, str]] = [] try: images = convert_from_path(str(path)) for i, img in enumerate(images): txt = pytesseract.image_to_string(img) or "" out.append((i + 1, _normalize_text(txt))) except Exception as e: print(f"[WARN] OCR: falha ao converter/ler {path.name}: {e}") return out def read_pdf_pages(path: Path) -> List[Tuple[int, str]]: if not path.exists(): return [] # 1) PyMuPDF if fitz is not None: try: pages = _extract_with_pymupdf(path) if any((tx or "").strip() for _, tx in pages): return pages except Exception as e: print(f"[WARN] PyMuPDF falhou: {e}") # 2) pdfminer if pdfminer_extract_text is not None: try: pages = _extract_with_pdfminer(path) if any((tx or "").strip() for _, tx in pages): return pages except Exception as e: print(f"[WARN] pdfminer falhou: {e}") # 3) pypdf try: pages = _extract_with_pypdf(path) if any((tx or "").strip() for _, tx in pages): return pages except Exception as e: print(f"[WARN] pypdf falhou: {e}") # 4) OCR quando nada foi extraído ocr_pages = _maybe_ocr_images(path) if not any((tx or "").strip() for _, tx in ocr_pages): print("[ERRO] Nenhum texto extraído, nem com OCR.") return ocr_pages # ============================== # Segmentação (detecção de seções para metadados) # ============================== DB2_HEADER_RE = re.compile( r"^(Part\s+\d+\.|Chapter\s+\d+\.)|" r"\b(BACKUP SYSTEM|CATMAINT|CHECK DATA|CHECK INDEX|CHECK LOB|COPY|COPYTOCOPY|DIAGNOSE|LISTDEF|LOAD|" r"MERGECOPY|MODIFY RECOVERY|MODIFY STATISTICS|OPTIONS|QUIESCE|REBUILD INDEX|RECOVER|REORG INDEX|REORG TABLESPACE|" r"REPAIR|REPORT|RESTORE SYSTEM|RUNSTATS|STOSPACE|TEMPLATE|UNLOAD)\b", re.IGNORECASE ) def split_db2_docs(pages: List[Tuple[int, str]], doc_label: str) -> List[Dict]: """Agrupa páginas por possíveis cabeçalhos (capítulos/utilities) para compor metadados de seção.""" blocks: List[Dict] = [] current = {"doc": doc_label, "section": "INTRO", "start_page": 1, "texts": []} for pg, tx in pages: head = (tx or "")[:300] if DB2_HEADER_RE.search(head): if current["texts"]: current["end_page"] = current["texts"][-1][0] blocks.append(current) m = re.search(r"(Chapter\s+\d+\.\s*[^\n]+|^[^\n]{1,200})", tx or "") title = (m.group(1).strip() if m else f"Section@{pg}") current = {"doc": doc_label, "section": title, "start_page": pg, "texts": []} current["texts"].append((pg, tx or "")) if current["texts"]: current["end_page"] = current["texts"][-1][0] blocks.append(current) return blocks # ============================== # Chunkização por caracteres (robusta) # ============================== def make_chunks_by_chars(blocks: List[Dict], max_chars: int = 1500, min_chars: int = 180) -> List[Dict]: """Concatena o texto das páginas de cada bloco e fatia por janelas de caracteres com overlap.""" out: List[Dict] = [] for b in blocks: pieces: List[str] = [] pages: List[int] = [] for pg, tx in b["texts"]: txn = _normalize_text(tx or "") if txn: pieces.append(txn) pages.append(pg) if not pieces: continue blob = "\n".join(pieces).strip() if not blob: continue start_page = min(pages) if pages else b.get("start_page", 0) end_page = max(pages) if pages else b.get("end_page", start_page) if len(blob) <= max_chars and len(blob) >= min_chars: out.append({ "doc": b["doc"], "section": b["section"], "start_page": start_page, "end_page": end_page, "text": blob }) continue overlap = 120 i, n = 0, len(blob) while i < n: j = min(i + max_chars, n) chunk_text = blob[i:j].strip() if len(chunk_text) >= min_chars: out.append({ "doc": b["doc"], "section": b["section"], "start_page": start_page, "end_page": end_page, "text": chunk_text }) new_i = j - overlap i = j if new_i <= i else new_i # filtro final out = [c for c in out if (c.get("text") or "").strip()] return out # ============================== # Embeddings # ============================== def embed_texts(texts: List[str], batch_size: int = 16) -> np.ndarray: client = get_client() clean = [(i, t) for i, t in enumerate(texts) if isinstance(t, str) and t.strip()] if not clean: return np.zeros((0, 0), dtype=np.float32) order, payload = zip(*clean) vecs: Dict[int, np.ndarray] = {} for i in range(0, len(payload), batch_size): batch = list(payload[i:i + batch_size]) resp = client.embeddings.create(model=EMBED_MODEL, input=batch) for k, item in enumerate(resp.data): vecs[int(order[i + k])] = np.array(item.embedding, dtype=np.float32) rows: List[np.ndarray] = [] for idx in range(len(texts)): if idx in vecs: rows.append(vecs[idx]) if not rows: return np.zeros((0, 0), dtype=np.float32) mat = np.vstack(rows).astype(np.float32) norms = np.linalg.norm(mat, axis=1, keepdims=True) norms[norms == 0] = 1.0 return mat / norms def embed_query(q: str) -> np.ndarray: client = get_client() resp = client.embeddings.create(model=EMBED_MODEL, input=[q]) v = np.array(resp.data[0].embedding, dtype=np.float32) n = np.linalg.norm(v) return (v / (n if n > 0 else 1.0)).astype(np.float32) # ============================== # Indexação # ============================== def build_index() -> Tuple[np.ndarray, List[Dict]]: all_blocks: List[Dict] = [] for p in PDFS: pages = read_pdf_pages(p) if not pages or not any((tx or "").strip() for _, tx in pages): print(f"[WARN] Sem texto legível em {p.name}; ignorando.") continue blks = split_db2_docs(pages, p.name) all_blocks.extend(blks) all_chunks = make_chunks_by_chars(all_blocks, max_chars=1500, min_chars=180) all_chunks = [c for c in all_chunks if (c.get("text") or "").strip()] if not all_chunks: with open(META_FILE, "w", encoding="utf-8") as f: json.dump({"chunks": [], "embed_model": EMBED_MODEL, "embed_dim": 0, "total_chars": 0}, f, ensure_ascii=False, indent=2) np.save(VEC_FILE, np.zeros((0, 0), dtype=np.float32)) raise RuntimeError("Nenhum chunk foi criado. Verifique extração/ OCR.") texts = [c["text"] for c in all_chunks] total_chars = sum(len(t) for t in texts) mat = embed_texts(texts) if texts else np.zeros((0, 0), dtype=np.float32) embed_dim = int(mat.shape[1]) if mat.size else 0 np.save(VEC_FILE, mat) with open(META_FILE, "w", encoding="utf-8") as f: json.dump( {"chunks": all_chunks, "embed_model": EMBED_MODEL, "embed_dim": embed_dim, "total_chars": total_chars}, f, ensure_ascii=False, indent=2 ) return mat, all_chunks def load_index() -> Tuple[np.ndarray, List[Dict]]: if VEC_FILE.exists() and META_FILE.exists(): mat = np.load(VEC_FILE) dd = json.loads(META_FILE.read_text(encoding="utf-8")) chunks = dd.get("chunks", []) return mat, chunks return build_index() def wipe_index() -> str: try: if INDEX_DIR.exists(): for p in INDEX_DIR.glob("*"): p.unlink() INDEX_DIR.rmdir() INDEX_DIR.mkdir(exist_ok=True) return "Índice limpo." except Exception as e: return f"Erro ao limpar índice: {e}" # ============================== # Recuperação + LLM # ============================== def _check_embed_dim(mat: np.ndarray) -> Optional[str]: try: dd = json.loads(META_FILE.read_text(encoding="utf-8")) idx_dim = int(dd.get("embed_dim", 0)) except Exception: idx_dim = 0 try: v = embed_query("dim_test") cur_dim = int(v.shape[0]) except Exception as e: return f"Falha ao checar dimensão do embedding: {e}" if idx_dim and cur_dim and idx_dim != cur_dim: return (f"Incompatibilidade de dimensão do embedding: índice={idx_dim}, modelo atual={cur_dim}. " f"Reindexe com o mesmo EMBED_MODEL. (Atual EMBED_MODEL: {EMBED_MODEL})") return None def retrieve_topk(query: str, doc_filter: Optional[str] = None, k: int = TOP_K_RETRIEVE) -> List[Dict]: mat, chunks = load_index() if mat.shape[0] == 0 or not chunks: return [] qv = embed_query(query) if mat.shape[1] != qv.shape[0]: raise RuntimeError( f"Dimensão incompatível mat={mat.shape} vs query={qv.shape}. " f"Provável troca de EMBED_MODEL após criar o índice. Clique 'Reindexar'." ) sims = (mat @ qv).astype(float) if doc_filter and doc_filter != "(Todos)": mask = np.array([1.0 if c["doc"] == doc_filter else 0.0 for c in chunks], dtype=float) sims *= mask idxs = np.argsort(-sims)[:k] out = [] for i in idxs: c = chunks[int(i)] out.append({ "doc": c["doc"], "section": c.get("section", ""), "start_page": c.get("start_page", "?"), "end_page": c.get("end_page", "?"), "text": c["text"], "score": float(sims[int(i)]), "idx": int(i) }) return out def expand_context(hits: List[Dict], all_chunks: List[Dict], target_chars: int = TARGET_CONTEXT_CHARS) -> Tuple[str, List[Tuple[str, str, str]]]: if not hits: return "", [] best = max(hits, key=lambda x: x["score"]) ctx = best["text"] srcs = {(best["doc"], best["section"], f"{best['start_page']}–{best['end_page']}")} doc, section, best_idx = best["doc"], best["section"], best["idx"] indices = [i for i, c in enumerate(all_chunks) if c["doc"] == doc and c.get("section", "") == section] if not indices: return ctx, sorted(list(srcs)) indices.sort() if best_idx not in indices: return ctx, sorted(list(srcs)) pos = indices.index(best_idx) left, right = pos - 1, pos + 1 while len(ctx) < target_chars and (left >= 0 or right < len(indices)): if right < len(indices) and len(ctx) < target_chars: rch = all_chunks[indices[right]] ctx += "\n\n" + rch["text"] srcs.add((doc, section, f"{rch.get('start_page', '?')}–{rch.get('end_page', '?')}")) right += 1 if left >= 0 and len(ctx) < target_chars: lch = all_chunks[indices[left]] ctx = lch["text"] + "\n\n" + ctx srcs.add((doc, section, f"{lch.get('start_page', '?')}–{lch.get('end_page', '?')}")) left -= 1 return ctx, sorted(list(srcs)) def answer_with_llm(question: str, context: str) -> str: client = get_client() system = ("Você é um assistente especialista em IBM Db2 para z/OS. " "Responda em português, com exemplos de comandos SQL/JCL completos e corretos. " "Use apenas o contexto fornecido; se algo não estiver nele, diga que não está disponível.") user = (f"Pergunta:\n{question}\n\n" f"Contexto do(s) manual(is):\n{context}\n\n" "Regras de resposta:\n" "- Explique o necessário e como fazer.\n" "- Inclua pelo menos um exemplo de comando Db2 utilitário, SQL ou JCL (auto-contido), se aplicável.\n" "- Liste observações/pré-requisitos, se houver.\n" "- Cite as fontes (Documento e páginas) ao final.") chat = client.chat.completions.create( model=CHAT_MODEL, messages=[{"role": "system", "content": system}, {"role": "user", "content": user}], temperature=0.2, ) return chat.choices[0].message.content.strip() def format_sources_md(sources: List[Tuple[str, str, str]]) -> str: if not sources: return "" lines = [ f"- **Documento:** {d} \n **Seção:** {s} \n **Páginas:** {p}" for (d, s, p) in sources ] return "\n".join(lines) # ============================== # Templates Db2 (exemplos) # ============================== DB2_TEMPLATES: Dict[str, str] = { "RUNSTATS_TABLESPACE": ( "//RUNSTAT JOB (ACCT),'RUNSTATS',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='RUNSTATS',UTPROC=''\n" "//SYSIN DD *\n" " RUNSTATS TABLESPACE(DBNAME.TSNAME) TABLE(ALL) INDEX(ALL)\n" "/*\n" ), "REORG_TABLESPACE": ( "//REORG JOB (ACCT),'REORG',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='REORGTS',UTPROC=''\n" "//SYSIN DD *\n" " REORG TABLESPACE(DBNAME.TSNAME) SHRLEVEL CHANGE\n" "/*\n" ), "EXPLAIN_SQL": ( "//EXPLAIN JOB (ACCT),'EXPLAIN',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNTEP2,SYSTEM=DSN1\n" "//SYSIN DD *\n" " EXPLAIN PLAN FOR\n" " SELECT COL1, COL2 FROM DBNAME.TBNAME WHERE COL3 = 'X';\n" "/*\n" ), "DISPLAY_BUFFERPOOL": ( "//DISPBP JOB (ACCT),'DISPLAY BP',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC PGM=IKJEFT01\n" "//SYSTSPRT DD SYSOUT=*\n" "//SYSIN DD *\n" " DSN SYSTEM(DSN1)\n" " -DISPLAY BUFFERPOOL(BP0) DETAIL\n" " END\n" "/*\n" ), "DSNTEP2_SELECT": ( "//SELECT JOB (ACCT),'DSNTEP2',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNTEP2,SYSTEM=DSN1\n" "//SYSIN DD *\n" " SELECT FIRSTNME, LASTNAME FROM DSN8810.EMP\n" " WHERE WORKDEPT = 'A00';\n" "/*\n" ), "COPY_TABLESPACE": ( "//COPYTS JOB (ACCT),'COPY',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='COPYTS',UTPROC=''\n" "//SYSIN DD *\n" " COPY TABLESPACE(DBNAME.TSNAME) FULL YES SHRLEVEL CHANGE\n" "/*\n" ), "LOAD_TABLE": ( "//LOADTBL JOB (ACCT),'LOAD',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='LOADTBL',UTPROC=''\n" "//SYSIN DD *\n" " LOAD DATA INDDN SYSREC INTO TABLE DBNAME.TBNAME\n" " REPLACE\n" "/*\n" ), "RECOVER_TABLESPACE": ( "//RECOVTS JOB (ACCT),'RECOVER',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='RECOVTS',UTPROC=''\n" "//SYSIN DD *\n" " RECOVER TABLESPACE(DBNAME.TSNAME)\n" "/*\n" ), "STATS_INDEX": ( "//STATSIX JOB (ACCT),'STATS INDEX',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='STATSIX',UTPROC=''\n" "//SYSIN DD *\n" " RUNSTATS INDEX(DBNAME.IXNAME) ALL\n" "/*\n" ), "MODIFY_RECOVERY": ( "//MODREC JOB (ACCT),'MODIFY RECOVERY',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='MODREC',UTPROC=''\n" "//SYSIN DD *\n" " MODIFY RECOVERY TABLESPACE(DBNAME.TSNAME) AGE(30)\n" "/*\n" ), "CHECK_DATA": ( "//CHKDATA JOB (ACCT),'CHECK DATA',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='CHKDATA',UTPROC=''\n" "//SYSIN DD *\n" " CHECK DATA TABLESPACE(DBNAME.TSNAME) SCOPE ALL\n" "/*\n" ), } def template_for_db2(cmd: str) -> str: return DB2_TEMPLATES.get(cmd, "//GENERIC ...\n") # ============================== # UI — layout com SIDEBAR + diagnóstico # ============================== CUSTOM_CSS = """ :root{ --ink:#0f172a; --muted:#475569; } .gradio-container { max-width: 1200px !important; margin: 0 auto !important; } .section-card { background: #fff; border: 1px solid #e2e8f0; border-radius: 16px; padding: 16px; box-shadow: 0 10px 30px rgba(2,6,23,.05); } .section-title { font-size: 1.05rem; font-weight: 800; color: var(--ink); display: flex; gap: .6rem; align-items: center; } .subtitle { color:var(--muted); font-size:.95rem; margin-top:.25rem; } .result-card { background:#fcfdff; border:1px solid #e2e8f0; border-radius:12px; padding:12px; } hr.sep { border:none; border-top:1px dashed #e2e8f0; margin:10px 0 14px; } .small { font-size:.9rem; color:var(--muted); } """ def build_app(): doc_label = PDFS[0].name if PDFS else "(Nenhum PDF)" all_doc_names = [p.name for p in PDFS] or ["(Nenhum PDF encontrado)"] default_doc = all_doc_names[0] if all_doc_names else "(Todos)" with gr.Blocks(title="Db2 z/OS • RAG (NVIDIA NIM)", css=CUSTOM_CSS, fill_height=True) as demo: # ===== Sidebar ===== with gr.Sidebar(): gr.Markdown( f"""
{doc_label}{doc_label}.