HPLL-DataReview / app.py
gionuibk's picture
πŸ¦† Add SQL Query Tab (DuckDB)
6eb712d verified
import streamlit as st
import pandas as pd
import os
from huggingface_hub import HfApi, hf_hub_download
import glob
import time
import plotly.express as px
from concurrent.futures import ThreadPoolExecutor
import duckdb
# Config
MAIN_DATASET = "gionuibk/hyperliquidL2Book"
# Auto-discovered from previous step
ALL_DATASETS = [
'gionuibk/hyperliquidL2Book',
'gionuibk/hyperliquid-explorer-raw',
'gionuibk/hyperliquid-node-fills',
'gionuibk/hyperliquid-node-fills-by-block',
'gionuibk/hyperliquid-node-trades',
'gionuibk/hyperliquid-replica-cmds',
'gionuibk/hyperliquid-misc-events',
'gionuibk/hyperliquid-l4-data'
]
HF_TOKEN = os.environ.get("HF_TOKEN")
CACHE_DIR = "/data/cache"
os.makedirs(CACHE_DIR, exist_ok=True)
st.set_page_config(page_title="HPLL Data Review", layout="wide", page_icon="πŸ“Š")
@st.cache_data(ttl=300, show_spinner="Fetching Inventory...")
def load_s3_inventory():
# st.toast removed to support caching
api = HfApi(token=HF_TOKEN)
# Inventory is ONLY in the main dataset
files = api.list_repo_files(repo_id=MAIN_DATASET, repo_type="dataset")
inv_files = [f for f in files if f.startswith("config/inventory_parts/")]
if not inv_files: return pd.DataFrame()
dfs = []
def download_and_load(f):
try:
local = hf_hub_download(repo_id=MAIN_DATASET, filename=f, repo_type="dataset", local_dir=CACHE_DIR, token=HF_TOKEN)
return pd.read_parquet(local)
except: return None
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(download_and_load, inv_files)
dfs = [r for r in results if r is not None]
if dfs:
full_df = pd.concat(dfs, ignore_index=True)
full_df['date'] = pd.to_datetime(full_df['modified'], unit='s').dt.date
return full_df
return pd.DataFrame()
@st.cache_data(ttl=300, show_spinner="Scanning ALL Datasets...")
def load_all_downloaded_data():
api = HfApi(token=HF_TOKEN)
all_rows = []
def scan_dataset(dataset_id):
try:
files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset")
# Legacy datasets often store files directly in data/ without subfolders
# Pattern: data/filename.parquet
data_files = [f for f in files if f.startswith("data/") and f.endswith(".parquet") and "inventory" not in f and "config" not in f]
ds_rows = []
for f in data_files:
# Infer metadata
parts = f.split('/') # ['data', 'filename.parquet'] or ['data', 'category', 'filename.parquet']
filename = parts[-1]
# Category Inference
if len(parts) > 2:
category = parts[1]
else:
# Fallback: Infer category from dataset name or filename
if "node-fills" in dataset_id or "batch_upto" in filename: category = "node_fills"
elif "explorer" in dataset_id: category = "explorer_blocks"
elif "trades" in dataset_id: category = "node_trades"
elif "l4" in dataset_id: category = "l4_data"
else: category = "other"
# Timestamp Inference
ts = 0
try:
# Case 1: Standard indexer format: name_TIMESTAMP.parquet
# Case 2: Legacy node files: batch_upto_YYYYMMDD_HH.lz4_TIMESTAMP.parquet
name_clean = filename.replace('.parquet', '')
tokens = name_clean.split('_')
# Try last token first (most common for timestamp)
if tokens[-1].isdigit() and len(tokens[-1]) > 9: # Unix TS is usually 10 digits
ts = int(tokens[-1])
else:
# Case 3: Parse YYYYMMDD from batch_upto_20250525...
for t in tokens:
if t.isdigit() and len(t) == 8 and t.startswith("202"):
# Found a date string 2024..., 2025...
ts = pd.Timestamp(t).timestamp()
break
except: pass
ds_rows.append({
"dataset": dataset_id,
"path": f,
"category": category,
"filename": filename,
"timestamp": ts,
"date": pd.to_datetime(ts, unit='s').dt.date if ts > 0 else None
})
return ds_rows
except Exception as e:
print(f"Error scanning {dataset_id}: {e}")
return []
with ThreadPoolExecutor(max_workers=8) as executor:
results = executor.map(scan_dataset, ALL_DATASETS)
for res in results:
all_rows.extend(res)
return pd.DataFrame(all_rows)
# Main UI
st.title("πŸ“Š Global Data Audit (8 Datasets)")
col1, col2 = st.columns([1, 1])
with col1:
st.info(f"Source 1: **S3 Inventory** (Target)\n\nValues read from `{MAIN_DATASET}`")
with col2:
st.success(f"Source 2: **Downloaded Ecosystem**\n\nScanning {len(ALL_DATASETS)} datasets")
# Load
df_inv = load_s3_inventory()
df_down = load_all_downloaded_data()
# --- METRICS ---
st.divider()
if df_inv.empty:
st.warning("⚠️ Inventory Missing.")
else:
c1, c2, c3, c4 = st.columns(4)
c1.metric("Total S3 Files", f"{len(df_inv):,}")
c2.metric("Total S3 Size", f"{df_inv['size'].sum()/1e9:.2f} GB")
if not df_down.empty:
c3.metric("Downloaded Files", f"{len(df_down):,}")
c4.metric("Datasets Active", f"{df_down['dataset'].nunique()}/{len(ALL_DATASETS)}")
else:
c3.metric("Downloaded Files", "0")
# --- CROSS MATCHING ---
st.subheader("🌐 Global Coverage Map")
# 1. Map S3 Categories
def map_prefix(p):
if "market_data" in p: return "market_data"
if "explorer_blocks" in p: return "explorer_blocks"
if "node_fills" in p: return "node_fills" # Covers node_fills and node_fills_by_block
if "node_trades" in p: return "node_trades"
if "misc_events" in p: return "misc_events"
return "other"
df_inv['agg_category'] = df_inv['key'].apply(map_prefix)
# 2. Map Dataset Categories
# We need to map dataset specific categories to the S3 agg_category
def map_ds_cat(row):
txt = (row['dataset'] + row['category']).lower()
if "market_data" in txt or "l2book" in txt: return "market_data"
if "explorer" in txt or "block" in txt: return "explorer_blocks"
if "fill" in txt: return "node_fills"
if "trade" in txt: return "node_trades"
if "misc" in txt: return "misc_events"
return "other"
if not df_down.empty:
df_down['agg_category'] = df_down.apply(map_ds_cat, axis=1)
# 3. Group & Merge
s3_grp = df_inv.groupby(['agg_category', 'date']).size().reset_index(name='s3_files')
down_grp = pd.DataFrame()
if not df_down.empty:
down_grp = df_down.groupby(['agg_category', 'date']).size().reset_index(name='down_files')
if not down_grp.empty:
merged = pd.merge(s3_grp, down_grp, on=['agg_category', 'date'], how='outer').fillna(0)
else:
merged = s3_grp
merged['down_files'] = 0
merged = merged.sort_values(['agg_category', 'date'], ascending=[True, False])
# 4. Display Categories
cats = merged['agg_category'].unique()
for cat in cats:
with st.expander(f"πŸ“‚ {cat.upper()}", expanded=True):
sub = merged[merged['agg_category'] == cat]
# Chart
fig = px.bar(
sub, x='date', y=['s3_files', 'down_files'],
barmode='group',
title=f"Coverage: {cat.upper()}",
color_discrete_map={'s3_files': '#FFA07A', 'down_files': '#90EE90'} # Salmon vs LightGreen
)
st.plotly_chart(fig, use_container_width=True)
# Show which datasets contribute to this category
if not df_down.empty:
contributors = df_down[df_down['agg_category'] == cat]['dataset'].unique()
if len(contributors) > 0:
st.caption(f"βœ… Data found in: {', '.join(contributors)}")
else:
st.caption("❌ No downloaded data found in any dataset.")
st.divider()
st.subheader("πŸ” Dataset Inspector")
ds_choice = st.selectbox("Select Dataset to Inspect", ALL_DATASETS)
if not df_down.empty:
ds_subset = df_down[df_down['dataset'] == ds_choice]
if ds_subset.empty:
st.warning(f"No parquet files found in {ds_choice}")
else:
st.dataframe(ds_subset, use_container_width=True)
# ===================== SQL QUERY TAB =====================
st.divider()
st.subheader("πŸ¦† SQL Query (DuckDB)")
st.caption("Query any HF Parquet file remotely. **Fast** - runs on server, not your local machine.")
# Helper to build URL
def hf_parquet_url(repo_id, filename):
return f"https://huggingface.co/datasets/{repo_id}/resolve/main/{filename}"
# Dataset + File Selection
col_ds, col_file = st.columns(2)
with col_ds:
sql_dataset = st.selectbox("Dataset", ALL_DATASETS + ['gionuibk/hyperliquidL2Book-v2'], key="sql_ds")
with col_file:
# Fetch file list for selected dataset (cached)
@st.cache_data(ttl=600)
def get_parquet_files(ds):
try:
api = HfApi(token=HF_TOKEN)
files = api.list_repo_files(repo_id=ds, repo_type="dataset")
return [f for f in files if f.endswith('.parquet')][:100] # Limit to 100
except:
return []
available_files = get_parquet_files(sql_dataset)
sql_file = st.selectbox("File", available_files if available_files else ["(No files found)"], key="sql_file")
# SQL Input
example_url = hf_parquet_url(sql_dataset, sql_file) if sql_file and sql_file != "(No files found)" else "URL"
default_sql = f"SELECT * FROM read_parquet('{example_url}') LIMIT 10"
sql_input = st.text_area("SQL Query", value=default_sql, height=100)
# Execute Button
if st.button("πŸš€ Run Query", type="primary"):
if sql_input.strip():
with st.spinner("Executing query..."):
try:
con = duckdb.connect(':memory:')
con.execute("INSTALL httpfs; LOAD httpfs;")
result = con.execute(sql_input).fetchdf()
st.success(f"βœ… Query returned {len(result)} rows.")
st.dataframe(result, use_container_width=True)
# Download button
csv = result.to_csv(index=False)
st.download_button("⬇️ Download CSV", csv, "query_result.csv", "text/csv")
except Exception as e:
st.error(f"❌ Query Error: {e}")
else:
st.warning("Please enter a SQL query.")
st.write(f"Last updated: {time.strftime('%H:%M:%S')}")
if st.button("Refresh"):
st.rerun()