| import os |
| import json |
| from typing import List, Dict, Any, Union |
| import pandas as pd |
|
|
| def save_market_data(data: List[List[Union[int, float]]], file_path: str) -> bool: |
| """ |
| Save market data to a JSON file |
| |
| Args: |
| data: List of OHLCV candles |
| file_path: Path to save the data |
| |
| Returns: |
| bool: True if saved successfully |
| """ |
| try: |
| |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) |
| |
| with open(file_path, 'w') as f: |
| json.dump(data, f) |
| |
| return True |
| except Exception as e: |
| print(f"Error saving market data: {str(e)}") |
| return False |
|
|
| def load_market_data(file_path: str) -> List[List[Union[int, float]]]: |
| """ |
| Load market data from a JSON file |
| |
| Args: |
| file_path: Path to the data file |
| |
| Returns: |
| List of OHLCV candles |
| """ |
| try: |
| with open(file_path, 'r') as f: |
| data = json.load(f) |
| |
| return data |
| except Exception as e: |
| print(f"Error loading market data: {str(e)}") |
| return [] |
|
|
| def get_available_data(directory: str) -> List[Dict[str, Any]]: |
| """ |
| Get list of available data files |
| |
| Args: |
| directory: Directory to scan |
| |
| Returns: |
| List of data file information |
| """ |
| result = [] |
| |
| try: |
| |
| os.makedirs(directory, exist_ok=True) |
| |
| |
| files = os.listdir(directory) |
| |
| for file in files: |
| if file.endswith('.json'): |
| |
| parts = file.replace('.json', '').split('_') |
| |
| if len(parts) >= 3: |
| exchange_id = parts[0] |
| |
| |
| timeframe_index = None |
| for i, part in enumerate(parts[2:], start=2): |
| if part in ['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w', '1M']: |
| timeframe_index = i |
| break |
| |
| if timeframe_index is not None: |
| symbol_parts = parts[1:timeframe_index] |
| symbol = '/'.join(symbol_parts) if len(symbol_parts) > 1 else symbol_parts[0] |
| timeframe = parts[timeframe_index] |
| |
| |
| file_path = os.path.join(directory, file) |
| file_size = os.path.getsize(file_path) |
| modified_time = os.path.getmtime(file_path) |
| |
| |
| try: |
| data = load_market_data(file_path) |
| candle_count = len(data) |
| except: |
| candle_count = 0 |
| |
| result.append({ |
| 'exchange': exchange_id, |
| 'symbol': symbol, |
| 'timeframe': timeframe, |
| 'filename': file, |
| 'size_kb': round(file_size / 1024, 2), |
| 'modified': modified_time, |
| 'candle_count': candle_count |
| }) |
| |
| return result |
| except Exception as e: |
| print(f"Error getting available data: {str(e)}") |
| return [] |
|
|