ModelB_API / app.py
Miruzen's picture
Update app.py
687eaa2 verified
raw
history blame
9.72 kB
# app.py
from fastapi import FastAPI
from pydantic import BaseModel
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime, timedelta
import logging
import os
# -----------------------
# Logging
# -----------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s β€” %(levelname)s β€” %(message)s")
logger = logging.getLogger("modelb_api")
# -----------------------
# App & config
# -----------------------
app = FastAPI(
title="Model B – EMA & Dynamic Scaling API",
description="API untuk menghitung EMA, normalisasi, dan analisis tren otomatis berdasarkan data yfinance",
version="2.1"
)
PAIR = "EURUSD=X"
BASE_WINDOW = 60 # jumlah hari historical yang diambil untuk jaga-jaga agar EMA50 bisa dihitung
# -----------------------
# Request schema
# -----------------------
class DateRange(BaseModel):
start_date: str # ISO date 'YYYY-MM-DD'
end_date: str # ISO date 'YYYY-MM-DD'
# -----------------------
# Utility functions
# -----------------------
def ema_manual(prices, span):
"""
Manual EMA calculation:
- prices: iterable numeric
- span: int (e.g. 20 or 50)
Returns list of same length; first (span-1) entries are NaN, entry at index span-1 is SMA seed.
"""
prices = np.asarray(prices, dtype=float)
n = len(prices)
if n == 0:
return [np.nan] * 0
if span <= 0:
raise ValueError("span must be > 0")
ema = [np.nan] * n
alpha = 2.0 / (span + 1.0)
# Not enough data -> return NaNs (keputusan: tetap mengembalikan NaN untuk indeks sebelum span-1)
if n < span:
logger.warning(f"Not enough data for EMA span={span} (have {n} < needed {span}), returning NaNs.")
return ema
# seed with SMA at index span-1
seed = float(np.mean(prices[:span]))
ema[span - 1] = seed
# recursive EMA
for i in range(span, n):
ema[i] = alpha * prices[i] + (1.0 - alpha) * ema[i - 1]
return ema
def get_dynamic_minmax(pair=PAIR, window_days=BASE_WINDOW):
"""Download recent window and return min/max of Close to use for normalization."""
today = datetime.utcnow().date()
start = today - timedelta(days=window_days)
logger.info(f"Fetching recent data for dynamic min/max: {start} -> {today}")
try:
df = yf.download(pair, start=start, end=today + timedelta(days=1), auto_adjust=True, progress=False)
except Exception as e:
logger.error("yfinance download failed for dynamic minmax: %s", e, exc_info=True)
raise
if df is None or df.empty:
raise ValueError("Failed to fetch recent prices for dynamic min/max")
close_min = float(df["Close"].min())
close_max = float(df["Close"].max())
logger.info("dynamic min/max: %s / %s", close_min, close_max)
return close_min, close_max
def normalize_close(value, close_min, close_max):
if close_max == close_min:
return 0.5
return float((value - close_min) / (close_max - close_min))
def analyze_trend(latest_row):
"""
Return simple analysis dict based on EMA20 vs EMA50 and gap percent.
"""
ema20 = latest_row.get("EMA20", np.nan)
ema50 = latest_row.get("EMA50", np.nan)
close = latest_row.get("close", np.nan)
if np.isnan(ema20) or np.isnan(ema50):
return {"trend": "unknown", "strength": "unknown", "price_position": "unknown", "ema_gap_percent": None}
if ema20 > ema50:
trend = "bullish"
elif ema20 < ema50:
trend = "bearish"
else:
trend = "neutral"
if ema50 == 0 or np.isnan(ema50):
gap_pct = 0.0
else:
gap_pct = abs(ema20 - ema50) / abs(ema50) * 100.0
if gap_pct > 0.3:
strength = "strong"
elif gap_pct > 0.1:
strength = "moderate"
else:
strength = "weak"
if close > ema20 and close > ema50:
price_position = "above both EMA β€” possible continuation"
elif close < ema20 and close < ema50:
price_position = "below both EMA β€” possible correction"
else:
price_position = "between EMAs β€” indecision zone"
return {
"trend": trend,
"strength": strength,
"price_position": price_position,
"ema_gap_percent": round(gap_pct, 4)
}
# -----------------------
# Endpoints
# -----------------------
@app.post("/analyze")
def analyze_ema_endpoint(input_data: DateRange):
"""
Return time series data (date, close, EMA20, EMA50, norm_close) for plotting.
If requested window is too short to compute EMA50, endpoint automatically uses earlier data
by extending start_date backward by BASE_WINDOW days.
"""
try:
logger.info("Received /analyze request: %s -> %s", input_data.start_date, input_data.end_date)
start_date = pd.to_datetime(input_data.start_date).date()
end_date = pd.to_datetime(input_data.end_date).date()
except Exception as e:
logger.error("Invalid date format: %s", e)
return {"status": "error", "message": "Invalid date format. Use YYYY-MM-DD."}
# extend start to ensure enough history (for EMA50)
extended_start = start_date - timedelta(days=BASE_WINDOW)
try:
df_raw = yf.download(PAIR, start=extended_start, end=end_date + timedelta(days=1), auto_adjust=True, progress=False)
except Exception as e:
logger.error("yfinance download failed: %s", e, exc_info=True)
return {"status": "error", "message": f"Failed to fetch data from yfinance: {e}"}
if df_raw is None or df_raw.empty:
logger.warning("No data returned from yfinance for the requested range")
return {"status": "error", "message": "No price data for requested dates"}
df = df_raw.reset_index()[["Date", "Close"]].rename(columns={"Date": "date", "Close": "close"})
df["date"] = pd.to_datetime(df["date"]).dt.normalize()
logger.info("Downloaded rows: %d", len(df))
if len(df) < 50:
msg = f"Insufficient data points after extension ({len(df)}). Need at least 50 for EMA50."
logger.error(msg)
return {"status": "error", "message": msg}
# compute EMAs
close_list = df["close"].tolist()
df["EMA20"] = ema_manual(close_list, 20)
df["EMA50"] = ema_manual(close_list, 50)
# drop rows where EMA values are NaN (i.e., before we have enough seed)
df = df.dropna(subset=["EMA20", "EMA50"]).reset_index(drop=True)
if df.empty:
return {"status": "error", "message": "After computing EMAs no usable rows remain."}
# dynamic min/max and normalization
try:
close_min, close_max = get_dynamic_minmax()
except Exception as e:
logger.warning("Dynamic min/max fetch failed: %s", e)
close_min, close_max = float(df["close"].min()), float(df["close"].max())
df["norm_close"] = df["close"].apply(lambda x: normalize_close(x, close_min, close_max))
chart_data = {
"dates": df["date"].dt.strftime("%Y-%m-%d").tolist(),
"close": [float(x) for x in df["close"].tolist()],
"ema20": [float(x) for x in df["EMA20"].tolist()],
"ema50": [float(x) for x in df["EMA50"].tolist()],
"norm_close": [float(x) for x in df["norm_close"].tolist()],
"min_close": float(close_min),
"max_close": float(close_max),
}
logger.info("Analyze success -> points: %d", len(df))
return {
"status": "ok",
"pair": PAIR,
"requested_start": str(start_date),
"requested_end": str(end_date),
"data_points": len(df),
"chart_data": chart_data
}
@app.post("/summary")
def ema_summary_endpoint(input_data: DateRange):
"""
Return last available close, EMA20, EMA50 and a short trend analysis dictionary.
"""
try:
logger.info("Received /summary request: %s -> %s", input_data.start_date, input_data.end_date)
start_date = pd.to_datetime(input_data.start_date).date()
end_date = pd.to_datetime(input_data.end_date).date()
except Exception as e:
logger.error("Invalid date input for /summary: %s", e)
return {"status": "error", "message": "Invalid date format. Use YYYY-MM-DD."}
extended_start = start_date - timedelta(days=BASE_WINDOW)
try:
df_raw = yf.download(PAIR, start=extended_start, end=end_date + timedelta(days=1), auto_adjust=True, progress=False)
except Exception as e:
logger.error("yfinance download failed for /summary: %s", e, exc_info=True)
return {"status": "error", "message": f"Failed to fetch data from yfinance: {e}"}
if df_raw is None or df_raw.empty:
return {"status": "error", "message": "No price data for requested dates"}
df = df_raw.reset_index()[["Date", "Close"]].rename(columns={"Date": "date", "Close": "close"})
df["date"] = pd.to_datetime(df["date"]).dt.normalize()
if len(df) < 50:
return {"status": "error", "message": f"Insufficient data (have {len(df)} rows). Need >=50 for EMA50."}
close_list = df["close"].tolist()
df["EMA20"] = ema_manual(close_list, 20)
df["EMA50"] = ema_manual(close_list, 50)
df = df.dropna(subset=["EMA20", "EMA50"]).reset_index(drop=True)
if df.empty:
return {"status": "error", "message": "No usable rows after EMA computation."}
latest = df.iloc[-1]
analysis = analyze_trend(latest)
return {
"status": "ok",
"pair": PAIR,
"as_of_date": latest["date"].strftime("%Y-%m-%d"),
"close": float(latest["close"]),
"ema20": float(latest["EMA20"]),
"ema50": float(latest["EMA50"]),
"trend_analysis": analysis
}
@app.get("/")
def root():
return {"message": "Model B API (EMA + Trend Summary) aktif πŸš€"}