import streamlit as st import pandas as pd import os from huggingface_hub import HfApi, hf_hub_download import glob import time import plotly.express as px from concurrent.futures import ThreadPoolExecutor import duckdb # Config MAIN_DATASET = "gionuibk/hyperliquidL2Book" # Auto-discovered from previous step ALL_DATASETS = [ 'gionuibk/hyperliquidL2Book', 'gionuibk/hyperliquid-explorer-raw', 'gionuibk/hyperliquid-node-fills', 'gionuibk/hyperliquid-node-fills-by-block', 'gionuibk/hyperliquid-node-trades', 'gionuibk/hyperliquid-replica-cmds', 'gionuibk/hyperliquid-misc-events', 'gionuibk/hyperliquid-l4-data' ] HF_TOKEN = os.environ.get("HF_TOKEN") CACHE_DIR = "/data/cache" os.makedirs(CACHE_DIR, exist_ok=True) st.set_page_config(page_title="HPLL Data Review", layout="wide", page_icon="📊") @st.cache_data(ttl=300, show_spinner="Fetching Inventory...") def load_s3_inventory(): # st.toast removed to support caching api = HfApi(token=HF_TOKEN) # Inventory is ONLY in the main dataset files = api.list_repo_files(repo_id=MAIN_DATASET, repo_type="dataset") inv_files = [f for f in files if f.startswith("config/inventory_parts/")] if not inv_files: return pd.DataFrame() dfs = [] def download_and_load(f): try: local = hf_hub_download(repo_id=MAIN_DATASET, filename=f, repo_type="dataset", local_dir=CACHE_DIR, token=HF_TOKEN) return pd.read_parquet(local) except: return None with ThreadPoolExecutor(max_workers=10) as executor: results = executor.map(download_and_load, inv_files) dfs = [r for r in results if r is not None] if dfs: full_df = pd.concat(dfs, ignore_index=True) full_df['date'] = pd.to_datetime(full_df['modified'], unit='s').dt.date return full_df return pd.DataFrame() @st.cache_data(ttl=300, show_spinner="Scanning ALL Datasets...") def load_all_downloaded_data(): api = HfApi(token=HF_TOKEN) all_rows = [] def scan_dataset(dataset_id): try: files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset") # Legacy datasets often store files directly in data/ without subfolders # Pattern: data/filename.parquet data_files = [f for f in files if f.startswith("data/") and f.endswith(".parquet") and "inventory" not in f and "config" not in f] ds_rows = [] for f in data_files: # Infer metadata parts = f.split('/') # ['data', 'filename.parquet'] or ['data', 'category', 'filename.parquet'] filename = parts[-1] # Category Inference if len(parts) > 2: category = parts[1] else: # Fallback: Infer category from dataset name or filename if "node-fills" in dataset_id or "batch_upto" in filename: category = "node_fills" elif "explorer" in dataset_id: category = "explorer_blocks" elif "trades" in dataset_id: category = "node_trades" elif "l4" in dataset_id: category = "l4_data" else: category = "other" # Timestamp Inference ts = 0 try: # Case 1: Standard indexer format: name_TIMESTAMP.parquet # Case 2: Legacy node files: batch_upto_YYYYMMDD_HH.lz4_TIMESTAMP.parquet name_clean = filename.replace('.parquet', '') tokens = name_clean.split('_') # Try last token first (most common for timestamp) if tokens[-1].isdigit() and len(tokens[-1]) > 9: # Unix TS is usually 10 digits ts = int(tokens[-1]) else: # Case 3: Parse YYYYMMDD from batch_upto_20250525... for t in tokens: if t.isdigit() and len(t) == 8 and t.startswith("202"): # Found a date string 2024..., 2025... ts = pd.Timestamp(t).timestamp() break except: pass ds_rows.append({ "dataset": dataset_id, "path": f, "category": category, "filename": filename, "timestamp": ts, "date": pd.to_datetime(ts, unit='s').dt.date if ts > 0 else None }) return ds_rows except Exception as e: print(f"Error scanning {dataset_id}: {e}") return [] with ThreadPoolExecutor(max_workers=8) as executor: results = executor.map(scan_dataset, ALL_DATASETS) for res in results: all_rows.extend(res) return pd.DataFrame(all_rows) # Main UI st.title("📊 Global Data Audit (8 Datasets)") col1, col2 = st.columns([1, 1]) with col1: st.info(f"Source 1: **S3 Inventory** (Target)\n\nValues read from `{MAIN_DATASET}`") with col2: st.success(f"Source 2: **Downloaded Ecosystem**\n\nScanning {len(ALL_DATASETS)} datasets") # Load df_inv = load_s3_inventory() df_down = load_all_downloaded_data() # --- METRICS --- st.divider() if df_inv.empty: st.warning("⚠️ Inventory Missing.") else: c1, c2, c3, c4 = st.columns(4) c1.metric("Total S3 Files", f"{len(df_inv):,}") c2.metric("Total S3 Size", f"{df_inv['size'].sum()/1e9:.2f} GB") if not df_down.empty: c3.metric("Downloaded Files", f"{len(df_down):,}") c4.metric("Datasets Active", f"{df_down['dataset'].nunique()}/{len(ALL_DATASETS)}") else: c3.metric("Downloaded Files", "0") # --- CROSS MATCHING --- st.subheader("🌐 Global Coverage Map") # 1. Map S3 Categories def map_prefix(p): if "market_data" in p: return "market_data" if "explorer_blocks" in p: return "explorer_blocks" if "node_fills" in p: return "node_fills" # Covers node_fills and node_fills_by_block if "node_trades" in p: return "node_trades" if "misc_events" in p: return "misc_events" return "other" df_inv['agg_category'] = df_inv['key'].apply(map_prefix) # 2. Map Dataset Categories # We need to map dataset specific categories to the S3 agg_category def map_ds_cat(row): txt = (row['dataset'] + row['category']).lower() if "market_data" in txt or "l2book" in txt: return "market_data" if "explorer" in txt or "block" in txt: return "explorer_blocks" if "fill" in txt: return "node_fills" if "trade" in txt: return "node_trades" if "misc" in txt: return "misc_events" return "other" if not df_down.empty: df_down['agg_category'] = df_down.apply(map_ds_cat, axis=1) # 3. Group & Merge s3_grp = df_inv.groupby(['agg_category', 'date']).size().reset_index(name='s3_files') down_grp = pd.DataFrame() if not df_down.empty: down_grp = df_down.groupby(['agg_category', 'date']).size().reset_index(name='down_files') if not down_grp.empty: merged = pd.merge(s3_grp, down_grp, on=['agg_category', 'date'], how='outer').fillna(0) else: merged = s3_grp merged['down_files'] = 0 merged = merged.sort_values(['agg_category', 'date'], ascending=[True, False]) # 4. Display Categories cats = merged['agg_category'].unique() for cat in cats: with st.expander(f"📂 {cat.upper()}", expanded=True): sub = merged[merged['agg_category'] == cat] # Chart fig = px.bar( sub, x='date', y=['s3_files', 'down_files'], barmode='group', title=f"Coverage: {cat.upper()}", color_discrete_map={'s3_files': '#FFA07A', 'down_files': '#90EE90'} # Salmon vs LightGreen ) st.plotly_chart(fig, use_container_width=True) # Show which datasets contribute to this category if not df_down.empty: contributors = df_down[df_down['agg_category'] == cat]['dataset'].unique() if len(contributors) > 0: st.caption(f"✅ Data found in: {', '.join(contributors)}") else: st.caption("❌ No downloaded data found in any dataset.") st.divider() st.subheader("🔍 Dataset Inspector") ds_choice = st.selectbox("Select Dataset to Inspect", ALL_DATASETS) if not df_down.empty: ds_subset = df_down[df_down['dataset'] == ds_choice] if ds_subset.empty: st.warning(f"No parquet files found in {ds_choice}") else: st.dataframe(ds_subset, use_container_width=True) # ===================== SQL QUERY TAB ===================== st.divider() st.subheader("🦆 SQL Query (DuckDB)") st.caption("Query any HF Parquet file remotely. **Fast** - runs on server, not your local machine.") # Helper to build URL def hf_parquet_url(repo_id, filename): return f"https://huggingface.co/datasets/{repo_id}/resolve/main/{filename}" # Dataset + File Selection col_ds, col_file = st.columns(2) with col_ds: sql_dataset = st.selectbox("Dataset", ALL_DATASETS + ['gionuibk/hyperliquidL2Book-v2'], key="sql_ds") with col_file: # Fetch file list for selected dataset (cached) @st.cache_data(ttl=600) def get_parquet_files(ds): try: api = HfApi(token=HF_TOKEN) files = api.list_repo_files(repo_id=ds, repo_type="dataset") return [f for f in files if f.endswith('.parquet')][:100] # Limit to 100 except: return [] available_files = get_parquet_files(sql_dataset) sql_file = st.selectbox("File", available_files if available_files else ["(No files found)"], key="sql_file") # SQL Input example_url = hf_parquet_url(sql_dataset, sql_file) if sql_file and sql_file != "(No files found)" else "URL" default_sql = f"SELECT * FROM read_parquet('{example_url}') LIMIT 10" sql_input = st.text_area("SQL Query", value=default_sql, height=100) # Execute Button if st.button("🚀 Run Query", type="primary"): if sql_input.strip(): with st.spinner("Executing query..."): try: con = duckdb.connect(':memory:') con.execute("INSTALL httpfs; LOAD httpfs;") result = con.execute(sql_input).fetchdf() st.success(f"✅ Query returned {len(result)} rows.") st.dataframe(result, use_container_width=True) # Download button csv = result.to_csv(index=False) st.download_button("⬇️ Download CSV", csv, "query_result.csv", "text/csv") except Exception as e: st.error(f"❌ Query Error: {e}") else: st.warning("Please enter a SQL query.") st.write(f"Last updated: {time.strftime('%H:%M:%S')}") if st.button("Refresh"): st.rerun()