Spaces:
Sleeping
Sleeping
| import re | |
| import json | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple | |
| import gradio as gr | |
| from pypdf import PdfReader | |
| from chunk_kb import build_chunks | |
| from simple_search import search_chunks | |
| from retriever import retrieve_top_chunks | |
| from risk_extractor import ( | |
| GENERATION_MODES, | |
| generate_supervisory_questions, | |
| generate_domain_mode_targeted_questions, | |
| ) | |
| from answer_generator import answer_question_dynamic, render_excerpts_markdown | |
| from contract_analysis import DEFAULT_RISK_KEYWORDS, run_contract_analysis | |
| from domains import CANONICAL_DOMAINS, DOMAIN_KEYWORDS, normalize_domain | |
| from question_bank import ( | |
| add_questions, | |
| edit_item_question, | |
| flatten_bank, | |
| list_modes, | |
| load_bank, | |
| save_bank, | |
| ) | |
| from question_quality import score_question_quality | |
| from persist_index import pull_index | |
| print(pull_index()) | |
| OUT_DIR = Path("outputs") | |
| OUT_DIR.mkdir(parents=True, exist_ok=True) | |
| QA_BANK_PATH = OUT_DIR / "qa_bank.jsonl" | |
| FAQ_CURATED_PATH = OUT_DIR / "faq_curated.md" | |
| def _clean_text(s: str) -> str: | |
| s = re.sub(r"(\w)-\n(\w)", r"\1\2", s) | |
| s = re.sub(r"(?<!\n)\n(?!\n)", " ", s) | |
| s = re.sub(r"[ \t]+", " ", s) | |
| s = re.sub(r"\n{3,}", "\n\n", s) | |
| return s.strip() | |
| def extract_pdf_text(pdf_file): | |
| if pdf_file is None: | |
| return "Upload a PDF first.", None, "" | |
| reader = PdfReader(pdf_file) | |
| pages_text = [] | |
| for i, page in enumerate(reader.pages): | |
| t = page.extract_text() or "" | |
| pages_text.append(f"\n\n--- PAGE {i+1} ---\n\n{t}") | |
| raw = "".join(pages_text) | |
| cleaned = _clean_text(raw) | |
| txt_path = OUT_DIR / "contract_extracted.txt" | |
| txt_path.write_text(cleaned, encoding="utf-8") | |
| preview = cleaned[:6000] | |
| stats = f"Pages: {len(reader.pages)} | Characters: {len(cleaned):,}" | |
| return preview, str(txt_path), stats | |
| def build_kb(): | |
| extracted = OUT_DIR / "contract_extracted.txt" | |
| if not extracted.exists(): | |
| return "Extract the PDF first (Step 2)." | |
| return build_chunks(str(extracted)) | |
| def run_contract_analysis_ui(keywords_csv: str, top_sections: int): | |
| keywords = [k.strip() for k in (keywords_csv or "").split(",") if k.strip()] | |
| if not keywords: | |
| keywords = DEFAULT_RISK_KEYWORDS | |
| top_n = max(1, int(top_sections or 15)) | |
| try: | |
| result = run_contract_analysis( | |
| chunks_path=Path("kb/chunks.jsonl"), | |
| out_dir=OUT_DIR, | |
| keywords=keywords, | |
| top_sections=top_n, | |
| ) | |
| except Exception as e: | |
| err = f"Contract analysis failed: {e}" | |
| return err, err, None, None | |
| status = ( | |
| f"Analysis complete. Wrote {result['markdown_path']} and {result['csv_path']} " | |
| f"using {len(keywords)} risk keywords." | |
| ) | |
| return status, result["markdown"], result["markdown_path"], result["csv_path"] | |
| # ------------------------- | |
| # Step 6 helpers | |
| # ------------------------- | |
| def _normalize_question(s: str) -> str: | |
| return re.sub(r"\s+", " ", (s or "").strip().lower()) | |
| def _extract_json_object(text: str): | |
| if not text or not text.strip(): | |
| return None, "Generated text is empty." | |
| s = text.strip() | |
| first = s.find("{") | |
| last = s.rfind("}") | |
| if first == -1 or last == -1 or last <= first: | |
| return None, "Could not find a JSON object in the generated text." | |
| s = s[first : last + 1] | |
| try: | |
| return json.loads(s), None | |
| except Exception as e: | |
| return None, f"Could not parse JSON: {e}" | |
| def _extract_question_objects(data: Dict) -> List[Dict]: | |
| extracted: List[Dict] = [] | |
| if isinstance(data.get("domains"), list): | |
| for domain_block in data.get("domains", []): | |
| domain = normalize_domain(domain_block.get("domain", "")) | |
| for q in domain_block.get("questions", []): | |
| extracted.append( | |
| { | |
| "domain": domain, | |
| "mode": q.get("mode") or domain_block.get("mode") or "Unspecified", | |
| "question": q.get("question"), | |
| "risk_level": q.get("risk_level"), | |
| "why_it_matters": q.get("why_it_matters"), | |
| "likely_failure_points": q.get("likely_failure_points", []), | |
| "supporting_chunk_ids": q.get("supporting_chunk_ids", []), | |
| } | |
| ) | |
| elif isinstance(data.get("questions"), list): | |
| domain = normalize_domain(data.get("domain", "")) | |
| for q in data.get("questions", []): | |
| extracted.append( | |
| { | |
| "domain": normalize_domain(q.get("domain", domain)), | |
| "mode": q.get("mode") or data.get("mode") or "Unspecified", | |
| "question": q.get("question"), | |
| "risk_level": q.get("risk_level"), | |
| "why_it_matters": q.get("why_it_matters"), | |
| "likely_failure_points": q.get("likely_failure_points", []), | |
| "supporting_chunk_ids": q.get("supporting_chunk_ids", []), | |
| } | |
| ) | |
| return extracted | |
| def _build_state_from_bank(bank: Dict): | |
| # Step 6 should show a clean, deduped question picker without status prefixes. | |
| all_items = flatten_bank(bank, status_filter="All", hide_duplicates=True) | |
| by_norm = {} | |
| for item in all_items: | |
| norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) | |
| if not norm: | |
| continue | |
| prev = by_norm.get(norm) | |
| if not prev: | |
| by_norm[norm] = item | |
| continue | |
| prev_key = (-int(prev.get("quality_score", 0)), prev.get("domain", ""), prev.get("id", "")) | |
| cur_key = ( | |
| -int(item.get("quality_score", 0)), | |
| item.get("domain", ""), | |
| item.get("id", ""), | |
| ) | |
| if cur_key < prev_key: | |
| by_norm[norm] = item | |
| flat = [] | |
| for item in by_norm.values(): | |
| q = (item.get("question") or "").strip() | |
| if not q: | |
| continue | |
| clean = dict(item) | |
| clean["label"] = f"{item.get('domain')} — {q}" | |
| flat.append(clean) | |
| flat.sort(key=lambda x: (x.get("domain", ""), x.get("question", ""))) | |
| choices = [x["label"] for x in flat] | |
| modes = list_modes(bank) | |
| return {"bank": bank, "flat": flat, "modes": modes}, choices | |
| def _load_state_from_bank(): | |
| bank = load_bank(out_dir=str(OUT_DIR)) | |
| return _build_state_from_bank(bank) | |
| def _refresh_state( | |
| domain_filter: str = "All domains", | |
| mode_filter: str = "All modes", | |
| min_score: int = 0, | |
| hide_duplicates: bool = False, | |
| ): | |
| bank = load_bank(out_dir=str(OUT_DIR)) | |
| flat = flatten_bank( | |
| bank, | |
| domain_filter=domain_filter, | |
| mode_filter=mode_filter, | |
| min_score=int(min_score or 0), | |
| status_filter="All", | |
| hide_duplicates=hide_duplicates, | |
| ) | |
| all_flat = flatten_bank(bank) | |
| modes = list_modes(bank) | |
| choices = [x["label"] for x in flat] | |
| state = { | |
| "bank": bank, | |
| "flat": flat, | |
| "all_flat": all_flat, | |
| "modes": modes, | |
| } | |
| return state, choices | |
| def _append_questions_to_bank(question_objects: List[Dict]): | |
| bank = load_bank(out_dir=str(OUT_DIR)) | |
| bank, added_count, skipped = add_questions(bank, question_objects) | |
| save_bank(bank, out_dir=str(OUT_DIR)) | |
| state, choices = _build_state_from_bank(bank) | |
| return state, choices, added_count, skipped | |
| def generate_questions_and_persist(): | |
| raw = generate_supervisory_questions(sample_chunks=250, model="gpt-4.1-mini") | |
| data, err = _extract_json_object(raw) | |
| if err: | |
| return raw, f"Generated output, but did not persist question bank: {err}" | |
| question_objects = _extract_question_objects(data) | |
| _, _, added_count, skipped = _append_questions_to_bank(question_objects) | |
| return raw, f"Appended to bank: added {added_count}, skipped duplicates/invalid {skipped}." | |
| def parse_questions_json(json_text: str): | |
| """ | |
| Parse the LLM output JSON and return: | |
| - state_obj: dict | |
| - dropdown update (choices + selected value) | |
| - multiselect update | |
| - status text | |
| """ | |
| if not json_text or not json_text.strip(): | |
| return None, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "No JSON to parse." | |
| data, err = _extract_json_object(json_text) | |
| if err: | |
| return ( | |
| {"error": err, "raw": json_text}, | |
| gr.update(choices=[], value=None), | |
| gr.update(choices=[], value=[]), | |
| f"Could not load questions: {err}", | |
| ) | |
| question_objects = _extract_question_objects(data) | |
| state, choices, added_count, skipped = _append_questions_to_bank(question_objects) | |
| if not choices: | |
| return state, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "Parsed JSON but found no questions." | |
| return ( | |
| state, | |
| gr.update(choices=choices, value=choices[0]), | |
| gr.update(choices=choices, value=[]), | |
| f"Loaded bank with {len(choices)} total questions (added {added_count}, skipped {skipped}).", | |
| ) | |
| def load_questions_from_bank(): | |
| state, choices = _load_state_from_bank() | |
| if not choices: | |
| return state, gr.update(choices=[], value=None), gr.update(choices=[], value=[]), "Question bank loaded, but it has no questions." | |
| return ( | |
| state, | |
| gr.update(choices=choices, value=choices[0]), | |
| gr.update(choices=choices, value=[]), | |
| f"Loaded {len(choices)} questions from outputs/questions_bank.json", | |
| ) | |
| def _get_selected_item(state_obj, label: str): | |
| if not state_obj or not isinstance(state_obj, dict): | |
| return None | |
| for item in state_obj.get("flat", []): | |
| if item.get("label") == label: | |
| return item | |
| return None | |
| def show_selected_question(state_obj, label: str): | |
| item = _get_selected_item(state_obj, label) | |
| if not item: | |
| return "Select a question.", "" | |
| fp = item.get("likely_failure_points", []) or [] | |
| fp_text = "- " + "\n- ".join(fp[:10]) if fp else "- (none provided)" | |
| details = ( | |
| f"**Domain:** {item.get('domain')}\n\n" | |
| f"**Risk level:** {item.get('risk_level')}\n\n" | |
| f"**Why it matters:** {item.get('why_it_matters')}\n\n" | |
| f"**Likely failure points:**\n{fp_text}" | |
| ) | |
| chunk_ids = ", ".join((item.get("supporting_chunk_ids") or [])[:12]) | |
| return details, chunk_ids | |
| def generate_answer_from_dropdown(state_obj, label: str): | |
| item = _get_selected_item(state_obj, label) | |
| if not item: | |
| return "Select a question first." | |
| # Dynamic retrieval at answer-time | |
| return answer_question_dynamic( | |
| question=item["question"], | |
| top_k=10, | |
| model="gpt-4.1-mini", | |
| ) | |
| def preview_retrieved_excerpts(state_obj, label: str): | |
| item = _get_selected_item(state_obj, label) | |
| if not item: | |
| return "Select a question to preview retrieved excerpts." | |
| return render_excerpts_markdown( | |
| question=item["question"], | |
| top_k=8, | |
| max_chars=1400, | |
| ) | |
| def generate_more_questions_for_domain_ui(domain: str, n_new: int): | |
| target_domain = normalize_domain(domain) | |
| if target_domain == "Other / Needs Review": | |
| return ( | |
| "Select a canonical domain (not Other / Needs Review).", | |
| "", | |
| None, | |
| gr.update(choices=[], value=None), | |
| gr.update(choices=[], value=[]), | |
| "No questions loaded.", | |
| ) | |
| keywords = DOMAIN_KEYWORDS.get(target_domain, []) | |
| retrieval_query = " ".join([target_domain] + keywords[:4]).strip() | |
| chunks = retrieve_top_chunks(retrieval_query, top_k=30) | |
| if not chunks: | |
| return ( | |
| "No relevant chunks retrieved. Build KB first, then try again.", | |
| "", | |
| None, | |
| gr.update(choices=[], value=None), | |
| gr.update(choices=[], value=[]), | |
| "No questions loaded.", | |
| ) | |
| raw = generate_domain_mode_targeted_questions( | |
| domain=target_domain, | |
| mode="Supervisor decision points", | |
| n_new=int(n_new), | |
| chunks=chunks, | |
| model="gpt-4.1-mini", | |
| ) | |
| data, err = _extract_json_object(raw) | |
| if err: | |
| return ( | |
| f"Could not parse generated JSON: {err}", | |
| raw, | |
| None, | |
| gr.update(choices=[], value=None), | |
| gr.update(choices=[], value=[]), | |
| "No questions loaded.", | |
| ) | |
| question_objects = _extract_question_objects(data) | |
| state, choices, added_count, skipped = _append_questions_to_bank(question_objects) | |
| if not choices: | |
| return ( | |
| f"Added {added_count} questions; skipped {skipped} duplicates/invalid.", | |
| json.dumps(data, indent=2, ensure_ascii=False), | |
| state, | |
| gr.update(choices=[], value=None), | |
| gr.update(choices=[], value=[]), | |
| "Question bank loaded, but it has no questions.", | |
| ) | |
| return ( | |
| f"Added {added_count} questions; skipped {skipped} duplicates/invalid.", | |
| json.dumps(data, indent=2, ensure_ascii=False), | |
| state, | |
| gr.update(choices=choices, value=choices[0]), | |
| gr.update(choices=choices, value=[]), | |
| f"Loaded {len(choices)} questions from outputs/questions_bank.json", | |
| ) | |
| def _extract_chunk_citations(chunks: List[Dict], chunk_ids: List[str]) -> List[Dict]: | |
| by_id = {str(c.get("chunk_id")): c for c in (chunks or []) if c.get("chunk_id")} | |
| citations = [] | |
| for cid in chunk_ids or []: | |
| c = by_id.get(str(cid)) | |
| if not c: | |
| continue | |
| citations.append( | |
| { | |
| "chunk_id": c.get("chunk_id"), | |
| "article": c.get("article"), | |
| "section": c.get("section"), | |
| "page_start": c.get("page_start"), | |
| "page_end": c.get("page_end"), | |
| "text_excerpt": (c.get("text") or "")[:500], | |
| } | |
| ) | |
| return citations | |
| def _count_domain_progress(bank: Dict, domain: str, min_quality: int) -> int: | |
| total = 0 | |
| for item in flatten_bank( | |
| bank, | |
| domain_filter=domain, | |
| mode_filter="All modes", | |
| min_score=min_quality, | |
| status_filter="All", | |
| hide_duplicates=True, | |
| ): | |
| total += 1 | |
| return total | |
| def _target_map_from_values(target_values: Tuple) -> Dict[str, int]: | |
| targets = {} | |
| for i, domain in enumerate(CANONICAL_DOMAINS): | |
| val = target_values[i] if i < len(target_values) else 0 | |
| try: | |
| n = int(val or 0) | |
| except Exception: | |
| n = 0 | |
| targets[domain] = max(0, n) | |
| return targets | |
| def generate_with_targets_ui( | |
| selected_modes: List[str], | |
| quality_threshold: int, | |
| dedupe_enabled: bool, | |
| max_rounds: int, | |
| *target_values, | |
| ): | |
| modes = selected_modes or list(GENERATION_MODES.keys()) | |
| quality_threshold = int(quality_threshold or 3) | |
| max_rounds = max(1, min(8, int(max_rounds or 3))) | |
| targets = _target_map_from_values(target_values) | |
| summary_lines = [] | |
| total_attempted = 0 | |
| for domain in CANONICAL_DOMAINS: | |
| target = targets.get(domain, 0) | |
| if target <= 0: | |
| continue | |
| round_no = 0 | |
| mode_idx = 0 | |
| while round_no < max_rounds: | |
| bank = load_bank(out_dir=str(OUT_DIR)) | |
| have = _count_domain_progress(bank, domain=domain, min_quality=quality_threshold) | |
| need = target - have | |
| if need <= 0: | |
| break | |
| round_no += 1 | |
| mode = modes[mode_idx % len(modes)] | |
| mode_idx += 1 | |
| batch = max(2, min(12, need * 2)) | |
| keywords = DOMAIN_KEYWORDS.get(domain, []) | |
| retrieval_query = " ".join([domain] + keywords[:4] + [mode]).strip() | |
| chunks = retrieve_top_chunks(retrieval_query, top_k=36) | |
| if not chunks: | |
| summary_lines.append(f"{domain}: no chunks retrieved on round {round_no}.") | |
| continue | |
| raw = generate_domain_mode_targeted_questions( | |
| domain=domain, | |
| mode=mode, | |
| n_new=batch, | |
| chunks=chunks, | |
| model="gpt-4.1-mini", | |
| ) | |
| data, err = _extract_json_object(raw) | |
| if err: | |
| summary_lines.append(f"{domain}: generation parse error on round {round_no}: {err}") | |
| continue | |
| generated = _extract_question_objects(data) | |
| prepared = [] | |
| for q in generated: | |
| question_text = (q.get("question") or "").strip() | |
| if not question_text: | |
| continue | |
| source_chunk_ids = q.get("supporting_chunk_ids", []) if isinstance(q.get("supporting_chunk_ids"), list) else [] | |
| citations = _extract_chunk_citations(chunks, source_chunk_ids) | |
| quality = score_question_quality( | |
| question=question_text, | |
| domain=domain, | |
| mode=q.get("mode") or mode, | |
| chunk_citations=citations, | |
| model="gpt-4.1-mini", | |
| ) | |
| score = int(quality.get("quality_score", 0)) | |
| prepared.append( | |
| { | |
| "question": question_text, | |
| "domain": domain, | |
| "mode": q.get("mode") or mode, | |
| "quality_score": score, | |
| "quality_rationale": quality.get("quality_rationale", ""), | |
| "risk_level": q.get("risk_level"), | |
| "why_it_matters": q.get("why_it_matters"), | |
| "likely_failure_points": q.get("likely_failure_points", []), | |
| "source_chunk_ids": source_chunk_ids, | |
| "source_citations": citations, | |
| } | |
| ) | |
| total_attempted += len(prepared) | |
| bank = load_bank(out_dir=str(OUT_DIR)) | |
| bank, added, skipped = add_questions( | |
| bank, | |
| prepared, | |
| dedupe_enabled=bool(dedupe_enabled), | |
| dedupe_threshold=0.88, | |
| ) | |
| save_bank(bank, out_dir=str(OUT_DIR)) | |
| have_after = _count_domain_progress(bank, domain=domain, min_quality=quality_threshold) | |
| summary_lines.append( | |
| f"{domain} round {round_no}/{max_rounds}: attempted {len(prepared)}, added {added}, skipped {skipped}, progress {have_after}/{target}." | |
| ) | |
| state, choices = _refresh_state() | |
| status_text = "\n".join(summary_lines) if summary_lines else "No targets requested." | |
| final_status = ( | |
| f"Generation done. Attempted {total_attempted} questions. Bank now has {len(state.get('all_flat', []))} total items.\n\n{status_text}" | |
| ) | |
| return ( | |
| final_status, | |
| state, | |
| gr.update(choices=choices, value=choices[0] if choices else None), | |
| gr.update(choices=choices, value=[]), | |
| f"Loaded {len(choices)} questions from outputs/questions_bank.json", | |
| ) | |
| def refresh_curation_ui( | |
| domain_filter: str, | |
| mode_filter: str, | |
| min_score: int, | |
| hide_duplicates: bool, | |
| query: str, | |
| selected_labels: List[str], | |
| ): | |
| state, choices = _refresh_state( | |
| domain_filter=domain_filter, | |
| mode_filter=mode_filter, | |
| min_score=min_score, | |
| hide_duplicates=hide_duplicates, | |
| ) | |
| q_norm = _normalize_question(query or "") | |
| if q_norm: | |
| filtered = [] | |
| for label in choices: | |
| item = next((x for x in state.get("flat", []) if x.get("label") == label), None) | |
| if not item: | |
| continue | |
| if q_norm in _normalize_question(item.get("question", "")): | |
| filtered.append(label) | |
| choices = filtered | |
| selected_set = set(selected_labels or []) | |
| kept = [x for x in choices if x in selected_set] | |
| mode_choices = ["All modes"] + state.get("modes", []) | |
| if mode_filter not in mode_choices: | |
| mode_filter = "All modes" | |
| status = f"Showing {len(choices)} curated items." | |
| return ( | |
| state, | |
| gr.update(choices=choices, value=kept), | |
| status, | |
| gr.update(choices=mode_choices, value=mode_filter), | |
| gr.update(choices=choices, value=choices[0] if choices else None), | |
| ) | |
| def curation_edit_question_ui( | |
| state_obj: Dict, | |
| label: str, | |
| edited_question: str, | |
| domain_filter: str, | |
| mode_filter: str, | |
| min_score: int, | |
| hide_duplicates: bool, | |
| query: str, | |
| ): | |
| item_id = None | |
| for item in (state_obj or {}).get("flat", []): | |
| if item.get("label") == label: | |
| item_id = item.get("id") | |
| break | |
| bank = load_bank(out_dir=str(OUT_DIR)) | |
| ok = edit_item_question(bank, item_id=item_id, new_question=edited_question) | |
| if ok: | |
| save_bank(bank, out_dir=str(OUT_DIR)) | |
| state, choices_update, status_msg, mode_update, edit_update = refresh_curation_ui( | |
| domain_filter, | |
| mode_filter, | |
| min_score, | |
| hide_duplicates, | |
| query, | |
| [], | |
| ) | |
| if not ok: | |
| return state, choices_update, "Edit failed: select one item and provide non-empty text.", mode_update, edit_update | |
| return state, choices_update, f"Edited question text. {status_msg}", mode_update, edit_update | |
| def export_selected_from_curation(state_obj, labels): | |
| selected = _get_selected_items(state_obj, labels) | |
| if not selected: | |
| return "Select one or more curation questions first.", None | |
| by_norm, _ = _ensure_answers_for_items(selected) | |
| lines = [ | |
| "# Curated Supervisory FAQ", | |
| "", | |
| f"_Generated from checked questions. Source of truth: `{QA_BANK_PATH}`._", | |
| "", | |
| ] | |
| exported = 0 | |
| for item in selected: | |
| norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) | |
| rec = by_norm.get(norm) | |
| if not rec: | |
| continue | |
| exported += 1 | |
| lines.extend( | |
| [ | |
| f"## {item.get('domain', 'Unknown Domain')} — {item.get('question', '').strip()}", | |
| "", | |
| rec.get("answer_markdown", "").strip(), | |
| "", | |
| "---", | |
| "", | |
| ] | |
| ) | |
| FAQ_CURATED_PATH.write_text("\n".join(lines).strip() + "\n", encoding="utf-8") | |
| return f"Exported {exported} checked Q&A entries to {FAQ_CURATED_PATH}", str(FAQ_CURATED_PATH) | |
| def _read_qa_bank_records(): | |
| if not QA_BANK_PATH.exists(): | |
| return [] | |
| records = [] | |
| for line in QA_BANK_PATH.read_text(encoding="utf-8").splitlines(): | |
| if not line.strip(): | |
| continue | |
| try: | |
| records.append(json.loads(line)) | |
| except Exception: | |
| continue | |
| return records | |
| def _append_qa_records(records): | |
| if not records: | |
| return | |
| with QA_BANK_PATH.open("a", encoding="utf-8") as f: | |
| for rec in records: | |
| f.write(json.dumps(rec, ensure_ascii=False) + "\n") | |
| def _get_selected_items(state_obj, labels): | |
| if not labels: | |
| return [] | |
| items = [] | |
| for label in labels: | |
| item = _get_selected_item(state_obj, label) | |
| if item: | |
| items.append(item) | |
| return items | |
| def _ensure_answers_for_items(items): | |
| existing = _read_qa_bank_records() | |
| by_norm = {} | |
| for rec in existing: | |
| norm = rec.get("normalized_question") or _normalize_question(rec.get("question", "")) | |
| if norm and norm not in by_norm: | |
| by_norm[norm] = rec | |
| new_records = [] | |
| for item in items: | |
| q = item.get("question", "") | |
| norm = item.get("normalized_question") or _normalize_question(q) | |
| if not norm or norm in by_norm: | |
| continue | |
| answer = answer_question_dynamic( | |
| question=q, | |
| top_k=10, | |
| model="gpt-4.1-mini", | |
| ) | |
| rec = { | |
| "saved_at_utc": datetime.now(timezone.utc).isoformat(), | |
| "domain": item.get("domain"), | |
| "label": item.get("label"), | |
| "question": q, | |
| "normalized_question": norm, | |
| "answer_markdown": answer, | |
| } | |
| by_norm[norm] = rec | |
| new_records.append(rec) | |
| _append_qa_records(new_records) | |
| return by_norm, len(new_records) | |
| def answer_selected_questions(state_obj, labels): | |
| items = _get_selected_items(state_obj, labels) | |
| if not items: | |
| return "Select one or more questions first." | |
| # Dedupe within current selection by exact normalized question. | |
| deduped = [] | |
| seen = set() | |
| for item in items: | |
| norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) | |
| if not norm or norm in seen: | |
| continue | |
| seen.add(norm) | |
| deduped.append(item) | |
| by_norm, added = _ensure_answers_for_items(deduped) | |
| answered_count = 0 | |
| for item in deduped: | |
| norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) | |
| if norm in by_norm: | |
| answered_count += 1 | |
| return ( | |
| f"Answered {answered_count} selected questions (new: {added}, existing reused: {answered_count - added}). " | |
| f"Saved/updated bank at {QA_BANK_PATH}." | |
| ) | |
| def export_selected_to_markdown(state_obj, labels): | |
| items = _get_selected_items(state_obj, labels) | |
| if not items: | |
| return "Select one or more questions first.", None | |
| # Dedupe in selected order by normalized question. | |
| selected = [] | |
| seen = set() | |
| for item in items: | |
| norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) | |
| if not norm or norm in seen: | |
| continue | |
| seen.add(norm) | |
| selected.append(item) | |
| by_norm, _ = _ensure_answers_for_items(selected) | |
| lines = [ | |
| "# Curated Supervisory FAQ", | |
| "", | |
| f"_Generated from selected questions. Source of truth: `{QA_BANK_PATH}`._", | |
| "", | |
| ] | |
| exported = 0 | |
| for item in selected: | |
| norm = item.get("normalized_question") or _normalize_question(item.get("question", "")) | |
| rec = by_norm.get(norm) | |
| if not rec: | |
| continue | |
| exported += 1 | |
| lines.extend( | |
| [ | |
| f"## {item.get('domain', 'Unknown Domain')} — {item.get('question', '').strip()}", | |
| "", | |
| rec.get("answer_markdown", "").strip(), | |
| "", | |
| "---", | |
| "", | |
| ] | |
| ) | |
| FAQ_CURATED_PATH.write_text("\n".join(lines).strip() + "\n", encoding="utf-8") | |
| return f"Exported {exported} Q&A entries to {FAQ_CURATED_PATH}", str(FAQ_CURATED_PATH) | |
| def _filter_step8_labels(state_obj, domain_filter: str, query: str) -> List[str]: | |
| if not state_obj or not state_obj.get("flat"): | |
| return [] | |
| domain_filter = (domain_filter or "All domains").strip() | |
| q = _normalize_question(query or "") | |
| out = [] | |
| for item in state_obj.get("flat", []): | |
| domain = item.get("domain", "") | |
| question = item.get("question", "") | |
| label = item.get("label", "") | |
| if domain_filter != "All domains" and domain != domain_filter: | |
| continue | |
| if q and q not in _normalize_question(question): | |
| continue | |
| out.append(label) | |
| return out | |
| def update_step8_filtered_choices(state_obj, domain_filter: str, query: str, selected_labels): | |
| filtered = _filter_step8_labels(state_obj, domain_filter, query) | |
| selected_set = set(selected_labels or []) | |
| kept_selected = [label for label in filtered if label in selected_set] | |
| status = ( | |
| f"Showing {len(filtered)} matching questions." | |
| if filtered | |
| else "No questions match the current filters." | |
| ) | |
| return gr.update(choices=filtered, value=kept_selected), status | |
| def select_all_filtered_questions(state_obj, domain_filter: str, query: str): | |
| filtered = _filter_step8_labels(state_obj, domain_filter, query) | |
| status = ( | |
| f"Selected {len(filtered)} filtered questions." | |
| if filtered | |
| else "No filtered questions to select." | |
| ) | |
| return gr.update(value=filtered), status | |
| def clear_selected_questions(): | |
| return gr.update(value=[]), "Cleared selected questions." | |
| INITIAL_QA_STATE, INITIAL_Q_CHOICES = _load_state_from_bank() | |
| INITIAL_CURATION_STATE, _ = _refresh_state( | |
| domain_filter="All domains", | |
| mode_filter="All modes", | |
| min_score=0, | |
| hide_duplicates=True, | |
| ) | |
| INITIAL_CURATION_CHOICES = [] | |
| with gr.Blocks( | |
| css=""" | |
| #step8-select-list { | |
| max-height: 360px; | |
| overflow-y: auto; | |
| border: 1px solid #d0d7de; | |
| border-radius: 8px; | |
| padding: 8px; | |
| } | |
| """ | |
| ) as demo: | |
| gr.Markdown("# UPS National Agreement — Supervisory Decision-Risk Extractor") | |
| # ------------------------- | |
| # Step 2: PDF -> text | |
| # ------------------------- | |
| gr.Markdown("## Step 2 — Upload PDF and Extract Text") | |
| pdf = gr.File(label="Upload contract PDF", file_types=[".pdf"]) | |
| extract_btn = gr.Button("Extract text") | |
| preview = gr.Textbox(label="Preview (first 6,000 chars)", lines=18) | |
| download_txt = gr.File(label="Download extracted text (txt)") | |
| stats = gr.Textbox(label="Stats", interactive=False) | |
| extract_btn.click( | |
| extract_pdf_text, | |
| inputs=[pdf], | |
| outputs=[preview, download_txt, stats], | |
| ) | |
| # ------------------------- | |
| # Step 3: Chunking | |
| # ------------------------- | |
| gr.Markdown("## Step 3 — Build Chunked Knowledge Base") | |
| build_btn = gr.Button("Build KB chunks (Step 3)") | |
| kb_status = gr.Textbox(label="KB build status", interactive=False) | |
| build_btn.click(build_kb, inputs=[], outputs=[kb_status]) | |
| # ------------------------- | |
| # Step 4: Quick search (no embeddings yet) | |
| # ------------------------- | |
| gr.Markdown("## Step 4 — Search the Contract (Quick Test)") | |
| search_q = gr.Textbox( | |
| label="Search query (try: post-accident testing, random testing, overtime, seniority)" | |
| ) | |
| search_btn = gr.Button("Search top matches") | |
| search_out = gr.Markdown() | |
| search_btn.click( | |
| lambda q: search_chunks(q, top_k=5), | |
| inputs=[search_q], | |
| outputs=[search_out], | |
| ) | |
| # ------------------------- | |
| # Step 5: Generate Supervisory Risk Questions (LLM) | |
| # ------------------------- | |
| gr.Markdown("## Step 5 — Generate Supervisory Decision-Risk Questions (LLM)") | |
| gen_btn = gr.Button("Generate engineered supervisory questions") | |
| gen_out = gr.Textbox(label="Generated domains + questions (JSON)", lines=20) | |
| step5_status = gr.Textbox(label="Step 5 status", interactive=False) | |
| gen_btn.click( | |
| fn=generate_questions_and_persist, | |
| inputs=[], | |
| outputs=[gen_out, step5_status], | |
| ) | |
| # ------------------------- | |
| # Step 6: Select question + preview retrieval + generate answer | |
| # ------------------------- | |
| gr.Markdown("## Step 6 — Pick a Question, Preview Retrieved Excerpts, Then Generate Answer") | |
| qa_state = gr.State(value=INITIAL_QA_STATE) | |
| load_btn = gr.Button("Load generated questions into dropdown") | |
| load_bank_btn = gr.Button("Load questions from bank") | |
| q_dropdown = gr.Dropdown( | |
| label="Select a question", | |
| choices=INITIAL_Q_CHOICES, | |
| value=INITIAL_Q_CHOICES[0] if INITIAL_Q_CHOICES else None, | |
| ) | |
| qa_load_status = gr.Textbox( | |
| label="Question load status", | |
| value=( | |
| f"Loaded {len(INITIAL_Q_CHOICES)} questions from outputs/questions_bank.json on startup" | |
| if INITIAL_Q_CHOICES | |
| else "Question bank currently empty." | |
| ), | |
| interactive=False, | |
| ) | |
| q_details = gr.Markdown() | |
| q_chunk_ids = gr.Textbox(label="Generator supporting chunk_ids (FYI)", interactive=False) | |
| preview_btn = gr.Button("Preview retrieved contract excerpts (what will ground the answer)") | |
| retrieved_excerpts = gr.Markdown() | |
| answer_btn = gr.Button("Generate answer (dynamic retrieval + citations)") | |
| answer_out = gr.Markdown() | |
| # ------------------------- | |
| # Step 7: Generate more by canonical domain | |
| # ------------------------- | |
| gr.Markdown("## Step 7 — Generate More Questions by Canonical Domain") | |
| domain_choices = [d for d in CANONICAL_DOMAINS if d != "Other / Needs Review"] | |
| domain_select = gr.Dropdown( | |
| label="Canonical domain", | |
| choices=domain_choices, | |
| value=domain_choices[0] if domain_choices else None, | |
| ) | |
| n_new = gr.Slider(label="How many new questions", minimum=5, maximum=50, step=1, value=10) | |
| gen_more_btn = gr.Button("Generate more questions for this domain") | |
| step7_more_status = gr.Textbox(label="Step 7 status", interactive=False) | |
| step7_more_json = gr.Textbox(label="Step 7 generated JSON preview", lines=14) | |
| # ------------------------- | |
| # Step 8: Curate + batch answer + export | |
| # ------------------------- | |
| gr.Markdown("## Step 8 — Curate Questions, Batch Answer, and Export") | |
| with gr.Row(): | |
| step8_domain_filter = gr.Dropdown( | |
| label="Filter by domain", | |
| choices=["All domains"] + list(CANONICAL_DOMAINS), | |
| value="All domains", | |
| ) | |
| step8_query_filter = gr.Textbox( | |
| label="Filter by question text", | |
| placeholder="Type to narrow the list...", | |
| ) | |
| with gr.Row(): | |
| step8_select_filtered_btn = gr.Button("Select all filtered") | |
| step8_clear_selected_btn = gr.Button("Clear selected") | |
| q_multiselect = gr.CheckboxGroup( | |
| label="Select questions to curate (domain + question)", | |
| choices=INITIAL_Q_CHOICES, | |
| value=[], | |
| elem_id="step8-select-list", | |
| ) | |
| answer_selected_btn = gr.Button("Answer selected") | |
| export_selected_btn = gr.Button("Export selected to Markdown") | |
| step8_status = gr.Textbox(label="Step 8 status", interactive=False) | |
| curated_md_file = gr.File(label="Curated FAQ Markdown") | |
| load_btn.click( | |
| fn=parse_questions_json, | |
| inputs=[gen_out], | |
| outputs=[qa_state, q_dropdown, q_multiselect, qa_load_status], | |
| ) | |
| load_bank_btn.click( | |
| fn=load_questions_from_bank, | |
| inputs=[], | |
| outputs=[qa_state, q_dropdown, q_multiselect, qa_load_status], | |
| ) | |
| gen_more_btn.click( | |
| fn=generate_more_questions_for_domain_ui, | |
| inputs=[domain_select, n_new], | |
| outputs=[step7_more_status, step7_more_json, qa_state, q_dropdown, q_multiselect, qa_load_status], | |
| ) | |
| q_dropdown.change( | |
| fn=show_selected_question, | |
| inputs=[qa_state, q_dropdown], | |
| outputs=[q_details, q_chunk_ids], | |
| ) | |
| preview_btn.click( | |
| fn=preview_retrieved_excerpts, | |
| inputs=[qa_state, q_dropdown], | |
| outputs=[retrieved_excerpts], | |
| ) | |
| answer_btn.click( | |
| fn=generate_answer_from_dropdown, | |
| inputs=[qa_state, q_dropdown], | |
| outputs=[answer_out], | |
| ) | |
| answer_selected_btn.click( | |
| fn=answer_selected_questions, | |
| inputs=[qa_state, q_multiselect], | |
| outputs=[step8_status], | |
| ) | |
| export_selected_btn.click( | |
| fn=export_selected_to_markdown, | |
| inputs=[qa_state, q_multiselect], | |
| outputs=[step8_status, curated_md_file], | |
| ) | |
| step8_domain_filter.change( | |
| fn=update_step8_filtered_choices, | |
| inputs=[qa_state, step8_domain_filter, step8_query_filter, q_multiselect], | |
| outputs=[q_multiselect, step8_status], | |
| ) | |
| step8_query_filter.change( | |
| fn=update_step8_filtered_choices, | |
| inputs=[qa_state, step8_domain_filter, step8_query_filter, q_multiselect], | |
| outputs=[q_multiselect, step8_status], | |
| ) | |
| step8_select_filtered_btn.click( | |
| fn=select_all_filtered_questions, | |
| inputs=[qa_state, step8_domain_filter, step8_query_filter], | |
| outputs=[q_multiselect, step8_status], | |
| ) | |
| step8_clear_selected_btn.click( | |
| fn=clear_selected_questions, | |
| inputs=[], | |
| outputs=[q_multiselect, step8_status], | |
| ) | |
| # ------------------------- | |
| # Advanced Generation + Curation | |
| # ------------------------- | |
| gr.Markdown("## Advanced Generation + Curation") | |
| with gr.Tabs(): | |
| with gr.Tab("Generation"): | |
| gen_modes = gr.CheckboxGroup( | |
| label="Generation modes", | |
| choices=list(GENERATION_MODES.keys()), | |
| value=list(GENERATION_MODES.keys()), | |
| ) | |
| with gr.Row(): | |
| gen_quality_threshold = gr.Slider( | |
| label="Quality threshold (0-5)", | |
| minimum=0, | |
| maximum=5, | |
| step=1, | |
| value=3, | |
| ) | |
| gen_dedupe_toggle = gr.Checkbox( | |
| label="Near-dedupe filter", | |
| value=True, | |
| ) | |
| gen_max_rounds = gr.Slider( | |
| label="Max retry rounds per domain", | |
| minimum=1, | |
| maximum=8, | |
| step=1, | |
| value=3, | |
| ) | |
| gr.Markdown("Per-domain targets (canonical domains)") | |
| gen_target_inputs = [] | |
| for d in CANONICAL_DOMAINS: | |
| gen_target_inputs.append( | |
| gr.Number(label=d, value=0, precision=0, minimum=0) | |
| ) | |
| gen_targets_btn = gr.Button("Generate to meet domain targets") | |
| gen_targets_status = gr.Textbox(label="Generation status", lines=10, interactive=False) | |
| with gr.Tab("Curation"): | |
| curation_state = gr.State(value=INITIAL_CURATION_STATE) | |
| with gr.Row(): | |
| cur_domain_filter = gr.Dropdown( | |
| label="Domain", | |
| choices=["All domains"] + list(CANONICAL_DOMAINS), | |
| value="All domains", | |
| ) | |
| cur_mode_filter = gr.Dropdown( | |
| label="Mode", | |
| choices=["All modes"] + list(INITIAL_CURATION_STATE.get("modes", [])), | |
| value="All modes", | |
| ) | |
| with gr.Row(): | |
| cur_min_score = gr.Slider(label="Min quality score", minimum=0, maximum=5, step=1, value=0) | |
| cur_hide_duplicates = gr.Checkbox(label="Hide duplicates", value=True) | |
| cur_query = gr.Textbox(label="Text contains", placeholder="Filter by question text") | |
| cur_refresh_btn = gr.Button("Refresh curation list") | |
| cur_select = gr.CheckboxGroup( | |
| label="Curate question items", | |
| choices=INITIAL_CURATION_CHOICES, | |
| value=[], | |
| elem_id="step8-select-list", | |
| ) | |
| cur_edit_label = gr.Dropdown(label="Edit one item", choices=INITIAL_CURATION_CHOICES, value=None) | |
| cur_edit_text = gr.Textbox(label="Edited question text", lines=2) | |
| cur_edit_btn = gr.Button("Apply edit") | |
| cur_export_selected_btn = gr.Button("Export checked questions") | |
| cur_export_file = gr.File(label="Curated checked FAQ Markdown") | |
| cur_status = gr.Textbox(label="Curation status", interactive=False) | |
| # ------------------------- | |
| # Contract Analysis | |
| # ------------------------- | |
| gr.Markdown("## Contract Analysis") | |
| analysis_keywords = gr.Textbox( | |
| label="Risk keywords (comma-separated)", | |
| value=", ".join(DEFAULT_RISK_KEYWORDS), | |
| ) | |
| analysis_top_sections = gr.Number( | |
| label="Top sections to show", | |
| value=15, | |
| precision=0, | |
| minimum=1, | |
| ) | |
| analysis_btn = gr.Button("Run contract analysis") | |
| analysis_status = gr.Textbox(label="Analysis status", interactive=False) | |
| analysis_md = gr.Markdown() | |
| analysis_md_file = gr.File(label="domain_analysis.md") | |
| analysis_csv_file = gr.File(label="article_risk_report.csv") | |
| gen_targets_btn.click( | |
| fn=generate_with_targets_ui, | |
| inputs=[gen_modes, gen_quality_threshold, gen_dedupe_toggle, gen_max_rounds] + gen_target_inputs, | |
| outputs=[gen_targets_status, qa_state, q_dropdown, q_multiselect, qa_load_status], | |
| ) | |
| for filter_component in [ | |
| cur_domain_filter, | |
| cur_min_score, | |
| cur_hide_duplicates, | |
| cur_query, | |
| ]: | |
| filter_component.change( | |
| fn=refresh_curation_ui, | |
| inputs=[ | |
| cur_domain_filter, | |
| cur_mode_filter, | |
| cur_min_score, | |
| cur_hide_duplicates, | |
| cur_query, | |
| cur_select, | |
| ], | |
| outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label], | |
| ) | |
| cur_mode_filter.change( | |
| fn=refresh_curation_ui, | |
| inputs=[ | |
| cur_domain_filter, | |
| cur_mode_filter, | |
| cur_min_score, | |
| cur_hide_duplicates, | |
| cur_query, | |
| cur_select, | |
| ], | |
| outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label], | |
| ) | |
| cur_refresh_btn.click( | |
| fn=refresh_curation_ui, | |
| inputs=[ | |
| cur_domain_filter, | |
| cur_mode_filter, | |
| cur_min_score, | |
| cur_hide_duplicates, | |
| cur_query, | |
| cur_select, | |
| ], | |
| outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label], | |
| ) | |
| cur_edit_btn.click( | |
| fn=curation_edit_question_ui, | |
| inputs=[ | |
| curation_state, | |
| cur_edit_label, | |
| cur_edit_text, | |
| cur_domain_filter, | |
| cur_mode_filter, | |
| cur_min_score, | |
| cur_hide_duplicates, | |
| cur_query, | |
| ], | |
| outputs=[curation_state, cur_select, cur_status, cur_mode_filter, cur_edit_label], | |
| ) | |
| cur_export_selected_btn.click( | |
| fn=export_selected_from_curation, | |
| inputs=[curation_state, cur_select], | |
| outputs=[cur_status, cur_export_file], | |
| ) | |
| analysis_btn.click( | |
| fn=run_contract_analysis_ui, | |
| inputs=[analysis_keywords, analysis_top_sections], | |
| outputs=[analysis_status, analysis_md, analysis_md_file, analysis_csv_file], | |
| ) | |
| demo.launch() | |