zirobtc commited on
Commit
697fba5
·
1 Parent(s): 4ec4791

Upload folder using huggingface_hub

Browse files
data/data_loader.py CHANGED
@@ -534,8 +534,12 @@ class OracleDataset(Dataset):
534
  wallet_data: Dict[str, Any],
535
  total_supply_dec: float,
536
  _register_event_fn,
537
- cached_holders_list: List[List[str]] = None
538
  ) -> None:
 
 
 
 
539
  # Prepare helper sets and maps (static sniper set based on earliest buyers)
540
  all_buy_trades = sorted([e for e in trade_events if e.get('trade_direction') == 0 and e.get('success', False)], key=lambda x: x['timestamp'])
541
  sniper_wallets = []
@@ -571,32 +575,38 @@ class OracleDataset(Dataset):
571
  xfers_win = [e for e in transfer_events if window_start < e['timestamp'] <= ts_value]
572
 
573
  # Per-snapshot holder distribution at ts_value
574
- holder_records_ts = []
575
- holders_end = 0
576
- if cached_holders_list is not None and i < len(cached_holders_list):
577
- # Use cached list of addresses
578
- holder_records_ts = [{'wallet_address': addr, 'current_balance': 0} for addr in cached_holders_list[i]]
579
- holders_end = len(cached_holders_list[i])
580
- elif self.fetcher:
581
- cutoff_dt_ts = datetime.datetime.fromtimestamp(ts_value, tz=datetime.timezone.utc)
582
- holders_end, holder_records_ts = self.fetcher.fetch_holder_snapshot_stats_for_token(
583
- token_address,
584
- cutoff_dt_ts,
585
- limit=HOLDER_SNAPSHOT_TOP_K
586
  )
587
- else:
588
- holder_records_ts = []
589
- holders_end = 0
 
 
 
 
 
 
 
 
 
590
  holder_entries_ts = []
591
  for rec in holder_records_ts:
592
- addr = rec.get('wallet_address')
593
- try:
594
- bal = float(rec.get('current_balance', 0.0) or 0.0)
595
- except (TypeError, ValueError):
596
- bal = 0.0
 
 
 
 
 
597
  pct = (bal / total_supply_dec) if total_supply_dec and total_supply_dec > 0 else 0.0
598
  if addr and pct > 0.0:
599
  holder_entries_ts.append({'wallet': addr, 'holding_pct': pct})
 
600
  holder_entries_ts.sort(key=lambda d: d['holding_pct'], reverse=True)
601
 
602
  # Emit HolderSnapshot for this ts_value
@@ -1346,18 +1356,21 @@ class OracleDataset(Dataset):
1346
  elapsed = (T_cutoff - mint_timestamp).total_seconds()
1347
  snap_idx = int(elapsed // 300)
1348
  holder_records = []
1349
- cached_holders_list = raw_data.get('holder_snapshots_list', [])
1350
- if 0 <= snap_idx < len(cached_holders_list):
1351
- # Handle both old format (list of addresses) and new format (dict with holders)
1352
- snapshot_data = cached_holders_list[snap_idx]
1353
- if isinstance(snapshot_data, dict):
1354
- # New format: {'timestamp': int, 'holders': [...]}
1355
- holder_records = [{'wallet_address': h.get('wallet_address')} for h in snapshot_data.get('holders', [])]
1356
- else:
1357
- # Old format: list of addresses
1358
- holder_records = [{'wallet_address': addr} for addr in snapshot_data]
 
1359
  for holder in holder_records:
1360
- _add_wallet(holder.get('wallet_address'), wallets_to_fetch)
 
 
1361
  _timings['wallet_collection'] = _time.perf_counter() - _t0
1362
  _timings['num_wallets'] = len(wallets_to_fetch)
1363
 
@@ -1877,7 +1890,7 @@ class OracleDataset(Dataset):
1877
  future_trades_for_labels: List[Dict[str, Any]],
1878
  pooler: EmbeddingPooler,
1879
  sample_idx: Optional[int] = None,
1880
- cached_holders_list: List[List[str]] = None,
1881
  cached_ohlc_1s: Optional[torch.Tensor] = None,
1882
  quality_score: Optional[float] = None
1883
  ) -> Optional[Dict[str, Any]]:
@@ -2087,7 +2100,7 @@ class OracleDataset(Dataset):
2087
  HIGH_DEF_INTERVAL = ("1s", 1)
2088
  MIDDLE_INTERVAL = ("30s", 30)
2089
 
2090
- def _emit_chart_segments(trades: List[Dict[str, Any]], interval: tuple, signature_prefix: str, precomputed_ohlc: List[tuple] = None):
2091
  if not trades and precomputed_ohlc is None:
2092
  return []
2093
  interval_label, interval_seconds = interval
@@ -2114,12 +2127,12 @@ class OracleDataset(Dataset):
2114
  'i': interval_label
2115
  }
2116
  emitted_events.append(chart_event)
2117
- _register_event(chart_event, _event_execution_sort_key(last_ts, signature=f"{signature_prefix}-{idx}"))
2118
  return emitted_events
2119
 
2120
- # Emit charts
2121
- chart_events = []
2122
-
 
2123
  # Prepare 1s OHLC from cache if available
2124
  ohlc_1s_precomputed = None
2125
  if cached_ohlc_1s is not None and len(cached_ohlc_1s) > 0:
@@ -2134,8 +2147,8 @@ class OracleDataset(Dataset):
2134
  for i, row in enumerate(slice_tensor)
2135
  ]
2136
 
2137
- chart_events.extend(_emit_chart_segments(high_def_chart_trades, HIGH_DEF_INTERVAL, "chart-hd", precomputed_ohlc=ohlc_1s_precomputed))
2138
- chart_events.extend(_emit_chart_segments(middle_chart_trades, MIDDLE_INTERVAL, "chart-mid"))
2139
 
2140
  # 5. Process Other Records (Pool, Liquidity, Fees, Burns, Locks, Migrations)
2141
  pool_meta_by_address = {}
@@ -2331,7 +2344,20 @@ class OracleDataset(Dataset):
2331
  _register_event,
2332
  cached_holders_list=cached_holders_list
2333
  )
2334
-
 
 
 
 
 
 
 
 
 
 
 
 
 
2335
  # 7. Finalize Sequence with Dynamic Sampling
2336
  event_sequence_entries.sort(key=lambda x: x[0])
2337
  raw_event_sequence = [entry[1] for entry in event_sequence_entries]
@@ -2628,7 +2654,7 @@ class OracleDataset(Dataset):
2628
 
2629
  raw_data['ohlc_1s'] = ohlc_1s
2630
 
2631
- # Generate holder snapshots
2632
  interval = 300
2633
  num_intervals = (duration_seconds // interval) + 1
2634
  snapshot_stats = torch.zeros((num_intervals, 6), dtype=torch.float32)
@@ -2640,69 +2666,96 @@ class OracleDataset(Dataset):
2640
  if bucket_idx >= 0:
2641
  buckets[bucket_idx].append(t)
2642
 
2643
- # Batch-fetch ALL holder counts in ONE query instead of N per-interval queries
2644
- # Original code did N sequential fetch_holder_snapshot_stats_for_token() calls
2645
- # which each execute a window function query. This batch replaces them all.
2646
- holder_counts_by_interval = {}
2647
- try:
2648
- max_snapshot_ts = t0 + datetime.timedelta(seconds=num_intervals * interval)
2649
- batch_holder_query = """
2650
- SELECT
2651
- intDiv(toUInt64(%(t0_epoch)s) + 300 * number, 1) as snap_epoch,
2652
- number as interval_idx
2653
- FROM numbers(%(num_intervals)s)
2654
- """
2655
- # Use a simpler approach: query holder counts for each interval boundary
2656
- # by fetching ALL holdings for this token once, then bucketing in Python
2657
- all_holdings = self.fetcher.db_client.execute("""
2658
- SELECT wallet_address, current_balance, updated_at
2659
- FROM wallet_holdings
2660
- WHERE mint_address = %(token)s
2661
- ORDER BY wallet_address, updated_at
2662
- """, {'token': token_address})
2663
 
2664
- if all_holdings:
2665
- # Build point-in-time holder counts per interval
2666
- # For each interval, count wallets with balance > 0 as of that timestamp
2667
- from collections import defaultdict as _dd
2668
- wallet_latest = {} # wallet -> (balance, updated_at)
2669
- # Sort by updated_at
2670
- all_holdings.sort(key=lambda x: x[2])
2671
- # Process holdings chronologically and snapshot at each interval boundary
2672
- holding_idx = 0
2673
- for i in range(num_intervals):
2674
- snap_ts = t0 + datetime.timedelta(seconds=(i + 1) * interval)
2675
- # Apply all holdings up to this timestamp
2676
- while holding_idx < len(all_holdings) and all_holdings[holding_idx][2] <= snap_ts:
2677
- wallet, balance, _ = all_holdings[holding_idx]
2678
- wallet_latest[wallet] = balance
2679
- holding_idx += 1
2680
- # Count wallets with positive balance
2681
- holder_counts_by_interval[i] = sum(1 for b in wallet_latest.values() if b and b > 0)
2682
- except Exception as e:
2683
- pass # Fall back to 0 counts
2684
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2685
  holder_snapshots_list = []
 
2686
  for i in range(num_intervals):
2687
  bucket_trades = buckets[i]
2688
  vol = sum(t.get('total_usd', 0.0) for t in bucket_trades)
2689
  tx = len(bucket_trades)
2690
  buys = sum(1 for t in bucket_trades if t.get('trade_direction') == 0 or t.get('trade_type') == 0)
2691
  sells = tx - buys
2692
-
2693
- count = holder_counts_by_interval.get(i, 0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2694
 
2695
  snapshot_stats[i, 0] = float(vol)
2696
  snapshot_stats[i, 1] = float(tx)
2697
  snapshot_stats[i, 2] = float(buys)
2698
  snapshot_stats[i, 3] = float(sells)
2699
  snapshot_stats[i, 4] = float(count)
2700
- snapshot_stats[i, 5] = 0.0 # top10_pct not available in batch mode
2701
 
2702
  snapshot_ts = t0 + datetime.timedelta(seconds=(i+1)*interval)
2703
  holder_snapshots_list.append({
2704
  'timestamp': int(snapshot_ts.timestamp()),
2705
- 'holders': []
 
 
 
 
 
 
2706
  })
2707
 
2708
  raw_data['snapshots_5m'] = snapshot_stats
@@ -2728,9 +2781,12 @@ class OracleDataset(Dataset):
2728
  if liq.get('lp_provider'):
2729
  all_wallets.add(liq['lp_provider'])
2730
  for snapshot in holder_snapshots_list:
2731
- for holder in snapshot.get('holders', []):
2732
- if holder.get('wallet_address'):
2733
- all_wallets.add(holder['wallet_address'])
 
 
 
2734
 
2735
  all_wallets.discard(None)
2736
  all_wallets.discard('')
@@ -2826,11 +2882,18 @@ class OracleDataset(Dataset):
2826
  # Get holder snapshot at T_cutoff
2827
  elapsed = (T_cutoff - t0).total_seconds()
2828
  snap_idx = int(elapsed // 300)
2829
- if 0 <= snap_idx < len(holder_snapshots_list):
2830
- snapshot_data = holder_snapshots_list[snap_idx]
2831
- for holder in snapshot_data.get('holders', []):
2832
- if holder.get('wallet_address'):
2833
- wallets_to_fetch.add(holder['wallet_address'])
 
 
 
 
 
 
 
2834
 
2835
  wallets_to_fetch.discard(None)
2836
  wallets_to_fetch.discard('')
@@ -2896,7 +2959,7 @@ class OracleDataset(Dataset):
2896
  future_trades_for_labels=raw_data['trades'],
2897
  pooler=pooler,
2898
  sample_idx=idx,
2899
- cached_holders_list=None, # Force DB fetch for holder snapshots
2900
  cached_ohlc_1s=ohlc_1s,
2901
  quality_score=None # Will be injected by cache_dataset.py
2902
  )
 
534
  wallet_data: Dict[str, Any],
535
  total_supply_dec: float,
536
  _register_event_fn,
537
+ cached_holders_list: List[Dict[str, Any]] = None
538
  ) -> None:
539
+ if cached_holders_list is None:
540
+ raise RuntimeError(
541
+ f"Missing holder_snapshots_list for token {token_address} in _generate_onchain_snapshots."
542
+ )
543
  # Prepare helper sets and maps (static sniper set based on earliest buyers)
544
  all_buy_trades = sorted([e for e in trade_events if e.get('trade_direction') == 0 and e.get('success', False)], key=lambda x: x['timestamp'])
545
  sniper_wallets = []
 
575
  xfers_win = [e for e in transfer_events if window_start < e['timestamp'] <= ts_value]
576
 
577
  # Per-snapshot holder distribution at ts_value
578
+ if i >= len(cached_holders_list):
579
+ raise RuntimeError(
580
+ f"holder_snapshots_list too short for token {token_address}: need index {i}, len={len(cached_holders_list)}."
 
 
 
 
 
 
 
 
 
581
  )
582
+ snapshot_data = cached_holders_list[i]
583
+ if not isinstance(snapshot_data, dict):
584
+ raise RuntimeError(
585
+ f"Invalid holder snapshot entry type for token {token_address} at index {i}: {type(snapshot_data).__name__}."
586
+ )
587
+ if 'holders' not in snapshot_data or not isinstance(snapshot_data['holders'], list):
588
+ raise RuntimeError(
589
+ f"Malformed holder snapshot for token {token_address} at index {i}: missing list 'holders'."
590
+ )
591
+
592
+ holder_records_ts = snapshot_data['holders']
593
+ holders_end = 0
594
  holder_entries_ts = []
595
  for rec in holder_records_ts:
596
+ if not isinstance(rec, dict):
597
+ raise RuntimeError(
598
+ f"Malformed holder record for token {token_address} at index {i}: expected dict."
599
+ )
600
+ if 'wallet_address' not in rec or 'current_balance' not in rec:
601
+ raise RuntimeError(
602
+ f"Malformed holder record for token {token_address} at index {i}: requires wallet_address/current_balance."
603
+ )
604
+ addr = rec['wallet_address']
605
+ bal = float(rec['current_balance'])
606
  pct = (bal / total_supply_dec) if total_supply_dec and total_supply_dec > 0 else 0.0
607
  if addr and pct > 0.0:
608
  holder_entries_ts.append({'wallet': addr, 'holding_pct': pct})
609
+ holders_end += 1
610
  holder_entries_ts.sort(key=lambda d: d['holding_pct'], reverse=True)
611
 
612
  # Emit HolderSnapshot for this ts_value
 
1356
  elapsed = (T_cutoff - mint_timestamp).total_seconds()
1357
  snap_idx = int(elapsed // 300)
1358
  holder_records = []
1359
+ cached_holders_list = raw_data.get('holder_snapshots_list')
1360
+ if not isinstance(cached_holders_list, list):
1361
+ raise RuntimeError("Invalid cache: holder_snapshots_list must be a list.")
1362
+ if not (0 <= snap_idx < len(cached_holders_list)):
1363
+ raise RuntimeError(
1364
+ f"Invalid cache: holder_snapshots_list index out of range (snap_idx={snap_idx}, len={len(cached_holders_list)})."
1365
+ )
1366
+ snapshot_data = cached_holders_list[snap_idx]
1367
+ if not isinstance(snapshot_data, dict) or not isinstance(snapshot_data.get('holders'), list):
1368
+ raise RuntimeError("Invalid cache: holder snapshot entry must be a dict with list field 'holders'.")
1369
+ holder_records = snapshot_data['holders']
1370
  for holder in holder_records:
1371
+ if not isinstance(holder, dict) or 'wallet_address' not in holder or 'current_balance' not in holder:
1372
+ raise RuntimeError("Invalid cache: each holder record must include wallet_address and current_balance.")
1373
+ _add_wallet(holder['wallet_address'], wallets_to_fetch)
1374
  _timings['wallet_collection'] = _time.perf_counter() - _t0
1375
  _timings['num_wallets'] = len(wallets_to_fetch)
1376
 
 
1890
  future_trades_for_labels: List[Dict[str, Any]],
1891
  pooler: EmbeddingPooler,
1892
  sample_idx: Optional[int] = None,
1893
+ cached_holders_list: List[Dict[str, Any]] = None,
1894
  cached_ohlc_1s: Optional[torch.Tensor] = None,
1895
  quality_score: Optional[float] = None
1896
  ) -> Optional[Dict[str, Any]]:
 
2100
  HIGH_DEF_INTERVAL = ("1s", 1)
2101
  MIDDLE_INTERVAL = ("30s", 30)
2102
 
2103
+ def _emit_chart_segments(trades: List[Dict[str, Any]], interval: tuple, precomputed_ohlc: List[tuple] = None):
2104
  if not trades and precomputed_ohlc is None:
2105
  return []
2106
  interval_label, interval_seconds = interval
 
2127
  'i': interval_label
2128
  }
2129
  emitted_events.append(chart_event)
 
2130
  return emitted_events
2131
 
2132
+ # Build chart candidates (registration deferred until we choose exactly one interval mode)
2133
+ chart_events_1s = []
2134
+ chart_events_30s = []
2135
+
2136
  # Prepare 1s OHLC from cache if available
2137
  ohlc_1s_precomputed = None
2138
  if cached_ohlc_1s is not None and len(cached_ohlc_1s) > 0:
 
2147
  for i, row in enumerate(slice_tensor)
2148
  ]
2149
 
2150
+ chart_events_1s = _emit_chart_segments(high_def_chart_trades, HIGH_DEF_INTERVAL, precomputed_ohlc=ohlc_1s_precomputed)
2151
+ chart_events_30s = _emit_chart_segments(middle_chart_trades, MIDDLE_INTERVAL)
2152
 
2153
  # 5. Process Other Records (Pool, Liquidity, Fees, Burns, Locks, Migrations)
2154
  pool_meta_by_address = {}
 
2344
  _register_event,
2345
  cached_holders_list=cached_holders_list
2346
  )
2347
+
2348
+ # Choose exactly one chart resolution per sample:
2349
+ # - no pressure -> 1s
2350
+ # - pressure -> 30s
2351
+ non_chart_event_count = len(event_sequence_entries)
2352
+ would_exceed = (non_chart_event_count + len(chart_events_1s)) > self.max_seq_len
2353
+ selected_chart_events = chart_events_30s if would_exceed else chart_events_1s
2354
+ selected_chart_signature = "chart-mid" if would_exceed else "chart-hd"
2355
+ for chart_idx, chart_event in enumerate(selected_chart_events):
2356
+ _register_event(
2357
+ chart_event,
2358
+ _event_execution_sort_key(chart_event['timestamp'], signature=f"{selected_chart_signature}-{chart_idx}")
2359
+ )
2360
+
2361
  # 7. Finalize Sequence with Dynamic Sampling
2362
  event_sequence_entries.sort(key=lambda x: x[0])
2363
  raw_event_sequence = [entry[1] for entry in event_sequence_entries]
 
2654
 
2655
  raw_data['ohlc_1s'] = ohlc_1s
2656
 
2657
+ # Generate holder snapshots from deterministic trade-ledger reconstruction.
2658
  interval = 300
2659
  num_intervals = (duration_seconds // interval) + 1
2660
  snapshot_stats = torch.zeros((num_intervals, 6), dtype=torch.float32)
 
2666
  if bucket_idx >= 0:
2667
  buckets[bucket_idx].append(t)
2668
 
2669
+ raw_total_supply = raw_data.get('total_supply')
2670
+ raw_decimals = raw_data.get('decimals')
2671
+ if raw_total_supply is None or raw_decimals is None:
2672
+ raise RuntimeError("Missing token total_supply/decimals required for holder snapshot reconstruction.")
2673
+ total_supply_raw = int(raw_total_supply)
2674
+ token_decimals = int(raw_decimals)
2675
+ if total_supply_raw <= 0:
2676
+ raise RuntimeError(f"Invalid total_supply for {token_address}: {total_supply_raw}")
2677
+ if token_decimals < 0:
2678
+ raise RuntimeError(f"Invalid decimals for {token_address}: {token_decimals}")
2679
+ token_scale = 10 ** token_decimals
2680
+
2681
+ def _strict_int(v: Any, field_name: str) -> int:
2682
+ if v is None:
2683
+ raise RuntimeError(f"Missing {field_name} in trade record for {token_address}.")
2684
+ try:
2685
+ return int(v)
2686
+ except Exception as e:
2687
+ raise RuntimeError(f"Invalid {field_name} in trade record for {token_address}: {v}") from e
 
2688
 
2689
+ def _trade_sort_key_for_ledger(trade: Dict[str, Any]) -> tuple:
2690
+ return (
2691
+ _timestamp_to_order_value(trade.get('timestamp')),
2692
+ _strict_int(trade.get('slot', 0), 'slot'),
2693
+ _strict_int(trade.get('transaction_index', 0), 'transaction_index'),
2694
+ _strict_int(trade.get('instruction_index', 0), 'instruction_index'),
2695
+ str(trade.get('signature') or '')
2696
+ )
 
 
 
 
 
 
 
 
 
 
 
 
2697
 
2698
+ ledger_trades = []
2699
+ for trade in trades:
2700
+ if not trade.get('success', False):
2701
+ continue
2702
+ maker = trade.get('maker')
2703
+ if not maker:
2704
+ raise RuntimeError(f"Missing maker in successful trade for {token_address}.")
2705
+ trade_type = _strict_int(trade.get('trade_type'), 'trade_type')
2706
+ if trade_type not in (0, 1):
2707
+ raise RuntimeError(f"Invalid trade_type={trade_type} for {token_address}; expected 0/1.")
2708
+ base_amount_raw = _strict_int(trade.get('base_amount'), 'base_amount')
2709
+ if base_amount_raw < 0:
2710
+ raise RuntimeError(f"Invalid negative base_amount={base_amount_raw} for {token_address}.")
2711
+ ledger_trades.append((trade, maker, trade_type, base_amount_raw))
2712
+
2713
+ ledger_trades.sort(key=lambda x: _trade_sort_key_for_ledger(x[0]))
2714
+ wallet_balances_raw: Dict[str, int] = {}
2715
+ ledger_idx = 0
2716
  holder_snapshots_list = []
2717
+
2718
  for i in range(num_intervals):
2719
  bucket_trades = buckets[i]
2720
  vol = sum(t.get('total_usd', 0.0) for t in bucket_trades)
2721
  tx = len(bucket_trades)
2722
  buys = sum(1 for t in bucket_trades if t.get('trade_direction') == 0 or t.get('trade_type') == 0)
2723
  sells = tx - buys
2724
+ snapshot_ts_epoch = t0_val + ((i + 1) * interval)
2725
+
2726
+ while ledger_idx < len(ledger_trades):
2727
+ trade, maker, trade_type, base_amount_raw = ledger_trades[ledger_idx]
2728
+ trade_ts = _timestamp_to_order_value(trade.get('timestamp'))
2729
+ if trade_ts > snapshot_ts_epoch:
2730
+ break
2731
+ signed_delta = base_amount_raw if trade_type == 0 else -base_amount_raw
2732
+ wallet_balances_raw[maker] = wallet_balances_raw.get(maker, 0) + signed_delta
2733
+ ledger_idx += 1
2734
+
2735
+ positive_holders_raw = [(wallet, bal) for wallet, bal in wallet_balances_raw.items() if bal > 0]
2736
+ positive_holders_raw.sort(key=lambda item: (-item[1], item[0]))
2737
+ holders_topk_raw = positive_holders_raw[:HOLDER_SNAPSHOT_TOP_K]
2738
+ count = len(positive_holders_raw)
2739
+ top10_sum_raw = sum(bal for _, bal in positive_holders_raw[:10])
2740
+ top10_pct = float(top10_sum_raw) / float(total_supply_raw)
2741
 
2742
  snapshot_stats[i, 0] = float(vol)
2743
  snapshot_stats[i, 1] = float(tx)
2744
  snapshot_stats[i, 2] = float(buys)
2745
  snapshot_stats[i, 3] = float(sells)
2746
  snapshot_stats[i, 4] = float(count)
2747
+ snapshot_stats[i, 5] = float(top10_pct)
2748
 
2749
  snapshot_ts = t0 + datetime.timedelta(seconds=(i+1)*interval)
2750
  holder_snapshots_list.append({
2751
  'timestamp': int(snapshot_ts.timestamp()),
2752
+ 'holders': [
2753
+ {
2754
+ 'wallet_address': wallet,
2755
+ 'current_balance': float(balance_raw) / float(token_scale)
2756
+ }
2757
+ for wallet, balance_raw in holders_topk_raw
2758
+ ]
2759
  })
2760
 
2761
  raw_data['snapshots_5m'] = snapshot_stats
 
2781
  if liq.get('lp_provider'):
2782
  all_wallets.add(liq['lp_provider'])
2783
  for snapshot in holder_snapshots_list:
2784
+ if not isinstance(snapshot, dict) or not isinstance(snapshot.get('holders'), list):
2785
+ raise RuntimeError("Invalid holder_snapshots_list entry during wallet collection.")
2786
+ for holder in snapshot['holders']:
2787
+ if not isinstance(holder, dict) or 'wallet_address' not in holder or 'current_balance' not in holder:
2788
+ raise RuntimeError("Invalid holder record during wallet collection.")
2789
+ all_wallets.add(holder['wallet_address'])
2790
 
2791
  all_wallets.discard(None)
2792
  all_wallets.discard('')
 
2882
  # Get holder snapshot at T_cutoff
2883
  elapsed = (T_cutoff - t0).total_seconds()
2884
  snap_idx = int(elapsed // 300)
2885
+ if not (0 <= snap_idx < len(holder_snapshots_list)):
2886
+ raise RuntimeError(
2887
+ f"holder_snapshots_list index out of range in __cacheitem_context__ "
2888
+ f"(snap_idx={snap_idx}, len={len(holder_snapshots_list)})."
2889
+ )
2890
+ snapshot_data = holder_snapshots_list[snap_idx]
2891
+ if not isinstance(snapshot_data, dict) or not isinstance(snapshot_data.get('holders'), list):
2892
+ raise RuntimeError("Invalid holder snapshot entry in __cacheitem_context__.")
2893
+ for holder in snapshot_data['holders']:
2894
+ if not isinstance(holder, dict) or 'wallet_address' not in holder or 'current_balance' not in holder:
2895
+ raise RuntimeError("Invalid holder record in __cacheitem_context__.")
2896
+ wallets_to_fetch.add(holder['wallet_address'])
2897
 
2898
  wallets_to_fetch.discard(None)
2899
  wallets_to_fetch.discard('')
 
2959
  future_trades_for_labels=raw_data['trades'],
2960
  pooler=pooler,
2961
  sample_idx=idx,
2962
+ cached_holders_list=holder_snapshots_list,
2963
  cached_ohlc_1s=ohlc_1s,
2964
  quality_score=None # Will be injected by cache_dataset.py
2965
  )
data/ohlc_stats.npz CHANGED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:1d6de1f1622fbf061842fa4227e2b98784c9cec39ed647f4b87df2ad5eef6e47
3
  size 1660
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:f6a84b63ec605e83a655f404bc89d825aa8ffbb5ac3ea24c7d2197324646d016
3
  size 1660
sample_5eeBz7qY2sc8iGRc_0.json ADDED
The diff for this file is too large to render. See raw diff