import gradio as gr import time import re from typing import List, Dict from collections import defaultdict ############################################################################### # GLOBAL STATE (HF SAFE) ############################################################################### RESULT_CACHE: List[Dict] = [] CURRENT_INDEX = 0 ############################################################################### # AGENCY POLICY / ROBOTS / LIVE SAFETY ############################################################################### AGENCY_POLICY = { "CIA": {"robots": True, "live": True}, "FBI": {"robots": True, "live": True}, "ODNI": {"robots": True, "live": True}, "USAF": {"robots": True, "live": True}, "NSA": {"robots": False, "live": False}, "NRO": {"robots": False, "live": False}, "SAP": {"robots": False, "live": False}, } ############################################################################### # KILL SWITCH (AUTO + MANUAL SAFE) ############################################################################### class KillSwitch: def __init__(self): self.disabled = {} def disable(self, agency: str, reason: str): self.disabled[agency] = reason def enabled(self, agency: str) -> bool: return agency not in self.disabled def reason(self, agency: str) -> str: return self.disabled.get(agency, "") KILL = KillSwitch() ############################################################################### # FOIA EXEMPTION CLASSIFIER ############################################################################### B_CODES = { "b(1)": "National Security", "b(3)": "Statutory", "b(5)": "Deliberative Process", "b(7)": "Law Enforcement", } def classify_exemptions(text: str) -> Dict: found = [k for k in B_CODES if k in text.lower()] confidence = round(min(1.0, 0.3 * len(found)), 2) return {"codes": found, "confidence": confidence} ############################################################################### # REDACTION SCORING ############################################################################### def redaction_score(text: str) -> float: hits = sum(k in text.lower() for k in ["redact", "withheld", "b("]) return round(min(1.0, hits * 0.25), 2) ############################################################################### # SEMANTIC CLUSTERING (HF SAFE HEURISTIC) ############################################################################### def semantic_clusters(results: List[Dict]) -> Dict[str, List[Dict]]: clusters = defaultdict(list) for r in results: t = r["title"].lower() if "uap" in t or "aerial" in t: clusters["πŸ›Έ UAP / Aerial Phenomena"].append(r) elif "intelligence" in t: clusters["🧠 Intelligence Activities"].append(r) else: clusters["πŸ“„ General Records"].append(r) return clusters ############################################################################### # MOCK LIVE SEARCH (REPLACE WITH REAL ADAPTERS SAFELY) ############################################################################### def run_search(query: str) -> List[Dict]: time.sleep(0.3) raw = [ { "title": "UAP Task Force Preliminary Assessment", "snippet": "Some material withheld under b(1) and b(3).", "url": "https://www.dni.gov/files/ODNI/documents/assessments/Prelimary-Assessments-UAP-20210625.pdf", "agency": "ODNI", "source": "ODNI FOIA", }, { "title": "CIA Memorandum on Aerial Phenomena", "snippet": "This document contains redactions under b(3).", "url": "https://www.cia.gov/readingroom/docs/DOC_000001.pdf", "agency": "CIA", "source": "CIA FOIA", }, { "title": "Project Blue Book Summary", "snippet": "Historical investigation records.", "url": "https://www.archives.gov/research/military/air-force/ufos", "agency": "USAF", "source": "National Archives", }, ] allowed = [] for r in raw: policy = AGENCY_POLICY.get(r["agency"], {}) if not policy.get("robots", False): continue if not KILL.enabled(r["agency"]): continue allowed.append(r) return allowed ############################################################################### # TEXT UTILITIES ############################################################################### def highlight(text: str, query: str) -> str: if not query: return text return re.sub( re.escape(query), lambda m: f"{m.group(0)}", text, flags=re.IGNORECASE, ) ############################################################################### # RENDERERS ############################################################################### def render_results(results: List[Dict], query: str) -> str: clusters = semantic_clusters(results) blocks = [] for name, items in clusters.items(): section = [f"## {name}"] for idx, r in enumerate(items): global_index = RESULT_CACHE.index(r) ex = classify_exemptions(r["snippet"]) section.append( f""" **{highlight(r['title'], query)}** πŸ›οΈ {r['agency']} Β· πŸ“Š Redaction {redaction_score(r['snippet'])} βš–οΈ Exemptions: `{', '.join(ex['codes']) or 'None'}` (conf {ex['confidence']}) πŸ”— {r['url']} ➑️ **Select #{global_index}** """ ) blocks.append("\n\n".join(section)) return "\n\n---\n\n".join(blocks) def render_preview(index: int) -> str: if not RESULT_CACHE: return "_No document selected._" r = RESULT_CACHE[index] ex = classify_exemptions(r["snippet"]) iframe = ( f'' ) return f""" ### πŸ“„ Document Preview **{r['title']}** πŸ›οΈ {r['agency']} Β· {r['source']} πŸ›‘οΈ Redaction Risk: **{redaction_score(r['snippet'])}** βš–οΈ FOIA Exemptions: `{', '.join(ex['codes']) or 'None'}` πŸ”Ž Confidence: **{ex['confidence']}** {iframe} """ def agency_coverage(results: List[Dict]) -> str: counts = defaultdict(int) for r in results: counts[r["agency"]] += 1 rows = ["| Agency | Docs |", "|---|---|"] for k, v in sorted(counts.items(), key=lambda x: -x[1]): rows.append(f"| {k} | {v} |") return "\n".join(rows) ############################################################################### # EVENT HANDLERS ############################################################################### def do_search(query: str): global RESULT_CACHE, CURRENT_INDEX RESULT_CACHE = run_search(query) CURRENT_INDEX = 0 return ( render_results(RESULT_CACHE, query), render_preview(0), agency_coverage(RESULT_CACHE), 0, ) def select_index(idx: int): global CURRENT_INDEX idx = int(max(0, min(idx, len(RESULT_CACHE) - 1))) CURRENT_INDEX = idx return render_preview(idx) def next_doc(): global CURRENT_INDEX if CURRENT_INDEX < len(RESULT_CACHE) - 1: CURRENT_INDEX += 1 return CURRENT_INDEX, render_preview(CURRENT_INDEX) def prev_doc(): global CURRENT_INDEX if CURRENT_INDEX > 0: CURRENT_INDEX -= 1 return CURRENT_INDEX, render_preview(CURRENT_INDEX) ############################################################################### # UI ############################################################################### with gr.Blocks(theme=gr.themes.Soft()) as app: gr.Markdown("# πŸ›οΈ Federated FOIA Intelligence Search") query = gr.Textbox(label="Search public FOIA reading rooms") search_btn = gr.Button("πŸ” Search") with gr.Row(): with gr.Column(scale=5): results_md = gr.Markdown() with gr.Accordion("πŸ—ΊοΈ Agency Coverage", open=False): coverage_md = gr.Markdown() with gr.Column(scale=7): preview_md = gr.Markdown() with gr.Row(): prev_btn = gr.Button("⬅️ Prev") next_btn = gr.Button("➑️ Next") index_box = gr.Number(label="Selected index", precision=0) search_btn.click( do_search, inputs=query, outputs=[results_md, preview_md, coverage_md, index_box], ) index_box.change( select_index, inputs=index_box, outputs=preview_md, ) next_btn.click( next_doc, outputs=[index_box, preview_md], ) prev_btn.click( prev_doc, outputs=[index_box, preview_md], ) if __name__ == "__main__": app.launch()