Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, HTTPException, Request, Header | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import time | |
| import threading | |
| import uuid | |
| import sys | |
| import asyncio | |
| import yfinance as yf | |
| from datetime import datetime | |
| import concurrent.futures | |
| BACKGROUND_TASKS = {} | |
| RATE_LIMITS = {} | |
| # Global ThreadPoolExecutor for heavy synchronous math (XGBoost/CVXPY) | |
| engine_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) | |
| try: | |
| import core_engine | |
| except ImportError: | |
| core_engine = None | |
| from config import OUTPUT_DIR | |
| import access_manager | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| STATIC_DIR = os.path.join(BASE_DIR, "static") | |
| app = FastAPI(title="Portfolio Engine API") | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
| class AuthRequest(BaseModel): | |
| key: str | |
| class KeyGenRequest(BaseModel): | |
| admin_key: str | |
| new_key: str | |
| class PortfolioRequest(BaseModel): | |
| tickers: List[str] | |
| capital: float = 100000.0 | |
| risk_input: int = 5 | |
| model: int = 1 | |
| allocation_engine: int = 1 | |
| allow_shorting: bool = True | |
| tax_enabled: bool = False | |
| garch_enabled: bool = True | |
| custom_constraints: Optional[List[dict]] = None | |
| async def read_login(): | |
| return FileResponse(os.path.join(STATIC_DIR, "login.html")) | |
| async def read_index(): | |
| return FileResponse(os.path.join(STATIC_DIR, "index.html")) | |
| # Simple cache for market ticker | |
| ticker_cache = {"timestamp": 0, "data": None} | |
| TICKER_CACHE_TTL = 300 # 5 minutes | |
| async def market_ticker(): | |
| global ticker_cache | |
| if time.time() - ticker_cache["timestamp"] < TICKER_CACHE_TTL and ticker_cache["data"]: | |
| return ticker_cache["data"] | |
| symbols = { | |
| "S&P 500": "^GSPC", | |
| "NASDAQ": "^IXIC", | |
| "BTC": "BTC-USD", | |
| "GOLD": "GC=F", | |
| "VIX": "^VIX" | |
| } | |
| def fetch_market_data(): | |
| try: | |
| tickers = yf.Tickers(" ".join(list(symbols.values()))) | |
| res = [] | |
| for name, symbol in symbols.items(): | |
| hist = tickers.tickers[symbol].history(period="5d") | |
| if len(hist) >= 2: | |
| current = float(hist['Close'].iloc[-1]) | |
| prev = float(hist['Close'].iloc[-2]) | |
| pct_change = ((current - prev) / prev) * 100 | |
| res.append({ | |
| "name": name, | |
| "price": round(current, 2), | |
| "change": round(pct_change, 2) | |
| }) | |
| return res | |
| except Exception as e: | |
| return [] | |
| fetched_results = await asyncio.to_thread(fetch_market_data) | |
| if fetched_results: | |
| ticker_cache["data"] = fetched_results | |
| ticker_cache["timestamp"] = time.time() | |
| return fetched_results or ticker_cache.get("data", []) | |
| async def authenticate(req: AuthRequest, request: Request): | |
| ip = request.client.host | |
| if access_manager.validate_key(req.key, ip): | |
| return {"status": "success", "message": "Access Granted"} | |
| raise HTTPException(status_code=401, detail="Invalid or Expired Access Key") | |
| async def create_otk(req: KeyGenRequest): | |
| new_key = access_manager.generate_otk(req.admin_key, req.new_key) | |
| if new_key: | |
| return {"status": "success", "message": f"OTK '{new_key}' created."} | |
| raise HTTPException(status_code=401, detail="Invalid Admin Key") | |
| async def telegram_webhook(request: Request): | |
| try: | |
| data = await request.json() | |
| if "message" in data and "text" in data["message"]: | |
| text = data["message"]["text"] | |
| chat_id = str(data["message"]["chat"]["id"]) | |
| # Security: Only process commands from the master Chat ID | |
| expected_chat_id = os.getenv("TELEGRAM_CHAT_ID") | |
| if expected_chat_id and chat_id == expected_chat_id: | |
| access_manager.handle_telegram_command(text) | |
| return {"status": "ok"} | |
| except Exception as e: | |
| # Return 200 so Telegram doesn't retry on parsing errors | |
| return {"status": "error", "detail": str(e)} | |
| async def telegram_setup(request: Request): | |
| token = os.getenv("TELEGRAM_BOT_TOKEN") | |
| if not token: | |
| return {"error": "TELEGRAM_BOT_TOKEN not set"} | |
| import urllib.request | |
| import urllib.parse | |
| import json | |
| # We construct the webhook URL from the current request base | |
| base_url = str(request.base_url).rstrip("/") | |
| # Hardcode https because Render load balancers sometimes mask the protocol | |
| if "onrender.com" in base_url: | |
| base_url = base_url.replace("http://", "https://") | |
| webhook_url = f"{base_url}/api/telegram_webhook" | |
| url = f"https://api.telegram.org/bot{token}/setWebhook" | |
| try: | |
| import requests | |
| res = requests.post(url, json={"url": webhook_url}, timeout=10) | |
| return {"status": "Webhook Configured", "url": webhook_url, "tg_res": res.json()} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| async def preview_portfolio(req: PortfolioRequest, x_access_key: Optional[str] = Header(None)): | |
| if not access_manager.validate_key(x_access_key): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| try: | |
| import requests | |
| HF_URL = os.environ.get("HF_MATH_URL", "https://engineportf-math-backend.hf.space") | |
| HF_SECRET = os.environ.get("HF_SECRET_KEY", "EngineSecret2026") | |
| headers = { | |
| "X-API-Key": HF_SECRET, | |
| "Content-Type": "application/json" | |
| } | |
| # Send request to Hugging Face backend | |
| loop = asyncio.get_running_loop() | |
| def _call_hf_preview(): | |
| res = requests.post(f"{HF_URL}/api/preview", json=req.model_dump() if hasattr(req, "model_dump") else req.dict(), headers=headers, timeout=60) | |
| if res.status_code != 200: | |
| raise Exception(f"HF Engine Error: {res.text}") | |
| return res.json() | |
| result = await loop.run_in_executor(engine_executor, _call_hf_preview) | |
| return { | |
| "status": "success", | |
| "target_weights": result.get("target_weights", {}), | |
| "efficient_frontier": result.get("efficient_frontier", {"vols": [], "rets": []}) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def generate_portfolio(req: PortfolioRequest, request: Request, x_access_key: Optional[str] = Header(None)): | |
| ip = request.client.host | |
| if time.time() - RATE_LIMITS.get(ip, 0) < 60: | |
| raise HTTPException(status_code=429, detail="Too Many Requests. Please wait 60 seconds between optimizations.") | |
| if not access_manager.validate_key(x_access_key, ip=ip): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| RATE_LIMITS[ip] = time.time() | |
| task_id = str(uuid.uuid4()) | |
| BACKGROUND_TASKS[task_id] = {"status": "running", "message": "Initializing...", "target_weights": {}} | |
| def _run_optimization(tid, request_obj): | |
| try: | |
| import requests | |
| import time | |
| HF_URL = os.environ.get("HF_MATH_URL", "https://engineportf-math-backend.hf.space") | |
| HF_SECRET = os.environ.get("HF_SECRET_KEY", "EngineSecret2026") | |
| headers = { | |
| "X-API-Key": HF_SECRET, | |
| "Content-Type": "application/json" | |
| } | |
| BACKGROUND_TASKS[tid]["message"] = "Initializing on Hugging Face..." | |
| payload = request_obj.model_dump() if hasattr(request_obj, "model_dump") else request_obj.dict() | |
| response = requests.post(f"{HF_URL}/api/generate", json=payload, headers=headers, timeout=60) | |
| if response.status_code != 200: | |
| raise Exception(f"HF Engine Error: {response.text}") | |
| init_data = response.json() | |
| if init_data.get("status") != "queued": | |
| raise Exception("Failed to queue task on Hugging Face.") | |
| hf_task_id = init_data.get("task_id") | |
| # Poll Hugging Face | |
| while True: | |
| time.sleep(2.0) | |
| status_res = requests.get(f"{HF_URL}/api/status/{hf_task_id}", headers=headers, timeout=20) | |
| if status_res.status_code != 200: | |
| continue # Network blip, retry | |
| status_data = status_res.json() | |
| if status_data.get("status") == "completed": | |
| # Task finished, retrieve HTML | |
| report_html = status_data.get("report_html", "") | |
| if report_html: | |
| report_path = os.path.join(OUTPUT_DIR, "portfolio_report.html") | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| with open(report_path, "w", encoding="utf-8") as f: | |
| f.write(report_html) | |
| BACKGROUND_TASKS[tid]["status"] = "completed" | |
| BACKGROUND_TASKS[tid]["message"] = "Report generated." | |
| BACKGROUND_TASKS[tid]["target_weights"] = status_data.get("target_weights", {}) | |
| break | |
| elif status_data.get("status") == "error": | |
| raise Exception(status_data.get("message", "Unknown error on HF backend.")) | |
| else: | |
| # Still running | |
| BACKGROUND_TASKS[tid]["message"] = "Calculating (Running on HF 16GB Cluster)..." | |
| except Exception as e: | |
| BACKGROUND_TASKS[tid]["status"] = "error" | |
| BACKGROUND_TASKS[tid]["message"] = str(e) | |
| engine_executor.submit(_run_optimization, task_id, req) | |
| return { | |
| "status": "queued", | |
| "task_id": task_id, | |
| "message": "Optimization started in background." | |
| } | |
| async def get_task_status(task_id: str, x_access_key: Optional[str] = Header(None)): | |
| if not access_manager.validate_key(x_access_key, silent=True): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| task = BACKGROUND_TASKS.get(task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| return task | |
| async def get_report(): | |
| report_path = os.path.join(OUTPUT_DIR, "portfolio_report.html") | |
| if os.path.exists(report_path): | |
| return FileResponse(report_path) | |
| raise HTTPException(status_code=404, detail="Report not generated yet.") | |
| def _alert_daemon(): | |
| """Background daemon to check for market drops and ping HF.""" | |
| import time | |
| import requests | |
| HF_URL = os.environ.get("HF_MATH_URL", "https://engineportf-math-backend.hf.space") | |
| loops = 0 | |
| while True: | |
| try: | |
| # Wake up every 15 minutes (900 seconds) | |
| time.sleep(900) | |
| loops += 1 | |
| # 1. Ping Hugging Face to keep the 16GB math cluster awake | |
| try: | |
| requests.get(HF_URL, timeout=10) | |
| except Exception: | |
| pass | |
| # 2. Every 4th loop (1 hour), check for SPY drops | |
| if loops % 4 == 0: | |
| ticker = yf.Ticker("SPY") | |
| hist = ticker.history(period="2d") | |
| if len(hist) >= 2: | |
| current = float(hist['Close'].iloc[-1]) | |
| prev = float(hist['Close'].iloc[-2]) | |
| pct_change = ((current - prev) / prev) * 100 | |
| if pct_change <= -5.0: | |
| access_manager.send_telegram_alert(f"🚨 **MARKET ALERT**\nSPY has dropped by {pct_change:.2f}%!\nCheck the portfolio engine.") | |
| except Exception as e: | |
| pass # Suppress daemon errors | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Start the background alert daemon | |
| daemon_thread = threading.Thread(target=_alert_daemon, daemon=True) | |
| daemon_thread.start() | |
| uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) | |