VN30 / app.py
Mrlongpro's picture
Update app.py
1c66561 verified
import os
import time
import threading
from contextlib import suppress
from dataclasses import dataclass, asdict
import importlib
import importlib.util
import json
from typing import Any, Dict, List, Optional
import pandas as pd
import requests
from fastapi import FastAPI, HTTPException
app = FastAPI(title="VN30F1M Trading Forecast API", version="2.0.0")
URL = "https://histdatafeed.vps.com.vn/tradingview/history"
SYMBOL = os.getenv("SYMBOL", "VN30F1M")
RESOLUTION = os.getenv("RESOLUTION", "1")
COUNTBACK = int(os.getenv("COUNTBACK", "330"))
UPDATE_SECONDS = int(os.getenv("UPDATE_SECONDS", "10"))
MIN_BARS = int(os.getenv("MIN_BARS", "50"))
CONTRACT_MULTIPLIER = float(os.getenv("CONTRACT_MULTIPLIER", "100000"))
MARGIN_RATE = float(os.getenv("MARGIN_RATE", "0.13"))
MAX_RISK_PCT = float(os.getenv("MAX_RISK_PCT", "0.01"))
HF_API_TOKEN = os.getenv("HF_API_TOKEN", "")
HF_MODEL_ID = os.getenv("HF_MODEL_ID", "HuggingFaceH4/zephyr-7b-beta")
HF_MODEL_CANDIDATES = [
m.strip()
for m in os.getenv(
"HF_MODEL_CANDIDATES",
"HuggingFaceH4/zephyr-7b-beta,google/flan-t5-base,tiiuae/falcon-7b-instruct",
).split(",")
if m.strip()
]
HF_INFERENCE_BASES = [
"https://router.huggingface.co/hf-inference/models",
"https://api-inference.huggingface.co/models",
]
USE_LOCAL_HF_MODEL = os.getenv("USE_LOCAL_HF_MODEL", "0").lower() in {"1", "true", "yes"}
HF_LOCAL_MODEL_ID = os.getenv("HF_LOCAL_MODEL_ID", HF_MODEL_ID)
AI_DEBUG_ERRORS = os.getenv("AI_DEBUG_ERRORS", "0").lower() in {"1", "true", "yes"}
KEEPALIVE_EXTERNAL_URL = os.getenv("KEEPALIVE_EXTERNAL_URL", "https://mrlongpro-vn30.hf.space/ping")
@dataclass
class ForecastState:
signal: str = "WAIT"
price: float = 0.0
confidence: float = 0.0
score: float = 0.0
reason: str = "System warming up"
updated_at: int = 0
state = ForecastState()
state_lock = threading.Lock()
http = requests.Session()
update_thread: Optional[threading.Thread] = None
keepalive_thread: Optional[threading.Thread] = None
local_model_lock = threading.Lock()
local_tokenizer = None
local_model = None
# ===== INDICATORS =====
def rsi(series: pd.Series, period: int = 14) -> pd.Series:
delta = series.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1 / period, min_periods=period, adjust=False).mean()
rs = avg_gain / avg_loss.replace(0, pd.NA)
return 100 - (100 / (1 + rs))
def build_features(data: Dict[str, List[float]]) -> pd.DataFrame:
df = pd.DataFrame(
{
"time": data["t"],
"open": data["o"],
"high": data["h"],
"low": data["l"],
"close": data["c"],
"volume": data["v"],
}
)
df["ret_1"] = df["close"].pct_change()
df["ma20"] = df["close"].rolling(20).mean()
df["ma50"] = df["close"].rolling(50).mean()
df["std20"] = df["close"].rolling(20).std()
df["zscore20"] = (df["close"] - df["ma20"]) / df["std20"].replace(0, pd.NA)
df["rsi14"] = rsi(df["close"], 14)
df["mom10"] = df["close"].pct_change(10)
tr = pd.concat(
[
(df["high"] - df["low"]).abs(),
(df["high"] - df["close"].shift()).abs(),
(df["low"] - df["close"].shift()).abs(),
],
axis=1,
).max(axis=1)
df["atr14"] = tr.rolling(14).mean()
df["volatility"] = df["ret_1"].rolling(20).std()
return df
# ===== DATA FETCH =====
def fetch_data(retries: int = 4, min_bars: int = MIN_BARS) -> Optional[Dict[str, Any]]:
now = int(time.time())
for i in range(retries):
try:
params = {
"symbol": SYMBOL,
"resolution": RESOLUTION,
"from": now - 3600 * 12,
"to": now,
"countback": COUNTBACK,
}
res = http.get(URL, params=params, timeout=8)
res.raise_for_status()
data = res.json()
required_keys = {"t", "o", "h", "l", "c", "v"}
if not required_keys.issubset(data):
raise ValueError("Response missing OHLCV keys")
n = len(data["c"])
if n < min_bars:
raise ValueError(f"Not enough candles ({n}/{min_bars})")
return data
except Exception as exc:
wait = 1.5**i
print(f"[fetch retry {i + 1}/{retries}] {exc}; wait={wait:.1f}s")
time.sleep(wait)
return None
# ===== FORECAST ENGINE =====
def compute_signal(data: Dict[str, List[float]]) -> Dict[str, Any]:
df = build_features(data)
last = df.iloc[-1]
if len(df) < 50:
return {
"signal": "WAIT",
"price": float(last["close"]),
"confidence": 0.0,
"score": 0.0,
"reason": f"Warming up: need >=50 candles, got {len(df)}",
"risk": None,
}
cols = ["ma20", "ma50", "zscore20", "rsi14", "mom10", "atr14", "volatility"]
if last[cols].isna().any():
return {
"signal": "WAIT",
"price": float(last["close"]),
"confidence": 0.0,
"score": 0.0,
"reason": "Indicators not ready",
"risk": None,
}
trend_score = 1 if last["close"] > last["ma20"] else -1
structure_score = 1 if last["ma20"] > last["ma50"] else -1
momentum_score = 1 if last["mom10"] > 0 else -1
if last["rsi14"] < 30:
rsi_score = 1
elif last["rsi14"] > 70:
rsi_score = -1
else:
rsi_score = 0
z_score = 1 if last["zscore20"] < -1 else (-1 if last["zscore20"] > 1 else 0)
raw_score = (
0.35 * trend_score
+ 0.25 * structure_score
+ 0.2 * momentum_score
+ 0.1 * rsi_score
+ 0.1 * z_score
)
confidence = min(1.0, abs(raw_score))
if last["volatility"] > 0.02:
signal = "HOLD"
reason = "Volatility too high, avoid new position"
elif abs(raw_score) < 0.25:
signal = "HOLD"
reason = "Conflicting signals"
elif raw_score > 0:
signal = "BUY"
reason = "Trend and momentum are supportive"
else:
signal = "SELL"
reason = "Trend and momentum are weakening"
atr = float(last["atr14"])
price = float(last["close"])
return {
"signal": signal,
"price": price,
"confidence": round(confidence, 4),
"score": round(float(raw_score), 4),
"reason": reason,
"risk": {
"stop_loss": round(price - 1.5 * atr, 2) if signal == "BUY" else round(price + 1.5 * atr, 2),
"take_profit": round(price + 2.5 * atr, 2) if signal == "BUY" else round(price - 2.5 * atr, 2),
"atr": round(atr, 4),
"volatility": round(float(last["volatility"]), 6),
},
}
def recommend_position_size(price: float, atr: float, account_balance: float) -> Dict[str, Any]:
"""
Position sizing for VN30F1M-style futures:
- risk per contract ~= stop distance(points) * contract multiplier
- margin per contract ~= price * contract multiplier * margin rate
"""
stop_points = max(atr * 1.5, 1e-6)
risk_budget = max(account_balance * MAX_RISK_PCT, 0.0)
risk_per_contract = stop_points * CONTRACT_MULTIPLIER
margin_per_contract = price * CONTRACT_MULTIPLIER * MARGIN_RATE
by_risk = int(risk_budget // risk_per_contract) if risk_per_contract > 0 else 0
by_margin = int(account_balance // margin_per_contract) if margin_per_contract > 0 else 0
contracts = max(min(by_risk, by_margin), 0)
return {
"contracts": contracts,
"risk_budget": round(risk_budget, 2),
"risk_per_contract": round(risk_per_contract, 2),
"margin_per_contract": round(margin_per_contract, 2),
"max_by_risk": by_risk,
"max_by_margin": by_margin,
}
def futures_pnl(side: str, entry: float, exit_price: float, contracts: int = 1) -> Dict[str, Any]:
points = (exit_price - entry) if side.upper() == "LONG" else (entry - exit_price)
gross_pnl = points * CONTRACT_MULTIPLIER * max(contracts, 0)
roi_on_margin = None
est_margin = entry * CONTRACT_MULTIPLIER * MARGIN_RATE * max(contracts, 0)
if est_margin > 0:
roi_on_margin = round((gross_pnl / est_margin) * 100, 2)
return {
"side": side.upper(),
"entry": entry,
"exit": exit_price,
"points": round(points, 2),
"contracts": contracts,
"gross_pnl_vnd": round(gross_pnl, 2),
"estimated_margin_vnd": round(est_margin, 2),
"roi_on_margin_pct": roi_on_margin,
}
def build_market_snapshot(data: Dict[str, List[float]]) -> Dict[str, Any]:
df = build_features(data)
last = df.iloc[-1]
recent = df.tail(30)
return {
"price": round(float(last["close"]), 2),
"ma20": round(float(last["ma20"]), 2) if pd.notna(last["ma20"]) else None,
"ma50": round(float(last["ma50"]), 2) if pd.notna(last["ma50"]) else None,
"rsi14": round(float(last["rsi14"]), 2) if pd.notna(last["rsi14"]) else None,
"mom10": round(float(last["mom10"]), 6) if pd.notna(last["mom10"]) else None,
"volatility20": round(float(last["volatility"]), 6) if pd.notna(last["volatility"]) else None,
"return_30bars_pct": round(float((recent["close"].iloc[-1] / recent["close"].iloc[0] - 1) * 100), 3)
if len(recent) >= 2
else None,
"high_30": round(float(recent["high"].max()), 2),
"low_30": round(float(recent["low"].min()), 2),
}
def _parse_hf_text(body: Any) -> str:
if isinstance(body, list) and body and "generated_text" in body[0]:
return str(body[0]["generated_text"]).strip()
if isinstance(body, list) and body and "summary_text" in body[0]:
return str(body[0]["summary_text"]).strip()
if isinstance(body, dict) and "generated_text" in body:
return str(body["generated_text"]).strip()
return str(body)
def _ensure_local_hf_model() -> Dict[str, Any]:
global local_tokenizer, local_model
if importlib.util.find_spec("transformers") is None:
return {
"ok": False,
"reason": "transformers chưa được cài trong môi trường hiện tại.",
}
with local_model_lock:
if local_tokenizer is not None and local_model is not None:
return {"ok": True, "reason": "cached"}
transformers = importlib.import_module("transformers")
AutoTokenizer = transformers.AutoTokenizer
AutoModelForCausalLM = transformers.AutoModelForCausalLM
try:
local_tokenizer = AutoTokenizer.from_pretrained(HF_LOCAL_MODEL_ID)
local_model = AutoModelForCausalLM.from_pretrained(HF_LOCAL_MODEL_ID)
return {"ok": True, "reason": "loaded"}
except Exception as exc:
local_tokenizer = None
local_model = None
return {"ok": False, "reason": f"Không load được local model: {exc}"}
def query_local_hf_recommendation(snapshot: Dict[str, Any], baseline: Dict[str, Any]) -> Dict[str, Any]:
status = _ensure_local_hf_model()
if not status["ok"]:
return {"source": "local-fallback", "model": HF_LOCAL_MODEL_ID, "text": status["reason"]}
messages = [
{
"role": "user",
"content": (
"Bạn là trợ lý giao dịch phái sinh VN30F1M. "
"Hãy trả JSON ngắn gồm recommendation (BUY/SELL/HOLD), confidence (0-1), rationale, risk_note.\n"
f"snapshot={snapshot}\n"
f"baseline={baseline}\n"
),
}
]
with local_model_lock:
inputs = local_tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
).to(local_model.device)
outputs = local_model.generate(**inputs, max_new_tokens=120)
text = local_tokenizer.decode(outputs[0][inputs["input_ids"].shape[-1] :], skip_special_tokens=True)
return {"source": "huggingface-local", "model": HF_LOCAL_MODEL_ID, "text": text.strip()}
def query_huggingface_recommendation(snapshot: Dict[str, Any], baseline: Dict[str, Any]) -> Dict[str, Any]:
if USE_LOCAL_HF_MODEL:
local_result = query_local_hf_recommendation(snapshot=snapshot, baseline=baseline)
if local_result["source"] != "local-fallback":
return local_result
headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {}
prompt = (
"Bạn là trợ lý giao dịch phái sinh VN30F1M. "
"Dựa trên snapshot thị trường và baseline signal, hãy trả về JSON ngắn gồm: "
"recommendation (BUY/SELL/HOLD), confidence (0-1), rationale (<=30 từ), risk_note.\n"
f"snapshot={snapshot}\n"
f"baseline={baseline}\n"
)
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": 180, "return_full_text": False, "temperature": 0.2},
}
errors: List[str] = []
for model_id in [HF_MODEL_ID] + [m for m in HF_MODEL_CANDIDATES if m != HF_MODEL_ID]:
for base in HF_INFERENCE_BASES:
endpoint = f"{base}/{model_id}"
try:
res = http.post(endpoint, headers=headers, json=payload, timeout=25)
if res.status_code in {404, 410, 503}:
errors.append(f"{model_id}@{base}: HTTP {res.status_code}")
continue
res.raise_for_status()
body = res.json()
text = _parse_hf_text(body)
return {"source": "huggingface", "model": model_id, "endpoint": base, "text": text}
except Exception as exc:
errors.append(f"{model_id}@{base}: {exc}")
continue
fallback_signal = baseline.get("signal", "HOLD")
fallback_conf = float(baseline.get("confidence", 0.0))
risk = baseline.get("risk") or {}
atr = risk.get("atr")
volatility = risk.get("volatility", snapshot.get("volatility20"))
trend = "uptrend" if (snapshot.get("ma20") or 0) >= (snapshot.get("ma50") or 0) else "downtrend"
risk_note = (
f"ATR={atr}, volatility={volatility}. Ưu tiên 1/2 khối lượng chuẩn, luôn đặt SL/TP."
if atr is not None
else "Giảm khối lượng và luôn đặt stop-loss."
)
result: Dict[str, Any] = {
"source": "rule-based-ai",
"model": "internal-fallback",
"recommendation": fallback_signal,
"confidence": round(max(0.35, fallback_conf), 2),
"rationale": f"Rule-AI fallback: {trend}, baseline={fallback_signal}.",
"risk_note": risk_note,
"text": "rule-based fallback",
}
if AI_DEBUG_ERRORS:
result["error_details"] = errors
return result
def normalize_ai_payload(ai_result: Dict[str, Any]) -> Dict[str, Any]:
"""
Ensure AI output is consistently structured (object fields), not only raw text.
"""
if {"recommendation", "confidence", "rationale", "risk_note"}.issubset(ai_result.keys()):
return ai_result
text = ai_result.get("text")
if isinstance(text, str):
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return {**ai_result, **parsed}
except Exception:
pass
return ai_result
# ===== HEARTBEAT =====
def update_loop() -> None:
while True:
try:
data = fetch_data(min_bars=10)
if data is None:
raise RuntimeError("Cannot fetch data from feed")
fc = compute_signal(data)
with state_lock:
state.signal = fc["signal"]
state.price = fc["price"]
state.confidence = fc["confidence"]
state.score = fc["score"]
state.reason = fc["reason"]
state.updated_at = int(time.time())
print(f"[UPDATE] {state.signal} @ {state.price} (conf={state.confidence})")
except Exception as exc:
print(f"[LOOP ERROR] {exc}")
time.sleep(UPDATE_SECONDS)
def start_update_thread() -> None:
global update_thread
if update_thread is None or not update_thread.is_alive():
update_thread = threading.Thread(target=update_loop, daemon=True, name="forecast-updater")
update_thread.start()
print("[SYSTEM] update thread started")
def keepalive_loop() -> None:
"""
Keep the app active on platforms like Hugging Face Spaces and auto-heal
the forecast worker if it dies unexpectedly.
"""
while True:
try:
start_update_thread()
# Self-ping helps keep event loop warm on environments with idling behavior.
with suppress(Exception):
http.get("http://127.0.0.1:7860/health", timeout=3)
with suppress(Exception):
http.get("http://127.0.0.1:8000/health", timeout=3)
if KEEPALIVE_EXTERNAL_URL:
with suppress(Exception):
http.get(KEEPALIVE_EXTERNAL_URL, timeout=8)
except Exception as exc:
print(f"[KEEPALIVE ERROR] {exc}")
time.sleep(45)
def start_background_services() -> None:
global keepalive_thread
start_update_thread()
if keepalive_thread is None or not keepalive_thread.is_alive():
keepalive_thread = threading.Thread(target=keepalive_loop, daemon=True, name="system-keepalive")
keepalive_thread.start()
@app.on_event("startup")
def on_startup() -> None:
start_background_services()
# ===== API =====
@app.get("/")
def root() -> Dict[str, Any]:
with state_lock:
return {
"status": "running",
"symbol": SYMBOL,
**asdict(state),
}
@app.get("/signal")
def signal() -> Dict[str, Any]:
with state_lock:
return {
"symbol": SYMBOL,
**asdict(state),
}
@app.get("/forecast")
def forecast() -> Dict[str, Any]:
data = fetch_data(min_bars=10)
if data is None:
raise HTTPException(status_code=503, detail="Data feed unavailable")
return compute_signal(data)
@app.get("/position-size")
def position_size(account_balance: float) -> Dict[str, Any]:
if account_balance <= 0:
raise HTTPException(status_code=400, detail="account_balance must be > 0")
data = fetch_data(min_bars=10)
if data is None:
raise HTTPException(status_code=503, detail="Data feed unavailable")
fc = compute_signal(data)
if not fc.get("risk"):
return {
"signal": fc["signal"],
"reason": fc["reason"],
"recommendation": None,
}
rec = recommend_position_size(
price=float(fc["price"]),
atr=float(fc["risk"]["atr"]),
account_balance=account_balance,
)
return {
"signal": fc["signal"],
"reason": fc["reason"],
"price": fc["price"],
"risk": fc["risk"],
"recommendation": rec,
}
@app.get("/pnl")
def pnl(side: str, entry: float, exit_price: float, contracts: int = 1) -> Dict[str, Any]:
side_up = side.upper()
if side_up not in {"LONG", "SHORT"}:
raise HTTPException(status_code=400, detail="side must be LONG or SHORT")
if contracts <= 0:
raise HTTPException(status_code=400, detail="contracts must be > 0")
return futures_pnl(side=side_up, entry=entry, exit_price=exit_price, contracts=contracts)
@app.get("/ai-recommendation")
def ai_recommendation() -> Dict[str, Any]:
data = fetch_data(min_bars=30)
if data is None:
raise HTTPException(status_code=503, detail="Data feed unavailable")
baseline = compute_signal(data)
snapshot = build_market_snapshot(data)
ai_result = normalize_ai_payload(query_huggingface_recommendation(snapshot=snapshot, baseline=baseline))
return {
"symbol": SYMBOL,
"snapshot": snapshot,
"baseline_signal": baseline,
"ai": ai_result,
"note": "AI output chỉ để tham khảo, không phải khuyến nghị đầu tư chắc chắn.",
}
@app.get("/hf-status")
def hf_status() -> Dict[str, Any]:
return {
"model_id": HF_MODEL_ID,
"local_model_id": HF_LOCAL_MODEL_ID,
"use_local_hf_model": USE_LOCAL_HF_MODEL,
"token_configured": bool(HF_API_TOKEN),
"inference_bases": HF_INFERENCE_BASES,
"keepalive_external_url": KEEPALIVE_EXTERNAL_URL,
"how_to_set_token": "Hugging Face Spaces -> Settings -> Variables and secrets -> Secrets -> HF_API_TOKEN",
}
@app.get("/ohlc")
def ohlc(limit: int = 200) -> List[Dict[str, Any]]:
data = fetch_data(min_bars=1)
if data is None:
raise HTTPException(status_code=503, detail="Data feed unavailable")
n = len(data["t"])
start = max(0, n - min(max(limit, 1), n))
return [
{
"time": data["t"][i],
"open": data["o"][i],
"high": data["h"][i],
"low": data["l"][i],
"close": data["c"][i],
"volume": data["v"][i],
}
for i in range(start, n)
]
@app.get("/health")
def health() -> Dict[str, Any]:
with state_lock:
age = int(time.time()) - state.updated_at if state.updated_at else None
return {
"ok": age is not None and age <= UPDATE_SECONDS * 4,
"last_update_age_sec": age,
"update_interval_sec": UPDATE_SECONDS,
"worker_alive": update_thread.is_alive() if update_thread else False,
"keepalive_alive": keepalive_thread.is_alive() if keepalive_thread else False,
}
@app.get("/ping")
def ping() -> Dict[str, str]:
return {"pong": "ok"}