Spaces:
Sleeping
Sleeping
| # app.py | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| import pandas as pd | |
| import numpy as np | |
| import yfinance as yf | |
| from datetime import datetime, timedelta | |
| import logging | |
| import os | |
| # ----------------------- | |
| # Logging | |
| # ----------------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s β %(levelname)s β %(message)s") | |
| logger = logging.getLogger("modelb_api") | |
| # ----------------------- | |
| # App & config | |
| # ----------------------- | |
| app = FastAPI( | |
| title="Model B β EMA & Dynamic Scaling API", | |
| description="API untuk menghitung EMA, normalisasi, dan analisis tren otomatis berdasarkan data yfinance", | |
| version="2.1" | |
| ) | |
| PAIR = "EURUSD=X" | |
| BASE_WINDOW = 60 # jumlah hari historical yang diambil untuk jaga-jaga agar EMA50 bisa dihitung | |
| # ----------------------- | |
| # Request schema | |
| # ----------------------- | |
| class DateRange(BaseModel): | |
| start_date: str # ISO date 'YYYY-MM-DD' | |
| end_date: str # ISO date 'YYYY-MM-DD' | |
| # ----------------------- | |
| # Utility functions | |
| # ----------------------- | |
| def ema_manual(prices, span): | |
| """ | |
| Manual EMA calculation: | |
| - prices: iterable numeric | |
| - span: int (e.g. 20 or 50) | |
| Returns list of same length; first (span-1) entries are NaN, entry at index span-1 is SMA seed. | |
| """ | |
| prices = np.asarray(prices, dtype=float) | |
| n = len(prices) | |
| if n == 0: | |
| return [np.nan] * 0 | |
| if span <= 0: | |
| raise ValueError("span must be > 0") | |
| ema = [np.nan] * n | |
| alpha = 2.0 / (span + 1.0) | |
| # Not enough data -> return NaNs (keputusan: tetap mengembalikan NaN untuk indeks sebelum span-1) | |
| if n < span: | |
| logger.warning(f"Not enough data for EMA span={span} (have {n} < needed {span}), returning NaNs.") | |
| return ema | |
| # seed with SMA at index span-1 | |
| seed = float(np.mean(prices[:span])) | |
| ema[span - 1] = seed | |
| # recursive EMA | |
| for i in range(span, n): | |
| ema[i] = alpha * prices[i] + (1.0 - alpha) * ema[i - 1] | |
| return ema | |
| def get_dynamic_minmax(pair=PAIR, window_days=BASE_WINDOW): | |
| """Download recent window and return min/max of Close to use for normalization.""" | |
| today = datetime.utcnow().date() | |
| start = today - timedelta(days=window_days) | |
| logger.info(f"Fetching recent data for dynamic min/max: {start} -> {today}") | |
| try: | |
| df = yf.download(pair, start=start, end=today + timedelta(days=1), auto_adjust=True, progress=False) | |
| except Exception as e: | |
| logger.error("yfinance download failed for dynamic minmax: %s", e, exc_info=True) | |
| raise | |
| if df is None or df.empty: | |
| raise ValueError("Failed to fetch recent prices for dynamic min/max") | |
| close_min = float(df["Close"].min()) | |
| close_max = float(df["Close"].max()) | |
| logger.info("dynamic min/max: %s / %s", close_min, close_max) | |
| return close_min, close_max | |
| def normalize_close(value, close_min, close_max): | |
| if close_max == close_min: | |
| return 0.5 | |
| return float((value - close_min) / (close_max - close_min)) | |
| def analyze_trend(latest_row): | |
| """ | |
| Return simple analysis dict based on EMA20 vs EMA50 and gap percent. | |
| """ | |
| ema20 = latest_row.get("EMA20", np.nan) | |
| ema50 = latest_row.get("EMA50", np.nan) | |
| close = latest_row.get("close", np.nan) | |
| if np.isnan(ema20) or np.isnan(ema50): | |
| return {"trend": "unknown", "strength": "unknown", "price_position": "unknown", "ema_gap_percent": None} | |
| if ema20 > ema50: | |
| trend = "bullish" | |
| elif ema20 < ema50: | |
| trend = "bearish" | |
| else: | |
| trend = "neutral" | |
| if ema50 == 0 or np.isnan(ema50): | |
| gap_pct = 0.0 | |
| else: | |
| gap_pct = abs(ema20 - ema50) / abs(ema50) * 100.0 | |
| if gap_pct > 0.3: | |
| strength = "strong" | |
| elif gap_pct > 0.1: | |
| strength = "moderate" | |
| else: | |
| strength = "weak" | |
| if close > ema20 and close > ema50: | |
| price_position = "above both EMA β possible continuation" | |
| elif close < ema20 and close < ema50: | |
| price_position = "below both EMA β possible correction" | |
| else: | |
| price_position = "between EMAs β indecision zone" | |
| return { | |
| "trend": trend, | |
| "strength": strength, | |
| "price_position": price_position, | |
| "ema_gap_percent": round(gap_pct, 4) | |
| } | |
| # ----------------------- | |
| # Endpoints | |
| # ----------------------- | |
| def analyze_ema_endpoint(input_data: DateRange): | |
| """ | |
| Return time series data (date, close, EMA20, EMA50, norm_close) for plotting. | |
| If requested window is too short to compute EMA50, endpoint automatically uses earlier data | |
| by extending start_date backward by BASE_WINDOW days. | |
| """ | |
| try: | |
| logger.info("Received /analyze request: %s -> %s", input_data.start_date, input_data.end_date) | |
| start_date = pd.to_datetime(input_data.start_date).date() | |
| end_date = pd.to_datetime(input_data.end_date).date() | |
| except Exception as e: | |
| logger.error("Invalid date format: %s", e) | |
| return {"status": "error", "message": "Invalid date format. Use YYYY-MM-DD."} | |
| # extend start to ensure enough history (for EMA50) | |
| extended_start = start_date - timedelta(days=BASE_WINDOW) | |
| try: | |
| df_raw = yf.download(PAIR, start=extended_start, end=end_date + timedelta(days=1), auto_adjust=True, progress=False) | |
| except Exception as e: | |
| logger.error("yfinance download failed: %s", e, exc_info=True) | |
| return {"status": "error", "message": f"Failed to fetch data from yfinance: {e}"} | |
| if df_raw is None or df_raw.empty: | |
| logger.warning("No data returned from yfinance for the requested range") | |
| return {"status": "error", "message": "No price data for requested dates"} | |
| df = df_raw.reset_index()[["Date", "Close"]].rename(columns={"Date": "date", "Close": "close"}) | |
| df["date"] = pd.to_datetime(df["date"]).dt.normalize() | |
| logger.info("Downloaded rows: %d", len(df)) | |
| if len(df) < 50: | |
| msg = f"Insufficient data points after extension ({len(df)}). Need at least 50 for EMA50." | |
| logger.error(msg) | |
| return {"status": "error", "message": msg} | |
| # compute EMAs | |
| close_list = df["close"].tolist() | |
| df["EMA20"] = ema_manual(close_list, 20) | |
| df["EMA50"] = ema_manual(close_list, 50) | |
| # drop rows where EMA values are NaN (i.e., before we have enough seed) | |
| df = df.dropna(subset=["EMA20", "EMA50"]).reset_index(drop=True) | |
| if df.empty: | |
| return {"status": "error", "message": "After computing EMAs no usable rows remain."} | |
| # dynamic min/max and normalization | |
| try: | |
| close_min, close_max = get_dynamic_minmax() | |
| except Exception as e: | |
| logger.warning("Dynamic min/max fetch failed: %s", e) | |
| close_min, close_max = float(df["close"].min()), float(df["close"].max()) | |
| df["norm_close"] = df["close"].apply(lambda x: normalize_close(x, close_min, close_max)) | |
| chart_data = { | |
| "dates": df["date"].dt.strftime("%Y-%m-%d").tolist(), | |
| "close": [float(x) for x in df["close"].tolist()], | |
| "ema20": [float(x) for x in df["EMA20"].tolist()], | |
| "ema50": [float(x) for x in df["EMA50"].tolist()], | |
| "norm_close": [float(x) for x in df["norm_close"].tolist()], | |
| "min_close": float(close_min), | |
| "max_close": float(close_max), | |
| } | |
| logger.info("Analyze success -> points: %d", len(df)) | |
| return { | |
| "status": "ok", | |
| "pair": PAIR, | |
| "requested_start": str(start_date), | |
| "requested_end": str(end_date), | |
| "data_points": len(df), | |
| "chart_data": chart_data | |
| } | |
| def ema_summary_endpoint(input_data: DateRange): | |
| """ | |
| Return last available close, EMA20, EMA50 and a short trend analysis dictionary. | |
| """ | |
| try: | |
| logger.info("Received /summary request: %s -> %s", input_data.start_date, input_data.end_date) | |
| start_date = pd.to_datetime(input_data.start_date).date() | |
| end_date = pd.to_datetime(input_data.end_date).date() | |
| except Exception as e: | |
| logger.error("Invalid date input for /summary: %s", e) | |
| return {"status": "error", "message": "Invalid date format. Use YYYY-MM-DD."} | |
| extended_start = start_date - timedelta(days=BASE_WINDOW) | |
| try: | |
| df_raw = yf.download(PAIR, start=extended_start, end=end_date + timedelta(days=1), auto_adjust=True, progress=False) | |
| except Exception as e: | |
| logger.error("yfinance download failed for /summary: %s", e, exc_info=True) | |
| return {"status": "error", "message": f"Failed to fetch data from yfinance: {e}"} | |
| if df_raw is None or df_raw.empty: | |
| return {"status": "error", "message": "No price data for requested dates"} | |
| df = df_raw.reset_index()[["Date", "Close"]].rename(columns={"Date": "date", "Close": "close"}) | |
| df["date"] = pd.to_datetime(df["date"]).dt.normalize() | |
| if len(df) < 50: | |
| return {"status": "error", "message": f"Insufficient data (have {len(df)} rows). Need >=50 for EMA50."} | |
| close_list = df["close"].tolist() | |
| df["EMA20"] = ema_manual(close_list, 20) | |
| df["EMA50"] = ema_manual(close_list, 50) | |
| df = df.dropna(subset=["EMA20", "EMA50"]).reset_index(drop=True) | |
| if df.empty: | |
| return {"status": "error", "message": "No usable rows after EMA computation."} | |
| latest = df.iloc[-1] | |
| analysis = analyze_trend(latest) | |
| return { | |
| "status": "ok", | |
| "pair": PAIR, | |
| "as_of_date": latest["date"].strftime("%Y-%m-%d"), | |
| "close": float(latest["close"]), | |
| "ema20": float(latest["EMA20"]), | |
| "ema50": float(latest["EMA50"]), | |
| "trend_analysis": analysis | |
| } | |
| def root(): | |
| return {"message": "Model B API (EMA + Trend Summary) aktif π"} | |