import re import json from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Tuple import gradio as gr from pypdf import PdfReader from chunk_kb import build_chunks from simple_search import search_chunks from retriever import retrieve_top_chunks from risk_extractor import ( GENERATION_MODES, generate_supervisory_questions, generate_domain_mode_targeted_questions, ) from answer_generator import answer_question_dynamic, render_excerpts_markdown from contract_analysis import DEFAULT_RISK_KEYWORDS, run_contract_analysis from domains import CANONICAL_DOMAINS, DOMAIN_KEYWORDS, normalize_domain from question_bank import ( add_questions, edit_item_question, flatten_bank, list_modes, load_bank, save_bank, ) from question_quality import score_question_quality from persist_index import pull_index print(pull_index()) OUT_DIR = Path("outputs") OUT_DIR.mkdir(parents=True, exist_ok=True) QA_BANK_PATH = OUT_DIR / "qa_bank.jsonl" FAQ_CURATED_PATH = OUT_DIR / "faq_curated.md" def _clean_text(s: str) -> str: s = re.sub(r"(\w)-\n(\w)", r"\1\2", s) s = re.sub(r"(? str: return re.sub(r"\s+", " ", (s or "").strip().lower()) def _extract_json_object(text: str): if not text or not text.strip(): return None, "Generated text is empty." s = text.strip() first = s.find("{") last = s.rfind("}") if first == -1 or last == -1 or last <= first: return None, "Could not find a JSON object in the generated text." s = s[first : last + 1] try: return json.loads(s), None except Exception as e: return None, f"Could not parse JSON: {e}" def _extract_question_objects(data: Dict) -> List[Dict]: extracted: List[Dict] = [] if isinstance(data.get("domains"), list): for domain_block in data.get("domains", []): domain = normalize_domain(domain_block.get("domain", "")) for q in domain_block.get("questions", []): extracted.append( { "domain": domain, "mode": q.get("mode") or domain_block.get("mode") or "Unspecified", "question": q.get("question"), "risk_level": q.get("risk_level"), "why_it_matters": q.get("why_it_matters"), "likely_failure_points": q.get("likely_failure_points", []), "supporting_chunk_ids": q.get("supporting_chunk_ids", []), } ) elif isinstance(data.get("questions"), list): domain = normalize_domain(data.get("domain", "")) for q in data.get("questions", []): extracted.append( { "domain": normalize_domain(q.get("domain", domain)), "mode": q.get("mode") or data.get("mode") or "Unspecified", "question": q.get("question"), "risk_level": q.get("risk_level"), "why_it_matters": q.get("why_it_matters"), "likely_failure_points": q.get("likely_failure_points", []), "supporting_chunk_ids": q.get("supporting_chunk_ids", []), } ) return extracted def _build_state_from_bank(bank: Dict): # Step 6 should show a clean, deduped question picker without status prefixes. all_items = flatten_bank(bank, status_filter="All", hide_duplicates=True) by_norm = {} for item in all_items: norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) if not norm: continue prev = by_norm.get(norm) if not prev: by_norm[norm] = item continue prev_key = (-int(prev.get("quality_score", 0)), prev.get("domain", ""), prev.get("id", "")) cur_key = ( -int(item.get("quality_score", 0)), item.get("domain", ""), item.get("id", ""), ) if cur_key < prev_key: by_norm[norm] = item flat = [] for item in by_norm.values(): q = (item.get("question") or "").strip() if not q: continue clean = dict(item) clean["label"] = f"{item.get('domain')} — {q}" flat.append(clean) flat.sort(key=lambda x: (x.get("domain", ""), x.get("question", ""))) choices = [x["label"] for x in flat] modes = list_modes(bank) return {"bank": bank, "flat": flat, "modes": modes}, choices def _load_state_from_bank(): bank = load_bank(out_dir=str(OUT_DIR)) return _build_state_from_bank(bank) def _refresh_state( domain_filter: str = "All domains", mode_filter: str = "All modes", min_score: int = 0, hide_duplicates: bool = False, ): bank = load_bank(out_dir=str(OUT_DIR)) flat = flatten_bank( bank, domain_filter=domain_filter, mode_filter=mode_filter, min_score=int(min_score or 0), status_filter="All", hide_duplicates=hide_duplicates, ) all_flat = flatten_bank(bank) modes = list_modes(bank) choices = [x["label"] for x in flat] state = { "bank": bank, "flat": flat, "all_flat": all_flat, "modes": modes, } return state, choices def _append_questions_to_bank(question_objects: List[Dict]): bank = load_bank(out_dir=str(OUT_DIR)) bank, added_count, skipped = add_questions(bank, question_objects) save_bank(bank, out_dir=str(OUT_DIR)) state, choices = _build_state_from_bank(bank) return state, choices, added_count, skipped def generate_questions_and_persist(): raw = generate_supervisory_questions(sample_chunks=250, model="gpt-4.1-mini") data, err = _extract_json_object(raw) if err: return raw, f"Generated output, but did not persist question bank: {err}" question_objects = _extract_question_objects(data) _, _, added_count, skipped = _append_questions_to_bank(question_objects) return raw, f"Appended to bank: added {added_count}, skipped duplicates/invalid {skipped}." def parse_questions_json(json_text: str): """ Parse the LLM output JSON and return: - state_obj: dict - dropdown update (choices + selected value) - multiselect update - status text """ if not json_text or not json_text.strip(): return None, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "No JSON to parse." data, err = _extract_json_object(json_text) if err: return ( {"error": err, "raw": json_text}, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), f"Could not load questions: {err}", ) question_objects = _extract_question_objects(data) state, choices, added_count, skipped = _append_questions_to_bank(question_objects) if not choices: return state, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "Parsed JSON but found no questions." return ( state, gr.update(choices=choices, value=choices[0]), gr.update(choices=choices, value=[]), f"Loaded bank with {len(choices)} total questions (added {added_count}, skipped {skipped}).", ) def load_questions_from_bank(): state, choices = _load_state_from_bank() if not choices: return state, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "Question bank loaded, but it has no questions." return ( state, gr.update(choices=choices, value=choices[0]), gr.update(choices=choices, value=[]), f"Loaded {len(choices)} questions from outputs/questions_bank.json", ) def _get_selected_item(state_obj, label: str): if not state_obj or not isinstance(state_obj, dict): return None for item in state_obj.get("flat", []): if item.get("label") == label: return item return None def show_selected_question(state_obj, label: str): item = _get_selected_item(state_obj, label) if not item: return "Select a question.", "" fp = item.get("likely_failure_points", []) or [] fp_text = "- " + "\n- ".join(fp[:10]) if fp else "- (none provided)" details = ( f"**Domain:** {item.get('domain')}\n\n" f"**Risk level:** {item.get('risk_level')}\n\n" f"**Why it matters:** {item.get('why_it_matters')}\n\n" f"**Likely failure points:**\n{fp_text}" ) chunk_ids = ", ".join((item.get("supporting_chunk_ids") or [])[:12]) return details, chunk_ids def generate_answer_from_dropdown(state_obj, label: str): item = _get_selected_item(state_obj, label) if not item: return "Select a question first." # Dynamic retrieval at answer-time return answer_question_dynamic( question=item["question"], top_k=10, model="gpt-4.1-mini", ) def preview_retrieved_excerpts(state_obj, label: str): item = _get_selected_item(state_obj, label) if not item: return "Select a question to preview retrieved excerpts." return render_excerpts_markdown( question=item["question"], top_k=8, max_chars=1400, ) def generate_more_questions_for_domain_ui(domain: str, n_new: int): target_domain = normalize_domain(domain) if target_domain == "Other / Needs Review": return ( "Select a canonical domain (not Other / Needs Review).", "", None, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "No questions loaded.", ) keywords = DOMAIN_KEYWORDS.get(target_domain, []) retrieval_query = " ".join([target_domain] + keywords[:4]).strip() chunks = retrieve_top_chunks(retrieval_query, top_k=30) if not chunks: return ( "No relevant chunks retrieved. Build KB first, then try again.", "", None, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "No questions loaded.", ) raw = generate_domain_mode_targeted_questions( domain=target_domain, mode="Supervisor decision points", n_new=int(n_new), chunks=chunks, model="gpt-4.1-mini", ) data, err = _extract_json_object(raw) if err: return ( f"Could not parse generated JSON: {err}", raw, None, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "No questions loaded.", ) question_objects = _extract_question_objects(data) state, choices, added_count, skipped = _append_questions_to_bank(question_objects) if not choices: return ( f"Added {added_count} questions; skipped {skipped} duplicates/invalid.", json.dumps(data, indent=2, ensure_ascii=False), state, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "Question bank loaded, but it has no questions.", ) return ( f"Added {added_count} questions; skipped {skipped} duplicates/invalid.", json.dumps(data, indent=2, ensure_ascii=False), state, gr.update(choices=choices, value=choices[0]), gr.update(choices=choices, value=[]), f"Loaded {len(choices)} questions from outputs/questions_bank.json", ) def _extract_chunk_citations(chunks: List[Dict], chunk_ids: List[str]) -> List[Dict]: by_id = {str(c.get("chunk_id")): c for c in (chunks or []) if c.get("chunk_id")} citations = [] for cid in chunk_ids or []: c = by_id.get(str(cid)) if not c: continue citations.append( { "chunk_id": c.get("chunk_id"), "article": c.get("article"), "section": c.get("section"), "page_start": c.get("page_start"), "page_end": c.get("page_end"), "text_excerpt": (c.get("text") or "")[:500], } ) return citations def _count_domain_progress(bank: Dict, domain: str, min_quality: int) -> int: total = 0 for item in flatten_bank( bank, domain_filter=domain, mode_filter="All modes", min_score=min_quality, status_filter="All", hide_duplicates=True, ): total += 1 return total def _target_map_from_values(target_values: Tuple) -> Dict[str, int]: targets = {} for i, domain in enumerate(CANONICAL_DOMAINS): val = target_values[i] if i < len(target_values) else 0 try: n = int(val or 0) except Exception: n = 0 targets[domain] = max(0, n) return targets def generate_with_targets_ui( selected_modes: List[str], quality_threshold: int, dedupe_enabled: bool, max_rounds: int, *target_values, ): modes = selected_modes or list(GENERATION_MODES.keys()) quality_threshold = int(quality_threshold or 3) max_rounds = max(1, min(8, int(max_rounds or 3))) targets = _target_map_from_values(target_values) summary_lines = [] total_attempted = 0 for domain in CANONICAL_DOMAINS: target = targets.get(domain, 0) if target <= 0: continue round_no = 0 mode_idx = 0 while round_no < max_rounds: bank = load_bank(out_dir=str(OUT_DIR)) have = _count_domain_progress(bank, domain=domain, min_quality=quality_threshold) need = target - have if need <= 0: break round_no += 1 mode = modes[mode_idx % len(modes)] mode_idx += 1 batch = max(2, min(12, need * 2)) keywords = DOMAIN_KEYWORDS.get(domain, []) retrieval_query = " ".join([domain] + keywords[:4] + [mode]).strip() chunks = retrieve_top_chunks(retrieval_query, top_k=36) if not chunks: summary_lines.append(f"{domain}: no chunks retrieved on round {round_no}.") continue raw = generate_domain_mode_targeted_questions( domain=domain, mode=mode, n_new=batch, chunks=chunks, model="gpt-4.1-mini", ) data, err = _extract_json_object(raw) if err: summary_lines.append(f"{domain}: generation parse error on round {round_no}: {err}") continue generated = _extract_question_objects(data) prepared = [] for q in generated: question_text = (q.get("question") or "").strip() if not question_text: continue source_chunk_ids = q.get("supporting_chunk_ids", []) if isinstance(q.get("supporting_chunk_ids"), list) else [] citations = _extract_chunk_citations(chunks, source_chunk_ids) quality = score_question_quality( question=question_text, domain=domain, mode=q.get("mode") or mode, chunk_citations=citations, model="gpt-4.1-mini", ) score = int(quality.get("quality_score", 0)) prepared.append( { "question": question_text, "domain": domain, "mode": q.get("mode") or mode, "quality_score": score, "quality_rationale": quality.get("quality_rationale", ""), "risk_level": q.get("risk_level"), "why_it_matters": q.get("why_it_matters"), "likely_failure_points": q.get("likely_failure_points", []), "source_chunk_ids": source_chunk_ids, "source_citations": citations, } ) total_attempted += len(prepared) bank = load_bank(out_dir=str(OUT_DIR)) bank, added, skipped = add_questions( bank, prepared, dedupe_enabled=bool(dedupe_enabled), dedupe_threshold=0.88, ) save_bank(bank, out_dir=str(OUT_DIR)) have_after = _count_domain_progress(bank, domain=domain, min_quality=quality_threshold) summary_lines.append( f"{domain} round {round_no}/{max_rounds}: attempted {len(prepared)}, added {added}, skipped {skipped}, progress {have_after}/{target}." ) state, choices = _refresh_state() status_text = "\n".join(summary_lines) if summary_lines else "No targets requested." final_status = ( f"Generation done. Attempted {total_attempted} questions. Bank now has {len(state.get('all_flat', []))} total items.\n\n{status_text}" ) return ( final_status, state, gr.update(choices=choices, value=choices[0] if choices else None), gr.update(choices=choices, value=[]), f"Loaded {len(choices)} questions from outputs/questions_bank.json", ) def refresh_curation_ui( domain_filter: str, mode_filter: str, min_score: int, hide_duplicates: bool, query: str, selected_labels: List[str], ): state, choices = _refresh_state( domain_filter=domain_filter, mode_filter=mode_filter, min_score=min_score, hide_duplicates=hide_duplicates, ) q_norm = _normalize_question(query or "") if q_norm: filtered = [] for label in choices: item = next((x for x in state.get("flat", []) if x.get("label") == label), None) if not item: continue if q_norm in _normalize_question(item.get("question", "")): filtered.append(label) choices = filtered selected_set = set(selected_labels or []) kept = [x for x in choices if x in selected_set] mode_choices = ["All modes"] + state.get("modes", []) if mode_filter not in mode_choices: mode_filter = "All modes" status = f"Showing {len(choices)} curated items." return ( state, gr.update(choices=choices, value=kept), status, gr.update(choices=mode_choices, value=mode_filter), gr.update(choices=choices, value=choices[0] if choices else None), ) def curation_edit_question_ui( state_obj: Dict, label: str, edited_question: str, domain_filter: str, mode_filter: str, min_score: int, hide_duplicates: bool, query: str, ): item_id = None for item in (state_obj or {}).get("flat", []): if item.get("label") == label: item_id = item.get("id") break bank = load_bank(out_dir=str(OUT_DIR)) ok = edit_item_question(bank, item_id=item_id, new_question=edited_question) if ok: save_bank(bank, out_dir=str(OUT_DIR)) state, choices_update, status_msg, mode_update, edit_update = refresh_curation_ui( domain_filter, mode_filter, min_score, hide_duplicates, query, [], ) if not ok: return state, choices_update, "Edit failed: select one item and provide non-empty text.", mode_update, edit_update return state, choices_update, f"Edited question text. {status_msg}", mode_update, edit_update def export_selected_from_curation(state_obj, labels): selected = _get_selected_items(state_obj, labels) if not selected: return "Select one or more curation questions first.", None by_norm, _ = _ensure_answers_for_items(selected) lines = [ "# Curated Supervisory FAQ", "", f"_Generated from checked questions. Source of truth: `{QA_BANK_PATH}`._", "", ] exported = 0 for item in selected: norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) rec = by_norm.get(norm) if not rec: continue exported += 1 lines.extend( [ f"## {item.get('domain', 'Unknown Domain')} — {item.get('question', '').strip()}", "", rec.get("answer_markdown", "").strip(), "", "---", "", ] ) FAQ_CURATED_PATH.write_text("\n".join(lines).strip() + "\n", encoding="utf-8") return f"Exported {exported} checked Q&A entries to {FAQ_CURATED_PATH}", str(FAQ_CURATED_PATH) def _read_qa_bank_records(): if not QA_BANK_PATH.exists(): return [] records = [] for line in QA_BANK_PATH.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: records.append(json.loads(line)) except Exception: continue return records def _append_qa_records(records): if not records: return with QA_BANK_PATH.open("a", encoding="utf-8") as f: for rec in records: f.write(json.dumps(rec, ensure_ascii=False) + "\n") def _get_selected_items(state_obj, labels): if not labels: return [] items = [] for label in labels: item = _get_selected_item(state_obj, label) if item: items.append(item) return items def _ensure_answers_for_items(items): existing = _read_qa_bank_records() by_norm = {} for rec in existing: norm = rec.get("normalized_question") or _normalize_question(rec.get("question", "")) if norm and norm not in by_norm: by_norm[norm] = rec new_records = [] for item in items: q = item.get("question", "") norm = item.get("normalized_question") or _normalize_question(q) if not norm or norm in by_norm: continue answer = answer_question_dynamic( question=q, top_k=10, model="gpt-4.1-mini", ) rec = { "saved_at_utc": datetime.now(timezone.utc).isoformat(), "domain": item.get("domain"), "label": item.get("label"), "question": q, "normalized_question": norm, "answer_markdown": answer, } by_norm[norm] = rec new_records.append(rec) _append_qa_records(new_records) return by_norm, len(new_records) def answer_selected_questions(state_obj, labels): items = _get_selected_items(state_obj, labels) if not items: return "Select one or more questions first." # Dedupe within current selection by exact normalized question. deduped = [] seen = set() for item in items: norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) if not norm or norm in seen: continue seen.add(norm) deduped.append(item) by_norm, added = _ensure_answers_for_items(deduped) answered_count = 0 for item in deduped: norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) if norm in by_norm: answered_count += 1 return ( f"Answered {answered_count} selected questions (new: {added}, existing reused: {answered_count - added}). " f"Saved/updated bank at {QA_BANK_PATH}." ) def export_selected_to_markdown(state_obj, labels): items = _get_selected_items(state_obj, labels) if not items: return "Select one or more questions first.", None # Dedupe in selected order by normalized question. selected = [] seen = set() for item in items: norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) if not norm or norm in seen: continue seen.add(norm) selected.append(item) by_norm, _ = _ensure_answers_for_items(selected) lines = [ "# Curated Supervisory FAQ", "", f"_Generated from selected questions. Source of truth: `{QA_BANK_PATH}`._", "", ] exported = 0 for item in selected: norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) rec = by_norm.get(norm) if not rec: continue exported += 1 lines.extend( [ f"## {item.get('domain', 'Unknown Domain')} — {item.get('question', '').strip()}", "", rec.get("answer_markdown", "").strip(), "", "---", "", ] ) FAQ_CURATED_PATH.write_text("\n".join(lines).strip() + "\n", encoding="utf-8") return f"Exported {exported} Q&A entries to {FAQ_CURATED_PATH}", str(FAQ_CURATED_PATH) def _filter_step8_labels(state_obj, domain_filter: str, query: str) -> List[str]: if not state_obj or not state_obj.get("flat"): return [] domain_filter = (domain_filter or "All domains").strip() q = _normalize_question(query or "") out = [] for item in state_obj.get("flat", []): domain = item.get("domain", "") question = item.get("question", "") label = item.get("label", "") if domain_filter != "All domains" and domain != domain_filter: continue if q and q not in _normalize_question(question): continue out.append(label) return out def update_step8_filtered_choices(state_obj, domain_filter: str, query: str, selected_labels): filtered = _filter_step8_labels(state_obj, domain_filter, query) selected_set = set(selected_labels or []) kept_selected = [label for label in filtered if label in selected_set] status = ( f"Showing {len(filtered)} matching questions." if filtered else "No questions match the current filters." ) return gr.update(choices=filtered, value=kept_selected), status def select_all_filtered_questions(state_obj, domain_filter: str, query: str): filtered = _filter_step8_labels(state_obj, domain_filter, query) status = ( f"Selected {len(filtered)} filtered questions." if filtered else "No filtered questions to select." ) return gr.update(value=filtered), status def clear_selected_questions(): return gr.update(value=[]), "Cleared selected questions." INITIAL_QA_STATE, INITIAL_Q_CHOICES = _load_state_from_bank() INITIAL_CURATION_STATE, _ = _refresh_state( domain_filter="All domains", mode_filter="All modes", min_score=0, hide_duplicates=True, ) INITIAL_CURATION_CHOICES = [] with gr.Blocks( css=""" #step8-select-list { max-height: 360px; overflow-y: auto; border: 1px solid #d0d7de; border-radius: 8px; padding: 8px; } """ ) as demo: gr.Markdown("# UPS National Agreement — Supervisory Decision-Risk Extractor") # ------------------------- # Step 2: PDF -> text # ------------------------- gr.Markdown("## Step 2 — Upload PDF and Extract Text") pdf = gr.File(label="Upload contract PDF", file_types=[".pdf"]) extract_btn = gr.Button("Extract text") preview = gr.Textbox(label="Preview (first 6,000 chars)", lines=18) download_txt = gr.File(label="Download extracted text (txt)") stats = gr.Textbox(label="Stats", interactive=False) extract_btn.click( extract_pdf_text, inputs=[pdf], outputs=[preview, download_txt, stats], ) # ------------------------- # Step 3: Chunking # ------------------------- gr.Markdown("## Step 3 — Build Chunked Knowledge Base") build_btn = gr.Button("Build KB chunks (Step 3)") kb_status = gr.Textbox(label="KB build status", interactive=False) build_btn.click(build_kb, inputs=[], outputs=[kb_status]) # ------------------------- # Step 4: Quick search (no embeddings yet) # ------------------------- gr.Markdown("## Step 4 — Search the Contract (Quick Test)") search_q = gr.Textbox( label="Search query (try: post-accident testing, random testing, overtime, seniority)" ) search_btn = gr.Button("Search top matches") search_out = gr.Markdown() search_btn.click( lambda q: search_chunks(q, top_k=5), inputs=[search_q], outputs=[search_out], ) # ------------------------- # Step 5: Generate Supervisory Risk Questions (LLM) # ------------------------- gr.Markdown("## Step 5 — Generate Supervisory Decision-Risk Questions (LLM)") gen_btn = gr.Button("Generate engineered supervisory questions") gen_out = gr.Textbox(label="Generated domains + questions (JSON)", lines=20) step5_status = gr.Textbox(label="Step 5 status", interactive=False) gen_btn.click( fn=generate_questions_and_persist, inputs=[], outputs=[gen_out, step5_status], ) # ------------------------- # Step 6: Select question + preview retrieval + generate answer # ------------------------- gr.Markdown("## Step 6 — Pick a Question, Preview Retrieved Excerpts, Then Generate Answer") qa_state = gr.State(value=INITIAL_QA_STATE) load_btn = gr.Button("Load generated questions into dropdown") load_bank_btn = gr.Button("Load questions from bank") q_dropdown = gr.Dropdown( label="Select a question", choices=INITIAL_Q_CHOICES, value=INITIAL_Q_CHOICES[0] if INITIAL_Q_CHOICES else None, ) qa_load_status = gr.Textbox( label="Question load status", value=( f"Loaded {len(INITIAL_Q_CHOICES)} questions from outputs/questions_bank.json on startup" if INITIAL_Q_CHOICES else "Question bank currently empty." ), interactive=False, ) q_details = gr.Markdown() q_chunk_ids = gr.Textbox(label="Generator supporting chunk_ids (FYI)", interactive=False) preview_btn = gr.Button("Preview retrieved contract excerpts (what will ground the answer)") retrieved_excerpts = gr.Markdown() answer_btn = gr.Button("Generate answer (dynamic retrieval + citations)") answer_out = gr.Markdown() # ------------------------- # Step 7: Generate more by canonical domain # ------------------------- gr.Markdown("## Step 7 — Generate More Questions by Canonical Domain") domain_choices = [d for d in CANONICAL_DOMAINS if d != "Other / Needs Review"] domain_select = gr.Dropdown( label="Canonical domain", choices=domain_choices, value=domain_choices[0] if domain_choices else None, ) n_new = gr.Slider(label="How many new questions", minimum=5, maximum=50, step=1, value=10) gen_more_btn = gr.Button("Generate more questions for this domain") step7_more_status = gr.Textbox(label="Step 7 status", interactive=False) step7_more_json = gr.Textbox(label="Step 7 generated JSON preview", lines=14) # ------------------------- # Step 8: Curate + batch answer + export # ------------------------- gr.Markdown("## Step 8 — Curate Questions, Batch Answer, and Export") with gr.Row(): step8_domain_filter = gr.Dropdown( label="Filter by domain", choices=["All domains"] + list(CANONICAL_DOMAINS), value="All domains", ) step8_query_filter = gr.Textbox( label="Filter by question text", placeholder="Type to narrow the list...", ) with gr.Row(): step8_select_filtered_btn = gr.Button("Select all filtered") step8_clear_selected_btn = gr.Button("Clear selected") q_multiselect = gr.CheckboxGroup( label="Select questions to curate (domain + question)", choices=INITIAL_Q_CHOICES, value=[], elem_id="step8-select-list", ) answer_selected_btn = gr.Button("Answer selected") export_selected_btn = gr.Button("Export selected to Markdown") step8_status = gr.Textbox(label="Step 8 status", interactive=False) curated_md_file = gr.File(label="Curated FAQ Markdown") load_btn.click( fn=parse_questions_json, inputs=[gen_out], outputs=[qa_state, q_dropdown, q_multiselect, qa_load_status], ) load_bank_btn.click( fn=load_questions_from_bank, inputs=[], outputs=[qa_state, q_dropdown, q_multiselect, qa_load_status], ) gen_more_btn.click( fn=generate_more_questions_for_domain_ui, inputs=[domain_select, n_new], outputs=[step7_more_status, step7_more_json, qa_state, q_dropdown, q_multiselect, qa_load_status], ) q_dropdown.change( fn=show_selected_question, inputs=[qa_state, q_dropdown], outputs=[q_details, q_chunk_ids], ) preview_btn.click( fn=preview_retrieved_excerpts, inputs=[qa_state, q_dropdown], outputs=[retrieved_excerpts], ) answer_btn.click( fn=generate_answer_from_dropdown, inputs=[qa_state, q_dropdown], outputs=[answer_out], ) answer_selected_btn.click( fn=answer_selected_questions, inputs=[qa_state, q_multiselect], outputs=[step8_status], ) export_selected_btn.click( fn=export_selected_to_markdown, inputs=[qa_state, q_multiselect], outputs=[step8_status, curated_md_file], ) step8_domain_filter.change( fn=update_step8_filtered_choices, inputs=[qa_state, step8_domain_filter, step8_query_filter, q_multiselect], outputs=[q_multiselect, step8_status], ) step8_query_filter.change( fn=update_step8_filtered_choices, inputs=[qa_state, step8_domain_filter, step8_query_filter, q_multiselect], outputs=[q_multiselect, step8_status], ) step8_select_filtered_btn.click( fn=select_all_filtered_questions, inputs=[qa_state, step8_domain_filter, step8_query_filter], outputs=[q_multiselect, step8_status], ) step8_clear_selected_btn.click( fn=clear_selected_questions, inputs=[], outputs=[q_multiselect, step8_status], ) # ------------------------- # Advanced Generation + Curation # ------------------------- gr.Markdown("## Advanced Generation + Curation") with gr.Tabs(): with gr.Tab("Generation"): gen_modes = gr.CheckboxGroup( label="Generation modes", choices=list(GENERATION_MODES.keys()), value=list(GENERATION_MODES.keys()), ) with gr.Row(): gen_quality_threshold = gr.Slider( label="Quality threshold (0-5)", minimum=0, maximum=5, step=1, value=3, ) gen_dedupe_toggle = gr.Checkbox( label="Near-dedupe filter", value=True, ) gen_max_rounds = gr.Slider( label="Max retry rounds per domain", minimum=1, maximum=8, step=1, value=3, ) gr.Markdown("Per-domain targets (canonical domains)") gen_target_inputs = [] for d in CANONICAL_DOMAINS: gen_target_inputs.append( gr.Number(label=d, value=0, precision=0, minimum=0) ) gen_targets_btn = gr.Button("Generate to meet domain targets") gen_targets_status = gr.Textbox(label="Generation status", lines=10, interactive=False) with gr.Tab("Curation"): curation_state = gr.State(value=INITIAL_CURATION_STATE) with gr.Row(): cur_domain_filter = gr.Dropdown( label="Domain", choices=["All domains"] + list(CANONICAL_DOMAINS), value="All domains", ) cur_mode_filter = gr.Dropdown( label="Mode", choices=["All modes"] + list(INITIAL_CURATION_STATE.get("modes", [])), value="All modes", ) with gr.Row(): cur_min_score = gr.Slider(label="Min quality score", minimum=0, maximum=5, step=1, value=0) cur_hide_duplicates = gr.Checkbox(label="Hide duplicates", value=True) cur_query = gr.Textbox(label="Text contains", placeholder="Filter by question text") cur_refresh_btn = gr.Button("Refresh curation list") cur_select = gr.CheckboxGroup( label="Curate question items", choices=INITIAL_CURATION_CHOICES, value=[], elem_id="step8-select-list", ) cur_edit_label = gr.Dropdown(label="Edit one item", choices=INITIAL_CURATION_CHOICES, value=None) cur_edit_text = gr.Textbox(label="Edited question text", lines=2) cur_edit_btn = gr.Button("Apply edit") cur_export_selected_btn = gr.Button("Export checked questions") cur_export_file = gr.File(label="Curated checked FAQ Markdown") cur_status = gr.Textbox(label="Curation status", interactive=False) # ------------------------- # Contract Analysis # ------------------------- gr.Markdown("## Contract Analysis") analysis_keywords = gr.Textbox( label="Risk keywords (comma-separated)", value=", ".join(DEFAULT_RISK_KEYWORDS), ) analysis_top_sections = gr.Number( label="Top sections to show", value=15, precision=0, minimum=1, ) analysis_btn = gr.Button("Run contract analysis") analysis_status = gr.Textbox(label="Analysis status", interactive=False) analysis_md = gr.Markdown() analysis_md_file = gr.File(label="domain_analysis.md") analysis_csv_file = gr.File(label="article_risk_report.csv") gen_targets_btn.click( fn=generate_with_targets_ui, inputs=[gen_modes, gen_quality_threshold, gen_dedupe_toggle, gen_max_rounds] + gen_target_inputs, outputs=[gen_targets_status, qa_state, q_dropdown, q_multiselect, qa_load_status], ) for filter_component in [ cur_domain_filter, cur_min_score, cur_hide_duplicates, cur_query, ]: filter_component.change( fn=refresh_curation_ui, inputs=[ cur_domain_filter, cur_mode_filter, cur_min_score, cur_hide_duplicates, cur_query, cur_select, ], outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label], ) cur_mode_filter.change( fn=refresh_curation_ui, inputs=[ cur_domain_filter, cur_mode_filter, cur_min_score, cur_hide_duplicates, cur_query, cur_select, ], outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label], ) cur_refresh_btn.click( fn=refresh_curation_ui, inputs=[ cur_domain_filter, cur_mode_filter, cur_min_score, cur_hide_duplicates, cur_query, cur_select, ], outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label], ) cur_edit_btn.click( fn=curation_edit_question_ui, inputs=[ curation_state, cur_edit_label, cur_edit_text, cur_domain_filter, cur_mode_filter, cur_min_score, cur_hide_duplicates, cur_query, ], outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label], ) cur_export_selected_btn.click( fn=export_selected_from_curation, inputs=[curation_state, cur_select], outputs=[cur_status, cur_export_file], ) analysis_btn.click( fn=run_contract_analysis_ui, inputs=[analysis_keywords, analysis_top_sections], outputs=[analysis_status, analysis_md, analysis_md_file, analysis_csv_file], ) demo.launch()