Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from typing import List, Dict, Any, Union | |
| import pandas as pd | |
| def save_market_data(data: List[List[Union[int, float]]], file_path: str) -> bool: | |
| """ | |
| Save market data to a JSON file | |
| Args: | |
| data: List of OHLCV candles | |
| file_path: Path to save the data | |
| Returns: | |
| bool: True if saved successfully | |
| """ | |
| try: | |
| # Create directory if it doesn't exist | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| with open(file_path, 'w') as f: | |
| json.dump(data, f) | |
| return True | |
| except Exception as e: | |
| print(f"Error saving market data: {str(e)}") | |
| return False | |
| def load_market_data(file_path: str) -> List[List[Union[int, float]]]: | |
| """ | |
| Load market data from a JSON file | |
| Args: | |
| file_path: Path to the data file | |
| Returns: | |
| List of OHLCV candles | |
| """ | |
| try: | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| return data | |
| except Exception as e: | |
| print(f"Error loading market data: {str(e)}") | |
| return [] | |
| def get_available_data(directory: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get list of available data files | |
| Args: | |
| directory: Directory to scan | |
| Returns: | |
| List of data file information | |
| """ | |
| result = [] | |
| try: | |
| # Create directory if it doesn't exist | |
| os.makedirs(directory, exist_ok=True) | |
| # List files in directory | |
| files = os.listdir(directory) | |
| for file in files: | |
| if file.endswith('.json'): | |
| # Parse file name to extract information | |
| parts = file.replace('.json', '').split('_') | |
| if len(parts) >= 3: | |
| exchange_id = parts[0] | |
| # Reconstruct symbol (may contain underscores) | |
| timeframe_index = None | |
| for i, part in enumerate(parts[2:], start=2): | |
| if part in ['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w', '1M']: | |
| timeframe_index = i | |
| break | |
| if timeframe_index is not None: | |
| symbol_parts = parts[1:timeframe_index] | |
| symbol = '/'.join(symbol_parts) if len(symbol_parts) > 1 else symbol_parts[0] | |
| timeframe = parts[timeframe_index] | |
| # Get file stats | |
| file_path = os.path.join(directory, file) | |
| file_size = os.path.getsize(file_path) | |
| modified_time = os.path.getmtime(file_path) | |
| # Get candle count | |
| try: | |
| data = load_market_data(file_path) | |
| candle_count = len(data) | |
| except: | |
| candle_count = 0 | |
| result.append({ | |
| 'exchange': exchange_id, | |
| 'symbol': symbol, | |
| 'timeframe': timeframe, | |
| 'filename': file, | |
| 'size_kb': round(file_size / 1024, 2), | |
| 'modified': modified_time, | |
| 'candle_count': candle_count | |
| }) | |
| return result | |
| except Exception as e: | |
| print(f"Error getting available data: {str(e)}") | |
| return [] | |