| """HuggingFace Cryptocurrency Data Engine - Main Application""" |
| from __future__ import annotations |
| import time |
| import logging |
| from contextlib import asynccontextmanager |
| from fastapi import FastAPI, HTTPException, Query, Request |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| from slowapi import Limiter, _rate_limit_exceeded_handler |
| from slowapi.util import get_remote_address |
| from slowapi.errors import RateLimitExceeded |
|
|
| from core.config import settings, get_supported_symbols, get_supported_intervals |
| from core.aggregator import get_aggregator |
| from core.cache import cache, cache_key, get_or_set |
| from core.models import ( |
| OHLCVResponse, PricesResponse, SentimentResponse, |
| MarketOverviewResponse, HealthResponse, ErrorResponse, ErrorDetail, CacheInfo |
| ) |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| limiter = Limiter(key_func=get_remote_address) |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Lifecycle manager for the application""" |
| logger.info("Starting HuggingFace Crypto Data Engine...") |
| logger.info(f"Version: {settings.VERSION}") |
| logger.info(f"Environment: {settings.ENV}") |
|
|
| |
| aggregator = get_aggregator() |
|
|
| yield |
|
|
| |
| logger.info("Shutting down...") |
| await aggregator.close() |
|
|
|
|
| |
| app = FastAPI( |
| title="HuggingFace Cryptocurrency Data Engine", |
| description="Comprehensive cryptocurrency data aggregator with multi-provider support", |
| version=settings.VERSION, |
| lifespan=lifespan |
| ) |
|
|
| |
| app.state.limiter = limiter |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| @app.exception_handler(Exception) |
| async def global_exception_handler(request: Request, exc: Exception): |
| """Global exception handler""" |
| logger.error(f"Unhandled exception: {exc}", exc_info=True) |
|
|
| return JSONResponse( |
| status_code=500, |
| content=ErrorResponse( |
| error=ErrorDetail( |
| code="INTERNAL_ERROR", |
| message=str(exc) |
| ), |
| timestamp=int(time.time() * 1000) |
| ).dict() |
| ) |
|
|
|
|
| @app.get("/") |
| async def root(): |
| """Root endpoint""" |
| return { |
| "service": "HuggingFace Cryptocurrency Data Engine", |
| "version": settings.VERSION, |
| "status": "online", |
| "endpoints": { |
| "health": "/api/health", |
| "ohlcv": "/api/ohlcv", |
| "prices": "/api/prices", |
| "sentiment": "/api/sentiment", |
| "market": "/api/market/overview", |
| "docs": "/docs" |
| } |
| } |
|
|
|
|
| @app.get("/api/health", response_model=HealthResponse) |
| @limiter.limit(f"{settings.RATE_LIMIT_HEALTH or 999999}/minute") |
| async def health_check(request: Request): |
| """Health check endpoint with provider status""" |
| aggregator = get_aggregator() |
|
|
| |
| providers = await aggregator.get_all_provider_health() |
|
|
| |
| online_count = sum(1 for p in providers if p.status == "online") |
| if online_count == 0: |
| overall_status = "unhealthy" |
| elif online_count < len(providers) / 2: |
| overall_status = "degraded" |
| else: |
| overall_status = "healthy" |
|
|
| |
| cache_stats = cache.get_stats() |
|
|
| return HealthResponse( |
| status=overall_status, |
| uptime=aggregator.get_uptime(), |
| version=settings.VERSION, |
| providers=providers, |
| cache=CacheInfo(**cache_stats) |
| ) |
|
|
|
|
| @app.get("/api/ohlcv", response_model=OHLCVResponse) |
| @limiter.limit(f"{settings.RATE_LIMIT_OHLCV}/minute") |
| async def get_ohlcv( |
| request: Request, |
| symbol: str = Query(..., description="Symbol (e.g., BTC, BTCUSDT, BTC/USDT)"), |
| interval: str = Query("1h", description="Interval (1m, 5m, 15m, 1h, 4h, 1d, 1w)"), |
| limit: int = Query(100, ge=1, le=1000, description="Number of candles (1-1000)") |
| ): |
| """Get OHLCV candlestick data with multi-provider fallback""" |
|
|
| |
| if interval not in get_supported_intervals(): |
| raise HTTPException( |
| status_code=400, |
| detail=f"Invalid interval. Supported: {', '.join(get_supported_intervals())}" |
| ) |
|
|
| |
| normalized_symbol = symbol.upper().replace("/", "").replace("-", "") |
|
|
| |
| key = cache_key("ohlcv", symbol=normalized_symbol, interval=interval, limit=limit) |
|
|
| async def fetch(): |
| aggregator = get_aggregator() |
| data, source = await aggregator.fetch_ohlcv(normalized_symbol, interval, limit) |
| return {"data": data, "source": source} |
|
|
| try: |
| |
| result = await get_or_set(key, settings.CACHE_TTL_OHLCV, fetch) |
|
|
| return OHLCVResponse( |
| data=result["data"], |
| symbol=normalized_symbol, |
| interval=interval, |
| count=len(result["data"]), |
| source=result["source"], |
| timestamp=int(time.time() * 1000) |
| ) |
|
|
| except Exception as e: |
| logger.error(f"OHLCV fetch failed: {e}") |
| raise HTTPException( |
| status_code=503, |
| detail=ErrorDetail( |
| code="PROVIDER_ERROR", |
| message=f"All data providers failed: {str(e)}" |
| ).dict() |
| ) |
|
|
|
|
| @app.get("/api/prices", response_model=PricesResponse) |
| @limiter.limit(f"{settings.RATE_LIMIT_PRICES}/minute") |
| async def get_prices( |
| request: Request, |
| symbols: str = Query(None, description="Comma-separated symbols (e.g., BTC,ETH,SOL)"), |
| convert: str = Query("USDT", description="Convert to currency (USD, USDT)") |
| ): |
| """Get real-time prices with multi-provider aggregation""" |
|
|
| |
| if symbols: |
| symbol_list = [s.strip().upper() for s in symbols.split(",")] |
| else: |
| |
| symbol_list = get_supported_symbols() |
|
|
| |
| key = cache_key("prices", symbols=",".join(sorted(symbol_list))) |
|
|
| async def fetch(): |
| aggregator = get_aggregator() |
| data, source = await aggregator.fetch_prices(symbol_list) |
| return {"data": data, "source": source} |
|
|
| try: |
| |
| result = await get_or_set(key, settings.CACHE_TTL_PRICES, fetch) |
|
|
| return PricesResponse( |
| data=result["data"], |
| timestamp=int(time.time() * 1000), |
| source=result["source"] |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Price fetch failed: {e}") |
| raise HTTPException( |
| status_code=503, |
| detail=ErrorDetail( |
| code="PROVIDER_ERROR", |
| message=f"All price providers failed: {str(e)}" |
| ).dict() |
| ) |
|
|
|
|
| @app.get("/api/sentiment", response_model=SentimentResponse) |
| @limiter.limit(f"{settings.RATE_LIMIT_SENTIMENT}/minute") |
| async def get_sentiment(request: Request): |
| """Get market sentiment data (Fear & Greed Index)""" |
|
|
| if not settings.ENABLE_SENTIMENT: |
| raise HTTPException( |
| status_code=503, |
| detail="Sentiment analysis is disabled" |
| ) |
|
|
| |
| key = cache_key("sentiment") |
|
|
| async def fetch(): |
| aggregator = get_aggregator() |
| return await aggregator.fetch_sentiment() |
|
|
| try: |
| |
| data = await get_or_set(key, settings.CACHE_TTL_SENTIMENT, fetch) |
|
|
| return SentimentResponse( |
| data=data, |
| timestamp=int(time.time() * 1000) |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Sentiment fetch failed: {e}") |
| raise HTTPException( |
| status_code=503, |
| detail=ErrorDetail( |
| code="PROVIDER_ERROR", |
| message=f"Failed to fetch sentiment: {str(e)}" |
| ).dict() |
| ) |
|
|
|
|
| @app.get("/api/market/overview", response_model=MarketOverviewResponse) |
| @limiter.limit(f"{settings.RATE_LIMIT_SENTIMENT}/minute") |
| async def get_market_overview(request: Request): |
| """Get market overview with global statistics""" |
|
|
| |
| key = cache_key("market_overview") |
|
|
| async def fetch(): |
| aggregator = get_aggregator() |
| return await aggregator.fetch_market_overview() |
|
|
| try: |
| |
| data = await get_or_set(key, settings.CACHE_TTL_MARKET, fetch) |
|
|
| return MarketOverviewResponse( |
| data=data, |
| timestamp=int(time.time() * 1000) |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Market overview fetch failed: {e}") |
| raise HTTPException( |
| status_code=503, |
| detail=ErrorDetail( |
| code="PROVIDER_ERROR", |
| message=f"Failed to fetch market overview: {str(e)}" |
| ).dict() |
| ) |
|
|
|
|
| @app.post("/api/cache/clear") |
| async def clear_cache(): |
| """Clear all cached data""" |
| cache.clear() |
| return {"success": True, "message": "Cache cleared"} |
|
|
|
|
| @app.get("/api/cache/stats") |
| async def cache_stats(): |
| """Get cache statistics""" |
| return cache.get_stats() |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| uvicorn.run( |
| "main:app", |
| host=settings.HOST, |
| port=settings.PORT, |
| reload=(settings.ENV == "development"), |
| log_level="info" |
| ) |
|
|