engineportf's picture
Initial Deployment from Local Engine
208fbf8 verified
Raw
History Blame Contribute Delete
21.3 kB
import os
# Set thread limits to prevent OpenBLAS/MKL deadlock inside FastAPI threads
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
from fastapi import FastAPI, HTTPException, Request, Header
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
from typing import List, Optional
import time
import threading
import uuid
import sys
import yfinance as yf
from datetime import datetime
import traceback
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
BACKGROUND_TASKS = {}
try:
from diagnostics import TraceManager
tracer = TraceManager()
except ImportError:
tracer = None
try:
from huggingface_hub import InferenceClient
has_hf_hub = True
except ImportError:
has_hf_hub = False
try:
import core_engine
except ImportError:
core_engine = None
from config import OUTPUT_DIR
import access_manager
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
STATIC_DIR = os.path.join(BASE_DIR, "static")
app = FastAPI(title="Portfolio Engine API")
# Mount static files
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
class AuthRequest(BaseModel):
key: str
class KeyGenRequest(BaseModel):
admin_key: str
new_key: str
class PortfolioRequest(BaseModel):
tickers: List[str]
capital: float = 100000.0
risk_input: int = 5
model: int = 1
allocation_engine: int = 1
allow_shorting: bool = True
tax_enabled: bool = False
garch_enabled: bool = True
custom_constraints: Optional[List[dict]] = None
class ChatHistoryItem(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
message: str
history: List[ChatHistoryItem] = []
portfolio_context: dict
@app.get("/")
async def read_login():
return FileResponse(os.path.join(STATIC_DIR, "login.html"))
@app.get("/main")
async def read_index():
return FileResponse(os.path.join(STATIC_DIR, "index.html"))
@app.get("/admin")
async def read_admin():
return FileResponse(os.path.join(STATIC_DIR, "admin.html"))
# Simple cache for market ticker
ticker_cache = {"timestamp": 0, "data": None}
TICKER_CACHE_TTL = 300 # 5 minutes
@app.get("/api/market_ticker")
async def market_ticker():
global ticker_cache
if time.time() - ticker_cache["timestamp"] < TICKER_CACHE_TTL and ticker_cache["data"]:
return ticker_cache["data"]
symbols = {
"S&P 500": "^GSPC",
"NASDAQ": "^IXIC",
"BTC": "BTC-USD",
"GOLD": "GC=F",
"VIX": "^VIX"
}
results = []
try:
# Fetch data for all symbols at once to save time
tickers = yf.Tickers(" ".join(list(symbols.values())))
for name, symbol in symbols.items():
hist = tickers.tickers[symbol].history(period="5d")
if len(hist) >= 2:
current = float(hist['Close'].iloc[-1])
prev = float(hist['Close'].iloc[-2])
pct_change = ((current - prev) / prev) * 100
results.append({
"name": name,
"price": round(current, 2),
"change": round(pct_change, 2)
})
ticker_cache["data"] = results
ticker_cache["timestamp"] = time.time()
except Exception as e:
# Graceful fallback if yfinance fails
pass
return results or ticker_cache.get("data", [])
@app.post("/api/auth")
async def api_auth(req: AuthRequest, request: Request):
forwarded = request.headers.get("X-Forwarded-For")
ip = forwarded.split(",")[0].strip() if forwarded else (request.client.host if request.client else "Unknown")
if access_manager.validate_key(req.key, ip):
return {"status": "success", "message": "Access Granted"}
raise HTTPException(status_code=401, detail="Invalid or Expired Access Key")
class AdminKeyGenRequest(BaseModel):
admin_key: str
hours: int = 1
class AdminRevokeRequest(BaseModel):
admin_key: str
target_key: str
@app.post("/api/admin/generate")
async def admin_generate(req: AdminKeyGenRequest):
new_key = access_manager.generate_otk(req.admin_key, hours=req.hours)
if new_key:
return {"status": "success", "key": new_key, "message": f"OTK '{new_key}' created."}
raise HTTPException(status_code=401, detail="Invalid Admin Key")
@app.post("/api/admin/revoke")
async def admin_revoke(req: AdminRevokeRequest):
success = access_manager.revoke_otk(req.admin_key, req.target_key)
if success:
return {"status": "success", "message": f"Key '{req.target_key}' revoked."}
raise HTTPException(status_code=401, detail="Invalid Admin Key or Key Not Found")
@app.get("/api/admin/keys")
async def admin_list_keys(admin_key: str = Header(...)):
keys = access_manager.get_all_keys(admin_key)
if admin_key != access_manager.MASTER_KEY:
raise HTTPException(status_code=401, detail="Invalid Admin Key")
return {"keys": keys}
@app.post("/api/admin/revoke_all")
async def admin_revoke_all(req: AdminRevokeRequest):
if req.admin_key != access_manager.MASTER_KEY:
raise HTTPException(status_code=401, detail="Invalid Admin Key")
keys = access_manager.get_all_keys(req.admin_key)
count = 0
for k, v in keys.items():
if not v.get("revoked", False):
access_manager.revoke_otk(req.admin_key, k)
count += 1
return {"status": "success", "revoked_count": count}
@app.get("/api/admin/logs")
async def admin_get_logs(admin_key: str = Header(...)):
if admin_key != access_manager.MASTER_KEY:
raise HTTPException(status_code=401, detail="Invalid Admin Key")
from constants import OUTPUT_DIR
import os
from datetime import datetime, timedelta
log_file = os.path.join(OUTPUT_DIR, "access.log")
logs = []
if os.path.exists(log_file):
cutoff = datetime.now() - timedelta(hours=24)
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
if not line.strip(): continue
# Parse the datetime from the log line (e.g. "2026-06-12 22:33:45,491 - ...")
try:
dt_str = line.split(" - ")[0].split(",")[0]
log_dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
if log_dt >= cutoff:
logs.append(line.strip())
except:
# If parsing fails, just append it
logs.append(line.strip())
# Return last 500 lines max to prevent huge payloads
return {"logs": logs[-500:]}
@app.post("/api/preview")
async def preview_portfolio(req: PortfolioRequest, x_access_key: Optional[str] = Header(None)):
if not access_manager.validate_key(x_access_key):
raise HTTPException(status_code=401, detail="Unauthorized")
try:
overrides = {
'tickers': req.tickers,
'capital': req.capital,
'risk_input': req.risk_input,
'risk_factor': {1:0.1, 2:0.5, 3:1.0, 4:2.0, 5:3.0, 6:5.0, 7:7.5, 8:10.0, 9:15.0, 10:25.0}.get(req.risk_input, 3.0),
'model': req.model,
'allocation_engine': req.allocation_engine,
'single_asset_min': -1.0 if req.allow_shorting else 0.0,
'tax_enabled': req.tax_enabled,
'garch_enabled': req.garch_enabled,
'custom_constraints': req.custom_constraints
}
result = core_engine.run_engine(overrides=overrides, serve=False, preview_only=True)
return {
"status": "success",
"target_weights": result.get("target_weights", {}),
"efficient_frontier": result.get("efficient_frontier", {"vols": [], "rets": []})
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/generate")
async def generate_portfolio(req: PortfolioRequest, x_access_key: Optional[str] = Header(None)):
if not access_manager.validate_key(x_access_key):
raise HTTPException(status_code=401, detail="Unauthorized")
task_id = str(uuid.uuid4())
BACKGROUND_TASKS[task_id] = {"status": "running", "message": "Initializing...", "target_weights": {}}
def _run_optimization(tid, request):
try:
overrides = {
'tickers': request.tickers,
'capital': request.capital,
'risk_input': request.risk_input,
'risk_factor': {1:0.1, 2:0.5, 3:1.0, 4:2.0, 5:3.0, 6:5.0, 7:7.5, 8:10.0, 9:15.0, 10:25.0}.get(request.risk_input, 3.0),
'model': request.model,
'allocation_engine': request.allocation_engine,
'single_asset_min': -1.0 if request.allow_shorting else 0.0,
'tax_enabled': request.tax_enabled,
'garch_enabled': request.garch_enabled,
'custom_constraints': request.custom_constraints
}
tracer.start_trace(tid)
tracer.add_flag(tid, "TASK_INIT", f"Target Universe: {overrides.get('universe')}")
# Determine if we are acting as a Proxy (e.g., on Render) or if we should run the math locally.
is_proxy = os.getenv("RENDER") == "true" or os.getenv("IS_PROXY") == "true"
is_backend = not is_proxy
if is_backend:
tracer.add_flag(tid, "BACKEND_START", "Running compute engine locally")
# We are the backend (either HF or localhost). Run the heavy math.
result = core_engine.run_engine(overrides=overrides, serve=False, task_id=tid)
tracer.add_flag(tid, "HF_BACKEND_COMPLETE", "Math engine finished")
BACKGROUND_TASKS[tid]["status"] = "completed"
BACKGROUND_TASKS[tid]["message"] = "Report generated."
BACKGROUND_TASKS[tid]["target_weights"] = result.get("target_weights", {})
else:
import requests
hf_url = os.getenv("HF_BACKEND_URL", "https://engineportf-portfolio-opt.hf.space").rstrip('/')
# Use the master key to bypass HF API restrictions and authenticate internally
hf_key = os.getenv("HF_MASTER_KEY", "Ir_yad")
tracer.add_flag(tid, "PROXY_START", f"Forwarding to HF backend: {hf_url}")
try:
proxy_res = requests.post(
f"{hf_url}/api/generate",
json=request.model_dump() if hasattr(request, 'model_dump') else request.dict(),
headers={"X-Access-Key": hf_key},
timeout=120
)
except requests.exceptions.Timeout:
raise Exception("Hugging Face Backend timed out while queuing the optimization. This is likely due to the free tier spinning up from sleep.")
except requests.exceptions.RequestException as req_e:
raise Exception(f"Hugging Face Backend connection failed: {req_e}")
if not proxy_res.ok:
raise Exception(f"Hugging Face Backend Error ({proxy_res.status_code}) at {hf_url}/api/generate. Check HF_BACKEND_URL.")
proxy_data = proxy_res.json()
remote_task_id = proxy_data.get("task_id")
tracer.add_flag(tid, "PROXY_HANDOFF_SUCCESS", f"HF Task ID: {remote_task_id}")
if not remote_task_id:
raise Exception("Failed to get remote task ID from Hugging Face.")
import time
retries = 0
while True:
time.sleep(2)
try:
status_res = requests.get(
f"{hf_url}/api/status/{remote_task_id}",
headers={"X-Access-Key": hf_key},
timeout=15
)
retries = 0 # Reset on success
except requests.exceptions.RequestException as e:
retries += 1
if retries > 10:
tracer.add_flag(tid, "PROXY_POLL_TIMEOUT", f"Poll failed 10 times: {e}")
BACKGROUND_TASKS[tid]["status"] = "error"
BACKGROUND_TASKS[tid]["message"] = "Hugging Face Backend is unreachable (Timeout). It might have crashed or restarted."
break
continue
if status_res.ok:
s_data = status_res.json()
BACKGROUND_TASKS[tid]["status"] = s_data["status"]
BACKGROUND_TASKS[tid]["message"] = s_data["message"]
if s_data["status"] == "completed":
BACKGROUND_TASKS[tid]["target_weights"] = s_data.get("target_weights", {})
# Download the completed HTML report from HF to Render
report_res = requests.get(f"{hf_url}/report")
if report_res.ok:
report_path = os.path.join(OUTPUT_DIR, "portfolio_report.html")
os.makedirs(OUTPUT_DIR, exist_ok=True)
with open(report_path, "wb") as f:
f.write(report_res.content)
break
elif s_data["status"] == "error":
error_msg = s_data.get("message", "Unknown error from HF")
tracer.add_flag(tid, "PROXY_POLL_ERROR", error_msg)
BACKGROUND_TASKS[tid]["status"] = "error"
BACKGROUND_TASKS[tid]["message"] = error_msg
break
else:
tracer.add_flag(tid, "PROXY_CONNECTION_LOST", f"Status Code: {status_res.status_code}")
retries += 1
if retries > 10:
BACKGROUND_TASKS[tid]["status"] = "error"
BACKGROUND_TASKS[tid]["message"] = f"Lost connection to Hugging Face backend (HTTP {status_res.status_code})."
raise Exception(f"Lost connection to Hugging Face backend. (HTTP {status_res.status_code})")
continue
except Exception as e:
error_trace = traceback.format_exc()
tracer.add_flag(tid, "FATAL_ERROR", f"{str(e)}\n\nTraceback:\n{error_trace}")
logger.error(f"Optimization failed: {error_trace}")
BACKGROUND_TASKS[tid]["status"] = "error"
BACKGROUND_TASKS[tid]["message"] = f"Error: {str(e)} (Check console logs for details)"
threading.Thread(target=_run_optimization, args=(task_id, req)).start()
return {
"status": "queued",
"task_id": task_id,
"message": "Optimization started in background."
}
@app.post("/api/chat")
async def chat_with_portfolio(req: ChatRequest):
import logging
if not has_hf_hub:
raise HTTPException(status_code=500, detail="huggingface_hub is not installed on the server.")
try:
hf_token = os.environ.get("HF_TOKEN", "")
if not hf_token:
return {"status": "error", "detail": "AI is disabled. Please add 'HF_TOKEN' to your Hugging Face Space Secrets to enable the AI."}
system_prompt = (
"You are an elite quantitative analyst AI assistant built into the Portfolio Engine. "
"Your goal is to help the user analyze their portfolio, explain mathematical models, "
"and act as a highly intelligent conversational partner. "
"You can answer general finance questions, write python code, or explain concepts simply if asked. "
"Never give explicit financial advice (e.g. 'You must buy this stock'). "
"Be helpful, natural, and adapt to the user's tone. "
"DO NOT blindly recite or summarize the portfolio context unless the user specifically asks a question about their portfolio. "
"If they just say 'hello' or make a general statement, greet them naturally without mentioning the data."
)
# Use direct REST API to bypass strict huggingface_hub token scope validation
import requests
try:
from huggingface_hub import InferenceClient
client = InferenceClient(token=hf_token)
context_str = f"\n\nUser's Current Portfolio Context:\n{req.portfolio_context}" if req.portfolio_context else ""
messages = [
{"role": "system", "content": system_prompt + context_str}
]
for h in req.history:
messages.append({"role": h.role, "content": h.content})
messages.append({"role": "user", "content": req.message})
response = client.chat_completion(
messages=messages,
model="Qwen/Qwen2.5-72B-Instruct",
max_tokens=2048,
temperature=0.3
)
return {"status": "success", "response": response.choices[0].message.content.strip()}
except Exception as client_err:
logging.error(f"InferenceClient failed: {client_err}")
return {"status": "error", "detail": f"AI temporarily unavailable: {client_err}"}
except Exception as e:
logging.error(f"AI Chat error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/traces")
async def get_all_traces():
return tracer.traces
@app.get("/api/trace/{task_id}")
async def get_task_trace(task_id: str):
return {"task_id": task_id, "trace": tracer.get_trace(task_id)}
@app.get("/api/status/{task_id}")
async def get_task_status(task_id: str, x_access_key: Optional[str] = Header(None)):
if not access_manager.validate_key(x_access_key, silent=True):
raise HTTPException(status_code=401, detail="Unauthorized")
task = BACKGROUND_TASKS.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.get("/report")
async def get_report():
report_path = os.path.join(OUTPUT_DIR, "portfolio_report.html")
if os.path.exists(report_path):
return FileResponse(report_path)
raise HTTPException(status_code=404, detail="Report not generated yet.")
def _alert_daemon():
"""Background daemon to check for market drops."""
import time
while True:
try:
# Wake up every 1 hour (3600 seconds)
time.sleep(3600)
# Simple check for SPY drops
ticker = yf.Ticker("SPY")
hist = ticker.history(period="2d")
if len(hist) >= 2:
current = float(hist['Close'].iloc[-1])
prev = float(hist['Close'].iloc[-2])
pct_change = ((current - prev) / prev) * 100
if pct_change <= -5.0:
access_manager.send_telegram_alert(f"🚨 **MARKET ALERT**\nSPY has dropped by {pct_change:.2f}%!\nCheck the portfolio engine.")
except Exception as e:
pass # Suppress daemon errors
def _keep_alive_daemon():
"""Background daemon to keep the Hugging Face Space awake."""
import time
import requests
while True:
try:
# Wake up every 5 minutes (300 seconds)
time.sleep(300)
# Ping the public URL to keep the load balancer active
requests.get("https://michaliskoustis2005-byte-portfolio-engine.hf.space/")
except Exception:
pass
@app.on_event("startup")
def startup_event():
import threading
# Start the background alert daemon
alert_thread = threading.Thread(target=_alert_daemon, daemon=True)
alert_thread.start()
# Start the keep-alive daemon
keep_alive_thread = threading.Thread(target=_keep_alive_daemon, daemon=True)
keep_alive_thread.start()
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)