import gradio as gr import ccxt import pandas as pd from datetime import datetime, timedelta from huggingface_hub import HfApi import os from tqdm import tqdm import time import logging import threading import schedule # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configuration HF_REPO_ID = "Yllvar/qubit-historical-data" SYMBOLS = ['BTC/USDT', 'ETH/USDT', 'ADA/USDT', 'SOL/USDT', 'XRP/USDT'] TIMEFRAMES = ['1h', '1d'] DATA_TYPES = ['spot', 'futures'] # Shared variable to store latest output latest_output = [] def fetch_and_upload_data(): global latest_output output_messages = [] try: HF_TOKEN = os.environ.get("HF_TOKEN") if not HF_TOKEN: return "Error: HF_TOKEN not found in environment variables" output_messages.append("Data fetching process started...") latest_output = output_messages.copy() # Initialize exchange and fetch data since = int((datetime.now() - timedelta(days=365)).timestamp() * 1000) for data_type in DATA_TYPES: exchange = ccxt.binance({ 'enableRateLimit': True, 'options': { 'defaultType': data_type } }) exchange.load_markets() for symbol in SYMBOLS: if symbol not in exchange.symbols: msg = f"Skipping {symbol} - not available for {data_type}" output_messages.append(msg) latest_output = output_messages.copy() continue for timeframe in TIMEFRAMES: try: msg = f"Processing {data_type} {symbol} {timeframe}..." output_messages.append(msg) latest_output = output_messages.copy() # Fetch OHLCV data ohlcv = [] current_since = since last_timestamp = None while True: try: data = exchange.fetch_ohlcv(symbol, timeframe, since=current_since, limit=1000) if not data or data[-1][0] == last_timestamp: break # Validate data structure if not all(len(candle) == 6 for candle in data): msg = f"Invalid data structure for {symbol} {timeframe} {data_type}" output_messages.append(msg) break ohlcv.extend(data) last_timestamp = data[-1][0] current_since = data[-1][0] + 1 time.sleep(exchange.rateLimit / 1000) except Exception as e: msg = f"Error fetching {symbol}: {str(e)}" output_messages.append(msg) latest_output = output_messages.copy() break if not ohlcv: msg = f"No data available for {symbol} {timeframe} {data_type}" output_messages.append(msg) latest_output = output_messages.copy() continue # Convert to DataFrame and process df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) # Validate data types and ranges if df['timestamp'].isna().any() or df['open'].isna().any(): msg = f"Invalid data detected for {symbol} {timeframe} {data_type}" output_messages.append(msg) continue df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms') df['symbol'] = symbol df['market_type'] = data_type df['timeframe'] = timeframe df['exchange'] = 'Binance' # Save to CSV base_symbol = symbol.split('/')[0] filename = f"{base_symbol}_USDT_Binance_{data_type}_{timeframe}.csv" subfolder = f"data/{data_type}" os.makedirs(subfolder, exist_ok=True) filepath = os.path.join(subfolder, filename) # Verify timeframe consistency if timeframe == '1d': time_diff = df['timestamp'].diff().median() / (1000 * 60 * 60 * 24) if not (0.9 < time_diff < 1.1): # Allow 10% deviation msg = f"Inconsistent daily intervals detected for {symbol} {timeframe} {data_type}" output_messages.append(msg) continue elif timeframe == '1h': time_diff = df['timestamp'].diff().median() / (1000 * 60 * 60) if not (0.9 < time_diff < 1.1): # Allow 10% deviation msg = f"Inconsistent hourly intervals detected for {symbol} {timeframe} {data_type}" output_messages.append(msg) continue df.to_csv(filepath, index=False) # Upload to Hugging Face api = HfApi(token=HF_TOKEN) api.upload_file( path_or_fileobj=filepath, path_in_repo=filepath, repo_id=HF_REPO_ID, repo_type="dataset", commit_message=f"Update {filename}" ) msg = f"Successfully uploaded {filepath} ({len(df)} records)" output_messages.append(msg) latest_output = output_messages.copy() except Exception as e: msg = f"Error processing {symbol} {timeframe} {data_type}: {str(e)}" output_messages.append(msg) latest_output = output_messages.copy() continue time.sleep(5) # Be kind to the API return "\n".join(output_messages) except Exception as e: error_msg = f"Error: {str(e)}" latest_output = [error_msg] return error_msg def scheduled_job(): """Run the fetch and upload job on schedule""" logger.info("Running scheduled data fetch...") fetch_and_upload_data() def run_schedule(): """Run the scheduler in a separate thread""" while True: schedule.run_pending() time.sleep(1) # Create Gradio interface with gr.Blocks() as iface: gr.Markdown("# Crypto Data Fetcher and Uploader") gr.Markdown("Automatically fetches cryptocurrency data every 24 hours and uploads to Hugging Face dataset") output_text = gr.Textbox(label="Output Log", lines=20, every=5) # Updates every 5 seconds def get_latest_output(): return "\n".join(latest_output) output_text.change(fn=get_latest_output, inputs=None, outputs=output_text) if __name__ == "__main__": # Schedule the job to run every 24 hours schedule.every(24).hours.do(scheduled_job) # Run the first job immediately scheduled_job() # Start the scheduler in a separate thread scheduler_thread = threading.Thread(target=run_schedule, daemon=True) scheduler_thread.start() # Launch the Gradio interface iface.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False )