zirobtc commited on
Commit
23349c5
·
1 Parent(s): 2063c38

Upload folder using huggingface_hub

Browse files
data/data_loader.py CHANGED
@@ -571,7 +571,7 @@ class OracleDataset(Dataset):
571
  buyers_seen_global = set()
572
  prev_holders_count = 0
573
 
574
- for snapshot_data in cached_holders_list:
575
  if not isinstance(snapshot_data, dict):
576
  continue
577
  ts_value = snapshot_data.get('timestamp')
@@ -582,6 +582,10 @@ class OracleDataset(Dataset):
582
  trades_win = [e for e in trade_events if e.get('success', False) and window_start < e['timestamp'] <= ts_value]
583
  xfers_win = [e for e in transfer_events if window_start < e['timestamp'] <= ts_value]
584
 
 
 
 
 
585
  if 'holders' not in snapshot_data or not isinstance(snapshot_data['holders'], list):
586
  continue
587
 
@@ -612,7 +616,11 @@ class OracleDataset(Dataset):
612
  'relative_ts': ts_value - t0_timestamp,
613
  'holders': holder_entries_ts
614
  }
615
- _register_event_fn(hs_event, self._event_execution_sort_key(ts_value, signature='HolderSnapshot') if hasattr(self, '_event_execution_sort_key') else (ts_value, 0, 0, 0, 'HolderSnapshot'))
 
 
 
 
616
 
617
  holder_pct_map_ts = {d['wallet']: d['holding_pct'] for d in holder_entries_ts}
618
  top10_holder_pct = sum(d['holding_pct'] for d in holder_entries_ts[:10]) if holder_entries_ts else 0.0
@@ -678,7 +686,11 @@ class OracleDataset(Dataset):
678
  'total_txns': float(total_txns),
679
  'global_fees_paid': float(global_fees_paid)
680
  }
681
- _register_event_fn(oc_event, self._event_execution_sort_key(ts_value, signature='OnChain_Snapshot') if hasattr(self, '_event_execution_sort_key') else (ts_value, 0, 0, 0, 'OnChain_Snapshot'))
 
 
 
 
682
 
683
  def _calculate_deployed_token_stats(self, profiles: Dict[str, Dict[str, Any]], T_cutoff: datetime.datetime):
684
  """
@@ -1132,27 +1144,13 @@ class OracleDataset(Dataset):
1132
 
1133
  end_ts = int(T_cutoff.timestamp())
1134
 
1135
- # Find the first available price for forward-filling
1136
- # Use the earliest trade price from intervals >= start_ts
1137
- last_price = None
1138
  for interval_ts in sorted_intervals:
1139
- if interval_ts >= start_ts:
1140
- last_price = trades_by_interval[interval_ts][0]
1141
- break
1142
-
1143
- # Fallback to first trade price if no intervals after start_ts
1144
- if last_price is None:
1145
- last_price = aggregation_trades[0]['price_usd']
1146
-
1147
- for ts in range(start_ts, end_ts + 1, interval_seconds):
1148
- if ts in trades_by_interval:
1149
- prices = trades_by_interval[ts]
1150
  open_price = prices[0]
1151
  close_price = prices[-1]
1152
- full_ohlc.append((ts, open_price, close_price))
1153
- last_price = close_price
1154
- else:
1155
- full_ohlc.append((ts, last_price, last_price))
1156
  return full_ohlc
1157
 
1158
  def __getitem__(self, idx: int) -> Optional[Dict[str, Any]]:
@@ -2151,8 +2149,8 @@ class OracleDataset(Dataset):
2151
  closes_raw = [s[2] for s in segment]
2152
  chart_event = {
2153
  'event_type': 'Chart_Segment',
2154
- 'timestamp': last_ts,
2155
- 'relative_ts': last_ts - t0_timestamp,
2156
  'opens': self._normalize_price_series(opens_raw),
2157
  'closes': self._normalize_price_series(closes_raw),
2158
  'i': interval_label
@@ -2164,21 +2162,9 @@ class OracleDataset(Dataset):
2164
  chart_events_1s = []
2165
  chart_events_30s = []
2166
 
2167
- # Prepare 1s OHLC from cache if available
2168
- ohlc_1s_precomputed = None
2169
- if cached_ohlc_1s is not None and len(cached_ohlc_1s) > 0:
2170
- # Calculate limit based on T_cutoff relative to t0
2171
- duration_limit = int(T_cutoff.timestamp() - t0_timestamp) + 1
2172
- limit = min(duration_limit, len(cached_ohlc_1s))
2173
-
2174
- if limit > 0:
2175
- slice_tensor = cached_ohlc_1s[:limit]
2176
- ohlc_1s_precomputed = [
2177
- (t0_timestamp + i, float(row[0]), float(row[1]))
2178
- for i, row in enumerate(slice_tensor)
2179
- ]
2180
-
2181
- chart_events_1s = _emit_chart_segments(high_def_chart_trades, HIGH_DEF_INTERVAL, precomputed_ohlc=ohlc_1s_precomputed)
2182
  chart_events_30s = _emit_chart_segments(middle_chart_trades, MIDDLE_INTERVAL)
2183
 
2184
  # 5. Process Other Records (Pool, Liquidity, Fees, Burns, Locks, Migrations)
@@ -2364,6 +2350,53 @@ class OracleDataset(Dataset):
2364
  )
2365
  )
2366
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2367
  # 6. Generate Snapshots
2368
  self._generate_onchain_snapshots(
2369
  token_address, int(t0_timestamp), T_cutoff,
@@ -2373,7 +2406,7 @@ class OracleDataset(Dataset):
2373
  wallet_data,
2374
  total_supply_dec,
2375
  _register_event,
2376
- cached_holders_list=cached_holders_list
2377
  )
2378
 
2379
  # Choose exactly one chart resolution per sample:
@@ -2733,31 +2766,10 @@ class OracleDataset(Dataset):
2733
  t0_val = _timestamp_to_order_value(t0)
2734
  last_trade_ts_val = max(trade_ts_values)
2735
 
 
 
2736
  duration_seconds = int(last_trade_ts_val - t0_val) + 120
2737
- ohlc_1s = torch.zeros((duration_seconds, 2), dtype=torch.float32)
2738
-
2739
- trades.sort(key=lambda x: _timestamp_to_order_value(x['timestamp']))
2740
- trades_by_sec = defaultdict(list)
2741
- for t in trades:
2742
- if not t.get('success', False) or float(t.get('price_usd', 0.0) or 0.0) <= 0:
2743
- continue
2744
- ts = _timestamp_to_order_value(t['timestamp'])
2745
- sec_idx = int(ts - t0_val)
2746
- if 0 <= sec_idx < duration_seconds:
2747
- trades_by_sec[sec_idx].append(t['price_usd'])
2748
-
2749
- last_close = float(trades[0]['price_usd'])
2750
- for i in range(duration_seconds):
2751
- if i in trades_by_sec:
2752
- prices = trades_by_sec[i]
2753
- op, cl = prices[0], prices[-1]
2754
- last_close = cl
2755
- else:
2756
- op = cl = last_close
2757
- ohlc_1s[i, 0] = float(op)
2758
- ohlc_1s[i, 1] = float(cl)
2759
-
2760
- raw_data['ohlc_1s'] = ohlc_1s
2761
 
2762
  # Generate holder snapshots from deterministic trade-ledger reconstruction.
2763
  interval = 300
@@ -3065,7 +3077,7 @@ class OracleDataset(Dataset):
3065
  pooler=pooler,
3066
  sample_idx=idx,
3067
  cached_holders_list=holder_snapshots_list,
3068
- cached_ohlc_1s=ohlc_1s,
3069
  quality_score=None # Will be injected by cache_dataset.py
3070
  )
3071
 
 
571
  buyers_seen_global = set()
572
  prev_holders_count = 0
573
 
574
+ for i, snapshot_data in enumerate(cached_holders_list):
575
  if not isinstance(snapshot_data, dict):
576
  continue
577
  ts_value = snapshot_data.get('timestamp')
 
582
  trades_win = [e for e in trade_events if e.get('success', False) and window_start < e['timestamp'] <= ts_value]
583
  xfers_win = [e for e in transfer_events if window_start < e['timestamp'] <= ts_value]
584
 
585
+ # SPARSE SNAPSHOTS: Skip if absolutely nothing happened in this 5 minute window
586
+ if not trades_win and not xfers_win:
587
+ continue
588
+
589
  if 'holders' not in snapshot_data or not isinstance(snapshot_data['holders'], list):
590
  continue
591
 
 
616
  'relative_ts': ts_value - t0_timestamp,
617
  'holders': holder_entries_ts
618
  }
619
+ _register_event_fn(
620
+ hs_event,
621
+ self._event_execution_sort_key(ts_value, slot=10**12, transaction_index=10**9, signature='HolderSnapshot')
622
+ if hasattr(self, '_event_execution_sort_key') else (ts_value, 10**12, 10**9, 0, 'HolderSnapshot')
623
+ )
624
 
625
  holder_pct_map_ts = {d['wallet']: d['holding_pct'] for d in holder_entries_ts}
626
  top10_holder_pct = sum(d['holding_pct'] for d in holder_entries_ts[:10]) if holder_entries_ts else 0.0
 
686
  'total_txns': float(total_txns),
687
  'global_fees_paid': float(global_fees_paid)
688
  }
689
+ _register_event_fn(
690
+ oc_event,
691
+ self._event_execution_sort_key(ts_value, slot=10**12, transaction_index=10**9, signature='OnChain_Snapshot')
692
+ if hasattr(self, '_event_execution_sort_key') else (ts_value, 10**12, 10**9, 0, 'OnChain_Snapshot')
693
+ )
694
 
695
  def _calculate_deployed_token_stats(self, profiles: Dict[str, Dict[str, Any]], T_cutoff: datetime.datetime):
696
  """
 
1144
 
1145
  end_ts = int(T_cutoff.timestamp())
1146
 
 
 
 
1147
  for interval_ts in sorted_intervals:
1148
+ if start_ts <= interval_ts <= end_ts:
1149
+ prices = trades_by_interval[interval_ts]
 
 
 
 
 
 
 
 
 
1150
  open_price = prices[0]
1151
  close_price = prices[-1]
1152
+ full_ohlc.append((interval_ts, open_price, close_price))
1153
+
 
 
1154
  return full_ohlc
1155
 
1156
  def __getitem__(self, idx: int) -> Optional[Dict[str, Any]]:
 
2149
  closes_raw = [s[2] for s in segment]
2150
  chart_event = {
2151
  'event_type': 'Chart_Segment',
2152
+ 'timestamp': int(last_ts),
2153
+ 'relative_ts': int(last_ts) - int(t0_timestamp),
2154
  'opens': self._normalize_price_series(opens_raw),
2155
  'closes': self._normalize_price_series(closes_raw),
2156
  'i': interval_label
 
2162
  chart_events_1s = []
2163
  chart_events_30s = []
2164
 
2165
+ # Build chart candidates (registration deferred until we choose exactly one interval mode)
2166
+ # We process sparse native charts using _generate_ohlc for both 1s and 30s
2167
+ chart_events_1s = _emit_chart_segments(high_def_chart_trades, HIGH_DEF_INTERVAL)
 
 
 
 
 
 
 
 
 
 
 
 
2168
  chart_events_30s = _emit_chart_segments(middle_chart_trades, MIDDLE_INTERVAL)
2169
 
2170
  # 5. Process Other Records (Pool, Liquidity, Fees, Burns, Locks, Migrations)
 
2350
  )
2351
  )
2352
 
2353
+ # --- ADD DYNAMIC T_CUTOFF SNAPSHOT ---
2354
+ # Evaluate balances exactly up to T_cutoff using the filtered trade_records
2355
+ wallet_balances_raw = {}
2356
+ for trade in trade_records:
2357
+ if not trade.get('success', False):
2358
+ continue
2359
+ maker = trade.get('maker')
2360
+ if not maker:
2361
+ continue
2362
+ try:
2363
+ trade_type = int(trade.get('trade_type', 0))
2364
+ base_amount_raw = int(trade.get('base_amount', 0))
2365
+ except:
2366
+ continue
2367
+ if trade_type not in (0, 1) or base_amount_raw < 0:
2368
+ continue
2369
+ signed_delta = base_amount_raw if trade_type == 0 else -base_amount_raw
2370
+ wallet_balances_raw[maker] = wallet_balances_raw.get(maker, 0) + signed_delta
2371
+
2372
+ positive_holders_raw = [(w, b) for w, b in wallet_balances_raw.items() if b > 0]
2373
+ positive_holders_raw.sort(key=lambda item: (-item[1], item[0]))
2374
+ holders_topk_raw = positive_holders_raw[:HOLDER_SNAPSHOT_TOP_K]
2375
+
2376
+ cutoff_ts_epoch = int(T_cutoff.timestamp())
2377
+ token_scale = 10 ** base_decimals if base_decimals else 1
2378
+
2379
+ cutoff_snapshot = {
2380
+ 'timestamp': cutoff_ts_epoch,
2381
+ 'holders': [
2382
+ {
2383
+ 'wallet_address': w,
2384
+ 'current_balance': float(b) / float(token_scale)
2385
+ }
2386
+ for w, b in holders_topk_raw
2387
+ ]
2388
+ }
2389
+
2390
+ # Create a local copy of cached_holders_list up to T_cutoff
2391
+ local_holders_list = [
2392
+ snap for snap in (cached_holders_list or [])
2393
+ if snap.get('timestamp', 0) < cutoff_ts_epoch
2394
+ ]
2395
+
2396
+ # Append our precise T_cutoff snapshot at the end
2397
+ if not local_holders_list or local_holders_list[-1]['timestamp'] != cutoff_ts_epoch:
2398
+ local_holders_list.append(cutoff_snapshot)
2399
+
2400
  # 6. Generate Snapshots
2401
  self._generate_onchain_snapshots(
2402
  token_address, int(t0_timestamp), T_cutoff,
 
2406
  wallet_data,
2407
  total_supply_dec,
2408
  _register_event,
2409
+ cached_holders_list=local_holders_list
2410
  )
2411
 
2412
  # Choose exactly one chart resolution per sample:
 
2766
  t0_val = _timestamp_to_order_value(t0)
2767
  last_trade_ts_val = max(trade_ts_values)
2768
 
2769
+ # Disable dense OHLC 1s precomputation.
2770
+ # Chart_Segment will now generate sparse OHLC at runtime.
2771
  duration_seconds = int(last_trade_ts_val - t0_val) + 120
2772
+ raw_data['ohlc_1s'] = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2773
 
2774
  # Generate holder snapshots from deterministic trade-ledger reconstruction.
2775
  interval = 300
 
3077
  pooler=pooler,
3078
  sample_idx=idx,
3079
  cached_holders_list=holder_snapshots_list,
3080
+ cached_ohlc_1s=None,
3081
  quality_score=None # Will be injected by cache_dataset.py
3082
  )
3083
 
data/ohlc_stats.npz CHANGED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:720bc76c8bfa9f5c1dbb0b2a3a575b8226c761160136c4b108c08ae6cbb6c299
3
  size 1660
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:87f6e823bed45b3e399d6fe2ab46f3297d80a623e5a41cca785a60c5a7db067d
3
  size 1660
ingest.sh CHANGED
@@ -20,7 +20,7 @@ error() { echo -e "${RED}[ERROR]${NC} $1"; exit 1; }
20
  #===============================================================================
21
  header "Step 5-6/7: Processing Epochs (Download → Ingest → Delete)"
22
 
23
- EPOCHS=(844 845 846 847 848 849)
24
 
25
 
26
  log "Processing epochs one at a time to minimize disk usage..."
@@ -41,7 +41,7 @@ for epoch in "${EPOCHS[@]}"; do
41
 
42
  # Step 2: Ingest (always pass --merge-neo4j; auto-detect handles empty DB)
43
  log " [2/3] Ingesting epoch ${epoch} into databases..."
44
- python scripts/ingest_epoch.py --epoch "$epoch" --merge-neo4j -c || {
45
  error "Ingestion failed for epoch ${epoch}. Cannot continue."
46
  }
47
 
 
20
  #===============================================================================
21
  header "Step 5-6/7: Processing Epochs (Download → Ingest → Delete)"
22
 
23
+ EPOCHS=(844)
24
 
25
 
26
  log "Processing epochs one at a time to minimize disk usage..."
 
41
 
42
  # Step 2: Ingest (always pass --merge-neo4j; auto-detect handles empty DB)
43
  log " [2/3] Ingesting epoch ${epoch} into databases..."
44
+ python scripts/ingest_epoch.py --epoch "$epoch" --merge-neo4j || {
45
  error "Ingestion failed for epoch ${epoch}. Cannot continue."
46
  }
47
 
log.log CHANGED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:27af4460defe3832c7bb709538efa4908bd370b202bad245473b23b14648e2db
3
- size 2881
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:c605ecab2de1c8c8442dda85ada5345b9d6ba43aae4095130f1d92ce6261c127
3
+ size 44400
sample_3nUWyakgm159j1vT_0.json ADDED
The diff for this file is too large to render. See raw diff
 
test_loader.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ import datetime
2
+ from data.data_loader import OracleDataset
3
+ from data.data_fetcher import DataFetcher
4
+ from clickhouse_driver import Client as ClickHouseClient
5
+ from neo4j import GraphDatabase
6
+
7
+ # Since there are dependencies and db clients needed, we can just run the pre-existing cache script on a small sample to test