Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import pandas as pd | |
| import os | |
| from huggingface_hub import HfApi, hf_hub_download | |
| import glob | |
| import time | |
| import plotly.express as px | |
| from concurrent.futures import ThreadPoolExecutor | |
| import duckdb | |
| # Config | |
| MAIN_DATASET = "gionuibk/hyperliquidL2Book" | |
| # Auto-discovered from previous step | |
| ALL_DATASETS = [ | |
| 'gionuibk/hyperliquidL2Book', | |
| 'gionuibk/hyperliquid-explorer-raw', | |
| 'gionuibk/hyperliquid-node-fills', | |
| 'gionuibk/hyperliquid-node-fills-by-block', | |
| 'gionuibk/hyperliquid-node-trades', | |
| 'gionuibk/hyperliquid-replica-cmds', | |
| 'gionuibk/hyperliquid-misc-events', | |
| 'gionuibk/hyperliquid-l4-data' | |
| ] | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| CACHE_DIR = "/data/cache" | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| st.set_page_config(page_title="HPLL Data Review", layout="wide", page_icon="π") | |
| def load_s3_inventory(): | |
| # st.toast removed to support caching | |
| api = HfApi(token=HF_TOKEN) | |
| # Inventory is ONLY in the main dataset | |
| files = api.list_repo_files(repo_id=MAIN_DATASET, repo_type="dataset") | |
| inv_files = [f for f in files if f.startswith("config/inventory_parts/")] | |
| if not inv_files: return pd.DataFrame() | |
| dfs = [] | |
| def download_and_load(f): | |
| try: | |
| local = hf_hub_download(repo_id=MAIN_DATASET, filename=f, repo_type="dataset", local_dir=CACHE_DIR, token=HF_TOKEN) | |
| return pd.read_parquet(local) | |
| except: return None | |
| with ThreadPoolExecutor(max_workers=10) as executor: | |
| results = executor.map(download_and_load, inv_files) | |
| dfs = [r for r in results if r is not None] | |
| if dfs: | |
| full_df = pd.concat(dfs, ignore_index=True) | |
| full_df['date'] = pd.to_datetime(full_df['modified'], unit='s').dt.date | |
| return full_df | |
| return pd.DataFrame() | |
| def load_all_downloaded_data(): | |
| api = HfApi(token=HF_TOKEN) | |
| all_rows = [] | |
| def scan_dataset(dataset_id): | |
| try: | |
| files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset") | |
| # Legacy datasets often store files directly in data/ without subfolders | |
| # Pattern: data/filename.parquet | |
| data_files = [f for f in files if f.startswith("data/") and f.endswith(".parquet") and "inventory" not in f and "config" not in f] | |
| ds_rows = [] | |
| for f in data_files: | |
| # Infer metadata | |
| parts = f.split('/') # ['data', 'filename.parquet'] or ['data', 'category', 'filename.parquet'] | |
| filename = parts[-1] | |
| # Category Inference | |
| if len(parts) > 2: | |
| category = parts[1] | |
| else: | |
| # Fallback: Infer category from dataset name or filename | |
| if "node-fills" in dataset_id or "batch_upto" in filename: category = "node_fills" | |
| elif "explorer" in dataset_id: category = "explorer_blocks" | |
| elif "trades" in dataset_id: category = "node_trades" | |
| elif "l4" in dataset_id: category = "l4_data" | |
| else: category = "other" | |
| # Timestamp Inference | |
| ts = 0 | |
| try: | |
| # Case 1: Standard indexer format: name_TIMESTAMP.parquet | |
| # Case 2: Legacy node files: batch_upto_YYYYMMDD_HH.lz4_TIMESTAMP.parquet | |
| name_clean = filename.replace('.parquet', '') | |
| tokens = name_clean.split('_') | |
| # Try last token first (most common for timestamp) | |
| if tokens[-1].isdigit() and len(tokens[-1]) > 9: # Unix TS is usually 10 digits | |
| ts = int(tokens[-1]) | |
| else: | |
| # Case 3: Parse YYYYMMDD from batch_upto_20250525... | |
| for t in tokens: | |
| if t.isdigit() and len(t) == 8 and t.startswith("202"): | |
| # Found a date string 2024..., 2025... | |
| ts = pd.Timestamp(t).timestamp() | |
| break | |
| except: pass | |
| ds_rows.append({ | |
| "dataset": dataset_id, | |
| "path": f, | |
| "category": category, | |
| "filename": filename, | |
| "timestamp": ts, | |
| "date": pd.to_datetime(ts, unit='s').dt.date if ts > 0 else None | |
| }) | |
| return ds_rows | |
| except Exception as e: | |
| print(f"Error scanning {dataset_id}: {e}") | |
| return [] | |
| with ThreadPoolExecutor(max_workers=8) as executor: | |
| results = executor.map(scan_dataset, ALL_DATASETS) | |
| for res in results: | |
| all_rows.extend(res) | |
| return pd.DataFrame(all_rows) | |
| # Main UI | |
| st.title("π Global Data Audit (8 Datasets)") | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| st.info(f"Source 1: **S3 Inventory** (Target)\n\nValues read from `{MAIN_DATASET}`") | |
| with col2: | |
| st.success(f"Source 2: **Downloaded Ecosystem**\n\nScanning {len(ALL_DATASETS)} datasets") | |
| # Load | |
| df_inv = load_s3_inventory() | |
| df_down = load_all_downloaded_data() | |
| # --- METRICS --- | |
| st.divider() | |
| if df_inv.empty: | |
| st.warning("β οΈ Inventory Missing.") | |
| else: | |
| c1, c2, c3, c4 = st.columns(4) | |
| c1.metric("Total S3 Files", f"{len(df_inv):,}") | |
| c2.metric("Total S3 Size", f"{df_inv['size'].sum()/1e9:.2f} GB") | |
| if not df_down.empty: | |
| c3.metric("Downloaded Files", f"{len(df_down):,}") | |
| c4.metric("Datasets Active", f"{df_down['dataset'].nunique()}/{len(ALL_DATASETS)}") | |
| else: | |
| c3.metric("Downloaded Files", "0") | |
| # --- CROSS MATCHING --- | |
| st.subheader("π Global Coverage Map") | |
| # 1. Map S3 Categories | |
| def map_prefix(p): | |
| if "market_data" in p: return "market_data" | |
| if "explorer_blocks" in p: return "explorer_blocks" | |
| if "node_fills" in p: return "node_fills" # Covers node_fills and node_fills_by_block | |
| if "node_trades" in p: return "node_trades" | |
| if "misc_events" in p: return "misc_events" | |
| return "other" | |
| df_inv['agg_category'] = df_inv['key'].apply(map_prefix) | |
| # 2. Map Dataset Categories | |
| # We need to map dataset specific categories to the S3 agg_category | |
| def map_ds_cat(row): | |
| txt = (row['dataset'] + row['category']).lower() | |
| if "market_data" in txt or "l2book" in txt: return "market_data" | |
| if "explorer" in txt or "block" in txt: return "explorer_blocks" | |
| if "fill" in txt: return "node_fills" | |
| if "trade" in txt: return "node_trades" | |
| if "misc" in txt: return "misc_events" | |
| return "other" | |
| if not df_down.empty: | |
| df_down['agg_category'] = df_down.apply(map_ds_cat, axis=1) | |
| # 3. Group & Merge | |
| s3_grp = df_inv.groupby(['agg_category', 'date']).size().reset_index(name='s3_files') | |
| down_grp = pd.DataFrame() | |
| if not df_down.empty: | |
| down_grp = df_down.groupby(['agg_category', 'date']).size().reset_index(name='down_files') | |
| if not down_grp.empty: | |
| merged = pd.merge(s3_grp, down_grp, on=['agg_category', 'date'], how='outer').fillna(0) | |
| else: | |
| merged = s3_grp | |
| merged['down_files'] = 0 | |
| merged = merged.sort_values(['agg_category', 'date'], ascending=[True, False]) | |
| # 4. Display Categories | |
| cats = merged['agg_category'].unique() | |
| for cat in cats: | |
| with st.expander(f"π {cat.upper()}", expanded=True): | |
| sub = merged[merged['agg_category'] == cat] | |
| # Chart | |
| fig = px.bar( | |
| sub, x='date', y=['s3_files', 'down_files'], | |
| barmode='group', | |
| title=f"Coverage: {cat.upper()}", | |
| color_discrete_map={'s3_files': '#FFA07A', 'down_files': '#90EE90'} # Salmon vs LightGreen | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Show which datasets contribute to this category | |
| if not df_down.empty: | |
| contributors = df_down[df_down['agg_category'] == cat]['dataset'].unique() | |
| if len(contributors) > 0: | |
| st.caption(f"β Data found in: {', '.join(contributors)}") | |
| else: | |
| st.caption("β No downloaded data found in any dataset.") | |
| st.divider() | |
| st.subheader("π Dataset Inspector") | |
| ds_choice = st.selectbox("Select Dataset to Inspect", ALL_DATASETS) | |
| if not df_down.empty: | |
| ds_subset = df_down[df_down['dataset'] == ds_choice] | |
| if ds_subset.empty: | |
| st.warning(f"No parquet files found in {ds_choice}") | |
| else: | |
| st.dataframe(ds_subset, use_container_width=True) | |
| # ===================== SQL QUERY TAB ===================== | |
| st.divider() | |
| st.subheader("π¦ SQL Query (DuckDB)") | |
| st.caption("Query any HF Parquet file remotely. **Fast** - runs on server, not your local machine.") | |
| # Helper to build URL | |
| def hf_parquet_url(repo_id, filename): | |
| return f"https://huggingface.co/datasets/{repo_id}/resolve/main/{filename}" | |
| # Dataset + File Selection | |
| col_ds, col_file = st.columns(2) | |
| with col_ds: | |
| sql_dataset = st.selectbox("Dataset", ALL_DATASETS + ['gionuibk/hyperliquidL2Book-v2'], key="sql_ds") | |
| with col_file: | |
| # Fetch file list for selected dataset (cached) | |
| def get_parquet_files(ds): | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| files = api.list_repo_files(repo_id=ds, repo_type="dataset") | |
| return [f for f in files if f.endswith('.parquet')][:100] # Limit to 100 | |
| except: | |
| return [] | |
| available_files = get_parquet_files(sql_dataset) | |
| sql_file = st.selectbox("File", available_files if available_files else ["(No files found)"], key="sql_file") | |
| # SQL Input | |
| example_url = hf_parquet_url(sql_dataset, sql_file) if sql_file and sql_file != "(No files found)" else "URL" | |
| default_sql = f"SELECT * FROM read_parquet('{example_url}') LIMIT 10" | |
| sql_input = st.text_area("SQL Query", value=default_sql, height=100) | |
| # Execute Button | |
| if st.button("π Run Query", type="primary"): | |
| if sql_input.strip(): | |
| with st.spinner("Executing query..."): | |
| try: | |
| con = duckdb.connect(':memory:') | |
| con.execute("INSTALL httpfs; LOAD httpfs;") | |
| result = con.execute(sql_input).fetchdf() | |
| st.success(f"β Query returned {len(result)} rows.") | |
| st.dataframe(result, use_container_width=True) | |
| # Download button | |
| csv = result.to_csv(index=False) | |
| st.download_button("β¬οΈ Download CSV", csv, "query_result.csv", "text/csv") | |
| except Exception as e: | |
| st.error(f"β Query Error: {e}") | |
| else: | |
| st.warning("Please enter a SQL query.") | |
| st.write(f"Last updated: {time.strftime('%H:%M:%S')}") | |
| if st.button("Refresh"): | |
| st.rerun() | |