Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import ccxt | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from huggingface_hub import HfApi | |
| import os | |
| from tqdm import tqdm | |
| import time | |
| import logging | |
| import threading | |
| import schedule | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configuration | |
| HF_REPO_ID = "Yllvar/qubit-historical-data" | |
| SYMBOLS = ['BTC/USDT', 'ETH/USDT', 'ADA/USDT', 'SOL/USDT', 'XRP/USDT'] | |
| TIMEFRAMES = ['1h', '1d'] | |
| DATA_TYPES = ['spot', 'futures'] | |
| # Shared variable to store latest output | |
| latest_output = [] | |
| def fetch_and_upload_data(): | |
| global latest_output | |
| output_messages = [] | |
| try: | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| if not HF_TOKEN: | |
| return "Error: HF_TOKEN not found in environment variables" | |
| output_messages.append("Data fetching process started...") | |
| latest_output = output_messages.copy() | |
| # Initialize exchange and fetch data | |
| since = int((datetime.now() - timedelta(days=365)).timestamp() * 1000) | |
| for data_type in DATA_TYPES: | |
| exchange = ccxt.binance({ | |
| 'enableRateLimit': True, | |
| 'options': { | |
| 'defaultType': data_type | |
| } | |
| }) | |
| exchange.load_markets() | |
| for symbol in SYMBOLS: | |
| if symbol not in exchange.symbols: | |
| msg = f"Skipping {symbol} - not available for {data_type}" | |
| output_messages.append(msg) | |
| latest_output = output_messages.copy() | |
| continue | |
| for timeframe in TIMEFRAMES: | |
| try: | |
| msg = f"Processing {data_type} {symbol} {timeframe}..." | |
| output_messages.append(msg) | |
| latest_output = output_messages.copy() | |
| # Fetch OHLCV data | |
| ohlcv = [] | |
| current_since = since | |
| last_timestamp = None | |
| while True: | |
| try: | |
| data = exchange.fetch_ohlcv(symbol, timeframe, since=current_since, limit=1000) | |
| if not data or data[-1][0] == last_timestamp: | |
| break | |
| # Validate data structure | |
| if not all(len(candle) == 6 for candle in data): | |
| msg = f"Invalid data structure for {symbol} {timeframe} {data_type}" | |
| output_messages.append(msg) | |
| break | |
| ohlcv.extend(data) | |
| last_timestamp = data[-1][0] | |
| current_since = data[-1][0] + 1 | |
| time.sleep(exchange.rateLimit / 1000) | |
| except Exception as e: | |
| msg = f"Error fetching {symbol}: {str(e)}" | |
| output_messages.append(msg) | |
| latest_output = output_messages.copy() | |
| break | |
| if not ohlcv: | |
| msg = f"No data available for {symbol} {timeframe} {data_type}" | |
| output_messages.append(msg) | |
| latest_output = output_messages.copy() | |
| continue | |
| # Convert to DataFrame and process | |
| df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| # Validate data types and ranges | |
| if df['timestamp'].isna().any() or df['open'].isna().any(): | |
| msg = f"Invalid data detected for {symbol} {timeframe} {data_type}" | |
| output_messages.append(msg) | |
| continue | |
| df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms') | |
| df['symbol'] = symbol | |
| df['market_type'] = data_type | |
| df['timeframe'] = timeframe | |
| df['exchange'] = 'Binance' | |
| # Save to CSV | |
| base_symbol = symbol.split('/')[0] | |
| filename = f"{base_symbol}_USDT_Binance_{data_type}_{timeframe}.csv" | |
| subfolder = f"data/{data_type}" | |
| os.makedirs(subfolder, exist_ok=True) | |
| filepath = os.path.join(subfolder, filename) | |
| # Verify timeframe consistency | |
| if timeframe == '1d': | |
| time_diff = df['timestamp'].diff().median() / (1000 * 60 * 60 * 24) | |
| if not (0.9 < time_diff < 1.1): # Allow 10% deviation | |
| msg = f"Inconsistent daily intervals detected for {symbol} {timeframe} {data_type}" | |
| output_messages.append(msg) | |
| continue | |
| elif timeframe == '1h': | |
| time_diff = df['timestamp'].diff().median() / (1000 * 60 * 60) | |
| if not (0.9 < time_diff < 1.1): # Allow 10% deviation | |
| msg = f"Inconsistent hourly intervals detected for {symbol} {timeframe} {data_type}" | |
| output_messages.append(msg) | |
| continue | |
| df.to_csv(filepath, index=False) | |
| # Upload to Hugging Face | |
| api = HfApi(token=HF_TOKEN) | |
| api.upload_file( | |
| path_or_fileobj=filepath, | |
| path_in_repo=filepath, | |
| repo_id=HF_REPO_ID, | |
| repo_type="dataset", | |
| commit_message=f"Update {filename}" | |
| ) | |
| msg = f"Successfully uploaded {filepath} ({len(df)} records)" | |
| output_messages.append(msg) | |
| latest_output = output_messages.copy() | |
| except Exception as e: | |
| msg = f"Error processing {symbol} {timeframe} {data_type}: {str(e)}" | |
| output_messages.append(msg) | |
| latest_output = output_messages.copy() | |
| continue | |
| time.sleep(5) # Be kind to the API | |
| return "\n".join(output_messages) | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| latest_output = [error_msg] | |
| return error_msg | |
| def scheduled_job(): | |
| """Run the fetch and upload job on schedule""" | |
| logger.info("Running scheduled data fetch...") | |
| fetch_and_upload_data() | |
| def run_schedule(): | |
| """Run the scheduler in a separate thread""" | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(1) | |
| # Create Gradio interface | |
| with gr.Blocks() as iface: | |
| gr.Markdown("# Crypto Data Fetcher and Uploader") | |
| gr.Markdown("Automatically fetches cryptocurrency data every 24 hours and uploads to Hugging Face dataset") | |
| output_text = gr.Textbox(label="Output Log", lines=20, every=5) # Updates every 5 seconds | |
| def get_latest_output(): | |
| return "\n".join(latest_output) | |
| output_text.change(fn=get_latest_output, inputs=None, outputs=output_text) | |
| if __name__ == "__main__": | |
| # Schedule the job to run every 24 hours | |
| schedule.every(24).hours.do(scheduled_job) | |
| # Run the first job immediately | |
| scheduled_job() | |
| # Start the scheduler in a separate thread | |
| scheduler_thread = threading.Thread(target=run_schedule, daemon=True) | |
| scheduler_thread.start() | |
| # Launch the Gradio interface | |
| iface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=False | |
| ) | |