| |
| """FastAPI backend for the professional crypto dashboard.""" |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| import re |
| from datetime import datetime |
| from typing import Any, Dict, List, Optional |
|
|
| from fastapi import HTTPException, WebSocket, WebSocketDisconnect |
| from fastapi import FastAPI |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse |
| from pydantic import BaseModel, Field |
|
|
| from ai_models import ( |
| analyze_chart_points, |
| analyze_crypto_sentiment, |
| analyze_financial_sentiment, |
| analyze_market_text, |
| analyze_news_item, |
| analyze_social_sentiment, |
| registry_status, |
| summarize_text, |
| ) |
| from collectors.aggregator import ( |
| CollectorError, |
| MarketDataCollector, |
| NewsCollector, |
| ProviderStatusCollector, |
| ) |
| from config import COIN_SYMBOL_MAPPING, get_settings |
|
|
| settings = get_settings() |
| logger = logging.getLogger("crypto.api") |
| logging.basicConfig(level=getattr(logging, settings.log_level, logging.INFO)) |
|
|
| app = FastAPI( |
| title="Crypto Intelligence Dashboard API", |
| version="2.0.0", |
| description="Professional API for cryptocurrency intelligence", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| market_collector = MarketDataCollector() |
| news_collector = NewsCollector() |
| provider_collector = ProviderStatusCollector() |
|
|
|
|
| class CoinSummary(BaseModel): |
| name: Optional[str] |
| symbol: str |
| price: Optional[float] |
| change_24h: Optional[float] |
| market_cap: Optional[float] |
| volume_24h: Optional[float] |
| rank: Optional[int] |
| last_updated: Optional[datetime] |
|
|
|
|
| class CoinDetail(CoinSummary): |
| id: Optional[str] |
| description: Optional[str] |
| homepage: Optional[str] |
| circulating_supply: Optional[float] |
| total_supply: Optional[float] |
| ath: Optional[float] |
| atl: Optional[float] |
|
|
|
|
| class MarketStats(BaseModel): |
| total_market_cap: Optional[float] |
| total_volume_24h: Optional[float] |
| market_cap_change_percentage_24h: Optional[float] |
| btc_dominance: Optional[float] |
| eth_dominance: Optional[float] |
| active_cryptocurrencies: Optional[int] |
| markets: Optional[int] |
| updated_at: Optional[int] |
|
|
|
|
| class NewsItem(BaseModel): |
| id: Optional[str] |
| title: str |
| body: Optional[str] |
| url: Optional[str] |
| source: Optional[str] |
| categories: Optional[str] |
| published_at: Optional[datetime] |
| analysis: Optional[Dict[str, Any]] = None |
|
|
|
|
| class ProviderInfo(BaseModel): |
| provider_id: str |
| name: str |
| category: Optional[str] |
| status: str |
| status_code: Optional[int] |
| latency_ms: Optional[float] |
| error: Optional[str] = None |
|
|
|
|
| class ChartDataPoint(BaseModel): |
| timestamp: datetime |
| price: float |
|
|
|
|
| class ChartAnalysisRequest(BaseModel): |
| symbol: str = Field(..., min_length=2, max_length=10) |
| timeframe: str = Field("7d", pattern=r"^[0-9]+[hdw]$") |
| indicators: Optional[List[str]] = None |
|
|
|
|
| class SentimentRequest(BaseModel): |
| text: str = Field(..., min_length=5) |
| mode: str = Field("auto", pattern=r"^(auto|crypto|financial|social)$") |
|
|
|
|
| class NewsSummaryRequest(BaseModel): |
| title: str = Field(..., min_length=5) |
| body: Optional[str] = None |
| source: Optional[str] = None |
|
|
|
|
| class QueryRequest(BaseModel): |
| query: str = Field(..., min_length=3) |
| symbol: Optional[str] = None |
| task: Optional[str] = None |
| options: Optional[Dict[str, Any]] = None |
|
|
|
|
| class QueryResponse(BaseModel): |
| success: bool |
| type: str |
| message: str |
| data: Dict[str, Any] |
|
|
|
|
| class HealthResponse(BaseModel): |
| status: str |
| version: str |
| timestamp: datetime |
| services: Dict[str, Any] |
|
|
|
|
| def _handle_collector_error(exc: CollectorError) -> None: |
| raise HTTPException(status_code=503, detail={"error": str(exc), "provider": exc.provider}) |
|
|
|
|
| @app.get("/") |
| async def serve_dashboard() -> FileResponse: |
| return FileResponse("unified_dashboard.html") |
|
|
|
|
| @app.get("/api/health", response_model=HealthResponse) |
| async def health_check() -> HealthResponse: |
| async def _safe_call(coro): |
| try: |
| await coro |
| return {"status": "ok"} |
| except Exception as exc: |
| return {"status": "error", "detail": str(exc)} |
|
|
| market_task = asyncio.create_task(_safe_call(market_collector.get_top_coins(limit=1))) |
| news_task = asyncio.create_task(_safe_call(news_collector.get_latest_news(limit=1))) |
| providers_task = asyncio.create_task(_safe_call(provider_collector.get_providers_status())) |
|
|
| market_status, news_status, providers_status = await asyncio.gather( |
| market_task, news_task, providers_task |
| ) |
|
|
| ai_status = registry_status() |
|
|
| return HealthResponse( |
| status="ok" if market_status.get("status") == "ok" else "degraded", |
| version=app.version, |
| timestamp=datetime.utcnow(), |
| services={ |
| "market_data": market_status, |
| "news": news_status, |
| "providers": providers_status, |
| "ai_models": ai_status, |
| }, |
| ) |
|
|
|
|
| @app.get("/api/coins/top", response_model=Dict[str, Any]) |
| async def get_top_coins(limit: int = 10) -> Dict[str, Any]: |
| try: |
| coins = await market_collector.get_top_coins(limit=limit) |
| return {"success": True, "coins": coins, "count": len(coins)} |
| except CollectorError as exc: |
| _handle_collector_error(exc) |
|
|
|
|
| @app.get("/api/coins/{symbol}", response_model=Dict[str, Any]) |
| async def get_coin_details(symbol: str) -> Dict[str, Any]: |
| try: |
| coin = await market_collector.get_coin_details(symbol) |
| return {"success": True, "coin": coin} |
| except CollectorError as exc: |
| _handle_collector_error(exc) |
|
|
|
|
| @app.get("/api/market/stats", response_model=Dict[str, Any]) |
| async def get_market_statistics() -> Dict[str, Any]: |
| try: |
| stats = await market_collector.get_market_stats() |
| return {"success": True, "stats": stats} |
| except CollectorError as exc: |
| _handle_collector_error(exc) |
|
|
|
|
| @app.get("/api/news/latest", response_model=Dict[str, Any]) |
| async def get_latest_news(limit: int = 10, enrich: bool = False) -> Dict[str, Any]: |
| try: |
| news = await news_collector.get_latest_news(limit=limit) |
| if enrich: |
| enriched: List[Dict[str, Any]] = [] |
| for item in news: |
| analysis = analyze_news_item(item) |
| enriched.append({**item, "analysis": analysis}) |
| news = enriched |
| return {"success": True, "news": news, "count": len(news)} |
| except CollectorError as exc: |
| _handle_collector_error(exc) |
|
|
|
|
| @app.post("/api/news/summarize", response_model=Dict[str, Any]) |
| async def summarize_news(request: NewsSummaryRequest) -> Dict[str, Any]: |
| analysis = analyze_news_item(request.dict()) |
| return {"success": True, "analysis": analysis} |
|
|
|
|
| @app.get("/api/providers", response_model=Dict[str, Any]) |
| async def get_providers() -> Dict[str, Any]: |
| providers = await provider_collector.get_providers_status() |
| return {"success": True, "providers": providers, "total": len(providers)} |
|
|
|
|
| @app.get("/api/charts/price/{symbol}", response_model=Dict[str, Any]) |
| async def get_price_history(symbol: str, timeframe: str = "7d") -> Dict[str, Any]: |
| try: |
| history = await market_collector.get_price_history(symbol, timeframe) |
| return {"success": True, "symbol": symbol.upper(), "timeframe": timeframe, "data": history} |
| except CollectorError as exc: |
| _handle_collector_error(exc) |
|
|
|
|
| @app.post("/api/charts/analyze", response_model=Dict[str, Any]) |
| async def analyze_chart(request: ChartAnalysisRequest) -> Dict[str, Any]: |
| try: |
| history = await market_collector.get_price_history(request.symbol, request.timeframe) |
| except CollectorError as exc: |
| _handle_collector_error(exc) |
|
|
| insights = analyze_chart_points(request.symbol, request.timeframe, history) |
| if request.indicators: |
| insights["indicators"] = request.indicators |
|
|
| return {"success": True, "symbol": request.symbol.upper(), "timeframe": request.timeframe, "insights": insights} |
|
|
|
|
| @app.post("/api/sentiment/analyze", response_model=Dict[str, Any]) |
| async def run_sentiment_analysis(request: SentimentRequest) -> Dict[str, Any]: |
| text = request.text.strip() |
| if not text: |
| raise HTTPException(status_code=400, detail="Text is required for sentiment analysis") |
|
|
| mode = request.mode or "auto" |
| if mode == "crypto": |
| payload = analyze_crypto_sentiment(text) |
| elif mode == "financial": |
| payload = analyze_financial_sentiment(text) |
| elif mode == "social": |
| payload = analyze_social_sentiment(text) |
| else: |
| payload = analyze_market_text(text) |
|
|
| response: Dict[str, Any] = {"success": True, "mode": mode, "result": payload} |
| if mode == "auto" and isinstance(payload, dict) and payload.get("signals"): |
| response["signals"] = payload["signals"] |
| return response |
|
|
|
|
| def _detect_task(query: str, explicit: Optional[str] = None) -> str: |
| if explicit: |
| return explicit |
| lowered = query.lower() |
| if "price" in lowered: |
| return "price" |
| if "sentiment" in lowered: |
| return "sentiment" |
| if "summar" in lowered: |
| return "summary" |
| if any(word in lowered for word in ("should i", "invest", "decision")): |
| return "decision" |
| return "general" |
|
|
|
|
| def _extract_symbol(query: str) -> Optional[str]: |
| lowered = query.lower() |
| for coin_id, symbol in COIN_SYMBOL_MAPPING.items(): |
| if coin_id in lowered or symbol.lower() in lowered: |
| return symbol |
|
|
| known_symbols = {symbol.lower() for symbol in COIN_SYMBOL_MAPPING.values()} |
| for token in re.findall(r"\b([a-z]{2,5})\b", lowered): |
| if token in known_symbols: |
| return token.upper() |
| return None |
|
|
|
|
| @app.post("/api/query", response_model=QueryResponse) |
| async def process_query(request: QueryRequest) -> QueryResponse: |
| task = _detect_task(request.query, request.task) |
| symbol = request.symbol or _extract_symbol(request.query) |
|
|
| if task == "price": |
| if not symbol: |
| raise HTTPException(status_code=400, detail="Symbol required for price queries") |
| coin = await market_collector.get_coin_details(symbol) |
| message = f"{coin['name']} ({coin['symbol']}) latest market data" |
| return QueryResponse(success=True, type="price", message=message, data=coin) |
|
|
| if task == "sentiment": |
| sentiment = { |
| "crypto": analyze_crypto_sentiment(request.query), |
| "financial": analyze_financial_sentiment(request.query), |
| "social": analyze_social_sentiment(request.query), |
| } |
| return QueryResponse(success=True, type="sentiment", message="Sentiment analysis", data=sentiment) |
|
|
| if task == "summary": |
| summary = summarize_text(request.query) |
| return QueryResponse(success=True, type="summary", message="Summarized text", data=summary) |
|
|
| if task == "decision": |
| market_task = asyncio.create_task(market_collector.get_market_stats()) |
| news_task = asyncio.create_task(news_collector.get_latest_news(limit=3)) |
| coins_task = asyncio.create_task(market_collector.get_top_coins(limit=5)) |
| stats, latest_news, coins = await asyncio.gather(market_task, news_task, coins_task) |
| sentiment = analyze_market_text(request.query) |
| data = { |
| "market_stats": stats, |
| "top_coins": coins, |
| "news": latest_news, |
| "analysis": sentiment, |
| } |
| return QueryResponse(success=True, type="decision", message="Composite decision support", data=data) |
|
|
| sentiment = analyze_market_text(request.query) |
| return QueryResponse(success=True, type="general", message="General analysis", data=sentiment) |
|
|
|
|
| class WebSocketManager: |
| def __init__(self) -> None: |
| self.connections: Dict[WebSocket, asyncio.Task] = {} |
| self.interval = 10 |
|
|
| async def connect(self, websocket: WebSocket) -> None: |
| await websocket.accept() |
| sender = asyncio.create_task(self._push_updates(websocket)) |
| self.connections[websocket] = sender |
| await websocket.send_json({"type": "connected", "timestamp": datetime.utcnow().isoformat()}) |
|
|
| async def disconnect(self, websocket: WebSocket) -> None: |
| task = self.connections.pop(websocket, None) |
| if task: |
| task.cancel() |
| try: |
| await websocket.close() |
| except Exception: |
| pass |
|
|
| async def _push_updates(self, websocket: WebSocket) -> None: |
| while True: |
| try: |
| coins = await market_collector.get_top_coins(limit=5) |
| stats = await market_collector.get_market_stats() |
| news = await news_collector.get_latest_news(limit=3) |
| sentiment = analyze_crypto_sentiment(" ".join(item.get("title", "") for item in news)) |
| payload = { |
| "market_data": coins, |
| "stats": stats, |
| "news": news, |
| "sentiment": sentiment, |
| "timestamp": datetime.utcnow().isoformat(), |
| } |
| await websocket.send_json({"type": "update", "payload": payload}) |
| await asyncio.sleep(self.interval) |
| except asyncio.CancelledError: |
| break |
| except Exception as exc: |
| logger.warning("WebSocket send failed: %s", exc) |
| break |
|
|
|
|
| manager = WebSocketManager() |
|
|
|
|
| @app.websocket("/ws") |
| async def websocket_endpoint(websocket: WebSocket) -> None: |
| await manager.connect(websocket) |
| try: |
| while True: |
| try: |
| await websocket.receive_text() |
| except WebSocketDisconnect: |
| break |
| finally: |
| await manager.disconnect(websocket) |
|
|
|
|
| @app.on_event("startup") |
| async def startup_event() -> None: |
| logger.info("Starting Crypto Intelligence Dashboard API version %s", app.version) |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|