Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| # Db2 z/OS • RAG (NVIDIA NIM) | |
| # Sidebar + Diagnóstico + Extração robusta (com OCR opcional) + Chunkização por caracteres | |
| # Requisitos: | |
| # pip install openai gradio numpy pypdf pdfminer.six pymupdf | |
| # (OCR opcional) + dependências de SO: | |
| # pip install pytesseract pdf2image pillow | |
| # Linux: apt-get install -y tesseract-ocr poppler-utils | |
| import os | |
| import re | |
| import json | |
| from pathlib import Path | |
| from typing import List, Tuple, Dict, Optional | |
| import numpy as np | |
| import gradio as gr | |
| # ============================== | |
| # Config (Db2 + NVIDIA NIM) | |
| # ============================== | |
| BASE_DIR = Path(__file__).parent if "__file__" in globals() else Path.cwd() | |
| # Altere se necessário; o app também aceita PDFs no diretório atual (*.pdf) | |
| USER_PDF = Path("db2z_13_utilities.pdf") | |
| PDFS = [USER_PDF] if USER_PDF.exists() else sorted([p for p in BASE_DIR.glob("*.pdf") if p.is_file() and p.suffix.lower()==".pdf"]) | |
| NVCF_BASE = os.getenv("NVCF_BASE", "https://integrate.api.nvidia.com/v1") | |
| NVCF_API_KEY = os.getenv("NVCF_API_KEY", "") | |
| EMBED_MODEL = os.getenv("EMBED_MODEL", "nvidia/nv-embed-v1") # 4096-dim atualmente | |
| CHAT_MODEL = os.getenv("CHAT_MODEL", "meta/llama-3.1-8b-instruct") | |
| SAFE_IDX = f".db2_index_{EMBED_MODEL.replace('/','__')}" | |
| INDEX_DIR = BASE_DIR / SAFE_IDX | |
| INDEX_DIR.mkdir(exist_ok=True) | |
| VEC_FILE = INDEX_DIR / "vectors.npy" | |
| META_FILE = INDEX_DIR / "meta.json" | |
| TOP_K_RETRIEVE = 3 | |
| TARGET_CONTEXT_CHARS = 1500 | |
| # ============================== | |
| # NVIDIA (OpenAI-compatible) | |
| # ============================== | |
| try: | |
| from openai import OpenAI | |
| except ImportError: | |
| raise RuntimeError("Instale: pip install openai gradio numpy pypdf pdfminer.six pymupdf") | |
| def get_client(): | |
| if not NVCF_API_KEY or NVCF_API_KEY == "xxxxxxxxxxxxxxxxxxx": | |
| raise RuntimeError("NVCF_API_KEY não definido. export/set NVCF_API_KEY='SUA_CHAVE'") | |
| return OpenAI(base_url=NVCF_BASE, api_key=NVCF_API_KEY) | |
| # ============================== | |
| # PDF utils (robusto + OCR opcional) | |
| # ============================== | |
| try: | |
| import fitz # PyMuPDF | |
| except Exception: | |
| fitz = None | |
| try: | |
| from pdfminer.high_level import extract_text as pdfminer_extract_text | |
| except Exception: | |
| pdfminer_extract_text = None | |
| try: | |
| from pypdf import PdfReader | |
| except Exception: | |
| PdfReader = None | |
| NBSP = "\u00A0" | |
| def _normalize_text(t: str) -> str: | |
| if not isinstance(t, str): | |
| t = str(t or "") | |
| t = t.replace(NBSP, " ") | |
| t = re.sub(r"[\u0000-\u001F]", " ", t) # remove controles | |
| # mantém quebras simples e colapsa espaços longos | |
| t = re.sub(r"[ \t]{2,}", " ", t) | |
| t = re.sub(r"\n{3,}", "\n\n", t) | |
| return t.strip() | |
| def _extract_with_pymupdf(path: Path) -> List[Tuple[int, str]]: | |
| out: List[Tuple[int, str]] = [] | |
| with fitz.open(path) as doc: | |
| for i, page in enumerate(doc): | |
| text = page.get_text("text") or "" | |
| out.append((i + 1, _normalize_text(text))) | |
| return out | |
| def _extract_with_pdfminer(path: Path) -> List[Tuple[int, str]]: | |
| text_all = pdfminer_extract_text(str(path)) or "" | |
| pages = re.split(r"\f", text_all) | |
| out: List[Tuple[int, str]] = [] | |
| for i, tx in enumerate(pages): | |
| out.append((i + 1, _normalize_text(tx))) | |
| return out | |
| def _extract_with_pypdf(path: Path) -> List[Tuple[int, str]]: | |
| if PdfReader is None: | |
| return [] | |
| try: | |
| r = PdfReader(str(path), strict=False) | |
| except Exception as e: | |
| print(f"[WARN] pypdf: falha ao abrir {path.name}: {e}") | |
| return [] | |
| out: List[Tuple[int, str]] = [] | |
| for i in range(len(r.pages)): | |
| try: | |
| page = r.pages[i] | |
| t = page.extract_text() or "" | |
| except Exception as e: | |
| print(f"[WARN] pypdf: falha ao extrair pag {i+1}: {e}") | |
| t = "" | |
| out.append((i + 1, _normalize_text(t))) | |
| return out | |
| def _maybe_ocr_images(path: Path) -> List[Tuple[int, str]]: | |
| try: | |
| import pytesseract | |
| from pdf2image import convert_from_path | |
| except Exception: | |
| return [] | |
| out: List[Tuple[int, str]] = [] | |
| try: | |
| images = convert_from_path(str(path)) | |
| for i, img in enumerate(images): | |
| txt = pytesseract.image_to_string(img) or "" | |
| out.append((i + 1, _normalize_text(txt))) | |
| except Exception as e: | |
| print(f"[WARN] OCR: falha ao converter/ler {path.name}: {e}") | |
| return out | |
| def read_pdf_pages(path: Path) -> List[Tuple[int, str]]: | |
| if not path.exists(): | |
| return [] | |
| # 1) PyMuPDF | |
| if fitz is not None: | |
| try: | |
| pages = _extract_with_pymupdf(path) | |
| if any((tx or "").strip() for _, tx in pages): | |
| return pages | |
| except Exception as e: | |
| print(f"[WARN] PyMuPDF falhou: {e}") | |
| # 2) pdfminer | |
| if pdfminer_extract_text is not None: | |
| try: | |
| pages = _extract_with_pdfminer(path) | |
| if any((tx or "").strip() for _, tx in pages): | |
| return pages | |
| except Exception as e: | |
| print(f"[WARN] pdfminer falhou: {e}") | |
| # 3) pypdf | |
| try: | |
| pages = _extract_with_pypdf(path) | |
| if any((tx or "").strip() for _, tx in pages): | |
| return pages | |
| except Exception as e: | |
| print(f"[WARN] pypdf falhou: {e}") | |
| # 4) OCR quando nada foi extraído | |
| ocr_pages = _maybe_ocr_images(path) | |
| if not any((tx or "").strip() for _, tx in ocr_pages): | |
| print("[ERRO] Nenhum texto extraído, nem com OCR.") | |
| return ocr_pages | |
| # ============================== | |
| # Segmentação (detecção de seções para metadados) | |
| # ============================== | |
| DB2_HEADER_RE = re.compile( | |
| r"^(Part\s+\d+\.|Chapter\s+\d+\.)|" | |
| r"\b(BACKUP SYSTEM|CATMAINT|CHECK DATA|CHECK INDEX|CHECK LOB|COPY|COPYTOCOPY|DIAGNOSE|LISTDEF|LOAD|" | |
| r"MERGECOPY|MODIFY RECOVERY|MODIFY STATISTICS|OPTIONS|QUIESCE|REBUILD INDEX|RECOVER|REORG INDEX|REORG TABLESPACE|" | |
| r"REPAIR|REPORT|RESTORE SYSTEM|RUNSTATS|STOSPACE|TEMPLATE|UNLOAD)\b", | |
| re.IGNORECASE | |
| ) | |
| def split_db2_docs(pages: List[Tuple[int, str]], doc_label: str) -> List[Dict]: | |
| """Agrupa páginas por possíveis cabeçalhos (capítulos/utilities) para compor metadados de seção.""" | |
| blocks: List[Dict] = [] | |
| current = {"doc": doc_label, "section": "INTRO", "start_page": 1, "texts": []} | |
| for pg, tx in pages: | |
| head = (tx or "")[:300] | |
| if DB2_HEADER_RE.search(head): | |
| if current["texts"]: | |
| current["end_page"] = current["texts"][-1][0] | |
| blocks.append(current) | |
| m = re.search(r"(Chapter\s+\d+\.\s*[^\n]+|^[^\n]{1,200})", tx or "") | |
| title = (m.group(1).strip() if m else f"Section@{pg}") | |
| current = {"doc": doc_label, "section": title, "start_page": pg, "texts": []} | |
| current["texts"].append((pg, tx or "")) | |
| if current["texts"]: | |
| current["end_page"] = current["texts"][-1][0] | |
| blocks.append(current) | |
| return blocks | |
| # ============================== | |
| # Chunkização por caracteres (robusta) | |
| # ============================== | |
| def make_chunks_by_chars(blocks: List[Dict], max_chars: int = 1500, min_chars: int = 180) -> List[Dict]: | |
| """Concatena o texto das páginas de cada bloco e fatia por janelas de caracteres com overlap.""" | |
| out: List[Dict] = [] | |
| for b in blocks: | |
| pieces: List[str] = [] | |
| pages: List[int] = [] | |
| for pg, tx in b["texts"]: | |
| txn = _normalize_text(tx or "") | |
| if txn: | |
| pieces.append(txn) | |
| pages.append(pg) | |
| if not pieces: | |
| continue | |
| blob = "\n".join(pieces).strip() | |
| if not blob: | |
| continue | |
| start_page = min(pages) if pages else b.get("start_page", 0) | |
| end_page = max(pages) if pages else b.get("end_page", start_page) | |
| if len(blob) <= max_chars and len(blob) >= min_chars: | |
| out.append({ | |
| "doc": b["doc"], | |
| "section": b["section"], | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| "text": blob | |
| }) | |
| continue | |
| overlap = 120 | |
| i, n = 0, len(blob) | |
| while i < n: | |
| j = min(i + max_chars, n) | |
| chunk_text = blob[i:j].strip() | |
| if len(chunk_text) >= min_chars: | |
| out.append({ | |
| "doc": b["doc"], | |
| "section": b["section"], | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| "text": chunk_text | |
| }) | |
| new_i = j - overlap | |
| i = j if new_i <= i else new_i | |
| # filtro final | |
| out = [c for c in out if (c.get("text") or "").strip()] | |
| return out | |
| # ============================== | |
| # Embeddings | |
| # ============================== | |
| def embed_texts(texts: List[str], batch_size: int = 16) -> np.ndarray: | |
| client = get_client() | |
| clean = [(i, t) for i, t in enumerate(texts) if isinstance(t, str) and t.strip()] | |
| if not clean: | |
| return np.zeros((0, 0), dtype=np.float32) | |
| order, payload = zip(*clean) | |
| vecs: Dict[int, np.ndarray] = {} | |
| for i in range(0, len(payload), batch_size): | |
| batch = list(payload[i:i + batch_size]) | |
| resp = client.embeddings.create(model=EMBED_MODEL, input=batch) | |
| for k, item in enumerate(resp.data): | |
| vecs[int(order[i + k])] = np.array(item.embedding, dtype=np.float32) | |
| rows: List[np.ndarray] = [] | |
| for idx in range(len(texts)): | |
| if idx in vecs: | |
| rows.append(vecs[idx]) | |
| if not rows: | |
| return np.zeros((0, 0), dtype=np.float32) | |
| mat = np.vstack(rows).astype(np.float32) | |
| norms = np.linalg.norm(mat, axis=1, keepdims=True) | |
| norms[norms == 0] = 1.0 | |
| return mat / norms | |
| def embed_query(q: str) -> np.ndarray: | |
| client = get_client() | |
| resp = client.embeddings.create(model=EMBED_MODEL, input=[q]) | |
| v = np.array(resp.data[0].embedding, dtype=np.float32) | |
| n = np.linalg.norm(v) | |
| return (v / (n if n > 0 else 1.0)).astype(np.float32) | |
| # ============================== | |
| # Indexação | |
| # ============================== | |
| def build_index() -> Tuple[np.ndarray, List[Dict]]: | |
| all_blocks: List[Dict] = [] | |
| for p in PDFS: | |
| pages = read_pdf_pages(p) | |
| if not pages or not any((tx or "").strip() for _, tx in pages): | |
| print(f"[WARN] Sem texto legível em {p.name}; ignorando.") | |
| continue | |
| blks = split_db2_docs(pages, p.name) | |
| all_blocks.extend(blks) | |
| all_chunks = make_chunks_by_chars(all_blocks, max_chars=1500, min_chars=180) | |
| all_chunks = [c for c in all_chunks if (c.get("text") or "").strip()] | |
| if not all_chunks: | |
| with open(META_FILE, "w", encoding="utf-8") as f: | |
| json.dump({"chunks": [], "embed_model": EMBED_MODEL, "embed_dim": 0, "total_chars": 0}, f, ensure_ascii=False, indent=2) | |
| np.save(VEC_FILE, np.zeros((0, 0), dtype=np.float32)) | |
| raise RuntimeError("Nenhum chunk foi criado. Verifique extração/ OCR.") | |
| texts = [c["text"] for c in all_chunks] | |
| total_chars = sum(len(t) for t in texts) | |
| mat = embed_texts(texts) if texts else np.zeros((0, 0), dtype=np.float32) | |
| embed_dim = int(mat.shape[1]) if mat.size else 0 | |
| np.save(VEC_FILE, mat) | |
| with open(META_FILE, "w", encoding="utf-8") as f: | |
| json.dump( | |
| {"chunks": all_chunks, "embed_model": EMBED_MODEL, "embed_dim": embed_dim, "total_chars": total_chars}, | |
| f, ensure_ascii=False, indent=2 | |
| ) | |
| return mat, all_chunks | |
| def load_index() -> Tuple[np.ndarray, List[Dict]]: | |
| if VEC_FILE.exists() and META_FILE.exists(): | |
| mat = np.load(VEC_FILE) | |
| dd = json.loads(META_FILE.read_text(encoding="utf-8")) | |
| chunks = dd.get("chunks", []) | |
| return mat, chunks | |
| return build_index() | |
| def wipe_index() -> str: | |
| try: | |
| if INDEX_DIR.exists(): | |
| for p in INDEX_DIR.glob("*"): | |
| p.unlink() | |
| INDEX_DIR.rmdir() | |
| INDEX_DIR.mkdir(exist_ok=True) | |
| return "Índice limpo." | |
| except Exception as e: | |
| return f"Erro ao limpar índice: {e}" | |
| # ============================== | |
| # Recuperação + LLM | |
| # ============================== | |
| def _check_embed_dim(mat: np.ndarray) -> Optional[str]: | |
| try: | |
| dd = json.loads(META_FILE.read_text(encoding="utf-8")) | |
| idx_dim = int(dd.get("embed_dim", 0)) | |
| except Exception: | |
| idx_dim = 0 | |
| try: | |
| v = embed_query("dim_test") | |
| cur_dim = int(v.shape[0]) | |
| except Exception as e: | |
| return f"Falha ao checar dimensão do embedding: {e}" | |
| if idx_dim and cur_dim and idx_dim != cur_dim: | |
| return (f"Incompatibilidade de dimensão do embedding: índice={idx_dim}, modelo atual={cur_dim}. " | |
| f"Reindexe com o mesmo EMBED_MODEL. (Atual EMBED_MODEL: {EMBED_MODEL})") | |
| return None | |
| def retrieve_topk(query: str, doc_filter: Optional[str] = None, k: int = TOP_K_RETRIEVE) -> List[Dict]: | |
| mat, chunks = load_index() | |
| if mat.shape[0] == 0 or not chunks: | |
| return [] | |
| qv = embed_query(query) | |
| if mat.shape[1] != qv.shape[0]: | |
| raise RuntimeError( | |
| f"Dimensão incompatível mat={mat.shape} vs query={qv.shape}. " | |
| f"Provável troca de EMBED_MODEL após criar o índice. Clique 'Reindexar'." | |
| ) | |
| sims = (mat @ qv).astype(float) | |
| if doc_filter and doc_filter != "(Todos)": | |
| mask = np.array([1.0 if c["doc"] == doc_filter else 0.0 for c in chunks], dtype=float) | |
| sims *= mask | |
| idxs = np.argsort(-sims)[:k] | |
| out = [] | |
| for i in idxs: | |
| c = chunks[int(i)] | |
| out.append({ | |
| "doc": c["doc"], | |
| "section": c.get("section", ""), | |
| "start_page": c.get("start_page", "?"), | |
| "end_page": c.get("end_page", "?"), | |
| "text": c["text"], | |
| "score": float(sims[int(i)]), | |
| "idx": int(i) | |
| }) | |
| return out | |
| def expand_context(hits: List[Dict], all_chunks: List[Dict], target_chars: int = TARGET_CONTEXT_CHARS) -> Tuple[str, List[Tuple[str, str, str]]]: | |
| if not hits: | |
| return "", [] | |
| best = max(hits, key=lambda x: x["score"]) | |
| ctx = best["text"] | |
| srcs = {(best["doc"], best["section"], f"{best['start_page']}–{best['end_page']}")} | |
| doc, section, best_idx = best["doc"], best["section"], best["idx"] | |
| indices = [i for i, c in enumerate(all_chunks) if c["doc"] == doc and c.get("section", "") == section] | |
| if not indices: | |
| return ctx, sorted(list(srcs)) | |
| indices.sort() | |
| if best_idx not in indices: | |
| return ctx, sorted(list(srcs)) | |
| pos = indices.index(best_idx) | |
| left, right = pos - 1, pos + 1 | |
| while len(ctx) < target_chars and (left >= 0 or right < len(indices)): | |
| if right < len(indices) and len(ctx) < target_chars: | |
| rch = all_chunks[indices[right]] | |
| ctx += "\n\n" + rch["text"] | |
| srcs.add((doc, section, f"{rch.get('start_page', '?')}–{rch.get('end_page', '?')}")) | |
| right += 1 | |
| if left >= 0 and len(ctx) < target_chars: | |
| lch = all_chunks[indices[left]] | |
| ctx = lch["text"] + "\n\n" + ctx | |
| srcs.add((doc, section, f"{lch.get('start_page', '?')}–{lch.get('end_page', '?')}")) | |
| left -= 1 | |
| return ctx, sorted(list(srcs)) | |
| def answer_with_llm(question: str, context: str) -> str: | |
| client = get_client() | |
| system = ("Você é um assistente especialista em IBM Db2 para z/OS. " | |
| "Responda em português, com exemplos de comandos SQL/JCL completos e corretos. " | |
| "Use apenas o contexto fornecido; se algo não estiver nele, diga que não está disponível.") | |
| user = (f"Pergunta:\n{question}\n\n" | |
| f"Contexto do(s) manual(is):\n{context}\n\n" | |
| "Regras de resposta:\n" | |
| "- Explique o necessário e como fazer.\n" | |
| "- Inclua pelo menos um exemplo de comando Db2 utilitário, SQL ou JCL (auto-contido), se aplicável.\n" | |
| "- Liste observações/pré-requisitos, se houver.\n" | |
| "- Cite as fontes (Documento e páginas) ao final.") | |
| chat = client.chat.completions.create( | |
| model=CHAT_MODEL, | |
| messages=[{"role": "system", "content": system}, {"role": "user", "content": user}], | |
| temperature=0.2, | |
| ) | |
| return chat.choices[0].message.content.strip() | |
| def format_sources_md(sources: List[Tuple[str, str, str]]) -> str: | |
| if not sources: | |
| return "" | |
| lines = [ | |
| f"- **Documento:** {d} \n **Seção:** {s} \n **Páginas:** {p}" | |
| for (d, s, p) in sources | |
| ] | |
| return "\n".join(lines) | |
| # ============================== | |
| # Templates Db2 (exemplos) | |
| # ============================== | |
| DB2_TEMPLATES: Dict[str, str] = { | |
| "RUNSTATS_TABLESPACE": ( | |
| "//RUNSTAT JOB (ACCT),'RUNSTATS',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='RUNSTATS',UTPROC=''\n" | |
| "//SYSIN DD *\n" | |
| " RUNSTATS TABLESPACE(DBNAME.TSNAME) TABLE(ALL) INDEX(ALL)\n" | |
| "/*\n" | |
| ), | |
| "REORG_TABLESPACE": ( | |
| "//REORG JOB (ACCT),'REORG',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='REORGTS',UTPROC=''\n" | |
| "//SYSIN DD *\n" | |
| " REORG TABLESPACE(DBNAME.TSNAME) SHRLEVEL CHANGE\n" | |
| "/*\n" | |
| ), | |
| "EXPLAIN_SQL": ( | |
| "//EXPLAIN JOB (ACCT),'EXPLAIN',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNTEP2,SYSTEM=DSN1\n" | |
| "//SYSIN DD *\n" | |
| " EXPLAIN PLAN FOR\n" | |
| " SELECT COL1, COL2 FROM DBNAME.TBNAME WHERE COL3 = 'X';\n" | |
| "/*\n" | |
| ), | |
| "DISPLAY_BUFFERPOOL": ( | |
| "//DISPBP JOB (ACCT),'DISPLAY BP',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC PGM=IKJEFT01\n" | |
| "//SYSTSPRT DD SYSOUT=*\n" | |
| "//SYSIN DD *\n" | |
| " DSN SYSTEM(DSN1)\n" | |
| " -DISPLAY BUFFERPOOL(BP0) DETAIL\n" | |
| " END\n" | |
| "/*\n" | |
| ), | |
| "DSNTEP2_SELECT": ( | |
| "//SELECT JOB (ACCT),'DSNTEP2',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNTEP2,SYSTEM=DSN1\n" | |
| "//SYSIN DD *\n" | |
| " SELECT FIRSTNME, LASTNAME FROM DSN8810.EMP\n" | |
| " WHERE WORKDEPT = 'A00';\n" | |
| "/*\n" | |
| ), | |
| "COPY_TABLESPACE": ( | |
| "//COPYTS JOB (ACCT),'COPY',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='COPYTS',UTPROC=''\n" | |
| "//SYSIN DD *\n" | |
| " COPY TABLESPACE(DBNAME.TSNAME) FULL YES SHRLEVEL CHANGE\n" | |
| "/*\n" | |
| ), | |
| "LOAD_TABLE": ( | |
| "//LOADTBL JOB (ACCT),'LOAD',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='LOADTBL',UTPROC=''\n" | |
| "//SYSIN DD *\n" | |
| " LOAD DATA INDDN SYSREC INTO TABLE DBNAME.TBNAME\n" | |
| " REPLACE\n" | |
| "/*\n" | |
| ), | |
| "RECOVER_TABLESPACE": ( | |
| "//RECOVTS JOB (ACCT),'RECOVER',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='RECOVTS',UTPROC=''\n" | |
| "//SYSIN DD *\n" | |
| " RECOVER TABLESPACE(DBNAME.TSNAME)\n" | |
| "/*\n" | |
| ), | |
| "STATS_INDEX": ( | |
| "//STATSIX JOB (ACCT),'STATS INDEX',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='STATSIX',UTPROC=''\n" | |
| "//SYSIN DD *\n" | |
| " RUNSTATS INDEX(DBNAME.IXNAME) ALL\n" | |
| "/*\n" | |
| ), | |
| "MODIFY_RECOVERY": ( | |
| "//MODREC JOB (ACCT),'MODIFY RECOVERY',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='MODREC',UTPROC=''\n" | |
| "//SYSIN DD *\n" | |
| " MODIFY RECOVERY TABLESPACE(DBNAME.TSNAME) AGE(30)\n" | |
| "/*\n" | |
| ), | |
| "CHECK_DATA": ( | |
| "//CHKDATA JOB (ACCT),'CHECK DATA',CLASS=A,MSGCLASS=X,NOTIFY=&SYSUID\n" | |
| "//STEP1 EXEC DSNUPROC,SYSTEM=DSN1,UID='CHKDATA',UTPROC=''\n" | |
| "//SYSIN DD *\n" | |
| " CHECK DATA TABLESPACE(DBNAME.TSNAME) SCOPE ALL\n" | |
| "/*\n" | |
| ), | |
| } | |
| def template_for_db2(cmd: str) -> str: | |
| return DB2_TEMPLATES.get(cmd, "//GENERIC ...\n") | |
| # ============================== | |
| # UI — layout com SIDEBAR + diagnóstico | |
| # ============================== | |
| CUSTOM_CSS = """ | |
| :root{ --ink:#0f172a; --muted:#475569; } | |
| .gradio-container { max-width: 1200px !important; margin: 0 auto !important; } | |
| .section-card { background: #fff; border: 1px solid #e2e8f0; border-radius: 16px; padding: 16px; box-shadow: 0 10px 30px rgba(2,6,23,.05); } | |
| .section-title { font-size: 1.05rem; font-weight: 800; color: var(--ink); display: flex; gap: .6rem; align-items: center; } | |
| .subtitle { color:var(--muted); font-size:.95rem; margin-top:.25rem; } | |
| .result-card { background:#fcfdff; border:1px solid #e2e8f0; border-radius:12px; padding:12px; } | |
| hr.sep { border:none; border-top:1px dashed #e2e8f0; margin:10px 0 14px; } | |
| .small { font-size:.9rem; color:var(--muted); } | |
| """ | |
| def build_app(): | |
| doc_label = PDFS[0].name if PDFS else "(Nenhum PDF)" | |
| all_doc_names = [p.name for p in PDFS] or ["(Nenhum PDF encontrado)"] | |
| default_doc = all_doc_names[0] if all_doc_names else "(Todos)" | |
| with gr.Blocks(title="Db2 z/OS • RAG (NVIDIA NIM)", css=CUSTOM_CSS, fill_height=True) as demo: | |
| # ===== Sidebar ===== | |
| with gr.Sidebar(): | |
| gr.Markdown( | |
| f""" | |
| <div class="section-title">💼 Db2 RAG</div> | |
| <div class="small">Contexto: <code>{doc_label}</code></div> | |
| """ | |
| ) | |
| status_box = gr.Markdown("Pronto ✅" if PDFS else "⚠️ Nenhum PDF encontrado.") | |
| gr.Markdown("<hr class='sep'/>") | |
| with gr.Group(): | |
| gr.Markdown("**Ações**") | |
| test_btn = gr.Button("🧪 Testar conexão NVIDIA", variant="secondary") | |
| rebuild_btn = gr.Button("🔁 Reindexar (NIM)") | |
| diag_btn = gr.Button("🛠️ Diagnóstico do Índice") | |
| gr.Markdown("<hr class='sep'/>") | |
| with gr.Accordion("Configuração", open=False): | |
| gr.Markdown(f"- **Embeddings:** `{EMBED_MODEL}`\n- **LLM:** `{CHAT_MODEL}`\n- **Índice:** `{INDEX_DIR.name}`") | |
| doc_choice = gr.Dropdown( | |
| choices=(["(Todos)"] + all_doc_names), | |
| value=default_doc if PDFS else "(Todos)", | |
| label="Documento" | |
| ) | |
| # ===== Main content ===== | |
| gr.Markdown( | |
| f""" | |
| <div class="section-card" style="padding:18px; display:flex; gap:16px; align-items:center;"> | |
| <div style="font-size:26px;">🧭</div> | |
| <div style="flex:1"> | |
| <div style="font-size:1.2rem; font-weight:800; color:#0f172a;">DB2 -Z/OS UTILITIES | RAG + NVIDIA NIM</div> | |
| <div class="subtitle">Pergunte sobre utilidades (COPY, LOAD, REORG, RUNSTATS, RECOVER, etc.). As respostas vêm do manual: <code>{doc_label}</code>.</div> | |
| </div> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(): | |
| q = gr.Textbox( | |
| label="Pergunta (Db2 Utilities)", | |
| placeholder="Ex.: Como usar COPY FULL com SHRLEVEL CHANGE? • Quando rodar RUNSTATS INDEX? • REORG TABLESPACE SHRLEVEL CHANGE • RECOVER PITR...", | |
| scale=8 | |
| ) | |
| with gr.Row(): | |
| ask_btn = gr.Button("🔍 Buscar", variant="primary", scale=2) | |
| clear_btn = gr.Button("🧹 Limpar", scale=1) | |
| out = gr.Markdown(label="Resposta (Db2)") | |
| gr.Markdown("<hr class='sep'/>") | |
| with gr.Accordion("🧩 Templates Db2 executáveis", open=False): | |
| db2_choice = gr.Dropdown( | |
| choices=list(DB2_TEMPLATES.keys()), | |
| value="RUNSTATS_TABLESPACE", | |
| label="Comando / Padrão" | |
| ) | |
| db2_btn = gr.Button("📄 Gerar exemplo") | |
| db2_out = gr.Textbox(label="Exemplo (copiar/ajustar)", lines=18, show_copy_button=True) | |
| with gr.Accordion("🧪 Log / Diagnóstico", open=False): | |
| diag_out = gr.Markdown() | |
| # ===== Callbacks ===== | |
| def _test_conn(): | |
| try: | |
| dim = len(get_client().embeddings.create(model=EMBED_MODEL, input=["ping"]).data[0].embedding) | |
| return f"Conexão ok ✅ — dimensão do embedding: **{dim}**" | |
| except Exception as e: | |
| return f"⚠️ Falha na conexão/credenciais NVIDIA: `{type(e).__name__}` — {e}" | |
| def _rebuild(): | |
| try: | |
| msg = wipe_index() | |
| mat, chunks = build_index() | |
| return msg + f" Reindexação concluída ✅ PDFs: {len(PDFS)} • Chunks: {len(chunks)} • Vetores: {mat.shape}" | |
| except Exception as e: | |
| return f"⚠️ Erro ao reindexar: `{type(e).__name__}` — {e}" | |
| def _diagnose(dsel: str) -> str: | |
| try: | |
| if not (VEC_FILE.exists() and META_FILE.exists()): | |
| return "❌ Nenhum índice encontrado. Clique **Reindexar (NIM)**." | |
| mat = np.load(VEC_FILE) | |
| meta = json.loads(META_FILE.read_text(encoding="utf-8")) | |
| chunks = meta.get("chunks", []) | |
| embed_dim = meta.get("embed_dim", 0) | |
| total_chars = int(meta.get("total_chars", 0)) | |
| dim_msg = _check_embed_dim(mat) | |
| # primeiras seções | |
| first_secs = [] | |
| for c in chunks[:12]: | |
| if dsel == "(Todos)" or c["doc"] == dsel: | |
| first_secs.append(f"- {c['doc']} • {c.get('section','?')} • p.{c.get('start_page','?')}-{c.get('end_page','?')}") | |
| if not first_secs: | |
| first_secs = ["(Filtro de documento não encontra seções no índice.)"] | |
| # prévia do primeiro chunk | |
| preview = "" | |
| for c in chunks: | |
| t = (c.get("text") or "").strip() | |
| if t: | |
| preview = t[:400].replace("\n", " ") | |
| break | |
| if not preview: | |
| preview = "(Nenhum chunk contém texto — verifique extração/OCR.)" | |
| msg = [ | |
| f"**Índice**: Vetores `{mat.shape}` • embed_dim(meta): `{embed_dim}` • Modelo atual: `{EMBED_MODEL}`", | |
| f"**Chunks**: **{len(chunks)}** • **Total de caracteres**: {total_chars}", | |
| f"**Documento selecionado**: `{dsel}`", | |
| f"**Primeiras seções**:\n" + "\n".join(first_secs), | |
| f"\n**Prévia (400 chars)**:\n```\n{preview}\n```" | |
| ] | |
| if dim_msg: | |
| msg.append(f"\n⚠️ {dim_msg}") | |
| return "\n".join(msg) | |
| except Exception as e: | |
| return f"⚠️ Diagnóstico falhou: `{type(e).__name__}` — {e}" | |
| def _search_answer(qstr: str, d: str) -> str: | |
| try: | |
| if not qstr or qstr.strip() == "": | |
| return "_Informe uma pergunta._" | |
| if not (VEC_FILE.exists() and META_FILE.exists()): | |
| return "_Nenhum conteúdo indexado. Use **Reindexar**._" | |
| mat = np.load(VEC_FILE) | |
| meta = json.loads(META_FILE.read_text(encoding="utf-8")) | |
| chunks = meta.get("chunks", []) | |
| if mat.size == 0 or not chunks: | |
| return "_Índice vazio. Reindexe (pode ser necessário OCR)._" | |
| dim_msg = _check_embed_dim(mat) | |
| if dim_msg: | |
| return f"⚠️ {dim_msg}" | |
| # retrieve | |
| hits = retrieve_topk(qstr, None if d == "(Todos)" else d, k=TOP_K_RETRIEVE) | |
| hits = [h for h in hits if (h.get("text") or "").strip()] | |
| if not hits: | |
| return "_Nada encontrado para a consulta (verifique o filtro de documento ou reindexe)._" | |
| context, sources = expand_context(hits, chunks, TARGET_CONTEXT_CHARS) | |
| if not context.strip(): | |
| return "_Contexto insuficiente encontrado._" | |
| answer = answer_with_llm(qstr, context) | |
| src_md = format_sources_md(sources) | |
| return f"<div class='result-card'>{answer}</div>\n\n### Fontes\n{src_md}" | |
| except Exception as e: | |
| return f"⚠️ Erro ao buscar: `{type(e).__name__}` — {e}" | |
| def _clear(doc_default: str) -> Tuple[str, str]: | |
| return "", (doc_default if PDFS else "(Todos)") | |
| def ui_db2_template(cmd_choice: str) -> str: | |
| return template_for_db2(cmd_choice) | |
| test_btn.click(_test_conn, outputs=[status_box]) | |
| rebuild_btn.click(_rebuild, outputs=[status_box]) | |
| diag_btn.click(_diagnose, inputs=[doc_choice], outputs=[diag_out]) | |
| ask_btn.click(_search_answer, inputs=[q, doc_choice], outputs=[out]) | |
| clear_btn.click(_clear, inputs=[gr.State(default_doc)], outputs=[q, doc_choice]) | |
| db2_btn.click(ui_db2_template, inputs=[db2_choice], outputs=[db2_out]) | |
| return demo | |
| # ============================== | |
| # Main (robusto: bind público, respeita $PORT, SSR off, queue opcional) | |
| # ============================== | |
| if __name__ == "__main__": | |
| try: | |
| _ = load_index() | |
| except Exception as e: | |
| print(f"[AVISO] Índice não carregado: {e}") | |
| app = build_app() | |
| app.launch(server_name="0.0.0.0", server_port=7860) | |