crypt / webui /test_binance.py
heyunfei's picture
Upload 56 files
85653bc verified
#!/usr/bin/env python3
"""
测试币安API连接和数据获取功能
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from binance.client import Client
import pandas as pd
def test_binance_connection():
"""测试币安连接"""
try:
# 初始化客户端(使用公开API)
client = Client("", "")
# 测试获取服务器时间
server_time = client.get_server_time()
print(f"✅ 币安服务器连接成功,服务器时间: {server_time}")
# 测试获取交易对信息
exchange_info = client.get_exchange_info()
print(f"✅ 获取交易所信息成功,共有 {len(exchange_info['symbols'])} 个交易对")
# 测试获取K线数据
klines = client.get_klines(symbol='BTCUSDT', interval='1h', limit=10)
print(f"✅ 获取BTCUSDT K线数据成功,获取到 {len(klines)} 条数据")
# 转换为DataFrame并显示
df = pd.DataFrame(klines, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_asset_volume', 'number_of_trades',
'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'
])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
print("\n最新的5条K线数据:")
print(df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].tail())
return True
except Exception as e:
print(f"❌ 币安连接测试失败: {e}")
return False
if __name__ == "__main__":
print("🚀 开始测试币安API连接...")
success = test_binance_connection()
if success:
print("\n✅ 所有测试通过!币安API集成准备就绪。")
else:
print("\n❌ 测试失败,请检查网络连接。")