Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| from typing import List, Tuple, Optional | |
| import pandas as pd | |
| import requests | |
| import gradio as gr | |
| # ------------------------- | |
| # Config | |
| # ------------------------- | |
| # NYC Open Data (Socrata) dataset — you can swap later as needed. | |
| # Using the "w9ak-ipjd" permits dataset you tested most recently. | |
| SOCRATA_URL = "https://data.cityofnewyork.us/resource/w9ak-ipjd.json" | |
| # Read your app token from environment (recommended) or leave empty | |
| SOCRATA_APP_TOKEN = os.getenv("SOCRATA_APP_TOKEN", "").strip() | |
| DEFAULT_API_LIMIT = 3000 # how many rows to fetch from API initially | |
| DEFAULT_PAGE_SIZE = 300 # how many rows to show per “page” | |
| DEFAULT_ORDER = "filing_date DESC" # server-side sort (if the column exists) | |
| DATE_COLUMNS_GUESS = [ | |
| "filing_date", | |
| "latest_action_date", | |
| "pre__filing_date", | |
| "approved", | |
| "signoff_date", | |
| ] | |
| # Friendly default “important” columns to show (use only if present) | |
| DEFAULT_VISIBLE_COLUMNS = [ | |
| "job__", "doc__", "borough", "house__", "street_name", | |
| "filing_date", "job_description" | |
| ] | |
| # ------------------------- | |
| # Data access | |
| # ------------------------- | |
| def fetch_permits(limit: int = DEFAULT_API_LIMIT, | |
| order: str = DEFAULT_ORDER) -> Tuple[pd.DataFrame, float]: | |
| """ | |
| Fetch up to `limit` rows from the Socrata API, optionally ordering. | |
| Returns (DataFrame, seconds_elapsed). | |
| """ | |
| headers = {} | |
| if SOCRATA_APP_TOKEN: | |
| headers["X-App-Token"] = SOCRATA_APP_TOKEN | |
| params = {"$limit": limit} | |
| if order: | |
| params["$order"] = order | |
| t0 = time.time() | |
| r = requests.get(SOCRATA_URL, headers=headers, params=params, timeout=60) | |
| r.raise_for_status() | |
| data = r.json() | |
| elapsed = time.time() - t0 | |
| if not data: | |
| return pd.DataFrame(), elapsed | |
| df = pd.DataFrame(data) | |
| # Parse date-like columns (if present) | |
| for col in DATE_COLUMNS_GUESS: | |
| if col in df.columns: | |
| # Try to parse; errors='coerce' leaves invalid as NaT | |
| df[col] = pd.to_datetime(df[col], errors="coerce") | |
| # Secondary local sort (just in case server-side order was ignored) | |
| if "filing_date" in df.columns: | |
| df = df.sort_values("filing_date", ascending=False, na_position="last").reset_index(drop=True) | |
| return df, elapsed | |
| # ------------------------- | |
| # Helpers | |
| # ------------------------- | |
| def pick_existing_columns(all_cols: List[str], | |
| desired: List[str]) -> List[str]: | |
| """Return only the desired columns that actually exist in df.""" | |
| s = set(all_cols) | |
| return [c for c in desired if c in s] | |
| def slice_up_to_page(df: pd.DataFrame, page_index: int, page_size: int) -> pd.DataFrame: | |
| """Return cumulative slice (first N pages).""" | |
| end = (page_index + 1) * page_size | |
| return df.iloc[:end].copy() | |
| def contains_any_column(df: pd.DataFrame, term: str) -> pd.Series: | |
| """Case-insensitive 'contains' across object columns.""" | |
| if term == "": | |
| return pd.Series([True] * len(df), index=df.index) | |
| text_cols = [c for c in df.columns if df[c].dtype == object] | |
| if not text_cols: | |
| # fallback: stringify entire df (rare) | |
| return df.astype(str).apply( | |
| lambda row: term.lower() in row.to_string().lower(), axis=1 | |
| ) | |
| mask = pd.Series(False, index=df.index) | |
| for c in text_cols: | |
| mask = mask | df[c].astype(str).str.contains(term, case=False, na=False) | |
| return mask | |
| # ------------------------- | |
| # Gradio app logic | |
| # ------------------------- | |
| def init_load(max_rows: int, | |
| page_size: int) -> tuple: | |
| """Reload from API. Reset state & UI.""" | |
| try: | |
| df, seconds = fetch_permits(limit=max_rows, order=DEFAULT_ORDER) | |
| except Exception as e: | |
| return (gr.update(value=pd.DataFrame()), | |
| gr.update(value=pd.DataFrame()), | |
| gr.update(value=pd.DataFrame()), | |
| 0, | |
| gr.update(value=f"_Error while loading data:_ `{e}`"), | |
| gr.update(choices=[], value=None), | |
| gr.update(choices=[], value=[])) | |
| if df.empty: | |
| return (gr.update(value=pd.DataFrame()), | |
| df, df, 0, | |
| f"_Loaded 0 records in {seconds:.1f} seconds._", | |
| gr.update(choices=[], value=None), | |
| gr.update(choices=[], value=[])) | |
| # Visible columns defaults (only those that exist) | |
| visible = pick_existing_columns(df.columns.tolist(), DEFAULT_VISIBLE_COLUMNS) | |
| if not visible: # fallback: first ~10 columns | |
| visible = df.columns.tolist()[:10] | |
| # Build dropdown choices (filterable fields) | |
| filterable_choices = sorted(df.columns.tolist()) | |
| # First page slice for view | |
| view = slice_up_to_page(df[visible], page_index=0, page_size=page_size) | |
| stats = f"Loaded **{len(df):,}** records in **{seconds:.1f}** seconds." | |
| return (gr.update(value=view), | |
| df, df, 0, | |
| stats, | |
| gr.update(choices=filterable_choices, value="borough" if "borough" in filterable_choices else filterable_choices[0]), | |
| gr.update(choices=visible, value=visible)) | |
| def apply_filter(search_term: str, | |
| field: Optional[str], | |
| df_full: pd.DataFrame, | |
| page_size: int, | |
| visible_cols: List[str]) -> tuple: | |
| """ | |
| Filter df_full by search_term in the selected field (or across all text fields | |
| if field is None/empty). Reset to page 0, update table & stats. | |
| """ | |
| if df_full is None or df_full.empty: | |
| return (gr.update(value=pd.DataFrame()), df_full, 0, "_No data loaded yet._") | |
| t0 = time.time() | |
| if search_term is None: | |
| search_term = "" | |
| search_term = search_term.strip() | |
| if field and field in df_full.columns and search_term != "": | |
| # Single field filter (convert to str for robustness) | |
| mask = df_full[field].astype(str).str.contains(search_term, case=False, na=False) | |
| else: | |
| # All columns filter (object columns) | |
| mask = contains_any_column(df_full, search_term) | |
| df_filtered = df_full.loc[mask].copy() | |
| # Respect ‘visible_cols’ if provided | |
| use_cols = [c for c in visible_cols if c in df_filtered.columns] if visible_cols else df_filtered.columns.tolist() | |
| # Sort by filing_date desc (if exists) | |
| if "filing_date" in df_filtered.columns: | |
| df_filtered = df_filtered.sort_values("filing_date", ascending=False, na_position="last") | |
| view = slice_up_to_page(df_filtered[use_cols], page_index=0, page_size=page_size) | |
| seconds = time.time() - t0 | |
| stats = f"Filtered to **{len(df_filtered):,}** records in **{seconds:.1f}** seconds." | |
| return gr.update(value=view), df_filtered, 0, stats | |
| def load_more(df_filtered: pd.DataFrame, | |
| page_index: int, | |
| page_size: int, | |
| visible_cols: List[str]) -> tuple: | |
| """Increase page and return cumulative rows.""" | |
| if df_filtered is None or df_filtered.empty: | |
| return gr.update(value=pd.DataFrame()), page_index | |
| new_page = page_index + 1 | |
| use_cols = [c for c in visible_cols if c in df_filtered.columns] if visible_cols else df_filtered.columns.tolist() | |
| view = slice_up_to_page(df_filtered[use_cols], page_index=new_page, page_size=page_size) | |
| return gr.update(value=view), new_page | |
| def reset_filters(df_full: pd.DataFrame, | |
| page_size: int, | |
| visible_cols: List[str]) -> tuple: | |
| """Clear search, reset dropdown, show first page of df_full.""" | |
| if df_full is None or df_full.empty: | |
| return (gr.update(value=pd.DataFrame()), df_full, 0, "_No data loaded yet._") | |
| use_cols = [c for c in visible_cols if c in df_full.columns] if visible_cols else df_full.columns.tolist() | |
| view = slice_up_to_page(df_full[use_cols], page_index=0, page_size=page_size) | |
| stats = f"Showing **{len(df_full):,}** records." | |
| return gr.update(value=view), df_full, 0, stats | |
| def export_csv(df_view: pd.DataFrame) -> str: | |
| """Export current (visible) view to CSV; return file path.""" | |
| if df_view is None or df_view.empty: | |
| # create an empty file anyway to keep UX consistent | |
| path = "/mnt/data/buildscout_empty.csv" | |
| pd.DataFrame().to_csv(path, index=False) | |
| return path | |
| path = "/mnt/data/buildscout_view.csv" | |
| df_view.to_csv(path, index=False) | |
| return path | |
| # ------------------------- | |
| # UI | |
| # ------------------------- | |
| with gr.Blocks(theme=gr.themes.Soft(), title="BuildScout v1.0") as demo: | |
| gr.Markdown("# BuildScout v1.0 \nNYC DOB Permits (Gradio Edition)") | |
| with gr.Row(): | |
| reload_btn = gr.Button("Reload DOB data", variant="secondary") | |
| reset_btn = gr.Button("Reset filters", variant="secondary") | |
| export_btn = gr.Button("Export current view (CSV)", variant="secondary") | |
| max_rows = gr.Number(label="API Max Rows", value=DEFAULT_API_LIMIT, precision=0) | |
| page_size = gr.Number(label="Rows per page", value=DEFAULT_PAGE_SIZE, precision=0) | |
| with gr.Row(): | |
| search_term = gr.Textbox(label="Search term", placeholder="Type to search…") | |
| field_dropdown = gr.Dropdown(label="Filter field", choices=[], value=None) | |
| visible_cols = gr.CheckboxGroup(label="Visible columns (for display)", choices=[], value=[]) | |
| stats_md = gr.Markdown("_Load something!_") | |
| df_out = gr.Dataframe(headers=[], row_count=10, col_count=(0, "dynamic"), | |
| interactive=False, wrap=False, height=600) | |
| load_more_btn = gr.Button("Load more rows") | |
| # STATE | |
| df_full_state = gr.State(pd.DataFrame()) # Full dataset loaded from API | |
| df_filtered_state = gr.State(pd.DataFrame()) # Filtered subset (for display) | |
| page_index_state = gr.State(0) # current page (0-based) | |
| df_view_state = gr.State(pd.DataFrame()) # Last view slice (for export) | |
| # --- WIRES --- | |
| # Reload data | |
| reload_btn.click( | |
| fn=init_load, | |
| inputs=[max_rows, page_size], | |
| outputs=[ | |
| df_out, # visible table | |
| df_full_state, # full df | |
| df_filtered_state, # filtered df (starts = full) | |
| page_index_state, | |
| stats_md, | |
| field_dropdown, | |
| visible_cols | |
| ] | |
| ) | |
| # Apply filter | |
| # Note: also updates df_view_state for export | |
| def _apply_and_store(search, field, df_full, page_size_val, visible): | |
| view, df_filt, page0, stats = apply_filter(search, field, df_full, int(page_size_val), visible) | |
| # Store the current view in a state for export | |
| # (Gradio Dataframe returns a dict on update; we reconstruct from df_filt slice) | |
| if isinstance(view, dict) and "value" in view: | |
| df_view = view["value"] | |
| else: | |
| df_view = pd.DataFrame() | |
| return view, df_filt, page0, stats, df_view | |
| apply_btn = gr.Button("Apply filter", variant="primary") | |
| apply_btn.click( | |
| fn=_apply_and_store, | |
| inputs=[search_term, field_dropdown, df_full_state, page_size, visible_cols], | |
| outputs=[df_out, df_filtered_state, page_index_state, stats_md, df_view_state] | |
| ) | |
| # Reset filters | |
| def _reset_and_store(df_full, page_size_val, visible): | |
| view, df_filt, page0, stats = reset_filters(df_full, int(page_size_val), visible) | |
| if isinstance(view, dict) and "value" in view: | |
| df_view = view["value"] | |
| else: | |
| df_view = pd.DataFrame() | |
| return view, df_filt, page0, stats, df_view, gr.update(value=""), gr.update(value=None) | |
| reset_btn.click( | |
| fn=_reset_and_store, | |
| inputs=[df_full_state, page_size, visible_cols], | |
| outputs=[df_out, df_filtered_state, page_index_state, stats_md, df_view_state, search_term, field_dropdown] | |
| ) | |
| # Load more | |
| def _more_and_store(df_filt, page_idx, page_size_val, visible): | |
| view, new_page = load_more(df_filt, int(page_idx), int(page_size_val), visible) | |
| if isinstance(view, dict) and "value" in view: | |
| df_view = view["value"] | |
| else: | |
| df_view = pd.DataFrame() | |
| return view, new_page, df_view | |
| load_more_btn.click( | |
| fn=_more_and_store, | |
| inputs=[df_filtered_state, page_index_state, page_size, visible_cols], | |
| outputs=[df_out, page_index_state, df_view_state] | |
| ) | |
| # Export | |
| csv_file = gr.File(label="Download CSV", interactive=False) | |
| export_btn.click( | |
| fn=export_csv, | |
| inputs=[df_view_state], | |
| outputs=[csv_file] | |
| ) | |
| gr.Markdown("— **BuildScout v1.0** —") | |
| if __name__ == "__main__": | |
| # Use 0.0.0.0 for WSL so VSCode/Browser can hit it via forwarded port. | |
| demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True) | |