Yllvar's picture
Update app.py
bb68b44 verified
import gradio as gr
import ccxt
import pandas as pd
from datetime import datetime, timedelta
from huggingface_hub import HfApi
import os
from tqdm import tqdm
import time
import logging
import threading
import schedule
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
HF_REPO_ID = "Yllvar/qubit-historical-data"
SYMBOLS = ['BTC/USDT', 'ETH/USDT', 'ADA/USDT', 'SOL/USDT', 'XRP/USDT']
TIMEFRAMES = ['1h', '1d']
DATA_TYPES = ['spot', 'futures']
# Shared variable to store latest output
latest_output = []
def fetch_and_upload_data():
global latest_output
output_messages = []
try:
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
return "Error: HF_TOKEN not found in environment variables"
output_messages.append("Data fetching process started...")
latest_output = output_messages.copy()
# Initialize exchange and fetch data
since = int((datetime.now() - timedelta(days=365)).timestamp() * 1000)
for data_type in DATA_TYPES:
exchange = ccxt.binance({
'enableRateLimit': True,
'options': {
'defaultType': data_type
}
})
exchange.load_markets()
for symbol in SYMBOLS:
if symbol not in exchange.symbols:
msg = f"Skipping {symbol} - not available for {data_type}"
output_messages.append(msg)
latest_output = output_messages.copy()
continue
for timeframe in TIMEFRAMES:
try:
msg = f"Processing {data_type} {symbol} {timeframe}..."
output_messages.append(msg)
latest_output = output_messages.copy()
# Fetch OHLCV data
ohlcv = []
current_since = since
last_timestamp = None
while True:
try:
data = exchange.fetch_ohlcv(symbol, timeframe, since=current_since, limit=1000)
if not data or data[-1][0] == last_timestamp:
break
# Validate data structure
if not all(len(candle) == 6 for candle in data):
msg = f"Invalid data structure for {symbol} {timeframe} {data_type}"
output_messages.append(msg)
break
ohlcv.extend(data)
last_timestamp = data[-1][0]
current_since = data[-1][0] + 1
time.sleep(exchange.rateLimit / 1000)
except Exception as e:
msg = f"Error fetching {symbol}: {str(e)}"
output_messages.append(msg)
latest_output = output_messages.copy()
break
if not ohlcv:
msg = f"No data available for {symbol} {timeframe} {data_type}"
output_messages.append(msg)
latest_output = output_messages.copy()
continue
# Convert to DataFrame and process
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# Validate data types and ranges
if df['timestamp'].isna().any() or df['open'].isna().any():
msg = f"Invalid data detected for {symbol} {timeframe} {data_type}"
output_messages.append(msg)
continue
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
df['symbol'] = symbol
df['market_type'] = data_type
df['timeframe'] = timeframe
df['exchange'] = 'Binance'
# Save to CSV
base_symbol = symbol.split('/')[0]
filename = f"{base_symbol}_USDT_Binance_{data_type}_{timeframe}.csv"
subfolder = f"data/{data_type}"
os.makedirs(subfolder, exist_ok=True)
filepath = os.path.join(subfolder, filename)
# Verify timeframe consistency
if timeframe == '1d':
time_diff = df['timestamp'].diff().median() / (1000 * 60 * 60 * 24)
if not (0.9 < time_diff < 1.1): # Allow 10% deviation
msg = f"Inconsistent daily intervals detected for {symbol} {timeframe} {data_type}"
output_messages.append(msg)
continue
elif timeframe == '1h':
time_diff = df['timestamp'].diff().median() / (1000 * 60 * 60)
if not (0.9 < time_diff < 1.1): # Allow 10% deviation
msg = f"Inconsistent hourly intervals detected for {symbol} {timeframe} {data_type}"
output_messages.append(msg)
continue
df.to_csv(filepath, index=False)
# Upload to Hugging Face
api = HfApi(token=HF_TOKEN)
api.upload_file(
path_or_fileobj=filepath,
path_in_repo=filepath,
repo_id=HF_REPO_ID,
repo_type="dataset",
commit_message=f"Update {filename}"
)
msg = f"Successfully uploaded {filepath} ({len(df)} records)"
output_messages.append(msg)
latest_output = output_messages.copy()
except Exception as e:
msg = f"Error processing {symbol} {timeframe} {data_type}: {str(e)}"
output_messages.append(msg)
latest_output = output_messages.copy()
continue
time.sleep(5) # Be kind to the API
return "\n".join(output_messages)
except Exception as e:
error_msg = f"Error: {str(e)}"
latest_output = [error_msg]
return error_msg
def scheduled_job():
"""Run the fetch and upload job on schedule"""
logger.info("Running scheduled data fetch...")
fetch_and_upload_data()
def run_schedule():
"""Run the scheduler in a separate thread"""
while True:
schedule.run_pending()
time.sleep(1)
# Create Gradio interface
with gr.Blocks() as iface:
gr.Markdown("# Crypto Data Fetcher and Uploader")
gr.Markdown("Automatically fetches cryptocurrency data every 24 hours and uploads to Hugging Face dataset")
output_text = gr.Textbox(label="Output Log", lines=20, every=5) # Updates every 5 seconds
def get_latest_output():
return "\n".join(latest_output)
output_text.change(fn=get_latest_output, inputs=None, outputs=output_text)
if __name__ == "__main__":
# Schedule the job to run every 24 hours
schedule.every(24).hours.do(scheduled_job)
# Run the first job immediately
scheduled_job()
# Start the scheduler in a separate thread
scheduler_thread = threading.Thread(target=run_schedule, daemon=True)
scheduler_thread.start()
# Launch the Gradio interface
iface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=False
)