Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import time | |
| import re | |
| from typing import List, Dict | |
| from collections import defaultdict | |
| ############################################################################### | |
| # GLOBAL STATE (HF SAFE) | |
| ############################################################################### | |
| RESULT_CACHE: List[Dict] = [] | |
| CURRENT_INDEX = 0 | |
| ############################################################################### | |
| # AGENCY POLICY / ROBOTS / LIVE SAFETY | |
| ############################################################################### | |
| AGENCY_POLICY = { | |
| "CIA": {"robots": True, "live": True}, | |
| "FBI": {"robots": True, "live": True}, | |
| "ODNI": {"robots": True, "live": True}, | |
| "USAF": {"robots": True, "live": True}, | |
| "NSA": {"robots": False, "live": False}, | |
| "NRO": {"robots": False, "live": False}, | |
| "SAP": {"robots": False, "live": False}, | |
| } | |
| ############################################################################### | |
| # KILL SWITCH (AUTO + MANUAL SAFE) | |
| ############################################################################### | |
| class KillSwitch: | |
| def __init__(self): | |
| self.disabled = {} | |
| def disable(self, agency: str, reason: str): | |
| self.disabled[agency] = reason | |
| def enabled(self, agency: str) -> bool: | |
| return agency not in self.disabled | |
| def reason(self, agency: str) -> str: | |
| return self.disabled.get(agency, "") | |
| KILL = KillSwitch() | |
| ############################################################################### | |
| # FOIA EXEMPTION CLASSIFIER | |
| ############################################################################### | |
| B_CODES = { | |
| "b(1)": "National Security", | |
| "b(3)": "Statutory", | |
| "b(5)": "Deliberative Process", | |
| "b(7)": "Law Enforcement", | |
| } | |
| def classify_exemptions(text: str) -> Dict: | |
| found = [k for k in B_CODES if k in text.lower()] | |
| confidence = round(min(1.0, 0.3 * len(found)), 2) | |
| return {"codes": found, "confidence": confidence} | |
| ############################################################################### | |
| # REDACTION SCORING | |
| ############################################################################### | |
| def redaction_score(text: str) -> float: | |
| hits = sum(k in text.lower() for k in ["redact", "withheld", "b("]) | |
| return round(min(1.0, hits * 0.25), 2) | |
| ############################################################################### | |
| # SEMANTIC CLUSTERING (HF SAFE HEURISTIC) | |
| ############################################################################### | |
| def semantic_clusters(results: List[Dict]) -> Dict[str, List[Dict]]: | |
| clusters = defaultdict(list) | |
| for r in results: | |
| t = r["title"].lower() | |
| if "uap" in t or "aerial" in t: | |
| clusters["πΈ UAP / Aerial Phenomena"].append(r) | |
| elif "intelligence" in t: | |
| clusters["π§ Intelligence Activities"].append(r) | |
| else: | |
| clusters["π General Records"].append(r) | |
| return clusters | |
| ############################################################################### | |
| # MOCK LIVE SEARCH (REPLACE WITH REAL ADAPTERS SAFELY) | |
| ############################################################################### | |
| def run_search(query: str) -> List[Dict]: | |
| time.sleep(0.3) | |
| raw = [ | |
| { | |
| "title": "UAP Task Force Preliminary Assessment", | |
| "snippet": "Some material withheld under b(1) and b(3).", | |
| "url": "https://www.dni.gov/files/ODNI/documents/assessments/Prelimary-Assessments-UAP-20210625.pdf", | |
| "agency": "ODNI", | |
| "source": "ODNI FOIA", | |
| }, | |
| { | |
| "title": "CIA Memorandum on Aerial Phenomena", | |
| "snippet": "This document contains redactions under b(3).", | |
| "url": "https://www.cia.gov/readingroom/docs/DOC_000001.pdf", | |
| "agency": "CIA", | |
| "source": "CIA FOIA", | |
| }, | |
| { | |
| "title": "Project Blue Book Summary", | |
| "snippet": "Historical investigation records.", | |
| "url": "https://www.archives.gov/research/military/air-force/ufos", | |
| "agency": "USAF", | |
| "source": "National Archives", | |
| }, | |
| ] | |
| allowed = [] | |
| for r in raw: | |
| policy = AGENCY_POLICY.get(r["agency"], {}) | |
| if not policy.get("robots", False): | |
| continue | |
| if not KILL.enabled(r["agency"]): | |
| continue | |
| allowed.append(r) | |
| return allowed | |
| ############################################################################### | |
| # TEXT UTILITIES | |
| ############################################################################### | |
| def highlight(text: str, query: str) -> str: | |
| if not query: | |
| return text | |
| return re.sub( | |
| re.escape(query), | |
| lambda m: f"<mark>{m.group(0)}</mark>", | |
| text, | |
| flags=re.IGNORECASE, | |
| ) | |
| ############################################################################### | |
| # RENDERERS | |
| ############################################################################### | |
| def render_results(results: List[Dict], query: str) -> str: | |
| clusters = semantic_clusters(results) | |
| blocks = [] | |
| for name, items in clusters.items(): | |
| section = [f"## {name}"] | |
| for idx, r in enumerate(items): | |
| global_index = RESULT_CACHE.index(r) | |
| ex = classify_exemptions(r["snippet"]) | |
| section.append( | |
| f""" | |
| **{highlight(r['title'], query)}** | |
| ποΈ {r['agency']} Β· π Redaction {redaction_score(r['snippet'])} | |
| βοΈ Exemptions: `{', '.join(ex['codes']) or 'None'}` (conf {ex['confidence']}) | |
| π {r['url']} | |
| β‘οΈ **Select #{global_index}** | |
| """ | |
| ) | |
| blocks.append("\n\n".join(section)) | |
| return "\n\n---\n\n".join(blocks) | |
| def render_preview(index: int) -> str: | |
| if not RESULT_CACHE: | |
| return "_No document selected._" | |
| r = RESULT_CACHE[index] | |
| ex = classify_exemptions(r["snippet"]) | |
| iframe = ( | |
| f'<iframe src="{r["url"]}" width="100%" height="520px" ' | |
| f'style="border:1px solid #444;border-radius:8px;"></iframe>' | |
| ) | |
| return f""" | |
| ### π Document Preview | |
| **{r['title']}** | |
| ποΈ {r['agency']} Β· {r['source']} | |
| π‘οΈ Redaction Risk: **{redaction_score(r['snippet'])}** | |
| βοΈ FOIA Exemptions: `{', '.join(ex['codes']) or 'None'}` | |
| π Confidence: **{ex['confidence']}** | |
| {iframe} | |
| """ | |
| def agency_coverage(results: List[Dict]) -> str: | |
| counts = defaultdict(int) | |
| for r in results: | |
| counts[r["agency"]] += 1 | |
| rows = ["| Agency | Docs |", "|---|---|"] | |
| for k, v in sorted(counts.items(), key=lambda x: -x[1]): | |
| rows.append(f"| {k} | {v} |") | |
| return "\n".join(rows) | |
| ############################################################################### | |
| # EVENT HANDLERS | |
| ############################################################################### | |
| def do_search(query: str): | |
| global RESULT_CACHE, CURRENT_INDEX | |
| RESULT_CACHE = run_search(query) | |
| CURRENT_INDEX = 0 | |
| return ( | |
| render_results(RESULT_CACHE, query), | |
| render_preview(0), | |
| agency_coverage(RESULT_CACHE), | |
| 0, | |
| ) | |
| def select_index(idx: int): | |
| global CURRENT_INDEX | |
| idx = int(max(0, min(idx, len(RESULT_CACHE) - 1))) | |
| CURRENT_INDEX = idx | |
| return render_preview(idx) | |
| def next_doc(): | |
| global CURRENT_INDEX | |
| if CURRENT_INDEX < len(RESULT_CACHE) - 1: | |
| CURRENT_INDEX += 1 | |
| return CURRENT_INDEX, render_preview(CURRENT_INDEX) | |
| def prev_doc(): | |
| global CURRENT_INDEX | |
| if CURRENT_INDEX > 0: | |
| CURRENT_INDEX -= 1 | |
| return CURRENT_INDEX, render_preview(CURRENT_INDEX) | |
| ############################################################################### | |
| # UI | |
| ############################################################################### | |
| with gr.Blocks(theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# ποΈ Federated FOIA Intelligence Search") | |
| query = gr.Textbox(label="Search public FOIA reading rooms") | |
| search_btn = gr.Button("π Search") | |
| with gr.Row(): | |
| with gr.Column(scale=5): | |
| results_md = gr.Markdown() | |
| with gr.Accordion("πΊοΈ Agency Coverage", open=False): | |
| coverage_md = gr.Markdown() | |
| with gr.Column(scale=7): | |
| preview_md = gr.Markdown() | |
| with gr.Row(): | |
| prev_btn = gr.Button("β¬ οΈ Prev") | |
| next_btn = gr.Button("β‘οΈ Next") | |
| index_box = gr.Number(label="Selected index", precision=0) | |
| search_btn.click( | |
| do_search, | |
| inputs=query, | |
| outputs=[results_md, preview_md, coverage_md, index_box], | |
| ) | |
| index_box.change( | |
| select_index, | |
| inputs=index_box, | |
| outputs=preview_md, | |
| ) | |
| next_btn.click( | |
| next_doc, | |
| outputs=[index_box, preview_md], | |
| ) | |
| prev_btn.click( | |
| prev_doc, | |
| outputs=[index_box, preview_md], | |
| ) | |
| if __name__ == "__main__": | |
| app.launch() |