import json import os from pathlib import PurePosixPath from typing import Any, Dict, List, Optional, Tuple import gradio as gr from huggingface_hub import hf_hub_download DATA_REPO_ID = os.environ.get("DATA_REPO_ID", "").strip() DATA_REPO_TYPE = os.environ.get("DATA_REPO_TYPE", "dataset").strip() or "dataset" DATA_PREFIX = os.environ.get("DATA_PREFIX", "live").strip().strip("/") DATA_REVISION = os.environ.get("DATA_REVISION", "main").strip() or "main" DATA_TOKEN = ( os.environ.get("DATA_REPO_TOKEN") or os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN") or None ) AUTO_REFRESH_SECONDS = float(os.environ.get("AUTO_REFRESH_SECONDS", "5") or "5") CURRENT_LABEL = "CURRENT / latest synced segment" CSS = """ #topbar { border-radius: 14px; padding: 12px 16px; background: linear-gradient(90deg, rgba(20,32,70,.95), rgba(20,70,65,.85)); color: white; } .big-status { font-size: 15px; } .entry-text textarea { font-size: 18px !important; line-height: 1.35 !important; } .small-note { opacity: .75; font-size: 12px; } """ def repo_path(*parts: str) -> str: clean_parts = [] if DATA_PREFIX: clean_parts.append(DATA_PREFIX) for part in parts: part = str(part or "").strip().strip("/") if part: clean_parts.append(part) return str(PurePosixPath(*clean_parts)) def require_config() -> None: if not DATA_REPO_ID: raise RuntimeError( "Missing DATA_REPO_ID. In the Space settings, set DATA_REPO_ID to the " "dataset repo your AWS app is syncing to, for example: krhogan2/datahub-live" ) def download_file(path_in_repo: str) -> str: require_config() return hf_hub_download( repo_id=DATA_REPO_ID, repo_type=DATA_REPO_TYPE, filename=path_in_repo, revision=DATA_REVISION, token=DATA_TOKEN, ) def download_json(path_in_repo: str) -> Dict[str, Any]: local_path = download_file(path_in_repo) with open(local_path, "r", encoding="utf-8") as fh: data = json.load(fh) if not isinstance(data, dict): raise ValueError(f"{path_in_repo} did not contain a JSON object") return data def load_manifest() -> Dict[str, Any]: try: return download_json(repo_path("manifest.json")) except Exception: return { "updatedUtc": "", "latestSegment": "", "currentSegmentJson": "current/segment.json", "folders": [], } def folder_choices_from_manifest(manifest: Dict[str, Any]) -> List[str]: folders = manifest.get("folders", []) if not isinstance(folders, list): folders = [] choices = [CURRENT_LABEL] for folder in folders: folder = str(folder or "").strip() if folder and folder not in choices: choices.append(folder) return choices def load_segment(folder_choice: Optional[str]) -> Tuple[Dict[str, Any], str, str]: manifest = load_manifest() latest = str(manifest.get("latestSegment") or "").strip() if not folder_choice or folder_choice == CURRENT_LABEL: segment_path = repo_path("current", "segment.json") segment = download_json(segment_path) segment["_hf_segment_path"] = segment_path segment["_hf_segment_base"] = str(PurePosixPath(segment_path).parent) return segment, CURRENT_LABEL, "" folder_choice = str(folder_choice).strip() # This supports a future enhanced sync format: # live/segments//segment.json archive_path = repo_path("segments", folder_choice, "segment.json") try: segment = download_json(archive_path) segment["_hf_segment_path"] = archive_path segment["_hf_segment_base"] = str(PurePosixPath(archive_path).parent) return segment, folder_choice, "" except Exception: pass # With the AWS patch we made, only live/current is pushed. # So if the selected folder is the latest folder, use current. if latest and folder_choice == latest: segment_path = repo_path("current", "segment.json") segment = download_json(segment_path) segment["_hf_segment_path"] = segment_path segment["_hf_segment_base"] = str(PurePosixPath(segment_path).parent) return segment, folder_choice, "" raise FileNotFoundError( f"'{folder_choice}' is listed in the manifest, but its full segment snapshot " "is not synced yet. The current AWS patch uploads the latest segment to " "live/current/. To browse old folders, add a later AWS sync pass that writes " "live/segments//segment.json and live/segments//media/." ) def entries_from_segment(segment: Dict[str, Any]) -> List[Dict[str, Any]]: entries = segment.get("entries", []) if not isinstance(entries, list): return [] return [entry for entry in entries if isinstance(entry, dict)] def entry_label(entry: Dict[str, Any]) -> str: row = entry.get("row", "") name = str(entry.get("name") or "").strip() entry_id = str(entry.get("entryId") or "").strip() text = str(entry.get("onScreenText") or "").strip().replace("\n", " ") if len(text) > 70: text = text[:67] + "..." pieces = [f"Row {row}"] if name: pieces.append(name) if text: pieces.append(text) if entry_id: pieces.append(entry_id[:8]) return " | ".join(pieces) def table_rows(segment: Dict[str, Any]) -> List[List[Any]]: rows = [] for entry in entries_from_segment(segment): images = entry.get("images", []) if not isinstance(images, list): images = [] rows.append([ entry.get("row", ""), entry.get("type", ""), entry.get("name", ""), entry.get("chart", ""), entry.get("textStatus", ""), len(images), entry.get("onScreenText", ""), ]) return rows def image_paths_for_entry(segment: Dict[str, Any], entry: Dict[str, Any]) -> List[str]: base = str(segment.get("_hf_segment_base") or "").strip() image_paths = entry.get("imagePaths", []) if not isinstance(image_paths, list) or not image_paths: images = entry.get("images", []) if isinstance(images, list): image_paths = [f"media/{name}" for name in images] else: image_paths = [] local_images = [] for image_path in image_paths: image_path = str(image_path or "").strip().strip("/") if not image_path: continue full_repo_path = str(PurePosixPath(base) / image_path) if base else repo_path(image_path) try: local_images.append(download_file(full_repo_path)) except Exception: continue return local_images def first_entry_outputs(segment: Dict[str, Any]) -> Tuple[Any, str, List[str]]: entries = entries_from_segment(segment) if not entries: return gr.update(choices=[], value=None), "", [] labels = [entry_label(entry) for entry in entries] first = entries[0] return ( gr.update(choices=labels, value=labels[0]), str(first.get("onScreenText") or ""), image_paths_for_entry(segment, first), ) def status_markdown( segment: Optional[Dict[str, Any]], folder_choice: str, warning: str = "", ) -> str: if not segment: return ( "### Not connected yet\n" "Set `DATA_REPO_ID` in the Space settings to the dataset repo receiving the AWS sync." ) folder = segment.get("folder") or folder_choice or "Unknown" updated = segment.get("updatedUtc") or "Unknown" count = segment.get("entryCount", len(entries_from_segment(segment))) repo = DATA_REPO_ID or "DATA_REPO_ID not set" path = segment.get("_hf_segment_path", "") warning_text = f"\n\n⚠️ {warning}" if warning else "" return ( f"### {folder}\n" f"**Updated:** {updated} \n" f"**Entries:** {count} \n" f"**Source repo:** `{repo}` \n" f"**Source file:** `{path}`" f"{warning_text}" ) def error_outputs(message: str): return ( f"### Error\n{message}", gr.update(choices=[CURRENT_LABEL], value=CURRENT_LABEL), {}, [], gr.update(choices=[], value=None), "", [], json.dumps({"error": message}, indent=2), ) def strip_internal_keys(segment: Dict[str, Any]) -> Dict[str, Any]: clean = {} for key, value in segment.items(): if not str(key).startswith("_hf_"): clean[key] = value return clean def refresh_ui(folder_choice: Optional[str]): try: manifest = load_manifest() choices = folder_choices_from_manifest(manifest) if not folder_choice or folder_choice not in choices: folder_choice = CURRENT_LABEL segment, resolved_folder, warning = load_segment(folder_choice) row_dropdown, first_text, first_images = first_entry_outputs(segment) return ( status_markdown(segment, resolved_folder, warning), gr.update(choices=choices, value=folder_choice), segment, table_rows(segment), row_dropdown, first_text, first_images, json.dumps(strip_internal_keys(segment), indent=2), ) except Exception as e: return error_outputs(str(e)) def load_folder_ui(folder_choice: Optional[str]): return refresh_ui(folder_choice) def show_selected_entry(segment: Dict[str, Any], selected_label: Optional[str]): if not segment or not selected_label: return "", [] for entry in entries_from_segment(segment): if entry_label(entry) == selected_label: return str(entry.get("onScreenText") or ""), image_paths_for_entry(segment, entry) return "", [] def build_app() -> gr.Blocks: with gr.Blocks(title="DataHub Live Viewer", css=CSS) as demo: segment_state = gr.State({}) gr.HTML( """

📺 DataHub Live Viewer

Read-only public viewer for on-screen text and Pictures tab uploads.
""" ) with gr.Row(): with gr.Column(scale=1): folder_dropdown = gr.Dropdown( choices=[CURRENT_LABEL], value=CURRENT_LABEL, label="DataHub segment folder", interactive=True, ) refresh_btn = gr.Button("Refresh now", variant="primary") status = gr.Markdown() gr.Markdown( """
Tip: with the current AWS sync patch, the latest segment appears under CURRENT / latest synced segment. Older folders will become clickable once the AWS side also syncs live/segments/<folder>/....
""" ) with gr.Column(scale=2): entries_table = gr.Dataframe( headers=[ "Row", "Type", "Name", "Chart", "Text Status", "Images", "On-Screen Text", ], datatype=["number", "str", "str", "str", "str", "number", "str"], interactive=False, wrap=True, label="All on-screen text", ) with gr.Row(): with gr.Column(scale=1): row_dropdown = gr.Dropdown( choices=[], label="Pick row / Pictures tab item", interactive=True, ) selected_text = gr.Textbox( label="Full on-screen text for selected row", lines=12, elem_classes=["entry-text"], ) with gr.Column(scale=1): gallery = gr.Gallery( label="Uploaded pictures for selected row", columns=2, height=420, show_download_button=True, object_fit="contain", ) with gr.Accordion("Raw current segment.json", open=False): raw_json = gr.Code(language="json", label="Raw JSON") demo.load( fn=refresh_ui, inputs=[folder_dropdown], outputs=[ status, folder_dropdown, segment_state, entries_table, row_dropdown, selected_text, gallery, raw_json, ], ) refresh_btn.click( fn=refresh_ui, inputs=[folder_dropdown], outputs=[ status, folder_dropdown, segment_state, entries_table, row_dropdown, selected_text, gallery, raw_json, ], ) folder_dropdown.change( fn=load_folder_ui, inputs=[folder_dropdown], outputs=[ status, folder_dropdown, segment_state, entries_table, row_dropdown, selected_text, gallery, raw_json, ], ) row_dropdown.change( fn=show_selected_entry, inputs=[segment_state, row_dropdown], outputs=[selected_text, gallery], ) if AUTO_REFRESH_SECONDS > 0: timer = gr.Timer(value=AUTO_REFRESH_SECONDS) timer.tick( fn=refresh_ui, inputs=[folder_dropdown], outputs=[ status, folder_dropdown, segment_state, entries_table, row_dropdown, selected_text, gallery, raw_json, ], ) return demo if __name__ == "__main__": build_app().launch()