HW_assistant_AI_app / hardware_qa_assistant_helpers.py
ankarb's picture
Upload 2 files
65268e7 verified
# hardware_qa_assistant_helpers.py
from __future__ import annotations
import json
import os, io, re, glob, tempfile, urllib.parse, time
from typing import List, Dict, Any, Tuple, Optional
# ---- Vector DB (persistent, local) ----
import chromadb
from chromadb.utils import embedding_functions
from chromadb.utils import embedding_functions
CHROMA_DIR = os.getenv("CHROMA_DIR", ".chroma_hwqa_workspace")
os.makedirs(CHROMA_DIR, exist_ok=True)
chroma = chromadb.PersistentClient(path=CHROMA_DIR)
# --- Demo toggle (keep HF, just choose smaller model when DEMO_MODE is on) ---
_DEMO = os.getenv("DEMO_MODE", "0").lower() in ("1", "true", "yes")
_EMB_DEFAULT = "sentence-transformers/all-mpnet-base-v2"
_EMB_DEMO = "sentence-transformers/all-MiniLM-L6-v2" # small, fast, great for demos
EMB_MODEL = os.getenv("EMB_MODEL") or (_EMB_DEMO if _DEMO else _EMB_DEFAULT)
_embed_fn = None
def get_embed_fn():
"""Create the embedding function on first use (saves memory + port binds fast)."""
global _embed_fn
if _embed_fn is None:
# keep torch lean in small containers
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
_embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=EMB_MODEL
)
print(f"[emb] Using {EMB_MODEL} (demo={_DEMO})")
return _embed_fn
# ---- OCR + parsing stack (isolated here) ----
import fitz # PyMuPDF
import docx # python-docx
from pdf2image import convert_from_bytes
from PIL import Image
import cv2, numpy as np
# -------------------------
# Small workspace utilities
# -------------------------
def _slug(s: str) -> str:
return re.sub(r"[^a-z0-9]+", "-", (s or "").strip().lower()).strip("-") or "default"
def user_prefix(email: str) -> str:
return f"u_{_slug(email)}__ws_"
def collection_name(email: Optional[str], workspace: str) -> str:
return f"{user_prefix(email)}{_slug(workspace)}" if email else f"ws_{_slug(workspace)}"
def list_projects_for_user(email: str) -> List[str]:
pref = user_prefix(email)
try:
cols = chroma.list_collections()
except Exception:
return []
out = []
for c in cols:
if c.name.startswith(pref):
out.append(c.name[len(pref):])
return sorted(set(out))
def delete_project(email: str, name: str) -> bool:
try:
chroma.delete_collection(f"{user_prefix(email)}{_slug(name)}")
return True
except Exception:
return False
# -------------------------
# Chroma helpers
# -------------------------
def get_collection(workspace: str, user: Optional[str] = None):
name = collection_name(user, workspace)
try:
# bind the embedding function even when getting an existing collection
return chroma.get_collection(name, embedding_function=get_embed_fn())
except Exception:
return chroma.create_collection(name=name, embedding_function=get_embed_fn())
def add_docs(col, docs: List[Tuple[str, str, Dict[str, Any]]]):
if not docs:
return
ids = [d[0] for d in docs]
texts = [d[1] for d in docs]
metas = [d[2] for d in docs]
col.add(ids=ids, documents=texts, metadatas=metas)
# def retrieve(col, query: str, k: int = 5) -> List[str]:
# if not query.strip():
# return []
# res = col.query(query_texts=[query], n_results=k)
# return res.get("documents", [[]])[0]
def retrieve(col, query: str, k: int = 5) -> List[str]:
docs, _, _, _ = retrieve_info(col, query, k)
return docs
def retrieve_info(col, query: str, k: int = 5):
"""Return (docs, metadatas, distances, ids) for debugging/inspection."""
if not query.strip():
return [], [], [], []
res = col.query(
query_texts=[query],
n_results=k,
include=["documents", "metadatas", "distances"],
)
docs = res.get("documents", [[]])[0]
metas = res.get("metadatas", [[]])[0]
dists = res.get("distances", [[]])[0]
ids = res.get("ids", [[]])[0]
return docs, metas, dists, ids
# ---------- Retrieval quality utilities (NEW) ----------
from typing import Iterable
import math
def rrf_fuse(rank_lists: List[List[Tuple[str, Dict[str,Any], float, str]]], k: int = 60, K: int = 60):
"""
rank_lists: list over sub-queries; each is [(doc, meta, dist, id), ...] sorted by DB rank.
Return fused unique list [(doc, meta, fused_score, id)] sorted desc by fused_score.
K is the RRF 'rank constant' (larger → flatter).
"""
scores = {}
seen_meta = {}
for lst in rank_lists:
for r, (doc, meta, dist, _id) in enumerate(lst, start=1):
key = _id or (doc[:60] + "|" + (meta or {}).get("filename",""))
scores[key] = scores.get(key, 0.0) + 1.0 / (K + r)
if key not in seen_meta:
seen_meta[key] = (doc, meta, (1.0 - float(dist)) if dist is not None else None, _id)
fused = [(seen_meta[k][0], seen_meta[k][1], scores[k], seen_meta[k][3]) for k in scores]
fused.sort(key=lambda x: x[2], reverse=True)
return fused[:k]
def _embed_texts(texts: List[str]) -> List[List[float]]:
# reuse the same embedding fn as Chroma uses
ef = get_embed_fn()
return ef(texts)
from functools import lru_cache
@lru_cache(maxsize=1)
def _load_cross_encoder(model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
return CrossEncoder(model_name)
def rerank_cross_encoder(query: str, cand_docs: List[Tuple[str,Dict[str,Any],float,str]],
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2") -> List[Tuple[str,Dict[str,Any],float,str]]:
"""
Return same tuples [(doc, meta, score, id)] but with cross-encoder similarity score, sorted desc.
Falls back to dot(query_emb, doc_emb) if CrossEncoder not available.
"""
try:
ce = _load_cross_encoder(model_name)
pairs = [(query, d[0]) for d in cand_docs]
scores = ce.predict(pairs).tolist()
out = [(d[0], d[1], float(s), d[3]) for d, s in zip(cand_docs, scores)]
out.sort(key=lambda x: x[2], reverse=True)
return out
except Exception:
# fallback: cosine with same embedding model used for the DB
vecs = _embed_texts([query] + [d[0] for d in cand_docs])
qv, dvs = vecs[0], vecs[1:]
def _cos(a,b):
import numpy as _np
a = _np.array(a); b = _np.array(b)
denom = (float((a*a).sum())**0.5) * (float((b*b).sum())**0.5)
return float((a*b).sum()) / denom if denom else 0.0
out = [(d[0], d[1], _cos(qv, dv), d[3]) for d, dv in zip(cand_docs, dvs)]
out.sort(key=lambda x: x[2], reverse=True)
return out
def mmr_select(query: str, cand_docs: List[Tuple[str,Dict[str,Any],float,str]],
keep: int = 8, diversity_lambda: float = 0.7) -> List[Tuple[str,Dict[str,Any],float,str]]:
"""
Maximal Marginal Relevance on the (already re-ranked) candidate list to keep diverse top-N.
"""
vecs = _embed_texts([query] + [d[0] for d in cand_docs])
qv, dvs = vecs[0], vecs[1:]
import numpy as _np
def _cos(a,b):
a = _np.array(a); b = _np.array(b)
denom = (float((a*a).sum())**0.5) * (float((b*b).sum())**0.5)
return float((a*b).sum()) / denom if denom else 0.0
selected, rest = [], list(range(len(cand_docs)))
while rest and len(selected) < keep:
if not selected:
# start from the best already (list is sorted)
best = rest.pop(0)
selected.append(best)
continue
# pick argmax of λ*sim(query,di) - (1-λ)*max_j sim(di, dj_selected)
best_idx, best_score = None, -1e9
for i in rest:
rel = _cos(qv, dvs[i])
red = max(_cos(dvs[i], dvs[j]) for j in selected) if selected else 0.0
score = diversity_lambda * rel - (1.0 - diversity_lambda) * red
if score > best_score:
best_idx, best_score = i, score
rest.remove(best_idx)
selected.append(best_idx)
return [cand_docs[i] for i in selected]
def _normalize_where(where: Dict[str, Any]) -> Dict[str, Any]:
"""
Chroma (newer) expects a single top-level operator: $and / $or / $not.
Convert a simple dict like {"filename":"x","doc_type":"pdf","page":2}
into {"$and":[{"filename":{"$eq":"x"}},{"doc_type":{"$eq":"pdf"}},{"page":{"$eq":2}}]}.
If already operator-structured, return as-is.
"""
if not where:
return {}
# if already has a top-level operator, keep it
if any(str(k).startswith("$") for k in where.keys()):
return where
clauses = []
for k, v in where.items():
if v is None:
continue
if isinstance(v, dict) and any(str(op).startswith("$") for op in v.keys()):
clauses.append({k: v})
else:
clauses.append({k: {"$eq": v}})
return {"$and": clauses} if clauses else {}
def get_by_where(col, where: Dict[str, Any]) -> Tuple[List[str], List[Dict[str,Any]]]:
where = _normalize_where(where) # <-- add this line
res = col.get(where=where, include=["documents","metadatas"])
docs = res.get("documents") or []
metas = res.get("metadatas") or []
return docs, metas
def promote_parents(col, picked: List[Tuple[str,Dict[str,Any],float,str]], max_siblings_per_parent: int = 1) -> List[Tuple[str,Dict[str,Any],float,str]]:
"""
For each picked child chunk, fetch siblings from the same parent (PDF page or DOCX section).
"""
out: List[Tuple[str,Dict[str,Any],float,str]] = []
seen_ids = set()
for (doc, meta, score, _id) in picked:
out.append((doc, meta, score, _id))
parent_where = None
m = meta or {}
fn = m.get("filename") or m.get("source") or m.get("name")
if not fn:
continue
if m.get("doc_type") == "pdf" and m.get("page") is not None:
parent_where = {"filename": fn, "doc_type": "pdf", "page": m.get("page")}
elif m.get("doc_type") == "docx" and m.get("section_path"):
parent_where = {"filename": fn, "doc_type": "docx", "section_path": m.get("section_path")}
if not parent_where:
continue
sdocs, smetas = get_by_where(col, parent_where)
added = 0
for sd, sm in zip(sdocs, smetas):
sid = (sd[:60] + "|" + (sm or {}).get("filename","") + "|" + str((sm or {}).get("chunk")))
if sid in seen_ids:
continue
if added >= max_siblings_per_parent:
break
# avoid re-adding the very same child (best effort without global ids)
if sd.strip() == (doc or "").strip():
continue
out.append((sd, sm, score, sid))
seen_ids.add(sid)
added += 1
return out
# -------------------------
# OCR backends + parsers
# -------------------------
_PADDLE_OCR = None
def _get_paddle_ocr():
global _PADDLE_OCR
if _PADDLE_OCR is not None:
return _PADDLE_OCR
try:
from paddleocr import PaddleOCR
_PADDLE_OCR = PaddleOCR(use_angle_cls=True, lang="en")
except Exception as e:
_PADDLE_OCR = f"[paddle init error] {e}"
return _PADDLE_OCR
def ocr_image_paddle(img: Image.Image) -> str:
ocr = _get_paddle_ocr()
if isinstance(ocr, str):
return ocr
arr = np.array(img.convert("RGB"))[:, :, ::-1] # BGR
result = ocr.ocr(arr)
lines: List[str] = []
for page in result or []:
for item in page or []:
try:
_, (txt, _conf) = item
if txt:
lines.append(txt)
except Exception:
pass
return "\n".join(lines).strip()
def ocr_image_tesseract(img: Image.Image) -> str:
try:
import pytesseract
gray = cv2.cvtColor(np.array(img.convert("RGB")), cv2.COLOR_RGB2GRAY)
thr = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
return pytesseract.image_to_string(Image.fromarray(thr)).strip()
except Exception as e:
return f"[tesseract error] {e}"
def ocr_image(img: Image.Image, backend: str) -> str:
b = (backend or "auto").lower()
if b == "paddle":
out = ocr_image_paddle(img)
return out if out and not out.startswith("[paddle init error]") else ocr_image_tesseract(img)
if b == "tesseract":
return ocr_image_tesseract(img)
out = ocr_image_paddle(img)
return out if out and not out.startswith("[paddle init error]") else ocr_image_tesseract(img)
def parse_pdf(file_bytes: bytes, *, ocr_backend: str) -> str:
text_parts: List[str] = []
with fitz.open(stream=file_bytes, filetype="pdf") as doc:
for p in doc:
text_parts.append(p.get_text("text"))
text = "\n".join(text_parts).strip()
if text:
return text
pages = convert_from_bytes(file_bytes)
lines = [ocr_image(im, backend=ocr_backend) for im in pages]
return "\n".join(lines)
def parse_docx(file_bytes: bytes) -> str:
with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as tmp:
tmp.write(file_bytes)
path = tmp.name
d = docx.Document(path)
os.unlink(path)
return "\n".join(p.text for p in d.paragraphs)
def parse_image(file_bytes: bytes, *, ocr_backend: str) -> str:
img = Image.open(io.BytesIO(file_bytes)).convert("RGB")
return ocr_image(img, backend=ocr_backend)
# -------------------------
# Hybrid chunking helpers (PDF pages + DOCX sections) ⬇️ ADD THIS
# -------------------------
def chunk_text(s: str, size: int = 1200, overlap: int = 180) -> List[str]:
"""Slide a window over s with the given size/overlap, returning chunks."""
s = (s or "").strip()
out: List[str] = []
if not s:
return out
i = 0
n = len(s)
while i < n:
j = min(n, i + size)
out.append(s[i:j])
if j >= n:
break
i = max(0, j - overlap)
return out
def parse_pdf_pages(file_bytes: bytes, *, ocr_backend: str) -> List[str]:
"""Return text per page (OCRing only pages that have no extractable text)."""
texts: List[str] = []
try:
with fitz.open(stream=file_bytes, filetype="pdf") as doc:
page_count = len(doc)
for idx in range(page_count):
t = (doc[idx].get_text("text") or "").strip()
if t:
texts.append(t)
else:
# OCR just this page to avoid rendering whole PDF into RAM
try:
imgs = convert_from_bytes(
file_bytes, first_page=idx + 1, last_page=idx + 1
)
t = ocr_image(imgs[0], backend=ocr_backend)
except Exception as e:
t = f"[ocr error p{idx+1}] {e}"
texts.append(t or "")
except Exception as e:
# fallback: keep previous parse_pdf behavior (may OCR all pages)
texts = (parse_pdf(file_bytes, ocr_backend=ocr_backend) or "").split("\n\f\n")
return texts
def parse_docx_sections(file_bytes: bytes) -> List[Tuple[str, str]]:
"""Return (section_path, text) pairs using Heading 1/2/3 as boundaries.
If no headings exist, return a single ('Body', full_text).
"""
with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as tmp:
tmp.write(file_bytes)
path = tmp.name
d = docx.Document(path)
os.unlink(path)
def _lvl(name: Optional[str]) -> Optional[int]:
if not name:
return None
m = re.match(r"heading\s*(\d+)", name.strip().lower())
return int(m.group(1)) if m else None
sections: List[Tuple[str, str]] = []
stack: List[str] = [] # e.g., ["Intro", "Clocking"]
buf: List[str] = []
def _flush():
if buf:
sec = " > ".join(stack) if stack else "Body"
sections.append((sec, "\n".join(buf).strip()))
buf.clear()
for p in d.paragraphs:
txt = (p.text or "").strip()
lvl = _lvl(getattr(getattr(p, "style", None), "name", None))
if lvl and 1 <= lvl <= 3:
_flush()
# adjust heading path depth
if lvl == 1:
stack = [txt] if txt else []
elif lvl == 2:
stack = (stack[:1] + [txt]) if txt else stack[:1]
else: # lvl == 3
stack = (stack[:2] + [txt]) if txt else stack[:2]
else:
if txt:
buf.append(txt)
_flush()
if not sections:
whole = "\n".join(p.text for p in d.paragraphs).strip()
return [("Body", whole)]
return sections
# -------------------------
# Manage-PDF helpers
# -------------------------
def list_workspace_files_by_filename(ws: str, email: Optional[str]) -> List[str]:
if not ws or not email:
return []
col = get_collection(ws, email)
res = col.get(include=["metadatas"])
metadatas = res.get("metadatas") or []
names: List[str] = []
for m in metadatas:
m = m or {}
fn = m.get("source") or m.get("filename") or m.get("name")
if fn:
names.append(os.path.basename(str(fn)))
return sorted(set(names), key=str.lower)
def delete_files_by_filename(ws: str, email: Optional[str], filenames: List[str]) -> str:
if not ws:
return "Select a project first."
if not email:
return "You must be logged in."
if not filenames:
return "Nothing selected."
targets = {os.path.basename(str(n)) for n in filenames}
col = get_collection(ws, email)
res = col.get(include=["metadatas"])
metadatas = res.get("metadatas") or []
ids = res.get("ids") or []
to_delete_ids: List[str] = []
for _id, m in zip(ids, metadatas):
m = m or {}
fn = m.get("source") or m.get("filename") or m.get("name")
if fn and os.path.basename(str(fn)) in targets:
to_delete_ids.append(_id)
if not to_delete_ids:
return "No matching chunks found for the selected filename(s)."
col.delete(ids=to_delete_ids)
return f"Deleted {len(to_delete_ids)} chunk(s) from project '{ws}' across {len(targets)} file name(s)."
# -------------------------
# Reports helpers
# -------------------------
def list_summary_reports() -> List[str]:
roots = [os.getcwd(), tempfile.gettempdir()]
out: List[str] = []
for root in roots:
out.extend(os.path.abspath(p) for p in glob.glob(os.path.join(root, "hwqa_summary_*.md")))
out.sort(key=lambda p: os.path.getmtime(p), reverse=True)
return out
def read_report_text(path: str) -> str:
if not path:
return "_No report selected._"
if not os.path.isfile(path):
return f"_Report not found:_ `{path}`"
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return f.read()
except Exception as e:
return f"_Could not read report (`{path}`):_ {e}"
# -------------------------
# Trace helpers
# -------------------------
def trace_push(state: Dict[str, Any], node: str, t0: float, notes: str = "") -> None:
try:
ms = int((time.perf_counter() - t0) * 1000)
except Exception:
ms = 0
trace = state.get("trace") or []
trace.append({"node": node, "ms": ms, "notes": notes})
state["trace"] = trace
def _trace_to_markdown(state: Dict[str, Any]) -> str:
q = (state.get("user_input") or "").strip()
full = state.get("trace") or []
# keep only entries from the last 'session' marker onward
last_idx = -1
for i, e in enumerate(full):
if (e or {}).get("node") == "session":
last_idx = i
trace = full[last_idx:] if last_idx != -1 else full
head = [
"### Last request",
f"**Q:** {q}" if q else "**Q:** (no question text)",
f"Workspace: {state.get('workspace','')} OCR: {state.get('ocr_backend','')} Web search: {state.get('do_web_search')} Schematic mode: {state.get('treat_images_as_schematics')}",
"",
]
body = []
for i, e in enumerate(trace or [], 1):
body.append(f"{i}. **{e.get('node','?')}** — {e.get('notes','')} ({e.get('ms',0)} ms)")
cites = []
# Show citations only if the web node actually ran and returned sources>0
# (prevents lingering/irrelevant links when web search is disabled)
import re as _re
# work on the sliced "trace" we already computed above
web_sources = 0
for e in trace or []:
if (e or {}).get("node") == "web":
notes = (e or {}).get("notes", "")
m = _re.search(r"sources\s*=\s*(\d+)", notes)
if m:
try:
web_sources = int(m.group(1))
except Exception:
web_sources = 0
if web_sources > 0:
cites = [f"- {u}" for u in (state.get("citations") or [])]
foot = ["", "**Citations (web):**", *cites]
else:
foot = []
# ---- LLM Inspector (NEW) ----
events = state.get("llm_events") or []
inspector: List[str] = []
if events:
inspector.extend(["", "### LLM Inspector", "",
"| node | model | ms | in | out | total |",
"|---|---|---:|---:|---:|---:|"])
for e in events:
inspector.append(
f"| {e.get('node','')} | {e.get('model','')} | {e.get('latency_ms',0)} | "
f"{e.get('prompt_tokens',0)} | {e.get('completion_tokens',0)} | {e.get('total_tokens',0)} |"
)
inspector.append("")
inspector.append("<details><summary>Show input/output previews</summary>")
for e in events:
inspector.append(
f"<br/><b>{e.get('node','')}</b> — <i>{e.get('model','')}</i><br/>"
f"<b>Input:</b> {e.get('prompt_preview','')}<br/>"
f"<b>Output:</b> {e.get('output_preview','')}"
)
inspector.append("</details>")
# ---- Retrieval Inspector (NEW) ----
r_events = state.get("retrieval_events") or []
retr_tbl: List[str] = []
if r_events:
retr_tbl.extend(["", "### Retrieval Inspector", "",
"| rank | score | file | type | page/section | preview |",
"|---:|---:|---|---|---|---|"])
for e in r_events:
loc = ""
if e.get("page") is not None:
loc = f"p{e.get('page')}"
elif e.get("section"):
loc = e.get("section") or ""
# escape pipes in preview
prev = (e.get("preview","") or "").replace("|", "\\|")
score = "" if e.get("score") is None else f"{e.get('score'):.3f}"
retr_tbl.append(f"| {e.get('rank','')} | {score} | {e.get('filename','')} | "
f"{e.get('doc_type','')} | {loc} | {prev} |")
# ---- Executed Nodes Summary + Execution Narrative (NEW) ----
trace_list = state.get("trace") or []
ran = [e for e in trace_list if e.get("node")]
ran_names = [e.get("node") for e in ran]
# helpers to pick info from state
def _llm_line(node_name: str):
for e in (state.get("llm_events") or []):
if e.get("node") == node_name:
m = e.get("model") or ""
pi = e.get("prompt_tokens") or 0
co = e.get("completion_tokens") or 0
ms = e.get("latency_ms") or 0
return f"model={m} in={pi} out={co} {int(ms)}ms"
return ""
def _retrieve_line():
evs = state.get("retrieval_events") or []
if not evs:
return "hits=0"
top = ", ".join([f"{(e.get('filename') or '')} {('p'+str(e['page'])) if e.get('page') else (e.get('section') or '')}".strip()
for e in evs[:3]])
return f"hits={len(evs)} ({top})"
summary_lines: List[str] = []
for idx, e in enumerate(ran, 1):
n = e.get("node", "?")
notes = e.get("notes", "")
if n == "router":
route = state.get("route") or "?"
files = len(state.get("parsed_docs") or [])
schem = bool(state.get("has_schematic"))
summary_lines.append(f"{idx}. **router** — route={route} (files={files}, schematic={schem})")
elif n == "ingest":
summary_lines.append(f"{idx}. **ingest** — {notes or 'done'}")
elif n == "retrieve":
# Legacy path (older runs) — keep this
k = len(state.get("retrieved_chunks") or [])
summary_lines.append(f"{idx}. **retrieve** — returned {k} chunks")
elif n in ("multiquery","retrieve_pool","rrf_fuse","rerank","mmr","parent_promote"):
if n == "multiquery":
summary_lines.append(f"{idx}. **multiquery** — generated variants")
elif n == "retrieve_pool":
summary_lines.append(f"{idx}. **retrieve_pool** — pooled top-k per variant")
elif n == "rrf_fuse":
summary_lines.append(f"{idx}. **rrf_fuse** — fused & deduped")
elif n == "rerank":
summary_lines.append(f"{idx}. **rerank** — cross-encoder re-ordered")
elif n == "mmr":
summary_lines.append(f"{idx}. **mmr** — selected diverse top-N")
elif n == "parent_promote":
# New pipeline — final point where retrieved_chunks is set
k = len(state.get("retrieved_chunks") or [])
summary_lines.append(f"{idx}. **parent_promote** — finalized {k} chunks for context")
elif n == "web":
ws = state.get("web_snippets") or []
summary_lines.append(f"{idx}. **web** — sources={len(ws)}")
elif n in ("answer", "schematic_answer"):
summary_lines.append(f"{idx}. **{n}** — {_llm_line(n)}")
elif n == "inventory_docs":
summary_lines.append(f"{idx}. **inventory_docs** — {notes or 'listed PDFs/DOCX'}")
elif n == "inventory_schematics":
summary_lines.append(f"{idx}. **inventory_schematics** — {notes or 'listed schematics'}")
elif n == "verify":
summary_lines.append(f"{idx}. **verify** — {notes or 'ok'}")
else:
summary_lines.append(f"{idx}. **{n}** — {notes}")
# Build a one-paragraph narrative
narrative_bits: List[str] = []
if "router" in ran_names:
narrative_bits.append(f"The router selected the **{state.get('route') or '?'}** path.")
if "ingest" in ran_names:
narrative_bits.append("Ingest parsed and indexed the new files.")
if "retrieve" in ran_names:
evs = state.get("retrieval_events") or []
narrative_bits.append(f"Retrieval returned **{len(evs)}** chunk(s).")
if "web" in ran_names:
ws = state.get("web_snippets") or []
if ws:
narrative_bits.append(f"Web search added **{len(ws)}** source(s).")
else:
narrative_bits.append("No web sources were needed.")
if "answer" in ran_names or "schematic_answer" in ran_names:
# use whichever LLM event we have
llms = state.get("llm_events") or []
if llms:
m = llms[-1].get("model", "")
pi = llms[-1].get("prompt_tokens") or 0
co = llms[-1].get("completion_tokens") or 0
narrative_bits.append(f"The model **{m}** generated the answer ({pi} prompt → {co} completion tokens).")
if "verify" in ran_names:
narrative_bits.append("Verification passed.")
summary_block: List[str] = []
if summary_lines:
summary_block.extend(["", "### What happened in this run", ""])
summary_block.extend([*summary_lines, ""])
if narrative_bits:
summary_block.extend(["**Execution narrative:** " + " ".join(narrative_bits), ""])
# ---- Raw JSON collapsed (NEW) ----
raw = {"trace": trace_list}
raw_json = json.dumps(raw, indent=2, ensure_ascii=False)
raw_block = [
"",
"<details><summary><b>Show raw trace JSON</b></summary>",
"",
"```json",
raw_json,
"```",
"",
"</details>",
]
return "\n".join(head + body + inspector + retr_tbl + summary_block + raw_block + foot)
def _dag_image_from_trace(trace: list, llm_events: list = None):
import networkx as nx
import matplotlib.pyplot as plt
from PIL import Image as PILImage
import io as _io
G = nx.DiGraph()
nodes = [
"session", "router", "ingest",
"multiquery","retrieve_pool","rrf_fuse","rerank","mmr","parent_promote",
"web", "answer", "verify", "clarify", "report", "END",
"parts_expand", "schematic_answer",
"inventory_docs", "inventory_schematics"
]
G.add_nodes_from(nodes)
G.add_edges_from([
("session","router"), ("session","report"),
("router","ingest"), ("router","multiquery"),
("ingest","multiquery"),
("multiquery","retrieve_pool"),
("retrieve_pool","rrf_fuse"),
("rrf_fuse","rerank"),
("rerank","mmr"),
("mmr","parent_promote"),
("parent_promote","web"),
("parent_promote","answer"),
("web","answer"),
("answer","verify"),
("parent_promote","parts_expand"),
("parts_expand","schematic_answer"),
("schematic_answer","verify"),
("verify","clarify"),
("verify","END"),
("parent_promote","inventory_docs"),
("parent_promote","inventory_schematics"),
("inventory_docs","verify"),
("inventory_schematics","verify"),
])
pos = {
"session": (0.0, 0.0),
"router": (1.2, 0.0),
"ingest": (2.4, 0.6),
"multiquery": (3.6, 0.6),
"retrieve_pool": (4.8, 0.6),
"rrf_fuse": (6.0, 0.6),
"rerank": (7.2, 0.6),
"mmr": (8.4, 0.6),
"parent_promote": (9.6, 0.6),
"web": (10.8, 1.0),
"answer": (12.0, 0.6),
"verify": (13.2, 0.6),
"clarify":(14.4, 1.0),
"report": (2.4, -1.0),
"END": (14.4, 0.2),
"parts_expand": (10.8, 0.0),
"schematic_answer": (12.0, 0.2),
"inventory_docs": (10.8, 0.4),
"inventory_schematics":(10.8, -0.4),
}
# Map latest LLM event per node
ev_map = {}
for e in (llm_events or []):
ev_map[e.get("node")] = e
# Highlight executed path
path_nodes = [e.get("node") for e in (trace or []) if e.get("node") in G.nodes]
path_edges = [(a, b) for a, b in zip(path_nodes, path_nodes[1:]) if G.has_edge(a, b)]
node_colors = ["#90caf9" if n in path_nodes else "#e6e6e6" for n in G.nodes]
node_sizes = [1800 if n in path_nodes else 1200 for n in G.nodes]
edge_colors = ["#1976d2" if e in path_edges else "#bdbdbd" for e in G.edges]
widths = [3.0 if e in path_edges else 1.5 for e in G.edges]
# Labels with subtitle if LLM used
labels = {}
for n in nodes:
if n in ev_map:
e = ev_map[n]
labels[n] = f"{n}\n{e.get('model','')} · {e.get('prompt_tokens',0)}{e.get('completion_tokens',0)} tok"
else:
labels[n] = n
fig = plt.figure(figsize=(11, 3.6))
nx.draw(G, pos, with_labels=False, arrows=True,
node_color=node_colors, node_size=node_sizes,
edge_color=edge_colors, width=widths)
nx.draw_networkx_labels(G, pos, labels=labels, font_size=9)
plt.axis("off")
buf = _io.BytesIO()
plt.tight_layout(); plt.savefig(buf, format="png", dpi=150); plt.close(fig)
buf.seek(0)
return PILImage.open(buf)
def render_trace_artifacts(state: Dict[str, Any]):
md = _trace_to_markdown(state)
json_str = __import__("json").dumps(state.get("trace") or [], indent=2)
# last-turn slice for the DAG too
full = state.get("trace") or []
last_idx = -1
for i, e in enumerate(full):
if (e or {}).get("node") == "session":
last_idx = i
last_trace = full[last_idx:] if last_idx != -1 else full
try:
img = _dag_image_from_trace(last_trace, state.get("llm_events") or [])
except Exception:
img = None
return md, json_str, img