Spaces:
Sleeping
Sleeping
| import os | |
| # Set thread limits to prevent OpenBLAS/MKL deadlock inside FastAPI threads | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| os.environ["OPENBLAS_NUM_THREADS"] = "1" | |
| os.environ["MKL_NUM_THREADS"] = "1" | |
| os.environ["VECLIB_MAXIMUM_THREADS"] = "1" | |
| os.environ["NUMEXPR_NUM_THREADS"] = "1" | |
| from fastapi import FastAPI, HTTPException, Request, Header | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import time | |
| import threading | |
| import uuid | |
| import sys | |
| import yfinance as yf | |
| from datetime import datetime | |
| import traceback | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| pass | |
| BACKGROUND_TASKS = {} | |
| try: | |
| from diagnostics import TraceManager | |
| tracer = TraceManager() | |
| except ImportError: | |
| tracer = None | |
| try: | |
| from huggingface_hub import InferenceClient | |
| has_hf_hub = True | |
| except ImportError: | |
| has_hf_hub = False | |
| try: | |
| import core_engine | |
| except ImportError: | |
| core_engine = None | |
| from config import OUTPUT_DIR | |
| import access_manager | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| STATIC_DIR = os.path.join(BASE_DIR, "static") | |
| app = FastAPI(title="Portfolio Engine API") | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
| class AuthRequest(BaseModel): | |
| key: str | |
| class KeyGenRequest(BaseModel): | |
| admin_key: str | |
| new_key: str | |
| class PortfolioRequest(BaseModel): | |
| tickers: List[str] | |
| capital: float = 100000.0 | |
| risk_input: int = 5 | |
| model: int = 1 | |
| allocation_engine: int = 1 | |
| allow_shorting: bool = True | |
| tax_enabled: bool = False | |
| garch_enabled: bool = True | |
| custom_constraints: Optional[List[dict]] = None | |
| class ChatHistoryItem(BaseModel): | |
| role: str | |
| content: str | |
| class ChatRequest(BaseModel): | |
| message: str | |
| history: List[ChatHistoryItem] = [] | |
| portfolio_context: dict | |
| async def read_login(): | |
| return FileResponse(os.path.join(STATIC_DIR, "login.html")) | |
| async def read_index(): | |
| return FileResponse(os.path.join(STATIC_DIR, "index.html")) | |
| async def read_admin(): | |
| return FileResponse(os.path.join(STATIC_DIR, "admin.html")) | |
| # Simple cache for market ticker | |
| ticker_cache = {"timestamp": 0, "data": None} | |
| TICKER_CACHE_TTL = 300 # 5 minutes | |
| async def market_ticker(): | |
| global ticker_cache | |
| if time.time() - ticker_cache["timestamp"] < TICKER_CACHE_TTL and ticker_cache["data"]: | |
| return ticker_cache["data"] | |
| symbols = { | |
| "S&P 500": "^GSPC", | |
| "NASDAQ": "^IXIC", | |
| "BTC": "BTC-USD", | |
| "GOLD": "GC=F", | |
| "VIX": "^VIX" | |
| } | |
| results = [] | |
| try: | |
| # Fetch data for all symbols at once to save time | |
| tickers = yf.Tickers(" ".join(list(symbols.values()))) | |
| for name, symbol in symbols.items(): | |
| hist = tickers.tickers[symbol].history(period="5d") | |
| if len(hist) >= 2: | |
| current = float(hist['Close'].iloc[-1]) | |
| prev = float(hist['Close'].iloc[-2]) | |
| pct_change = ((current - prev) / prev) * 100 | |
| results.append({ | |
| "name": name, | |
| "price": round(current, 2), | |
| "change": round(pct_change, 2) | |
| }) | |
| ticker_cache["data"] = results | |
| ticker_cache["timestamp"] = time.time() | |
| except Exception as e: | |
| # Graceful fallback if yfinance fails | |
| pass | |
| return results or ticker_cache.get("data", []) | |
| async def api_auth(req: AuthRequest, request: Request): | |
| forwarded = request.headers.get("X-Forwarded-For") | |
| ip = forwarded.split(",")[0].strip() if forwarded else (request.client.host if request.client else "Unknown") | |
| if access_manager.validate_key(req.key, ip): | |
| return {"status": "success", "message": "Access Granted"} | |
| raise HTTPException(status_code=401, detail="Invalid or Expired Access Key") | |
| class AdminKeyGenRequest(BaseModel): | |
| admin_key: str | |
| hours: int = 1 | |
| class AdminRevokeRequest(BaseModel): | |
| admin_key: str | |
| target_key: str | |
| async def admin_generate(req: AdminKeyGenRequest): | |
| new_key = access_manager.generate_otk(req.admin_key, hours=req.hours) | |
| if new_key: | |
| return {"status": "success", "key": new_key, "message": f"OTK '{new_key}' created."} | |
| raise HTTPException(status_code=401, detail="Invalid Admin Key") | |
| async def admin_revoke(req: AdminRevokeRequest): | |
| success = access_manager.revoke_otk(req.admin_key, req.target_key) | |
| if success: | |
| return {"status": "success", "message": f"Key '{req.target_key}' revoked."} | |
| raise HTTPException(status_code=401, detail="Invalid Admin Key or Key Not Found") | |
| async def admin_list_keys(admin_key: str = Header(...)): | |
| keys = access_manager.get_all_keys(admin_key) | |
| if admin_key != access_manager.MASTER_KEY: | |
| raise HTTPException(status_code=401, detail="Invalid Admin Key") | |
| return {"keys": keys} | |
| async def admin_revoke_all(req: AdminRevokeRequest): | |
| if req.admin_key != access_manager.MASTER_KEY: | |
| raise HTTPException(status_code=401, detail="Invalid Admin Key") | |
| keys = access_manager.get_all_keys(req.admin_key) | |
| count = 0 | |
| for k, v in keys.items(): | |
| if not v.get("revoked", False): | |
| access_manager.revoke_otk(req.admin_key, k) | |
| count += 1 | |
| return {"status": "success", "revoked_count": count} | |
| async def admin_get_logs(admin_key: str = Header(...)): | |
| if admin_key != access_manager.MASTER_KEY: | |
| raise HTTPException(status_code=401, detail="Invalid Admin Key") | |
| from constants import OUTPUT_DIR | |
| import os | |
| from datetime import datetime, timedelta | |
| log_file = os.path.join(OUTPUT_DIR, "access.log") | |
| logs = [] | |
| if os.path.exists(log_file): | |
| cutoff = datetime.now() - timedelta(hours=24) | |
| with open(log_file, "r", encoding="utf-8") as f: | |
| for line in f: | |
| if not line.strip(): continue | |
| # Parse the datetime from the log line (e.g. "2026-06-12 22:33:45,491 - ...") | |
| try: | |
| dt_str = line.split(" - ")[0].split(",")[0] | |
| log_dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") | |
| if log_dt >= cutoff: | |
| logs.append(line.strip()) | |
| except: | |
| # If parsing fails, just append it | |
| logs.append(line.strip()) | |
| # Return last 500 lines max to prevent huge payloads | |
| return {"logs": logs[-500:]} | |
| async def preview_portfolio(req: PortfolioRequest, x_access_key: Optional[str] = Header(None)): | |
| if not access_manager.validate_key(x_access_key): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| try: | |
| overrides = { | |
| 'tickers': req.tickers, | |
| 'capital': req.capital, | |
| 'risk_input': req.risk_input, | |
| 'risk_factor': {1:0.1, 2:0.5, 3:1.0, 4:2.0, 5:3.0, 6:5.0, 7:7.5, 8:10.0, 9:15.0, 10:25.0}.get(req.risk_input, 3.0), | |
| 'model': req.model, | |
| 'allocation_engine': req.allocation_engine, | |
| 'single_asset_min': -1.0 if req.allow_shorting else 0.0, | |
| 'tax_enabled': req.tax_enabled, | |
| 'garch_enabled': req.garch_enabled, | |
| 'custom_constraints': req.custom_constraints | |
| } | |
| result = core_engine.run_engine(overrides=overrides, serve=False, preview_only=True) | |
| return { | |
| "status": "success", | |
| "target_weights": result.get("target_weights", {}), | |
| "efficient_frontier": result.get("efficient_frontier", {"vols": [], "rets": []}) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def generate_portfolio(req: PortfolioRequest, x_access_key: Optional[str] = Header(None)): | |
| if not access_manager.validate_key(x_access_key): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| task_id = str(uuid.uuid4()) | |
| BACKGROUND_TASKS[task_id] = {"status": "running", "message": "Initializing...", "target_weights": {}} | |
| def _run_optimization(tid, request): | |
| try: | |
| overrides = { | |
| 'tickers': request.tickers, | |
| 'capital': request.capital, | |
| 'risk_input': request.risk_input, | |
| 'risk_factor': {1:0.1, 2:0.5, 3:1.0, 4:2.0, 5:3.0, 6:5.0, 7:7.5, 8:10.0, 9:15.0, 10:25.0}.get(request.risk_input, 3.0), | |
| 'model': request.model, | |
| 'allocation_engine': request.allocation_engine, | |
| 'single_asset_min': -1.0 if request.allow_shorting else 0.0, | |
| 'tax_enabled': request.tax_enabled, | |
| 'garch_enabled': request.garch_enabled, | |
| 'custom_constraints': request.custom_constraints | |
| } | |
| tracer.start_trace(tid) | |
| tracer.add_flag(tid, "TASK_INIT", f"Target Universe: {overrides.get('universe')}") | |
| # Determine if we are acting as a Proxy (e.g., on Render) or if we should run the math locally. | |
| is_proxy = os.getenv("RENDER") == "true" or os.getenv("IS_PROXY") == "true" | |
| is_backend = not is_proxy | |
| if is_backend: | |
| tracer.add_flag(tid, "BACKEND_START", "Running compute engine locally") | |
| # We are the backend (either HF or localhost). Run the heavy math. | |
| result = core_engine.run_engine(overrides=overrides, serve=False, task_id=tid) | |
| tracer.add_flag(tid, "HF_BACKEND_COMPLETE", "Math engine finished") | |
| BACKGROUND_TASKS[tid]["status"] = "completed" | |
| BACKGROUND_TASKS[tid]["message"] = "Report generated." | |
| BACKGROUND_TASKS[tid]["target_weights"] = result.get("target_weights", {}) | |
| else: | |
| import requests | |
| hf_url = os.getenv("HF_BACKEND_URL", "https://engineportf-portfolio-opt.hf.space").rstrip('/') | |
| # Use the master key to bypass HF API restrictions and authenticate internally | |
| hf_key = os.getenv("HF_MASTER_KEY", "Ir_yad") | |
| tracer.add_flag(tid, "PROXY_START", f"Forwarding to HF backend: {hf_url}") | |
| try: | |
| proxy_res = requests.post( | |
| f"{hf_url}/api/generate", | |
| json=request.model_dump() if hasattr(request, 'model_dump') else request.dict(), | |
| headers={"X-Access-Key": hf_key}, | |
| timeout=120 | |
| ) | |
| except requests.exceptions.Timeout: | |
| raise Exception("Hugging Face Backend timed out while queuing the optimization. This is likely due to the free tier spinning up from sleep.") | |
| except requests.exceptions.RequestException as req_e: | |
| raise Exception(f"Hugging Face Backend connection failed: {req_e}") | |
| if not proxy_res.ok: | |
| raise Exception(f"Hugging Face Backend Error ({proxy_res.status_code}) at {hf_url}/api/generate. Check HF_BACKEND_URL.") | |
| proxy_data = proxy_res.json() | |
| remote_task_id = proxy_data.get("task_id") | |
| tracer.add_flag(tid, "PROXY_HANDOFF_SUCCESS", f"HF Task ID: {remote_task_id}") | |
| if not remote_task_id: | |
| raise Exception("Failed to get remote task ID from Hugging Face.") | |
| import time | |
| retries = 0 | |
| while True: | |
| time.sleep(2) | |
| try: | |
| status_res = requests.get( | |
| f"{hf_url}/api/status/{remote_task_id}", | |
| headers={"X-Access-Key": hf_key}, | |
| timeout=15 | |
| ) | |
| retries = 0 # Reset on success | |
| except requests.exceptions.RequestException as e: | |
| retries += 1 | |
| if retries > 10: | |
| tracer.add_flag(tid, "PROXY_POLL_TIMEOUT", f"Poll failed 10 times: {e}") | |
| BACKGROUND_TASKS[tid]["status"] = "error" | |
| BACKGROUND_TASKS[tid]["message"] = "Hugging Face Backend is unreachable (Timeout). It might have crashed or restarted." | |
| break | |
| continue | |
| if status_res.ok: | |
| s_data = status_res.json() | |
| BACKGROUND_TASKS[tid]["status"] = s_data["status"] | |
| BACKGROUND_TASKS[tid]["message"] = s_data["message"] | |
| if s_data["status"] == "completed": | |
| BACKGROUND_TASKS[tid]["target_weights"] = s_data.get("target_weights", {}) | |
| # Download the completed HTML report from HF to Render | |
| report_res = requests.get(f"{hf_url}/report") | |
| if report_res.ok: | |
| report_path = os.path.join(OUTPUT_DIR, "portfolio_report.html") | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| with open(report_path, "wb") as f: | |
| f.write(report_res.content) | |
| break | |
| elif s_data["status"] == "error": | |
| error_msg = s_data.get("message", "Unknown error from HF") | |
| tracer.add_flag(tid, "PROXY_POLL_ERROR", error_msg) | |
| BACKGROUND_TASKS[tid]["status"] = "error" | |
| BACKGROUND_TASKS[tid]["message"] = error_msg | |
| break | |
| else: | |
| tracer.add_flag(tid, "PROXY_CONNECTION_LOST", f"Status Code: {status_res.status_code}") | |
| retries += 1 | |
| if retries > 10: | |
| BACKGROUND_TASKS[tid]["status"] = "error" | |
| BACKGROUND_TASKS[tid]["message"] = f"Lost connection to Hugging Face backend (HTTP {status_res.status_code})." | |
| raise Exception(f"Lost connection to Hugging Face backend. (HTTP {status_res.status_code})") | |
| continue | |
| except Exception as e: | |
| error_trace = traceback.format_exc() | |
| tracer.add_flag(tid, "FATAL_ERROR", f"{str(e)}\n\nTraceback:\n{error_trace}") | |
| logger.error(f"Optimization failed: {error_trace}") | |
| BACKGROUND_TASKS[tid]["status"] = "error" | |
| BACKGROUND_TASKS[tid]["message"] = f"Error: {str(e)} (Check console logs for details)" | |
| threading.Thread(target=_run_optimization, args=(task_id, req)).start() | |
| return { | |
| "status": "queued", | |
| "task_id": task_id, | |
| "message": "Optimization started in background." | |
| } | |
| async def chat_with_portfolio(req: ChatRequest): | |
| import logging | |
| if not has_hf_hub: | |
| raise HTTPException(status_code=500, detail="huggingface_hub is not installed on the server.") | |
| try: | |
| hf_token = os.environ.get("HF_TOKEN", "") | |
| if not hf_token: | |
| return {"status": "error", "detail": "AI is disabled. Please add 'HF_TOKEN' to your Hugging Face Space Secrets to enable the AI."} | |
| system_prompt = ( | |
| "You are an elite quantitative analyst AI assistant built into the Portfolio Engine. " | |
| "Your goal is to help the user analyze their portfolio, explain mathematical models, " | |
| "and act as a highly intelligent conversational partner. " | |
| "You can answer general finance questions, write python code, or explain concepts simply if asked. " | |
| "Never give explicit financial advice (e.g. 'You must buy this stock'). " | |
| "Be helpful, natural, and adapt to the user's tone. " | |
| "DO NOT blindly recite or summarize the portfolio context unless the user specifically asks a question about their portfolio. " | |
| "If they just say 'hello' or make a general statement, greet them naturally without mentioning the data." | |
| ) | |
| # Use direct REST API to bypass strict huggingface_hub token scope validation | |
| import requests | |
| try: | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient(token=hf_token) | |
| context_str = f"\n\nUser's Current Portfolio Context:\n{req.portfolio_context}" if req.portfolio_context else "" | |
| messages = [ | |
| {"role": "system", "content": system_prompt + context_str} | |
| ] | |
| for h in req.history: | |
| messages.append({"role": h.role, "content": h.content}) | |
| messages.append({"role": "user", "content": req.message}) | |
| response = client.chat_completion( | |
| messages=messages, | |
| model="Qwen/Qwen2.5-72B-Instruct", | |
| max_tokens=2048, | |
| temperature=0.3 | |
| ) | |
| return {"status": "success", "response": response.choices[0].message.content.strip()} | |
| except Exception as client_err: | |
| logging.error(f"InferenceClient failed: {client_err}") | |
| return {"status": "error", "detail": f"AI temporarily unavailable: {client_err}"} | |
| except Exception as e: | |
| logging.error(f"AI Chat error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_all_traces(): | |
| return tracer.traces | |
| async def get_task_trace(task_id: str): | |
| return {"task_id": task_id, "trace": tracer.get_trace(task_id)} | |
| async def get_task_status(task_id: str, x_access_key: Optional[str] = Header(None)): | |
| if not access_manager.validate_key(x_access_key, silent=True): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| task = BACKGROUND_TASKS.get(task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| return task | |
| async def get_report(): | |
| report_path = os.path.join(OUTPUT_DIR, "portfolio_report.html") | |
| if os.path.exists(report_path): | |
| return FileResponse(report_path) | |
| raise HTTPException(status_code=404, detail="Report not generated yet.") | |
| def _alert_daemon(): | |
| """Background daemon to check for market drops.""" | |
| import time | |
| while True: | |
| try: | |
| # Wake up every 1 hour (3600 seconds) | |
| time.sleep(3600) | |
| # Simple check for SPY drops | |
| ticker = yf.Ticker("SPY") | |
| hist = ticker.history(period="2d") | |
| if len(hist) >= 2: | |
| current = float(hist['Close'].iloc[-1]) | |
| prev = float(hist['Close'].iloc[-2]) | |
| pct_change = ((current - prev) / prev) * 100 | |
| if pct_change <= -5.0: | |
| access_manager.send_telegram_alert(f"🚨 **MARKET ALERT**\nSPY has dropped by {pct_change:.2f}%!\nCheck the portfolio engine.") | |
| except Exception as e: | |
| pass # Suppress daemon errors | |
| def _keep_alive_daemon(): | |
| """Background daemon to keep the Hugging Face Space awake.""" | |
| import time | |
| import requests | |
| while True: | |
| try: | |
| # Wake up every 5 minutes (300 seconds) | |
| time.sleep(300) | |
| # Ping the public URL to keep the load balancer active | |
| requests.get("https://michaliskoustis2005-byte-portfolio-engine.hf.space/") | |
| except Exception: | |
| pass | |
| def startup_event(): | |
| import threading | |
| # Start the background alert daemon | |
| alert_thread = threading.Thread(target=_alert_daemon, daemon=True) | |
| alert_thread.start() | |
| # Start the keep-alive daemon | |
| keep_alive_thread = threading.Thread(target=_keep_alive_daemon, daemon=True) | |
| keep_alive_thread.start() | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) | |