Spaces:
Sleeping
Sleeping
Upload streaming_loader.py with huggingface_hub
Browse files- streaming_loader.py +9 -4
streaming_loader.py
CHANGED
|
@@ -19,7 +19,8 @@ class StreamingDataLoader:
|
|
| 19 |
model_type: str = "deeplob",
|
| 20 |
batch_size: int = 32,
|
| 21 |
chunk_size: int = 500, # Reduced to ensure frequent yields
|
| 22 |
-
buffer_size: int = 200
|
|
|
|
| 23 |
"""
|
| 24 |
Args:
|
| 25 |
repo_id: HF Dataset ID
|
|
@@ -27,12 +28,14 @@ class StreamingDataLoader:
|
|
| 27 |
batch_size: Training batch size
|
| 28 |
chunk_size: Rows per processing chunk
|
| 29 |
buffer_size: Overlap size to maintain rolling stats continuity
|
|
|
|
| 30 |
"""
|
| 31 |
self.repo_id = repo_id
|
| 32 |
self.model_type = model_type
|
| 33 |
self.batch_size = batch_size
|
| 34 |
self.chunk_size = chunk_size
|
| 35 |
self.buffer_size = buffer_size
|
|
|
|
| 36 |
|
| 37 |
self.processor = AlphaDataProcessor()
|
| 38 |
|
|
@@ -40,7 +43,7 @@ class StreamingDataLoader:
|
|
| 40 |
"""
|
| 41 |
Yields batches of (X, y) tensors from the stream.
|
| 42 |
"""
|
| 43 |
-
print(f"π‘ Connecting to HF Dataset Stream: {self.repo_id}")
|
| 44 |
token = os.environ.get("HF_TOKEN")
|
| 45 |
|
| 46 |
try:
|
|
@@ -70,16 +73,18 @@ class StreamingDataLoader:
|
|
| 70 |
f for f in files
|
| 71 |
if (f.startswith("data/bar/") or f.startswith("data/candles/"))
|
| 72 |
and f.endswith(".parquet")
|
|
|
|
| 73 |
]
|
| 74 |
-
print(f"π Found {len(target_files)} Bar/Candle files for LSTM.")
|
| 75 |
else:
|
| 76 |
# Use L2 Snapshots for DeepLOB/TRM (Support both v1 'order_book_snapshot' and v2 'l2book')
|
| 77 |
target_files = [
|
| 78 |
f for f in files
|
| 79 |
if ("order_book_snapshot" in f or "l2book" in f)
|
| 80 |
and f.endswith(".parquet")
|
|
|
|
| 81 |
]
|
| 82 |
-
print(f"π Found {len(target_files)} Snapshot/L2Book files for {self.model_type}.")
|
| 83 |
|
| 84 |
if not target_files:
|
| 85 |
raise RuntimeError(f"No valid training files found for {self.model_type} in {self.repo_id}")
|
|
|
|
| 19 |
model_type: str = "deeplob",
|
| 20 |
batch_size: int = 32,
|
| 21 |
chunk_size: int = 500, # Reduced to ensure frequent yields
|
| 22 |
+
buffer_size: int = 200, # Reduced buffer
|
| 23 |
+
coin: str = "ETH"): # Filter by Symbol (CRITICAL FIX)
|
| 24 |
"""
|
| 25 |
Args:
|
| 26 |
repo_id: HF Dataset ID
|
|
|
|
| 28 |
batch_size: Training batch size
|
| 29 |
chunk_size: Rows per processing chunk
|
| 30 |
buffer_size: Overlap size to maintain rolling stats continuity
|
| 31 |
+
coin: Symbol to filter (e.g. "ETH", "BTC")
|
| 32 |
"""
|
| 33 |
self.repo_id = repo_id
|
| 34 |
self.model_type = model_type
|
| 35 |
self.batch_size = batch_size
|
| 36 |
self.chunk_size = chunk_size
|
| 37 |
self.buffer_size = buffer_size
|
| 38 |
+
self.coin = coin
|
| 39 |
|
| 40 |
self.processor = AlphaDataProcessor()
|
| 41 |
|
|
|
|
| 43 |
"""
|
| 44 |
Yields batches of (X, y) tensors from the stream.
|
| 45 |
"""
|
| 46 |
+
print(f"π‘ Connecting to HF Dataset Stream: {self.repo_id} (Filter: {self.coin})")
|
| 47 |
token = os.environ.get("HF_TOKEN")
|
| 48 |
|
| 49 |
try:
|
|
|
|
| 73 |
f for f in files
|
| 74 |
if (f.startswith("data/bar/") or f.startswith("data/candles/"))
|
| 75 |
and f.endswith(".parquet")
|
| 76 |
+
and self.coin in f # STRICT FILTER
|
| 77 |
]
|
| 78 |
+
print(f"π Found {len(target_files)} Bar/Candle files for LSTM (Symbol: {self.coin}).")
|
| 79 |
else:
|
| 80 |
# Use L2 Snapshots for DeepLOB/TRM (Support both v1 'order_book_snapshot' and v2 'l2book')
|
| 81 |
target_files = [
|
| 82 |
f for f in files
|
| 83 |
if ("order_book_snapshot" in f or "l2book" in f)
|
| 84 |
and f.endswith(".parquet")
|
| 85 |
+
and self.coin in f # STRICT FILTER
|
| 86 |
]
|
| 87 |
+
print(f"π Found {len(target_files)} Snapshot/L2Book files for {self.model_type} (Symbol: {self.coin}).")
|
| 88 |
|
| 89 |
if not target_files:
|
| 90 |
raise RuntimeError(f"No valid training files found for {self.model_type} in {self.repo_id}")
|