binance-data-backend / modal_worker.py
Vycka12
Fix kwarg timeframe to klines_tf in VPIN pre-scan
21223a3
import modal
import pandas as pd
import requests
import io
import zipfile
import gc
import os
import numpy as np
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
# Modal app
app = modal.App("binance-data-dashboard")
image = modal.Image.debian_slim().pip_install("pandas", "requests", "python-dateutil", "ccxt", "huggingface_hub", "numpy")
def upload_to_hf(content_or_path, filename, repo_id, token, is_file=False):
"""Uploads content or a file directly to Hugging Face repository."""
from huggingface_hub import HfApi
import io
try:
api = HfApi(token=token)
try:
api.create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True)
except:
pass
if is_file:
api.upload_file(
path_or_fileobj=content_or_path,
path_in_repo=filename,
repo_id=repo_id,
repo_type="dataset"
)
else:
api.upload_file(
path_or_fileobj=io.BytesIO(content_or_path.encode('utf-8')),
path_in_repo=filename,
repo_id=repo_id,
repo_type="dataset"
)
print(f" [HF] Successfully uploaded to {repo_id}/{filename}")
return True, f"https://huggingface.co/datasets/{repo_id}/blob/main/{filename}"
except Exception as e:
print(f" [HF] Upload failed: {e}")
return False, str(e)
def yield_vision_zips(base_url, data_type, clean_symbol, start_dt, end_dt, klines_tf=None, usecols=None):
"""
Generator that yields monthly DataFrames one by one.
"""
# Build the path segment
if klines_tf:
path_segment = f"klines/{clean_symbol}/{klines_tf}"
file_prefix = f"{clean_symbol}-{klines_tf}"
else:
path_segment = f"{data_type}/{clean_symbol}"
file_prefix = f"{clean_symbol}-{data_type}"
# 1) Monthly archives
monthly_done = []
current_month = start_dt.replace(day=1)
while current_month <= end_dt.replace(day=1):
next_month = current_month + relativedelta(months=1)
if next_month <= datetime.now().replace(day=1):
m_str = current_month.strftime("%Y-%m")
url = f"{base_url}/monthly/{path_segment}/{file_prefix}-{m_str}.zip"
try:
res = requests.get(url, timeout=30)
if res.status_code == 200:
if len(res.content) < 100:
print(f" [SKIP] {url}: File too small ({len(res.content)} bytes)")
continue
try:
with zipfile.ZipFile(io.BytesIO(res.content)) as z:
namelist = z.namelist()
if not namelist:
print(f" [SKIP] {url}: No files inside ZIP")
continue
with z.open(namelist[0]) as f:
df = pd.read_csv(f, header=None, usecols=usecols)
if not df.empty and not str(df.iloc[0, 0]).isdigit():
df = df.iloc[1:].reset_index(drop=True)
yield df, m_str
monthly_done.append(current_month)
print(f" [OK] Month {m_str}: {len(df)} rows")
except zipfile.BadZipFile:
print(f" [ERR] {url}: Invalid ZIP file content")
else:
print(f" [SKIP] Month {m_str}: HTTP {res.status_code}")
except Exception as e:
print(f" [ERR] Month {m_str}: {e}")
current_month = next_month
# 2) Daily archives
temp_date = start_dt
while temp_date <= end_dt:
is_covered = any(
m_dt <= temp_date < (m_dt + relativedelta(months=1))
for m_dt in monthly_done
)
if not is_covered and temp_date < datetime.now():
d_str = temp_date.strftime("%Y-%m-%d")
url = f"{base_url}/daily/{path_segment}/{file_prefix}-{d_str}.zip"
try:
res = requests.get(url, timeout=15)
if res.status_code == 200:
with zipfile.ZipFile(io.BytesIO(res.content)) as z:
with z.open(z.namelist()[0]) as f:
df = pd.read_csv(f, header=None, usecols=usecols)
if not str(df.iloc[0, 0]).isdigit():
df = df.iloc[1:].reset_index(drop=True)
yield df, d_str
except:
pass
temp_date += timedelta(days=1)
def download_vision_zips(base_url, data_type, clean_symbol, start_dt, end_dt, klines_tf=None):
"""Backwards compatibility for simple downloads."""
all_dfs = []
for df, label in yield_vision_zips(base_url, data_type, clean_symbol, start_dt, end_dt, klines_tf):
all_dfs.append(df)
if all_dfs:
return pd.concat(all_dfs, ignore_index=True)
return pd.DataFrame()
# ============================================================
# CLOUD FUNCTIONS
# ============================================================
@app.function(image=image, timeout=7200, cpu=1.0, memory=51200)
def fetch_klines_cloud(symbol: str, timeframe: str, start_date: str, end_date: str, hf_repo: str = None, hf_token: str = None):
"""Download Klines (OHLCV) in the cloud with chunked processing."""
print(f"[CLOUD] Klines (Chunked): {symbol} {timeframe} | {start_date} -> {end_date}")
clean_symbol = symbol.replace("/", "").replace(":", "")
base_url = "https://data.binance.vision/data/futures/um"
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
all_csv_chunks = []
total_rows = 0
cols = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
for df_chunk, label in yield_vision_zips(base_url, "klines", clean_symbol, start_dt, end_dt, klines_tf=timeframe, usecols=[0,1,2,3,4,5]):
df_chunk.columns = cols
df_chunk['timestamp'] = pd.to_datetime(pd.to_numeric(df_chunk['timestamp']), unit='ms')
df_chunk = df_chunk[(df_chunk['timestamp'] >= pd.to_datetime(start_dt)) &
(df_chunk['timestamp'] <= pd.to_datetime(end_dt) + timedelta(days=1))]
if not df_chunk.empty:
all_csv_chunks.append(df_chunk.to_csv(index=False, header=(total_rows == 0)))
total_rows += len(df_chunk)
del df_chunk
gc.collect()
if total_rows == 0:
return {"success": False, "message": "Klines nerasta."}
csv_string = "".join(all_csv_chunks)
print(f"[CLOUD] Klines done: {total_rows} rows")
hf_url = None
if hf_repo and hf_token:
filename = f"{clean_symbol}_{timeframe}_{start_date}_{end_date}_klines.csv"
success, url_or_err = upload_to_hf(csv_string, filename, hf_repo, hf_token)
if success: hf_url = url_or_err
last_df = pd.read_csv(io.StringIO(all_csv_chunks[-1]))
preview = last_df.tail(100).to_dict(orient="records")
return {"success": True, "row_count": total_rows, "preview": preview, "csv_data": csv_string, "hf_url": hf_url}
@app.function(image=image, timeout=7200, cpu=1.0, memory=51200)
def fetch_aggtrades_cloud(symbol: str, start_date: str, end_date: str, hf_repo: str = None, hf_token: str = None):
"""Download AggTrades in the cloud with strict memory management."""
print(f"[CLOUD] AggTrades (Disk-Backed): {symbol} | {start_date} -> {end_date}")
clean_symbol = symbol.replace("/", "").replace(":", "")
base_url = "https://data.binance.vision/data/futures/um"
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
temp_path = f"/tmp/{clean_symbol}_aggtrades.csv"
total_rows = 0
cols = ['agg_trade_id', 'price', 'quantity', 'first_trade_id', 'last_trade_id', 'timestamp', 'is_buyer_maker']
with open(temp_path, "w") as f:
for df_chunk, label in yield_vision_zips(base_url, "aggTrades", clean_symbol, start_dt, end_dt):
df_chunk.columns = cols
df_chunk['timestamp'] = pd.to_datetime(pd.to_numeric(df_chunk['timestamp']), unit='ms')
df_chunk = df_chunk[(df_chunk['timestamp'] >= pd.to_datetime(start_dt)) &
(df_chunk['timestamp'] <= pd.to_datetime(end_dt) + timedelta(days=1))]
if not df_chunk.empty:
df_chunk.to_csv(f, index=False, header=(total_rows == 0))
total_rows += len(df_chunk)
# Strict Cleanup
del df_chunk
gc.collect()
if total_rows == 0:
if os.path.exists(temp_path): os.remove(temp_path)
return {"success": False, "message": "AggTrades nerasta."}
print(f"[CLOUD] AggTrades done: {total_rows} rows. Saved to disk.")
hf_url = None
if hf_repo and hf_token:
filename = f"{clean_symbol}_{start_date}_{end_date}_aggTrades.csv"
success, url_or_err = upload_to_hf(temp_path, filename, hf_repo, hf_token, is_file=True)
if success: hf_url = url_or_err
# Preview
preview_df = pd.read_csv(temp_path).tail(100)
preview = preview_df.to_dict(orient="records")
# Return limited data as string for immediate use, or signal to use HF
with open(temp_path, "r") as f:
# We only return the CSV string if it's reasonably small, otherwise use HF
# But per user req "csv_data" is usually expected.
# However, for huge files (>50MB), returning as string might crash the client.
csv_data = f.read() if os.path.getsize(temp_path) < 50_000_000 else "FILE_TOO_LARGE_USE_HF"
if os.path.exists(temp_path): os.remove(temp_path)
return {"success": True, "row_count": total_rows, "preview": preview, "csv_data": csv_data, "hf_url": hf_url}
@app.function(image=image, timeout=7200, cpu=1.0, memory=51200)
def fetch_liquidations_cloud(symbol: str, start_date: str, end_date: str, hf_repo: str = None, hf_token: str = None):
"""Download Liquidations in the cloud with strict memory management."""
print(f"[CLOUD] Liquidations (Disk-Backed): {symbol} | {start_date} -> {end_date}")
clean_symbol = symbol.replace("/", "").replace(":", "")
base_url = "https://data.binance.vision/data/futures/um"
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
temp_path = f"/tmp/{clean_symbol}_liquidations.csv"
total_rows = 0
cols = ['symbol', 'side', 'order_type', 'time_in_force', 'original_quantity', 'price',
'average_price', 'order_status', 'last_fill_quantity', 'accumulated_fill_quantity', 'timestamp']
with open(temp_path, "w") as f:
for df_chunk, label in yield_vision_zips(base_url, "liquidationOrders", clean_symbol, start_dt, end_dt):
df_chunk.columns = cols
df_chunk['timestamp'] = pd.to_datetime(pd.to_numeric(df_chunk['timestamp']), unit='ms')
df_chunk = df_chunk[(df_chunk['timestamp'] >= pd.to_datetime(start_dt)) &
(df_chunk['timestamp'] <= pd.to_datetime(end_dt) + timedelta(days=1))]
if not df_chunk.empty:
df_chunk.to_csv(f, index=False, header=(total_rows == 0))
total_rows += len(df_chunk)
del df_chunk
gc.collect()
if total_rows == 0:
if os.path.exists(temp_path): os.remove(temp_path)
return {"success": False, "message": "Liquidations nerasta."}
print(f"[CLOUD] Liquidations done: {total_rows} rows. Saved to disk.")
hf_url = None
if hf_repo and hf_token:
filename = f"{clean_symbol}_{start_date}_{end_date}_liquidations.csv"
success, url_or_err = upload_to_hf(temp_path, filename, hf_repo, hf_token, is_file=True)
if success: hf_url = url_or_err
preview_df = pd.read_csv(temp_path).tail(100)
preview = preview_df.to_dict(orient="records")
with open(temp_path, "r") as f:
csv_data = f.read() if os.path.getsize(temp_path) < 50_000_000 else "FILE_TOO_LARGE_USE_HF"
if os.path.exists(temp_path): os.remove(temp_path)
return {"success": True, "row_count": total_rows, "preview": preview, "csv_data": csv_data, "hf_url": hf_url}
@app.function(image=image, timeout=7200, cpu=1.0, memory=51200)
def fetch_dollar_bars_cloud(symbol: str, start_date: str, end_date: str, threshold: float = 1_000_000, hf_repo: str = None, hf_token: str = None):
"""Download AggTrades and generate Dollar Bars with chunked processing."""
print(f"[CLOUD] Dollar Bars (Chunked): {symbol} | {start_date} -> {end_date} | Threshold: {threshold}")
clean_symbol = symbol.replace("/", "").replace(":", "")
base_url = "https://data.binance.vision/data/futures/um"
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
bars = []
# State tracking
s = {
'current_sum': 0.0, 'b_open': None, 'b_high': -float('inf'),
'b_low': float('inf'), 'b_vol': 0.0, 'b_ts': None
}
for df_chunk, label in yield_vision_zips(base_url, "aggTrades", clean_symbol, start_dt, end_dt):
print(f" [DBARS] Processing {label}...")
df_chunk.columns = ['agg_trade_id', 'price', 'quantity', 'first_trade_id', 'last_trade_id', 'timestamp', 'is_buyer_maker']
df_chunk['timestamp'] = pd.to_datetime(pd.to_numeric(df_chunk['timestamp']), unit='ms')
df_chunk['price'] = pd.to_numeric(df_chunk['price'])
df_chunk['quantity'] = pd.to_numeric(df_chunk['quantity'])
df_chunk['dollar_value'] = df_chunk['price'] * df_chunk['quantity']
for _, row in df_chunk.iterrows():
if s['b_open'] is None:
s['b_open'] = row['price']
s['b_ts'] = row['timestamp']
s['b_high'] = max(s['b_high'], row['price'])
s['b_low'] = min(s['b_low'], row['price'])
s['b_vol'] += row['quantity']
s['current_sum'] += row['dollar_value']
if s['current_sum'] >= threshold:
bars.append({
'timestamp': s['b_ts'], 'open': s['b_open'], 'high': s['b_high'],
'low': s['b_low'], 'close': row['price'], 'volume': s['b_vol'],
'dollar_volume': s['current_sum']
})
s['current_sum'] = 0.0
s['b_open'], s['b_high'], s['b_low'], s['b_vol'] = None, -float('inf'), float('inf'), 0.0
del df_chunk
gc.collect()
if not bars:
return {"success": False, "message": "Dollar Bars nebuvo sugeneruoti."}
result_df = pd.DataFrame(bars)
print(f"[CLOUD] Dollar Bars done: {len(result_df)} bars")
csv_string = result_df.to_csv(index=False)
hf_url = None
if hf_repo and hf_token:
filename = f"{clean_symbol}_{start_date}_{end_date}_dollarBars_{int(threshold)}.csv"
success, url_or_err = upload_to_hf(csv_string, filename, hf_repo, hf_token)
if success: hf_url = url_or_err
preview = result_df.tail(100).to_dict(orient="records")
return {"success": True, "row_count": len(result_df), "preview": preview, "csv_data": csv_string, "hf_url": hf_url}
@app.function(image=image, timeout=7200, cpu=1.0, memory=51200)
def fetch_vpin_cloud(symbol: str, start_date: str, end_date: str, buckets_per_day: int = 50, hf_repo: str = None, hf_token: str = None):
"""
Download AggTrades and calculate VPIN.
Strictly sequential processing: one month at a time.
"""
import numpy as np
import pandas as pd
import gc
print(f"[CLOUD] VPIN (Strict Sequential): {symbol} | {start_date} -> {end_date} | Buckets/Day: {buckets_per_day}")
clean_symbol = symbol.replace("/", "").replace(":", "")
base_url = "https://data.binance.vision/data/futures/um"
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
total_days = (end_dt - start_dt).days + 1
# To avoid the slow AggTrades pre-scan, we use 1m Klines *only* for the volume estimate
# because bucket_size must be fixed for the indicator to be valid across time.
print(" [VPIN] Step 1: Sequential Volume Estimation (Fast 1m Klines)...")
total_vol = 0
for df_k, label in yield_vision_zips(base_url, "klines", clean_symbol, start_dt, end_dt, klines_tf="1m", usecols=[5]):
total_vol += pd.to_numeric(df_k.iloc[:, 0]).sum()
del df_k
gc.collect()
if total_vol == 0:
return {"success": False, "message": "Volume data not found."}
bucket_size = total_vol / (total_days * buckets_per_day)
print(f" [VPIN] Step 2: Sequential AggTrades processing | Target Bucket: {bucket_size:,.2f}")
vpin_results = []
# Persistent state for VPIN calculation across months
state = {
'residual_vol': 0.0, 'residual_buy': 0.0, 'residual_sell': 0.0,
'current_bucket_id': 0,
'recent_imbalances': [] # Rolling window of imbalances
}
for df_chunk, label in yield_vision_zips(base_url, "aggTrades", clean_symbol, start_dt, end_dt):
row_count = len(df_chunk)
print(f" [VPIN] Processing {label} ({row_count:,.0f} rows)...")
# 1. Prepare data (Columns: 1=Price, 2=Qty, 5=TS, 6=IsMaker)
prices = pd.to_numeric(df_chunk.iloc[:, 1]).values
quants = pd.to_numeric(df_chunk.iloc[:, 2]).values
times = pd.to_numeric(df_chunk.iloc[:, 5]).values
is_maker = df_chunk.iloc[:, 6].values
# Buy/Sell classification
buys = np.where(~is_maker, quants, 0.0)
sells = np.where(is_maker, quants, 0.0)
# 2. Vectorized Bucket Assignment
cum_vol = np.cumsum(quants) + state['residual_vol']
bucket_ids = (cum_vol // bucket_size).astype(int)
# 3. Aggregate by Bucket using Pandas (Vectorized & Fast)
df_work = pd.DataFrame({
'bid': bucket_ids, 'p': prices, 'q': quants,
't': times, 'b': buys, 's': sells
})
# Group by Bucket ID
grouped = df_work.groupby('bid')
aggs = grouped.agg({
't': 'first', 'p': ['first', 'max', 'min', 'last'],
'q': 'sum', 'b': 'sum', 's': 'sum'
})
aggs.columns = ['ts', 'open', 'high', 'low', 'close', 'vol', 'buy', 'sell']
# 4. Handle Partial Buckets at month boundaries
# The last bucket ID in this chunk might be incomplete
last_bid = bucket_ids[-1]
is_complete = (grouped.size().index < last_bid).values
complete_buckets = aggs[is_complete].copy()
# Process complete buckets
if not complete_buckets.empty:
for _, row in complete_buckets.iterrows():
imbalance = abs(row['buy'] - row['sell'])
state['recent_imbalances'].append(imbalance)
if len(state['recent_imbalances']) > buckets_per_day:
state['recent_imbalances'].pop(0)
vpin_val = None
if len(state['recent_imbalances']) == buckets_per_day:
vpin_val = sum(state['recent_imbalances']) / (buckets_per_day * bucket_size)
vpin_results.append({
'timestamp': pd.to_datetime(row['ts'], unit='ms'),
'open': row['open'], 'high': row['high'], 'low': row['low'],
'close': row['close'], 'volume': row['vol'], 'vpin': vpin_val
})
# Save residue for next month
# Residue is the data from the last (incomplete) bucket ID
residue_mask = (bucket_ids == last_bid)
state['residual_vol'] = quants[residue_mask].sum()
state['residual_buy'] = buys[residue_mask].sum()
state['residual_sell'] = sells[residue_mask].sum()
# Note: Open/High/Low for residue would need more state if we wanted perfect OHLC accuracy
# but for VPIN the volumes are the primary concern.
# Heartbeat log to show activity
print(f" [OK] Finished {label}. Buckets formed: {len(complete_buckets)}. Cumulative Buckets: {len(vpin_results)}")
# Immediate cleanup
del df_chunk, df_work, grouped, aggs, complete_buckets
gc.collect()
if not vpin_results:
return {"success": False, "message": "Buckets were not formed."}
df_final = pd.DataFrame(vpin_results)
print(f"[CLOUD] VPIN Complete: {len(df_final)} buckets.")
csv_string = df_final.to_csv(index=False)
hf_url = None
if hf_repo and hf_token:
filename = f"{clean_symbol}_{start_date}_{end_date}_VPIN.csv"
success, url_or_err = upload_to_hf(csv_string, filename, hf_repo, hf_token)
if success: hf_url = url_or_err
return {"success": True, "row_count": len(df_final), "preview": vpin_results[-100:], "csv_data": csv_string, "hf_url": hf_url}
@app.local_entrypoint()
def main(symbol="BTCUSDT", timeframe="15m", start="2024-01-01", end="2024-02-01"):
# result = fetch_klines_cloud.remote(symbol, timeframe, start, end)
result = fetch_vpin_cloud.remote(symbol, start, end)
if result.get("success"):
filename = f"{symbol}_vpin_{start}_{end}.csv"
with open(filename, "w") as f:
f.write(result["csv_data"])
print(f"File saved: {filename} ({result['row_count']} rows)")
else:
print(f"Error: {result.get('message')}")