Justin Tippins
Wire startup index pull and add huggingface_hub dependency
e46e66c
import re
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Tuple
import gradio as gr
from pypdf import PdfReader
from chunk_kb import build_chunks
from simple_search import search_chunks
from retriever import retrieve_top_chunks
from risk_extractor import (
GENERATION_MODES,
generate_supervisory_questions,
generate_domain_mode_targeted_questions,
)
from answer_generator import answer_question_dynamic, render_excerpts_markdown
from contract_analysis import DEFAULT_RISK_KEYWORDS, run_contract_analysis
from domains import CANONICAL_DOMAINS, DOMAIN_KEYWORDS, normalize_domain
from question_bank import (
add_questions,
edit_item_question,
flatten_bank,
list_modes,
load_bank,
save_bank,
)
from question_quality import score_question_quality
from persist_index import pull_index
print(pull_index())
OUT_DIR = Path("outputs")
OUT_DIR.mkdir(parents=True, exist_ok=True)
QA_BANK_PATH = OUT_DIR / "qa_bank.jsonl"
FAQ_CURATED_PATH = OUT_DIR / "faq_curated.md"
def _clean_text(s: str) -> str:
s = re.sub(r"(\w)-\n(\w)", r"\1\2", s)
s = re.sub(r"(?<!\n)\n(?!\n)", " ", s)
s = re.sub(r"[ \t]+", " ", s)
s = re.sub(r"\n{3,}", "\n\n", s)
return s.strip()
def extract_pdf_text(pdf_file):
if pdf_file is None:
return "Upload a PDF first.", None, ""
reader = PdfReader(pdf_file)
pages_text = []
for i, page in enumerate(reader.pages):
t = page.extract_text() or ""
pages_text.append(f"\n\n--- PAGE {i+1} ---\n\n{t}")
raw = "".join(pages_text)
cleaned = _clean_text(raw)
txt_path = OUT_DIR / "contract_extracted.txt"
txt_path.write_text(cleaned, encoding="utf-8")
preview = cleaned[:6000]
stats = f"Pages: {len(reader.pages)} | Characters: {len(cleaned):,}"
return preview, str(txt_path), stats
def build_kb():
extracted = OUT_DIR / "contract_extracted.txt"
if not extracted.exists():
return "Extract the PDF first (Step 2)."
return build_chunks(str(extracted))
def run_contract_analysis_ui(keywords_csv: str, top_sections: int):
keywords = [k.strip() for k in (keywords_csv or "").split(",") if k.strip()]
if not keywords:
keywords = DEFAULT_RISK_KEYWORDS
top_n = max(1, int(top_sections or 15))
try:
result = run_contract_analysis(
chunks_path=Path("kb/chunks.jsonl"),
out_dir=OUT_DIR,
keywords=keywords,
top_sections=top_n,
)
except Exception as e:
err = f"Contract analysis failed: {e}"
return err, err, None, None
status = (
f"Analysis complete. Wrote {result['markdown_path']} and {result['csv_path']} "
f"using {len(keywords)} risk keywords."
)
return status, result["markdown"], result["markdown_path"], result["csv_path"]
# -------------------------
# Step 6 helpers
# -------------------------
def _normalize_question(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip().lower())
def _extract_json_object(text: str):
if not text or not text.strip():
return None, "Generated text is empty."
s = text.strip()
first = s.find("{")
last = s.rfind("}")
if first == -1 or last == -1 or last <= first:
return None, "Could not find a JSON object in the generated text."
s = s[first : last + 1]
try:
return json.loads(s), None
except Exception as e:
return None, f"Could not parse JSON: {e}"
def _extract_question_objects(data: Dict) -> List[Dict]:
extracted: List[Dict] = []
if isinstance(data.get("domains"), list):
for domain_block in data.get("domains", []):
domain = normalize_domain(domain_block.get("domain", ""))
for q in domain_block.get("questions", []):
extracted.append(
{
"domain": domain,
"mode": q.get("mode") or domain_block.get("mode") or "Unspecified",
"question": q.get("question"),
"risk_level": q.get("risk_level"),
"why_it_matters": q.get("why_it_matters"),
"likely_failure_points": q.get("likely_failure_points", []),
"supporting_chunk_ids": q.get("supporting_chunk_ids", []),
}
)
elif isinstance(data.get("questions"), list):
domain = normalize_domain(data.get("domain", ""))
for q in data.get("questions", []):
extracted.append(
{
"domain": normalize_domain(q.get("domain", domain)),
"mode": q.get("mode") or data.get("mode") or "Unspecified",
"question": q.get("question"),
"risk_level": q.get("risk_level"),
"why_it_matters": q.get("why_it_matters"),
"likely_failure_points": q.get("likely_failure_points", []),
"supporting_chunk_ids": q.get("supporting_chunk_ids", []),
}
)
return extracted
def _build_state_from_bank(bank: Dict):
# Step 6 should show a clean, deduped question picker without status prefixes.
all_items = flatten_bank(bank, status_filter="All", hide_duplicates=True)
by_norm = {}
for item in all_items:
norm = item.get("normalized_question") or _normalize_question(item.get("question", ""))
if not norm:
continue
prev = by_norm.get(norm)
if not prev:
by_norm[norm] = item
continue
prev_key = (-int(prev.get("quality_score", 0)), prev.get("domain", ""), prev.get("id", ""))
cur_key = (
-int(item.get("quality_score", 0)),
item.get("domain", ""),
item.get("id", ""),
)
if cur_key < prev_key:
by_norm[norm] = item
flat = []
for item in by_norm.values():
q = (item.get("question") or "").strip()
if not q:
continue
clean = dict(item)
clean["label"] = f"{item.get('domain')}{q}"
flat.append(clean)
flat.sort(key=lambda x: (x.get("domain", ""), x.get("question", "")))
choices = [x["label"] for x in flat]
modes = list_modes(bank)
return {"bank": bank, "flat": flat, "modes": modes}, choices
def _load_state_from_bank():
bank = load_bank(out_dir=str(OUT_DIR))
return _build_state_from_bank(bank)
def _refresh_state(
domain_filter: str = "All domains",
mode_filter: str = "All modes",
min_score: int = 0,
hide_duplicates: bool = False,
):
bank = load_bank(out_dir=str(OUT_DIR))
flat = flatten_bank(
bank,
domain_filter=domain_filter,
mode_filter=mode_filter,
min_score=int(min_score or 0),
status_filter="All",
hide_duplicates=hide_duplicates,
)
all_flat = flatten_bank(bank)
modes = list_modes(bank)
choices = [x["label"] for x in flat]
state = {
"bank": bank,
"flat": flat,
"all_flat": all_flat,
"modes": modes,
}
return state, choices
def _append_questions_to_bank(question_objects: List[Dict]):
bank = load_bank(out_dir=str(OUT_DIR))
bank, added_count, skipped = add_questions(bank, question_objects)
save_bank(bank, out_dir=str(OUT_DIR))
state, choices = _build_state_from_bank(bank)
return state, choices, added_count, skipped
def generate_questions_and_persist():
raw = generate_supervisory_questions(sample_chunks=250, model="gpt-4.1-mini")
data, err = _extract_json_object(raw)
if err:
return raw, f"Generated output, but did not persist question bank: {err}"
question_objects = _extract_question_objects(data)
_, _, added_count, skipped = _append_questions_to_bank(question_objects)
return raw, f"Appended to bank: added {added_count}, skipped duplicates/invalid {skipped}."
def parse_questions_json(json_text: str):
"""
Parse the LLM output JSON and return:
- state_obj: dict
- dropdown update (choices + selected value)
- multiselect update
- status text
"""
if not json_text or not json_text.strip():
return None, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "No JSON to parse."
data, err = _extract_json_object(json_text)
if err:
return (
{"error": err, "raw": json_text},
gr.update(choices=[], value=None),
gr.update(choices=[], value=[]),
f"Could not load questions: {err}",
)
question_objects = _extract_question_objects(data)
state, choices, added_count, skipped = _append_questions_to_bank(question_objects)
if not choices:
return state, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "Parsed JSON but found no questions."
return (
state,
gr.update(choices=choices, value=choices[0]),
gr.update(choices=choices, value=[]),
f"Loaded bank with {len(choices)} total questions (added {added_count}, skipped {skipped}).",
)
def load_questions_from_bank():
state, choices = _load_state_from_bank()
if not choices:
return state, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "Question bank loaded, but it has no questions."
return (
state,
gr.update(choices=choices, value=choices[0]),
gr.update(choices=choices, value=[]),
f"Loaded {len(choices)} questions from outputs/questions_bank.json",
)
def _get_selected_item(state_obj, label: str):
if not state_obj or not isinstance(state_obj, dict):
return None
for item in state_obj.get("flat", []):
if item.get("label") == label:
return item
return None
def show_selected_question(state_obj, label: str):
item = _get_selected_item(state_obj, label)
if not item:
return "Select a question.", ""
fp = item.get("likely_failure_points", []) or []
fp_text = "- " + "\n- ".join(fp[:10]) if fp else "- (none provided)"
details = (
f"**Domain:** {item.get('domain')}\n\n"
f"**Risk level:** {item.get('risk_level')}\n\n"
f"**Why it matters:** {item.get('why_it_matters')}\n\n"
f"**Likely failure points:**\n{fp_text}"
)
chunk_ids = ", ".join((item.get("supporting_chunk_ids") or [])[:12])
return details, chunk_ids
def generate_answer_from_dropdown(state_obj, label: str):
item = _get_selected_item(state_obj, label)
if not item:
return "Select a question first."
# Dynamic retrieval at answer-time
return answer_question_dynamic(
question=item["question"],
top_k=10,
model="gpt-4.1-mini",
)
def preview_retrieved_excerpts(state_obj, label: str):
item = _get_selected_item(state_obj, label)
if not item:
return "Select a question to preview retrieved excerpts."
return render_excerpts_markdown(
question=item["question"],
top_k=8,
max_chars=1400,
)
def generate_more_questions_for_domain_ui(domain: str, n_new: int):
target_domain = normalize_domain(domain)
if target_domain == "Other / Needs Review":
return (
"Select a canonical domain (not Other / Needs Review).",
"",
None,
gr.update(choices=[], value=None),
gr.update(choices=[], value=[]),
"No questions loaded.",
)
keywords = DOMAIN_KEYWORDS.get(target_domain, [])
retrieval_query = " ".join([target_domain] + keywords[:4]).strip()
chunks = retrieve_top_chunks(retrieval_query, top_k=30)
if not chunks:
return (
"No relevant chunks retrieved. Build KB first, then try again.",
"",
None,
gr.update(choices=[], value=None),
gr.update(choices=[], value=[]),
"No questions loaded.",
)
raw = generate_domain_mode_targeted_questions(
domain=target_domain,
mode="Supervisor decision points",
n_new=int(n_new),
chunks=chunks,
model="gpt-4.1-mini",
)
data, err = _extract_json_object(raw)
if err:
return (
f"Could not parse generated JSON: {err}",
raw,
None,
gr.update(choices=[], value=None),
gr.update(choices=[], value=[]),
"No questions loaded.",
)
question_objects = _extract_question_objects(data)
state, choices, added_count, skipped = _append_questions_to_bank(question_objects)
if not choices:
return (
f"Added {added_count} questions; skipped {skipped} duplicates/invalid.",
json.dumps(data, indent=2, ensure_ascii=False),
state,
gr.update(choices=[], value=None),
gr.update(choices=[], value=[]),
"Question bank loaded, but it has no questions.",
)
return (
f"Added {added_count} questions; skipped {skipped} duplicates/invalid.",
json.dumps(data, indent=2, ensure_ascii=False),
state,
gr.update(choices=choices, value=choices[0]),
gr.update(choices=choices, value=[]),
f"Loaded {len(choices)} questions from outputs/questions_bank.json",
)
def _extract_chunk_citations(chunks: List[Dict], chunk_ids: List[str]) -> List[Dict]:
by_id = {str(c.get("chunk_id")): c for c in (chunks or []) if c.get("chunk_id")}
citations = []
for cid in chunk_ids or []:
c = by_id.get(str(cid))
if not c:
continue
citations.append(
{
"chunk_id": c.get("chunk_id"),
"article": c.get("article"),
"section": c.get("section"),
"page_start": c.get("page_start"),
"page_end": c.get("page_end"),
"text_excerpt": (c.get("text") or "")[:500],
}
)
return citations
def _count_domain_progress(bank: Dict, domain: str, min_quality: int) -> int:
total = 0
for item in flatten_bank(
bank,
domain_filter=domain,
mode_filter="All modes",
min_score=min_quality,
status_filter="All",
hide_duplicates=True,
):
total += 1
return total
def _target_map_from_values(target_values: Tuple) -> Dict[str, int]:
targets = {}
for i, domain in enumerate(CANONICAL_DOMAINS):
val = target_values[i] if i < len(target_values) else 0
try:
n = int(val or 0)
except Exception:
n = 0
targets[domain] = max(0, n)
return targets
def generate_with_targets_ui(
selected_modes: List[str],
quality_threshold: int,
dedupe_enabled: bool,
max_rounds: int,
*target_values,
):
modes = selected_modes or list(GENERATION_MODES.keys())
quality_threshold = int(quality_threshold or 3)
max_rounds = max(1, min(8, int(max_rounds or 3)))
targets = _target_map_from_values(target_values)
summary_lines = []
total_attempted = 0
for domain in CANONICAL_DOMAINS:
target = targets.get(domain, 0)
if target <= 0:
continue
round_no = 0
mode_idx = 0
while round_no < max_rounds:
bank = load_bank(out_dir=str(OUT_DIR))
have = _count_domain_progress(bank, domain=domain, min_quality=quality_threshold)
need = target - have
if need <= 0:
break
round_no += 1
mode = modes[mode_idx % len(modes)]
mode_idx += 1
batch = max(2, min(12, need * 2))
keywords = DOMAIN_KEYWORDS.get(domain, [])
retrieval_query = " ".join([domain] + keywords[:4] + [mode]).strip()
chunks = retrieve_top_chunks(retrieval_query, top_k=36)
if not chunks:
summary_lines.append(f"{domain}: no chunks retrieved on round {round_no}.")
continue
raw = generate_domain_mode_targeted_questions(
domain=domain,
mode=mode,
n_new=batch,
chunks=chunks,
model="gpt-4.1-mini",
)
data, err = _extract_json_object(raw)
if err:
summary_lines.append(f"{domain}: generation parse error on round {round_no}: {err}")
continue
generated = _extract_question_objects(data)
prepared = []
for q in generated:
question_text = (q.get("question") or "").strip()
if not question_text:
continue
source_chunk_ids = q.get("supporting_chunk_ids", []) if isinstance(q.get("supporting_chunk_ids"), list) else []
citations = _extract_chunk_citations(chunks, source_chunk_ids)
quality = score_question_quality(
question=question_text,
domain=domain,
mode=q.get("mode") or mode,
chunk_citations=citations,
model="gpt-4.1-mini",
)
score = int(quality.get("quality_score", 0))
prepared.append(
{
"question": question_text,
"domain": domain,
"mode": q.get("mode") or mode,
"quality_score": score,
"quality_rationale": quality.get("quality_rationale", ""),
"risk_level": q.get("risk_level"),
"why_it_matters": q.get("why_it_matters"),
"likely_failure_points": q.get("likely_failure_points", []),
"source_chunk_ids": source_chunk_ids,
"source_citations": citations,
}
)
total_attempted += len(prepared)
bank = load_bank(out_dir=str(OUT_DIR))
bank, added, skipped = add_questions(
bank,
prepared,
dedupe_enabled=bool(dedupe_enabled),
dedupe_threshold=0.88,
)
save_bank(bank, out_dir=str(OUT_DIR))
have_after = _count_domain_progress(bank, domain=domain, min_quality=quality_threshold)
summary_lines.append(
f"{domain} round {round_no}/{max_rounds}: attempted {len(prepared)}, added {added}, skipped {skipped}, progress {have_after}/{target}."
)
state, choices = _refresh_state()
status_text = "\n".join(summary_lines) if summary_lines else "No targets requested."
final_status = (
f"Generation done. Attempted {total_attempted} questions. Bank now has {len(state.get('all_flat', []))} total items.\n\n{status_text}"
)
return (
final_status,
state,
gr.update(choices=choices, value=choices[0] if choices else None),
gr.update(choices=choices, value=[]),
f"Loaded {len(choices)} questions from outputs/questions_bank.json",
)
def refresh_curation_ui(
domain_filter: str,
mode_filter: str,
min_score: int,
hide_duplicates: bool,
query: str,
selected_labels: List[str],
):
state, choices = _refresh_state(
domain_filter=domain_filter,
mode_filter=mode_filter,
min_score=min_score,
hide_duplicates=hide_duplicates,
)
q_norm = _normalize_question(query or "")
if q_norm:
filtered = []
for label in choices:
item = next((x for x in state.get("flat", []) if x.get("label") == label), None)
if not item:
continue
if q_norm in _normalize_question(item.get("question", "")):
filtered.append(label)
choices = filtered
selected_set = set(selected_labels or [])
kept = [x for x in choices if x in selected_set]
mode_choices = ["All modes"] + state.get("modes", [])
if mode_filter not in mode_choices:
mode_filter = "All modes"
status = f"Showing {len(choices)} curated items."
return (
state,
gr.update(choices=choices, value=kept),
status,
gr.update(choices=mode_choices, value=mode_filter),
gr.update(choices=choices, value=choices[0] if choices else None),
)
def curation_edit_question_ui(
state_obj: Dict,
label: str,
edited_question: str,
domain_filter: str,
mode_filter: str,
min_score: int,
hide_duplicates: bool,
query: str,
):
item_id = None
for item in (state_obj or {}).get("flat", []):
if item.get("label") == label:
item_id = item.get("id")
break
bank = load_bank(out_dir=str(OUT_DIR))
ok = edit_item_question(bank, item_id=item_id, new_question=edited_question)
if ok:
save_bank(bank, out_dir=str(OUT_DIR))
state, choices_update, status_msg, mode_update, edit_update = refresh_curation_ui(
domain_filter,
mode_filter,
min_score,
hide_duplicates,
query,
[],
)
if not ok:
return state, choices_update, "Edit failed: select one item and provide non-empty text.", mode_update, edit_update
return state, choices_update, f"Edited question text. {status_msg}", mode_update, edit_update
def export_selected_from_curation(state_obj, labels):
selected = _get_selected_items(state_obj, labels)
if not selected:
return "Select one or more curation questions first.", None
by_norm, _ = _ensure_answers_for_items(selected)
lines = [
"# Curated Supervisory FAQ",
"",
f"_Generated from checked questions. Source of truth: `{QA_BANK_PATH}`._",
"",
]
exported = 0
for item in selected:
norm = item.get("normalized_question") or _normalize_question(item.get("question", ""))
rec = by_norm.get(norm)
if not rec:
continue
exported += 1
lines.extend(
[
f"## {item.get('domain', 'Unknown Domain')}{item.get('question', '').strip()}",
"",
rec.get("answer_markdown", "").strip(),
"",
"---",
"",
]
)
FAQ_CURATED_PATH.write_text("\n".join(lines).strip() + "\n", encoding="utf-8")
return f"Exported {exported} checked Q&A entries to {FAQ_CURATED_PATH}", str(FAQ_CURATED_PATH)
def _read_qa_bank_records():
if not QA_BANK_PATH.exists():
return []
records = []
for line in QA_BANK_PATH.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
try:
records.append(json.loads(line))
except Exception:
continue
return records
def _append_qa_records(records):
if not records:
return
with QA_BANK_PATH.open("a", encoding="utf-8") as f:
for rec in records:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
def _get_selected_items(state_obj, labels):
if not labels:
return []
items = []
for label in labels:
item = _get_selected_item(state_obj, label)
if item:
items.append(item)
return items
def _ensure_answers_for_items(items):
existing = _read_qa_bank_records()
by_norm = {}
for rec in existing:
norm = rec.get("normalized_question") or _normalize_question(rec.get("question", ""))
if norm and norm not in by_norm:
by_norm[norm] = rec
new_records = []
for item in items:
q = item.get("question", "")
norm = item.get("normalized_question") or _normalize_question(q)
if not norm or norm in by_norm:
continue
answer = answer_question_dynamic(
question=q,
top_k=10,
model="gpt-4.1-mini",
)
rec = {
"saved_at_utc": datetime.now(timezone.utc).isoformat(),
"domain": item.get("domain"),
"label": item.get("label"),
"question": q,
"normalized_question": norm,
"answer_markdown": answer,
}
by_norm[norm] = rec
new_records.append(rec)
_append_qa_records(new_records)
return by_norm, len(new_records)
def answer_selected_questions(state_obj, labels):
items = _get_selected_items(state_obj, labels)
if not items:
return "Select one or more questions first."
# Dedupe within current selection by exact normalized question.
deduped = []
seen = set()
for item in items:
norm = item.get("normalized_question") or _normalize_question(item.get("question", ""))
if not norm or norm in seen:
continue
seen.add(norm)
deduped.append(item)
by_norm, added = _ensure_answers_for_items(deduped)
answered_count = 0
for item in deduped:
norm = item.get("normalized_question") or _normalize_question(item.get("question", ""))
if norm in by_norm:
answered_count += 1
return (
f"Answered {answered_count} selected questions (new: {added}, existing reused: {answered_count - added}). "
f"Saved/updated bank at {QA_BANK_PATH}."
)
def export_selected_to_markdown(state_obj, labels):
items = _get_selected_items(state_obj, labels)
if not items:
return "Select one or more questions first.", None
# Dedupe in selected order by normalized question.
selected = []
seen = set()
for item in items:
norm = item.get("normalized_question") or _normalize_question(item.get("question", ""))
if not norm or norm in seen:
continue
seen.add(norm)
selected.append(item)
by_norm, _ = _ensure_answers_for_items(selected)
lines = [
"# Curated Supervisory FAQ",
"",
f"_Generated from selected questions. Source of truth: `{QA_BANK_PATH}`._",
"",
]
exported = 0
for item in selected:
norm = item.get("normalized_question") or _normalize_question(item.get("question", ""))
rec = by_norm.get(norm)
if not rec:
continue
exported += 1
lines.extend(
[
f"## {item.get('domain', 'Unknown Domain')}{item.get('question', '').strip()}",
"",
rec.get("answer_markdown", "").strip(),
"",
"---",
"",
]
)
FAQ_CURATED_PATH.write_text("\n".join(lines).strip() + "\n", encoding="utf-8")
return f"Exported {exported} Q&A entries to {FAQ_CURATED_PATH}", str(FAQ_CURATED_PATH)
def _filter_step8_labels(state_obj, domain_filter: str, query: str) -> List[str]:
if not state_obj or not state_obj.get("flat"):
return []
domain_filter = (domain_filter or "All domains").strip()
q = _normalize_question(query or "")
out = []
for item in state_obj.get("flat", []):
domain = item.get("domain", "")
question = item.get("question", "")
label = item.get("label", "")
if domain_filter != "All domains" and domain != domain_filter:
continue
if q and q not in _normalize_question(question):
continue
out.append(label)
return out
def update_step8_filtered_choices(state_obj, domain_filter: str, query: str, selected_labels):
filtered = _filter_step8_labels(state_obj, domain_filter, query)
selected_set = set(selected_labels or [])
kept_selected = [label for label in filtered if label in selected_set]
status = (
f"Showing {len(filtered)} matching questions."
if filtered
else "No questions match the current filters."
)
return gr.update(choices=filtered, value=kept_selected), status
def select_all_filtered_questions(state_obj, domain_filter: str, query: str):
filtered = _filter_step8_labels(state_obj, domain_filter, query)
status = (
f"Selected {len(filtered)} filtered questions."
if filtered
else "No filtered questions to select."
)
return gr.update(value=filtered), status
def clear_selected_questions():
return gr.update(value=[]), "Cleared selected questions."
INITIAL_QA_STATE, INITIAL_Q_CHOICES = _load_state_from_bank()
INITIAL_CURATION_STATE, _ = _refresh_state(
domain_filter="All domains",
mode_filter="All modes",
min_score=0,
hide_duplicates=True,
)
INITIAL_CURATION_CHOICES = []
with gr.Blocks(
css="""
#step8-select-list {
max-height: 360px;
overflow-y: auto;
border: 1px solid #d0d7de;
border-radius: 8px;
padding: 8px;
}
"""
) as demo:
gr.Markdown("# UPS National Agreement — Supervisory Decision-Risk Extractor")
# -------------------------
# Step 2: PDF -> text
# -------------------------
gr.Markdown("## Step 2 — Upload PDF and Extract Text")
pdf = gr.File(label="Upload contract PDF", file_types=[".pdf"])
extract_btn = gr.Button("Extract text")
preview = gr.Textbox(label="Preview (first 6,000 chars)", lines=18)
download_txt = gr.File(label="Download extracted text (txt)")
stats = gr.Textbox(label="Stats", interactive=False)
extract_btn.click(
extract_pdf_text,
inputs=[pdf],
outputs=[preview, download_txt, stats],
)
# -------------------------
# Step 3: Chunking
# -------------------------
gr.Markdown("## Step 3 — Build Chunked Knowledge Base")
build_btn = gr.Button("Build KB chunks (Step 3)")
kb_status = gr.Textbox(label="KB build status", interactive=False)
build_btn.click(build_kb, inputs=[], outputs=[kb_status])
# -------------------------
# Step 4: Quick search (no embeddings yet)
# -------------------------
gr.Markdown("## Step 4 — Search the Contract (Quick Test)")
search_q = gr.Textbox(
label="Search query (try: post-accident testing, random testing, overtime, seniority)"
)
search_btn = gr.Button("Search top matches")
search_out = gr.Markdown()
search_btn.click(
lambda q: search_chunks(q, top_k=5),
inputs=[search_q],
outputs=[search_out],
)
# -------------------------
# Step 5: Generate Supervisory Risk Questions (LLM)
# -------------------------
gr.Markdown("## Step 5 — Generate Supervisory Decision-Risk Questions (LLM)")
gen_btn = gr.Button("Generate engineered supervisory questions")
gen_out = gr.Textbox(label="Generated domains + questions (JSON)", lines=20)
step5_status = gr.Textbox(label="Step 5 status", interactive=False)
gen_btn.click(
fn=generate_questions_and_persist,
inputs=[],
outputs=[gen_out, step5_status],
)
# -------------------------
# Step 6: Select question + preview retrieval + generate answer
# -------------------------
gr.Markdown("## Step 6 — Pick a Question, Preview Retrieved Excerpts, Then Generate Answer")
qa_state = gr.State(value=INITIAL_QA_STATE)
load_btn = gr.Button("Load generated questions into dropdown")
load_bank_btn = gr.Button("Load questions from bank")
q_dropdown = gr.Dropdown(
label="Select a question",
choices=INITIAL_Q_CHOICES,
value=INITIAL_Q_CHOICES[0] if INITIAL_Q_CHOICES else None,
)
qa_load_status = gr.Textbox(
label="Question load status",
value=(
f"Loaded {len(INITIAL_Q_CHOICES)} questions from outputs/questions_bank.json on startup"
if INITIAL_Q_CHOICES
else "Question bank currently empty."
),
interactive=False,
)
q_details = gr.Markdown()
q_chunk_ids = gr.Textbox(label="Generator supporting chunk_ids (FYI)", interactive=False)
preview_btn = gr.Button("Preview retrieved contract excerpts (what will ground the answer)")
retrieved_excerpts = gr.Markdown()
answer_btn = gr.Button("Generate answer (dynamic retrieval + citations)")
answer_out = gr.Markdown()
# -------------------------
# Step 7: Generate more by canonical domain
# -------------------------
gr.Markdown("## Step 7 — Generate More Questions by Canonical Domain")
domain_choices = [d for d in CANONICAL_DOMAINS if d != "Other / Needs Review"]
domain_select = gr.Dropdown(
label="Canonical domain",
choices=domain_choices,
value=domain_choices[0] if domain_choices else None,
)
n_new = gr.Slider(label="How many new questions", minimum=5, maximum=50, step=1, value=10)
gen_more_btn = gr.Button("Generate more questions for this domain")
step7_more_status = gr.Textbox(label="Step 7 status", interactive=False)
step7_more_json = gr.Textbox(label="Step 7 generated JSON preview", lines=14)
# -------------------------
# Step 8: Curate + batch answer + export
# -------------------------
gr.Markdown("## Step 8 — Curate Questions, Batch Answer, and Export")
with gr.Row():
step8_domain_filter = gr.Dropdown(
label="Filter by domain",
choices=["All domains"] + list(CANONICAL_DOMAINS),
value="All domains",
)
step8_query_filter = gr.Textbox(
label="Filter by question text",
placeholder="Type to narrow the list...",
)
with gr.Row():
step8_select_filtered_btn = gr.Button("Select all filtered")
step8_clear_selected_btn = gr.Button("Clear selected")
q_multiselect = gr.CheckboxGroup(
label="Select questions to curate (domain + question)",
choices=INITIAL_Q_CHOICES,
value=[],
elem_id="step8-select-list",
)
answer_selected_btn = gr.Button("Answer selected")
export_selected_btn = gr.Button("Export selected to Markdown")
step8_status = gr.Textbox(label="Step 8 status", interactive=False)
curated_md_file = gr.File(label="Curated FAQ Markdown")
load_btn.click(
fn=parse_questions_json,
inputs=[gen_out],
outputs=[qa_state, q_dropdown, q_multiselect, qa_load_status],
)
load_bank_btn.click(
fn=load_questions_from_bank,
inputs=[],
outputs=[qa_state, q_dropdown, q_multiselect, qa_load_status],
)
gen_more_btn.click(
fn=generate_more_questions_for_domain_ui,
inputs=[domain_select, n_new],
outputs=[step7_more_status, step7_more_json, qa_state, q_dropdown, q_multiselect, qa_load_status],
)
q_dropdown.change(
fn=show_selected_question,
inputs=[qa_state, q_dropdown],
outputs=[q_details, q_chunk_ids],
)
preview_btn.click(
fn=preview_retrieved_excerpts,
inputs=[qa_state, q_dropdown],
outputs=[retrieved_excerpts],
)
answer_btn.click(
fn=generate_answer_from_dropdown,
inputs=[qa_state, q_dropdown],
outputs=[answer_out],
)
answer_selected_btn.click(
fn=answer_selected_questions,
inputs=[qa_state, q_multiselect],
outputs=[step8_status],
)
export_selected_btn.click(
fn=export_selected_to_markdown,
inputs=[qa_state, q_multiselect],
outputs=[step8_status, curated_md_file],
)
step8_domain_filter.change(
fn=update_step8_filtered_choices,
inputs=[qa_state, step8_domain_filter, step8_query_filter, q_multiselect],
outputs=[q_multiselect, step8_status],
)
step8_query_filter.change(
fn=update_step8_filtered_choices,
inputs=[qa_state, step8_domain_filter, step8_query_filter, q_multiselect],
outputs=[q_multiselect, step8_status],
)
step8_select_filtered_btn.click(
fn=select_all_filtered_questions,
inputs=[qa_state, step8_domain_filter, step8_query_filter],
outputs=[q_multiselect, step8_status],
)
step8_clear_selected_btn.click(
fn=clear_selected_questions,
inputs=[],
outputs=[q_multiselect, step8_status],
)
# -------------------------
# Advanced Generation + Curation
# -------------------------
gr.Markdown("## Advanced Generation + Curation")
with gr.Tabs():
with gr.Tab("Generation"):
gen_modes = gr.CheckboxGroup(
label="Generation modes",
choices=list(GENERATION_MODES.keys()),
value=list(GENERATION_MODES.keys()),
)
with gr.Row():
gen_quality_threshold = gr.Slider(
label="Quality threshold (0-5)",
minimum=0,
maximum=5,
step=1,
value=3,
)
gen_dedupe_toggle = gr.Checkbox(
label="Near-dedupe filter",
value=True,
)
gen_max_rounds = gr.Slider(
label="Max retry rounds per domain",
minimum=1,
maximum=8,
step=1,
value=3,
)
gr.Markdown("Per-domain targets (canonical domains)")
gen_target_inputs = []
for d in CANONICAL_DOMAINS:
gen_target_inputs.append(
gr.Number(label=d, value=0, precision=0, minimum=0)
)
gen_targets_btn = gr.Button("Generate to meet domain targets")
gen_targets_status = gr.Textbox(label="Generation status", lines=10, interactive=False)
with gr.Tab("Curation"):
curation_state = gr.State(value=INITIAL_CURATION_STATE)
with gr.Row():
cur_domain_filter = gr.Dropdown(
label="Domain",
choices=["All domains"] + list(CANONICAL_DOMAINS),
value="All domains",
)
cur_mode_filter = gr.Dropdown(
label="Mode",
choices=["All modes"] + list(INITIAL_CURATION_STATE.get("modes", [])),
value="All modes",
)
with gr.Row():
cur_min_score = gr.Slider(label="Min quality score", minimum=0, maximum=5, step=1, value=0)
cur_hide_duplicates = gr.Checkbox(label="Hide duplicates", value=True)
cur_query = gr.Textbox(label="Text contains", placeholder="Filter by question text")
cur_refresh_btn = gr.Button("Refresh curation list")
cur_select = gr.CheckboxGroup(
label="Curate question items",
choices=INITIAL_CURATION_CHOICES,
value=[],
elem_id="step8-select-list",
)
cur_edit_label = gr.Dropdown(label="Edit one item", choices=INITIAL_CURATION_CHOICES, value=None)
cur_edit_text = gr.Textbox(label="Edited question text", lines=2)
cur_edit_btn = gr.Button("Apply edit")
cur_export_selected_btn = gr.Button("Export checked questions")
cur_export_file = gr.File(label="Curated checked FAQ Markdown")
cur_status = gr.Textbox(label="Curation status", interactive=False)
# -------------------------
# Contract Analysis
# -------------------------
gr.Markdown("## Contract Analysis")
analysis_keywords = gr.Textbox(
label="Risk keywords (comma-separated)",
value=", ".join(DEFAULT_RISK_KEYWORDS),
)
analysis_top_sections = gr.Number(
label="Top sections to show",
value=15,
precision=0,
minimum=1,
)
analysis_btn = gr.Button("Run contract analysis")
analysis_status = gr.Textbox(label="Analysis status", interactive=False)
analysis_md = gr.Markdown()
analysis_md_file = gr.File(label="domain_analysis.md")
analysis_csv_file = gr.File(label="article_risk_report.csv")
gen_targets_btn.click(
fn=generate_with_targets_ui,
inputs=[gen_modes, gen_quality_threshold, gen_dedupe_toggle, gen_max_rounds] + gen_target_inputs,
outputs=[gen_targets_status, qa_state, q_dropdown, q_multiselect, qa_load_status],
)
for filter_component in [
cur_domain_filter,
cur_min_score,
cur_hide_duplicates,
cur_query,
]:
filter_component.change(
fn=refresh_curation_ui,
inputs=[
cur_domain_filter,
cur_mode_filter,
cur_min_score,
cur_hide_duplicates,
cur_query,
cur_select,
],
outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label],
)
cur_mode_filter.change(
fn=refresh_curation_ui,
inputs=[
cur_domain_filter,
cur_mode_filter,
cur_min_score,
cur_hide_duplicates,
cur_query,
cur_select,
],
outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label],
)
cur_refresh_btn.click(
fn=refresh_curation_ui,
inputs=[
cur_domain_filter,
cur_mode_filter,
cur_min_score,
cur_hide_duplicates,
cur_query,
cur_select,
],
outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label],
)
cur_edit_btn.click(
fn=curation_edit_question_ui,
inputs=[
curation_state,
cur_edit_label,
cur_edit_text,
cur_domain_filter,
cur_mode_filter,
cur_min_score,
cur_hide_duplicates,
cur_query,
],
outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label],
)
cur_export_selected_btn.click(
fn=export_selected_from_curation,
inputs=[curation_state, cur_select],
outputs=[cur_status, cur_export_file],
)
analysis_btn.click(
fn=run_contract_analysis_ui,
inputs=[analysis_keywords, analysis_top_sections],
outputs=[analysis_status, analysis_md, analysis_md_file, analysis_csv_file],
)
demo.launch()