```python from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, APIRouter from fastapi.middleware.cors import CORSMiddleware from fastapi.security import APIKeyHeader from typing import Optional import io from .config import get_settings from .signal_engine import generate_signal_from_image, generate_signal_from_market from .timeseries_analysis import fetch_ohlcv, compute_technicals from .sentiment_analysis import get_crypto_sentiment from .models_registry import model_registry from pydantic import BaseModel app = FastAPI(title="CryptoSignal Sleuth Pro API") # Create a router for scalp signals scalp_router = APIRouter() DEFAULT_CRYPTO = ["BTCUSDT", "ETHUSDT", "SOLUSDT"] DEFAULT_FOREX = ["EURUSD", "GBPUSD", "XAUUSD", "USDJPY", "NAS100"] @scalp_router.post("/scalp/signals") async def get_scalp_signals( timeframe: str = "5m", asset_type: str = "forex" ): if asset_type == "crypto": symbols = DEFAULT_CRYPTO elif asset_type == "forex": symbols = DEFAULT_FOREX else: # mixed symbols = DEFAULT_CRYPTO + DEFAULT_FOREX from datetime import datetime, timezone from .signal_engine import generate_signal_from_market try: signals = [] for symbol in symbols: signal = await generate_signal_from_market( symbol=symbol, timeframe=timeframe, asset_type="crypto" if symbol.endswith("USDT") else "forex" ) signals.append({ **signal, "symbol": symbol, "asset_type": "crypto" if symbol.endswith("USDT") else "forex", "timeframe": timeframe, "updated_at": datetime.now(timezone.utc).isoformat() }) return {"signals": signals} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) app.include_router(scalp_router) # CORS settings app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) api_key_header = APIKeyHeader(name="X-API-KEY") async def verify_api_key(api_key: str = Depends(api_key_header)): settings = get_settings() if settings.WEBHOOK_API_KEY and api_key != settings.WEBHOOK_API_KEY: raise HTTPException(status_code=403, detail="Invalid API Key") return api_key @app.post("/api/analyze-chart") async def analyze_chart( file: UploadFile = File(...), symbol: Optional[str] = "BTCUSDT" ): if not file.content_type.startswith("image/"): raise HTTPException(400, detail="File must be an image") image_bytes = await file.read() signal = await generate_signal_from_image(image_bytes, symbol) return signal @app.get("/api/market-analysis") async def market_analysis( symbol: str, timeframe: str = "1h" ): try: ohlcv = await fetch_ohlcv(symbol, timeframe) technicals = compute_technicals(ohlcv) sentiment = await get_crypto_sentiment(symbol[:3]) return { "symbol": symbol, "timeframe": timeframe, "technicals": technicals, "sentiment": sentiment } except Exception as e: raise HTTPException(500, detail=str(e)) @app.post("/api/webhook") async def webhook_handler( payload: dict, api_key: str = Depends(verify_api_key) ): # Process webhook payload here return {"status": "received", "data": payload} @app.get("/api/health") async def health_check(): return {"status": "ok"} @app.get("/api/models") async def list_models(): settings = get_settings() return { "llm_model": settings.INFERENCE_LLM_MODEL if settings.USE_INFERENCE_API == "1" else settings.LOCAL_LLM_MODEL, "using_inference_api": settings.USE_INFERENCE_API == "1", "models": { "inference": settings.INFERENCE_LLM_MODEL, "local": settings.LOCAL_LLM_MODEL } } @app.post("/api/analyze/screenshot") async def analyze_screenshot(file: UploadFile = File(...)): try: # Ensure allowed type if file.content_type not in [ "image/png", "image/jpeg", "image/jpg", "image/webp", "image/heic", ]: raise HTTPException(status_code=400, detail="Unsupported file type") # Read the image image_bytes = await file.read() # Pass to your signal engine signal = await generate_signal_from_image(image_bytes) return signal except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class MarketRequest(BaseModel): symbol: str timeframe: str = "1h" asset_type: str = "crypto" # "crypto" | "forex" @app.post("/api/analyze/market") async def analyze_market(request: MarketRequest): try: ohlcv = await fetch_ohlcv( symbol=request.symbol, timeframe=request.timeframe, asset_type=request.asset_type ) technicals = compute_technicals(ohlcv) return await generate_signal_from_market( symbol=request.symbol, timeframe=request.timeframe, asset_type=request.asset_type, ohlcv=ohlcv, technicals=technicals ) except Exception as e: raise HTTPException(500, detail=str(e)) @app.post("/api/webhook/signal") async def webhook_signal( payload: dict, api_key: str = Depends(verify_api_key) ): # Basic validation if not payload.get("direction"): raise HTTPException(400, detail="Missing required field: direction") # Log the signal print(f"Received signal: {payload}") return {"status": "received", "signal": payload} ```