import os # Set thread limits to prevent OpenBLAS/MKL deadlock inside FastAPI threads os.environ["OMP_NUM_THREADS"] = "1" os.environ["OPENBLAS_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" os.environ["VECLIB_MAXIMUM_THREADS"] = "1" os.environ["NUMEXPR_NUM_THREADS"] = "1" from fastapi import FastAPI, HTTPException, Request, Header from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel from typing import List, Optional import time import threading import uuid import sys import yfinance as yf from datetime import datetime import traceback try: from dotenv import load_dotenv load_dotenv() except ImportError: pass BACKGROUND_TASKS = {} try: from diagnostics import TraceManager tracer = TraceManager() except ImportError: tracer = None try: from huggingface_hub import InferenceClient has_hf_hub = True except ImportError: has_hf_hub = False try: import core_engine except ImportError: core_engine = None from config import OUTPUT_DIR import access_manager BASE_DIR = os.path.dirname(os.path.abspath(__file__)) STATIC_DIR = os.path.join(BASE_DIR, "static") app = FastAPI(title="Portfolio Engine API") # Mount static files app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") class AuthRequest(BaseModel): key: str class KeyGenRequest(BaseModel): admin_key: str new_key: str class PortfolioRequest(BaseModel): tickers: List[str] capital: float = 100000.0 risk_input: int = 5 model: int = 1 allocation_engine: int = 1 allow_shorting: bool = True tax_enabled: bool = False garch_enabled: bool = True custom_constraints: Optional[List[dict]] = None class ChatHistoryItem(BaseModel): role: str content: str class ChatRequest(BaseModel): message: str history: List[ChatHistoryItem] = [] portfolio_context: dict @app.get("/") async def read_login(): return FileResponse(os.path.join(STATIC_DIR, "login.html")) @app.get("/main") async def read_index(): return FileResponse(os.path.join(STATIC_DIR, "index.html")) @app.get("/admin") async def read_admin(): return FileResponse(os.path.join(STATIC_DIR, "admin.html")) # Simple cache for market ticker ticker_cache = {"timestamp": 0, "data": None} TICKER_CACHE_TTL = 300 # 5 minutes @app.get("/api/market_ticker") async def market_ticker(): global ticker_cache if time.time() - ticker_cache["timestamp"] < TICKER_CACHE_TTL and ticker_cache["data"]: return ticker_cache["data"] symbols = { "S&P 500": "^GSPC", "NASDAQ": "^IXIC", "BTC": "BTC-USD", "GOLD": "GC=F", "VIX": "^VIX" } results = [] try: # Fetch data for all symbols at once to save time tickers = yf.Tickers(" ".join(list(symbols.values()))) for name, symbol in symbols.items(): hist = tickers.tickers[symbol].history(period="5d") if len(hist) >= 2: current = float(hist['Close'].iloc[-1]) prev = float(hist['Close'].iloc[-2]) pct_change = ((current - prev) / prev) * 100 results.append({ "name": name, "price": round(current, 2), "change": round(pct_change, 2) }) ticker_cache["data"] = results ticker_cache["timestamp"] = time.time() except Exception as e: # Graceful fallback if yfinance fails pass return results or ticker_cache.get("data", []) @app.post("/api/auth") async def api_auth(req: AuthRequest, request: Request): forwarded = request.headers.get("X-Forwarded-For") ip = forwarded.split(",")[0].strip() if forwarded else (request.client.host if request.client else "Unknown") if access_manager.validate_key(req.key, ip): return {"status": "success", "message": "Access Granted"} raise HTTPException(status_code=401, detail="Invalid or Expired Access Key") class AdminKeyGenRequest(BaseModel): admin_key: str hours: int = 1 class AdminRevokeRequest(BaseModel): admin_key: str target_key: str @app.post("/api/admin/generate") async def admin_generate(req: AdminKeyGenRequest): new_key = access_manager.generate_otk(req.admin_key, hours=req.hours) if new_key: return {"status": "success", "key": new_key, "message": f"OTK '{new_key}' created."} raise HTTPException(status_code=401, detail="Invalid Admin Key") @app.post("/api/admin/revoke") async def admin_revoke(req: AdminRevokeRequest): success = access_manager.revoke_otk(req.admin_key, req.target_key) if success: return {"status": "success", "message": f"Key '{req.target_key}' revoked."} raise HTTPException(status_code=401, detail="Invalid Admin Key or Key Not Found") @app.get("/api/admin/keys") async def admin_list_keys(admin_key: str = Header(...)): keys = access_manager.get_all_keys(admin_key) if admin_key != access_manager.MASTER_KEY: raise HTTPException(status_code=401, detail="Invalid Admin Key") return {"keys": keys} @app.post("/api/admin/revoke_all") async def admin_revoke_all(req: AdminRevokeRequest): if req.admin_key != access_manager.MASTER_KEY: raise HTTPException(status_code=401, detail="Invalid Admin Key") keys = access_manager.get_all_keys(req.admin_key) count = 0 for k, v in keys.items(): if not v.get("revoked", False): access_manager.revoke_otk(req.admin_key, k) count += 1 return {"status": "success", "revoked_count": count} @app.get("/api/admin/logs") async def admin_get_logs(admin_key: str = Header(...)): if admin_key != access_manager.MASTER_KEY: raise HTTPException(status_code=401, detail="Invalid Admin Key") from constants import OUTPUT_DIR import os from datetime import datetime, timedelta log_file = os.path.join(OUTPUT_DIR, "access.log") logs = [] if os.path.exists(log_file): cutoff = datetime.now() - timedelta(hours=24) with open(log_file, "r", encoding="utf-8") as f: for line in f: if not line.strip(): continue # Parse the datetime from the log line (e.g. "2026-06-12 22:33:45,491 - ...") try: dt_str = line.split(" - ")[0].split(",")[0] log_dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") if log_dt >= cutoff: logs.append(line.strip()) except: # If parsing fails, just append it logs.append(line.strip()) # Return last 500 lines max to prevent huge payloads return {"logs": logs[-500:]} @app.post("/api/preview") async def preview_portfolio(req: PortfolioRequest, x_access_key: Optional[str] = Header(None)): if not access_manager.validate_key(x_access_key): raise HTTPException(status_code=401, detail="Unauthorized") try: overrides = { 'tickers': req.tickers, 'capital': req.capital, 'risk_input': req.risk_input, 'risk_factor': {1:0.1, 2:0.5, 3:1.0, 4:2.0, 5:3.0, 6:5.0, 7:7.5, 8:10.0, 9:15.0, 10:25.0}.get(req.risk_input, 3.0), 'model': req.model, 'allocation_engine': req.allocation_engine, 'single_asset_min': -1.0 if req.allow_shorting else 0.0, 'tax_enabled': req.tax_enabled, 'garch_enabled': req.garch_enabled, 'custom_constraints': req.custom_constraints } result = core_engine.run_engine(overrides=overrides, serve=False, preview_only=True) return { "status": "success", "target_weights": result.get("target_weights", {}), "efficient_frontier": result.get("efficient_frontier", {"vols": [], "rets": []}) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/generate") async def generate_portfolio(req: PortfolioRequest, x_access_key: Optional[str] = Header(None)): if not access_manager.validate_key(x_access_key): raise HTTPException(status_code=401, detail="Unauthorized") task_id = str(uuid.uuid4()) BACKGROUND_TASKS[task_id] = {"status": "running", "message": "Initializing...", "target_weights": {}} def _run_optimization(tid, request): try: overrides = { 'tickers': request.tickers, 'capital': request.capital, 'risk_input': request.risk_input, 'risk_factor': {1:0.1, 2:0.5, 3:1.0, 4:2.0, 5:3.0, 6:5.0, 7:7.5, 8:10.0, 9:15.0, 10:25.0}.get(request.risk_input, 3.0), 'model': request.model, 'allocation_engine': request.allocation_engine, 'single_asset_min': -1.0 if request.allow_shorting else 0.0, 'tax_enabled': request.tax_enabled, 'garch_enabled': request.garch_enabled, 'custom_constraints': request.custom_constraints } tracer.start_trace(tid) tracer.add_flag(tid, "TASK_INIT", f"Target Universe: {overrides.get('universe')}") # Determine if we are acting as a Proxy (e.g., on Render) or if we should run the math locally. is_proxy = os.getenv("RENDER") == "true" or os.getenv("IS_PROXY") == "true" is_backend = not is_proxy if is_backend: tracer.add_flag(tid, "BACKEND_START", "Running compute engine locally") # We are the backend (either HF or localhost). Run the heavy math. result = core_engine.run_engine(overrides=overrides, serve=False, task_id=tid) tracer.add_flag(tid, "HF_BACKEND_COMPLETE", "Math engine finished") BACKGROUND_TASKS[tid]["status"] = "completed" BACKGROUND_TASKS[tid]["message"] = "Report generated." BACKGROUND_TASKS[tid]["target_weights"] = result.get("target_weights", {}) else: import requests hf_url = os.getenv("HF_BACKEND_URL", "https://engineportf-portfolio-opt.hf.space").rstrip('/') # Use the master key to bypass HF API restrictions and authenticate internally hf_key = os.getenv("HF_MASTER_KEY", "Ir_yad") tracer.add_flag(tid, "PROXY_START", f"Forwarding to HF backend: {hf_url}") try: proxy_res = requests.post( f"{hf_url}/api/generate", json=request.model_dump() if hasattr(request, 'model_dump') else request.dict(), headers={"X-Access-Key": hf_key}, timeout=120 ) except requests.exceptions.Timeout: raise Exception("Hugging Face Backend timed out while queuing the optimization. This is likely due to the free tier spinning up from sleep.") except requests.exceptions.RequestException as req_e: raise Exception(f"Hugging Face Backend connection failed: {req_e}") if not proxy_res.ok: raise Exception(f"Hugging Face Backend Error ({proxy_res.status_code}) at {hf_url}/api/generate. Check HF_BACKEND_URL.") proxy_data = proxy_res.json() remote_task_id = proxy_data.get("task_id") tracer.add_flag(tid, "PROXY_HANDOFF_SUCCESS", f"HF Task ID: {remote_task_id}") if not remote_task_id: raise Exception("Failed to get remote task ID from Hugging Face.") import time retries = 0 while True: time.sleep(2) try: status_res = requests.get( f"{hf_url}/api/status/{remote_task_id}", headers={"X-Access-Key": hf_key}, timeout=15 ) retries = 0 # Reset on success except requests.exceptions.RequestException as e: retries += 1 if retries > 10: tracer.add_flag(tid, "PROXY_POLL_TIMEOUT", f"Poll failed 10 times: {e}") BACKGROUND_TASKS[tid]["status"] = "error" BACKGROUND_TASKS[tid]["message"] = "Hugging Face Backend is unreachable (Timeout). It might have crashed or restarted." break continue if status_res.ok: s_data = status_res.json() BACKGROUND_TASKS[tid]["status"] = s_data["status"] BACKGROUND_TASKS[tid]["message"] = s_data["message"] if s_data["status"] == "completed": BACKGROUND_TASKS[tid]["target_weights"] = s_data.get("target_weights", {}) # Download the completed HTML report from HF to Render report_res = requests.get(f"{hf_url}/report") if report_res.ok: report_path = os.path.join(OUTPUT_DIR, "portfolio_report.html") os.makedirs(OUTPUT_DIR, exist_ok=True) with open(report_path, "wb") as f: f.write(report_res.content) break elif s_data["status"] == "error": error_msg = s_data.get("message", "Unknown error from HF") tracer.add_flag(tid, "PROXY_POLL_ERROR", error_msg) BACKGROUND_TASKS[tid]["status"] = "error" BACKGROUND_TASKS[tid]["message"] = error_msg break else: tracer.add_flag(tid, "PROXY_CONNECTION_LOST", f"Status Code: {status_res.status_code}") retries += 1 if retries > 10: BACKGROUND_TASKS[tid]["status"] = "error" BACKGROUND_TASKS[tid]["message"] = f"Lost connection to Hugging Face backend (HTTP {status_res.status_code})." raise Exception(f"Lost connection to Hugging Face backend. (HTTP {status_res.status_code})") continue except Exception as e: error_trace = traceback.format_exc() tracer.add_flag(tid, "FATAL_ERROR", f"{str(e)}\n\nTraceback:\n{error_trace}") logger.error(f"Optimization failed: {error_trace}") BACKGROUND_TASKS[tid]["status"] = "error" BACKGROUND_TASKS[tid]["message"] = f"Error: {str(e)} (Check console logs for details)" threading.Thread(target=_run_optimization, args=(task_id, req)).start() return { "status": "queued", "task_id": task_id, "message": "Optimization started in background." } @app.post("/api/chat") async def chat_with_portfolio(req: ChatRequest): import logging if not has_hf_hub: raise HTTPException(status_code=500, detail="huggingface_hub is not installed on the server.") try: hf_token = os.environ.get("HF_TOKEN", "") if not hf_token: return {"status": "error", "detail": "AI is disabled. Please add 'HF_TOKEN' to your Hugging Face Space Secrets to enable the AI."} system_prompt = ( "You are an elite quantitative analyst AI assistant built into the Portfolio Engine. " "Your goal is to help the user analyze their portfolio, explain mathematical models, " "and act as a highly intelligent conversational partner. " "You can answer general finance questions, write python code, or explain concepts simply if asked. " "Never give explicit financial advice (e.g. 'You must buy this stock'). " "Be helpful, natural, and adapt to the user's tone. " "DO NOT blindly recite or summarize the portfolio context unless the user specifically asks a question about their portfolio. " "If they just say 'hello' or make a general statement, greet them naturally without mentioning the data." ) # Use direct REST API to bypass strict huggingface_hub token scope validation import requests try: from huggingface_hub import InferenceClient client = InferenceClient(token=hf_token) context_str = f"\n\nUser's Current Portfolio Context:\n{req.portfolio_context}" if req.portfolio_context else "" messages = [ {"role": "system", "content": system_prompt + context_str} ] for h in req.history: messages.append({"role": h.role, "content": h.content}) messages.append({"role": "user", "content": req.message}) response = client.chat_completion( messages=messages, model="Qwen/Qwen2.5-72B-Instruct", max_tokens=2048, temperature=0.3 ) return {"status": "success", "response": response.choices[0].message.content.strip()} except Exception as client_err: logging.error(f"InferenceClient failed: {client_err}") return {"status": "error", "detail": f"AI temporarily unavailable: {client_err}"} except Exception as e: logging.error(f"AI Chat error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/traces") async def get_all_traces(): return tracer.traces @app.get("/api/trace/{task_id}") async def get_task_trace(task_id: str): return {"task_id": task_id, "trace": tracer.get_trace(task_id)} @app.get("/api/status/{task_id}") async def get_task_status(task_id: str, x_access_key: Optional[str] = Header(None)): if not access_manager.validate_key(x_access_key, silent=True): raise HTTPException(status_code=401, detail="Unauthorized") task = BACKGROUND_TASKS.get(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return task @app.get("/report") async def get_report(): report_path = os.path.join(OUTPUT_DIR, "portfolio_report.html") if os.path.exists(report_path): return FileResponse(report_path) raise HTTPException(status_code=404, detail="Report not generated yet.") def _alert_daemon(): """Background daemon to check for market drops.""" import time while True: try: # Wake up every 1 hour (3600 seconds) time.sleep(3600) # Simple check for SPY drops ticker = yf.Ticker("SPY") hist = ticker.history(period="2d") if len(hist) >= 2: current = float(hist['Close'].iloc[-1]) prev = float(hist['Close'].iloc[-2]) pct_change = ((current - prev) / prev) * 100 if pct_change <= -5.0: access_manager.send_telegram_alert(f"🚨 **MARKET ALERT**\nSPY has dropped by {pct_change:.2f}%!\nCheck the portfolio engine.") except Exception as e: pass # Suppress daemon errors def _keep_alive_daemon(): """Background daemon to keep the Hugging Face Space awake.""" import time import requests while True: try: # Wake up every 5 minutes (300 seconds) time.sleep(300) # Ping the public URL to keep the load balancer active requests.get("https://michaliskoustis2005-byte-portfolio-engine.hf.space/") except Exception: pass @app.on_event("startup") def startup_event(): import threading # Start the background alert daemon alert_thread = threading.Thread(target=_alert_daemon, daemon=True) alert_thread.start() # Start the keep-alive daemon keep_alive_thread = threading.Thread(target=_keep_alive_daemon, daemon=True) keep_alive_thread.start() if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)