Spaces:
Sleeping
Sleeping
| import asyncio | |
| import gradio as gr | |
| from typing import List, Dict | |
| from ingest.cia_reading_room import CIAAdapter | |
| from ingest.fbi_vault import FBIAdapter | |
| from ingest.cia_extended import CIAExtendedAdapter | |
| from ingest.nsa_extended import NSAExtendedAdapter | |
| from ingest.nro_extended import NROExtendedAdapter | |
| from ingest.dod_special_extended import ( | |
| AATIPExtendedAdapter, | |
| SAPExtendedAdapter, | |
| TENCAPExtendedAdapter, | |
| SpecialActivitiesExtendedAdapter | |
| ) | |
| import saved_searches | |
| BASE_ADAPTERS = [CIAAdapter(), FBIAdapter()] | |
| EXTENDED_ADAPTERS = [ | |
| CIAExtendedAdapter(), | |
| NSAExtendedAdapter(), | |
| NROExtendedAdapter(), | |
| AATIPExtendedAdapter(), | |
| SAPExtendedAdapter(), | |
| TENCAPExtendedAdapter(), | |
| SpecialActivitiesExtendedAdapter() | |
| ] | |
| async def federated_search_async(query: str, extended: bool): | |
| adapters = BASE_ADAPTERS + (EXTENDED_ADAPTERS if extended else []) | |
| tasks = [a.search(query) for a in adapters] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| flat = [] | |
| for r in results: | |
| if isinstance(r, list): | |
| flat.extend(r) | |
| return flat | |
| def badge(r): | |
| h = r.get("health", {}) | |
| latency = f"{h.get('latency_ms','?')}ms" | |
| status = "π’" if h.get("ok") else "π΄" | |
| if r.get("extended"): | |
| return f"π£ EXT Β· {status} {latency}" | |
| return "π’ LIVE" if r.get("live") else "βͺ STUB" | |
| def render(r): | |
| return f""" | |
| ### {r['title']} | |
| **{badge(r)} Β· {r['source']}** | |
| {r['snippet']} | |
| π {r['url']} | |
| """ | |
| def run_search(query, hide_stubs, extended): | |
| loop = asyncio.get_event_loop() | |
| results = loop.run_until_complete(federated_search_async(query, extended)) | |
| saved_searches.save(query, extended) | |
| if hide_stubs: | |
| results = [r for r in results if r.get("live")] | |
| if not results: | |
| return "No results found." | |
| return "\n\n---\n\n".join(render(r) for r in results) | |
| def show_saved(): | |
| rows = saved_searches.list_saved() | |
| if not rows: | |
| return "No saved searches yet." | |
| return "\n".join( | |
| f"- **{r['query']}** (extended={r['extended']})" | |
| for r in rows | |
| ) | |
| with gr.Blocks(css=""" | |
| .gradio-container { max-width: 1100px } | |
| """) as demo: | |
| gr.Markdown("# π FOIA Federated Search") | |
| gr.Markdown( | |
| "Transparent federated search across public FOIA reading rooms.\n\n" | |
| "**Badges**: βͺ Stub Β· π’ Live Β· π£ Extended Β· π΄ Unhealthy" | |
| ) | |
| with gr.Row(): | |
| query = gr.Textbox(label="Search term", scale=3) | |
| hide_stubs = gr.Checkbox(label="Hide stub sources", value=False) | |
| extended = gr.Checkbox( | |
| label="Enable Extended Features", | |
| value=False, | |
| info="Advanced public sources. Disabled by default." | |
| ) | |
| output = gr.Markdown() | |
| saved = gr.Markdown() | |
| with gr.Row(): | |
| gr.Button("Search").click( | |
| run_search, | |
| inputs=[query, hide_stubs, extended], | |
| outputs=output | |
| ) | |
| gr.Button("Saved Searches").click(show_saved, outputs=saved) | |
| demo.launch() |