Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from pathlib import PurePosixPath | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import gradio as gr | |
| from huggingface_hub import hf_hub_download | |
| DATA_REPO_ID = os.environ.get("DATA_REPO_ID", "").strip() | |
| DATA_REPO_TYPE = os.environ.get("DATA_REPO_TYPE", "dataset").strip() or "dataset" | |
| DATA_PREFIX = os.environ.get("DATA_PREFIX", "live").strip().strip("/") | |
| DATA_REVISION = os.environ.get("DATA_REVISION", "main").strip() or "main" | |
| DATA_TOKEN = ( | |
| os.environ.get("DATA_REPO_TOKEN") | |
| or os.environ.get("HF_TOKEN") | |
| or os.environ.get("HUGGINGFACE_HUB_TOKEN") | |
| or None | |
| ) | |
| AUTO_REFRESH_SECONDS = float(os.environ.get("AUTO_REFRESH_SECONDS", "5") or "5") | |
| CURRENT_LABEL = "CURRENT / latest synced segment" | |
| CSS = """ | |
| #topbar { | |
| border-radius: 14px; | |
| padding: 12px 16px; | |
| background: linear-gradient(90deg, rgba(20,32,70,.95), rgba(20,70,65,.85)); | |
| color: white; | |
| } | |
| .big-status { | |
| font-size: 15px; | |
| } | |
| .entry-text textarea { | |
| font-size: 18px !important; | |
| line-height: 1.35 !important; | |
| } | |
| .small-note { | |
| opacity: .75; | |
| font-size: 12px; | |
| } | |
| """ | |
| def repo_path(*parts: str) -> str: | |
| clean_parts = [] | |
| if DATA_PREFIX: | |
| clean_parts.append(DATA_PREFIX) | |
| for part in parts: | |
| part = str(part or "").strip().strip("/") | |
| if part: | |
| clean_parts.append(part) | |
| return str(PurePosixPath(*clean_parts)) | |
| def require_config() -> None: | |
| if not DATA_REPO_ID: | |
| raise RuntimeError( | |
| "Missing DATA_REPO_ID. In the Space settings, set DATA_REPO_ID to the " | |
| "dataset repo your AWS app is syncing to, for example: krhogan2/datahub-live" | |
| ) | |
| def download_file(path_in_repo: str) -> str: | |
| require_config() | |
| return hf_hub_download( | |
| repo_id=DATA_REPO_ID, | |
| repo_type=DATA_REPO_TYPE, | |
| filename=path_in_repo, | |
| revision=DATA_REVISION, | |
| token=DATA_TOKEN, | |
| ) | |
| def download_json(path_in_repo: str) -> Dict[str, Any]: | |
| local_path = download_file(path_in_repo) | |
| with open(local_path, "r", encoding="utf-8") as fh: | |
| data = json.load(fh) | |
| if not isinstance(data, dict): | |
| raise ValueError(f"{path_in_repo} did not contain a JSON object") | |
| return data | |
| def load_manifest() -> Dict[str, Any]: | |
| try: | |
| return download_json(repo_path("manifest.json")) | |
| except Exception: | |
| return { | |
| "updatedUtc": "", | |
| "latestSegment": "", | |
| "currentSegmentJson": "current/segment.json", | |
| "folders": [], | |
| } | |
| def folder_choices_from_manifest(manifest: Dict[str, Any]) -> List[str]: | |
| folders = manifest.get("folders", []) | |
| if not isinstance(folders, list): | |
| folders = [] | |
| choices = [CURRENT_LABEL] | |
| for folder in folders: | |
| folder = str(folder or "").strip() | |
| if folder and folder not in choices: | |
| choices.append(folder) | |
| return choices | |
| def load_segment(folder_choice: Optional[str]) -> Tuple[Dict[str, Any], str, str]: | |
| manifest = load_manifest() | |
| latest = str(manifest.get("latestSegment") or "").strip() | |
| if not folder_choice or folder_choice == CURRENT_LABEL: | |
| segment_path = repo_path("current", "segment.json") | |
| segment = download_json(segment_path) | |
| segment["_hf_segment_path"] = segment_path | |
| segment["_hf_segment_base"] = str(PurePosixPath(segment_path).parent) | |
| return segment, CURRENT_LABEL, "" | |
| folder_choice = str(folder_choice).strip() | |
| # This supports a future enhanced sync format: | |
| # live/segments/<folder name>/segment.json | |
| archive_path = repo_path("segments", folder_choice, "segment.json") | |
| try: | |
| segment = download_json(archive_path) | |
| segment["_hf_segment_path"] = archive_path | |
| segment["_hf_segment_base"] = str(PurePosixPath(archive_path).parent) | |
| return segment, folder_choice, "" | |
| except Exception: | |
| pass | |
| # With the AWS patch we made, only live/current is pushed. | |
| # So if the selected folder is the latest folder, use current. | |
| if latest and folder_choice == latest: | |
| segment_path = repo_path("current", "segment.json") | |
| segment = download_json(segment_path) | |
| segment["_hf_segment_path"] = segment_path | |
| segment["_hf_segment_base"] = str(PurePosixPath(segment_path).parent) | |
| return segment, folder_choice, "" | |
| raise FileNotFoundError( | |
| f"'{folder_choice}' is listed in the manifest, but its full segment snapshot " | |
| "is not synced yet. The current AWS patch uploads the latest segment to " | |
| "live/current/. To browse old folders, add a later AWS sync pass that writes " | |
| "live/segments/<folder>/segment.json and live/segments/<folder>/media/." | |
| ) | |
| def entries_from_segment(segment: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| entries = segment.get("entries", []) | |
| if not isinstance(entries, list): | |
| return [] | |
| return [entry for entry in entries if isinstance(entry, dict)] | |
| def entry_label(entry: Dict[str, Any]) -> str: | |
| row = entry.get("row", "") | |
| name = str(entry.get("name") or "").strip() | |
| entry_id = str(entry.get("entryId") or "").strip() | |
| text = str(entry.get("onScreenText") or "").strip().replace("\n", " ") | |
| if len(text) > 70: | |
| text = text[:67] + "..." | |
| pieces = [f"Row {row}"] | |
| if name: | |
| pieces.append(name) | |
| if text: | |
| pieces.append(text) | |
| if entry_id: | |
| pieces.append(entry_id[:8]) | |
| return " | ".join(pieces) | |
| def table_rows(segment: Dict[str, Any]) -> List[List[Any]]: | |
| rows = [] | |
| for entry in entries_from_segment(segment): | |
| images = entry.get("images", []) | |
| if not isinstance(images, list): | |
| images = [] | |
| rows.append([ | |
| entry.get("row", ""), | |
| entry.get("type", ""), | |
| entry.get("name", ""), | |
| entry.get("chart", ""), | |
| entry.get("textStatus", ""), | |
| len(images), | |
| entry.get("onScreenText", ""), | |
| ]) | |
| return rows | |
| def image_paths_for_entry(segment: Dict[str, Any], entry: Dict[str, Any]) -> List[str]: | |
| base = str(segment.get("_hf_segment_base") or "").strip() | |
| image_paths = entry.get("imagePaths", []) | |
| if not isinstance(image_paths, list) or not image_paths: | |
| images = entry.get("images", []) | |
| if isinstance(images, list): | |
| image_paths = [f"media/{name}" for name in images] | |
| else: | |
| image_paths = [] | |
| local_images = [] | |
| for image_path in image_paths: | |
| image_path = str(image_path or "").strip().strip("/") | |
| if not image_path: | |
| continue | |
| full_repo_path = str(PurePosixPath(base) / image_path) if base else repo_path(image_path) | |
| try: | |
| local_images.append(download_file(full_repo_path)) | |
| except Exception: | |
| continue | |
| return local_images | |
| def first_entry_outputs(segment: Dict[str, Any]) -> Tuple[Any, str, List[str]]: | |
| entries = entries_from_segment(segment) | |
| if not entries: | |
| return gr.update(choices=[], value=None), "", [] | |
| labels = [entry_label(entry) for entry in entries] | |
| first = entries[0] | |
| return ( | |
| gr.update(choices=labels, value=labels[0]), | |
| str(first.get("onScreenText") or ""), | |
| image_paths_for_entry(segment, first), | |
| ) | |
| def status_markdown( | |
| segment: Optional[Dict[str, Any]], | |
| folder_choice: str, | |
| warning: str = "", | |
| ) -> str: | |
| if not segment: | |
| return ( | |
| "### Not connected yet\n" | |
| "Set `DATA_REPO_ID` in the Space settings to the dataset repo receiving the AWS sync." | |
| ) | |
| folder = segment.get("folder") or folder_choice or "Unknown" | |
| updated = segment.get("updatedUtc") or "Unknown" | |
| count = segment.get("entryCount", len(entries_from_segment(segment))) | |
| repo = DATA_REPO_ID or "DATA_REPO_ID not set" | |
| path = segment.get("_hf_segment_path", "") | |
| warning_text = f"\n\n⚠️ {warning}" if warning else "" | |
| return ( | |
| f"### {folder}\n" | |
| f"**Updated:** {updated} \n" | |
| f"**Entries:** {count} \n" | |
| f"**Source repo:** `{repo}` \n" | |
| f"**Source file:** `{path}`" | |
| f"{warning_text}" | |
| ) | |
| def error_outputs(message: str): | |
| return ( | |
| f"### Error\n{message}", | |
| gr.update(choices=[CURRENT_LABEL], value=CURRENT_LABEL), | |
| {}, | |
| [], | |
| gr.update(choices=[], value=None), | |
| "", | |
| [], | |
| json.dumps({"error": message}, indent=2), | |
| ) | |
| def strip_internal_keys(segment: Dict[str, Any]) -> Dict[str, Any]: | |
| clean = {} | |
| for key, value in segment.items(): | |
| if not str(key).startswith("_hf_"): | |
| clean[key] = value | |
| return clean | |
| def refresh_ui(folder_choice: Optional[str]): | |
| try: | |
| manifest = load_manifest() | |
| choices = folder_choices_from_manifest(manifest) | |
| if not folder_choice or folder_choice not in choices: | |
| folder_choice = CURRENT_LABEL | |
| segment, resolved_folder, warning = load_segment(folder_choice) | |
| row_dropdown, first_text, first_images = first_entry_outputs(segment) | |
| return ( | |
| status_markdown(segment, resolved_folder, warning), | |
| gr.update(choices=choices, value=folder_choice), | |
| segment, | |
| table_rows(segment), | |
| row_dropdown, | |
| first_text, | |
| first_images, | |
| json.dumps(strip_internal_keys(segment), indent=2), | |
| ) | |
| except Exception as e: | |
| return error_outputs(str(e)) | |
| def load_folder_ui(folder_choice: Optional[str]): | |
| return refresh_ui(folder_choice) | |
| def show_selected_entry(segment: Dict[str, Any], selected_label: Optional[str]): | |
| if not segment or not selected_label: | |
| return "", [] | |
| for entry in entries_from_segment(segment): | |
| if entry_label(entry) == selected_label: | |
| return str(entry.get("onScreenText") or ""), image_paths_for_entry(segment, entry) | |
| return "", [] | |
| def build_app() -> gr.Blocks: | |
| with gr.Blocks(title="DataHub Live Viewer", css=CSS) as demo: | |
| segment_state = gr.State({}) | |
| gr.HTML( | |
| """ | |
| <div id="topbar"> | |
| <h1 style="margin:0;">📺 DataHub Live Viewer</h1> | |
| <div class="big-status"> | |
| Read-only public viewer for on-screen text and Pictures tab uploads. | |
| </div> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| folder_dropdown = gr.Dropdown( | |
| choices=[CURRENT_LABEL], | |
| value=CURRENT_LABEL, | |
| label="DataHub segment folder", | |
| interactive=True, | |
| ) | |
| refresh_btn = gr.Button("Refresh now", variant="primary") | |
| status = gr.Markdown() | |
| gr.Markdown( | |
| """ | |
| <div class="small-note"> | |
| Tip: with the current AWS sync patch, the latest segment appears under | |
| <code>CURRENT / latest synced segment</code>. Older folders will become clickable | |
| once the AWS side also syncs <code>live/segments/<folder>/...</code>. | |
| </div> | |
| """ | |
| ) | |
| with gr.Column(scale=2): | |
| entries_table = gr.Dataframe( | |
| headers=[ | |
| "Row", | |
| "Type", | |
| "Name", | |
| "Chart", | |
| "Text Status", | |
| "Images", | |
| "On-Screen Text", | |
| ], | |
| datatype=["number", "str", "str", "str", "str", "number", "str"], | |
| interactive=False, | |
| wrap=True, | |
| label="All on-screen text", | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| row_dropdown = gr.Dropdown( | |
| choices=[], | |
| label="Pick row / Pictures tab item", | |
| interactive=True, | |
| ) | |
| selected_text = gr.Textbox( | |
| label="Full on-screen text for selected row", | |
| lines=12, | |
| elem_classes=["entry-text"], | |
| ) | |
| with gr.Column(scale=1): | |
| gallery = gr.Gallery( | |
| label="Uploaded pictures for selected row", | |
| columns=2, | |
| height=420, | |
| show_download_button=True, | |
| object_fit="contain", | |
| ) | |
| with gr.Accordion("Raw current segment.json", open=False): | |
| raw_json = gr.Code(language="json", label="Raw JSON") | |
| demo.load( | |
| fn=refresh_ui, | |
| inputs=[folder_dropdown], | |
| outputs=[ | |
| status, | |
| folder_dropdown, | |
| segment_state, | |
| entries_table, | |
| row_dropdown, | |
| selected_text, | |
| gallery, | |
| raw_json, | |
| ], | |
| ) | |
| refresh_btn.click( | |
| fn=refresh_ui, | |
| inputs=[folder_dropdown], | |
| outputs=[ | |
| status, | |
| folder_dropdown, | |
| segment_state, | |
| entries_table, | |
| row_dropdown, | |
| selected_text, | |
| gallery, | |
| raw_json, | |
| ], | |
| ) | |
| folder_dropdown.change( | |
| fn=load_folder_ui, | |
| inputs=[folder_dropdown], | |
| outputs=[ | |
| status, | |
| folder_dropdown, | |
| segment_state, | |
| entries_table, | |
| row_dropdown, | |
| selected_text, | |
| gallery, | |
| raw_json, | |
| ], | |
| ) | |
| row_dropdown.change( | |
| fn=show_selected_entry, | |
| inputs=[segment_state, row_dropdown], | |
| outputs=[selected_text, gallery], | |
| ) | |
| if AUTO_REFRESH_SECONDS > 0: | |
| timer = gr.Timer(value=AUTO_REFRESH_SECONDS) | |
| timer.tick( | |
| fn=refresh_ui, | |
| inputs=[folder_dropdown], | |
| outputs=[ | |
| status, | |
| folder_dropdown, | |
| segment_state, | |
| entries_table, | |
| row_dropdown, | |
| selected_text, | |
| gallery, | |
| raw_json, | |
| ], | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| build_app().launch() | |