BIST Predictor Dev
Add force_predict API endpoint for remote triggers
75bf2c0
"""
BIST Predictor β€” FastAPI Ana Uygulama
REST API + SSE + Statik Dosya Servisi
"""
import asyncio
import json
import logging
import sys
from contextlib import asynccontextmanager
from datetime import date, datetime
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, Query, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse
from sse_starlette.sse import EventSourceResponse
from config import ACTIVE_SYMBOLS, HORIZONS, DEFAULT_HORIZON, FRONTEND_DIR, HOST, PORT, COMMODITIES
from services.database import (
init_db, get_daily_prices, get_predictions, get_predictions_with_actuals,
get_all_confidence_scores, get_confidence_score, get_confidence_history,
get_dashboard_summary, get_system_logs, get_latest_price,
get_closing_prices_array, log_event,
)
from services.data_fetcher import (
fetch_and_store_history, fetch_latest_price, get_stock_info,
)
from services.scheduler import (
start_scheduler, stop_scheduler, get_scheduler_status,
run_initial_data_load, run_single_prediction,
job_daily_prediction, job_daily_comparison,
add_event_listener, remove_event_listener,
)
from models.predictor import is_model_loaded, get_model_info, predict_stock
from models.confidence import (
calculate_confidence, update_confidence_score, get_confidence_label,
calculate_all_confidences,
)
# ─── Logging ──────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("bist_predictor.log", encoding="utf-8"),
]
)
logger = logging.getLogger(__name__)
# ─── SSE Event Queue ─────────────────────────────────────────────────────────────
_sse_queues: list[asyncio.Queue] = []
def _broadcast_sse(event_type: str, data: dict):
"""SSE event'ini tüm bağlı client'lara yayınla."""
message = json.dumps({"type": event_type, "data": data, "timestamp": datetime.now().isoformat()})
for q in _sse_queues[:]:
try:
q.put_nowait(message)
except asyncio.QueueFull:
pass
# ─── Lifespan ─────────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Uygulama başlatma ve kapatma."""
# Başlatma
logger.info("BIST Predictor başlatılıyor...")
init_db()
add_event_listener(_broadcast_sse)
start_scheduler()
log_event("app_start", "BIST Predictor başlatıldı")
logger.info("BIST Predictor hazΔ±r!")
yield
# Kapatma
stop_scheduler()
remove_event_listener(_broadcast_sse)
log_event("app_stop", "BIST Predictor kapatΔ±ldΔ±")
logger.info("BIST Predictor kapatΔ±ldΔ±.")
# ─── FastAPI Uygulama ────────────────────────────────────────────────────────────
app = FastAPI(
title="BIST Predictor",
description="TimesFM ile BIST hisse analizi ve tahmin sistemi",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ─── SSE Endpoint ────────────────────────────────────────────────────────────────
@app.get("/api/stream")
async def stream_events():
"""Server-Sent Events stream."""
queue = asyncio.Queue(maxsize=100)
_sse_queues.append(queue)
async def event_generator():
try:
while True:
try:
message = await asyncio.wait_for(queue.get(), timeout=30)
yield {"data": message}
except asyncio.TimeoutError:
yield {"data": json.dumps({"type": "heartbeat", "timestamp": datetime.now().isoformat()})}
except asyncio.CancelledError:
pass
finally:
if queue in _sse_queues:
_sse_queues.remove(queue)
return EventSourceResponse(event_generator())
# ─── Dashboard API ───────────────────────────────────────────────────────────────
@app.get("/api/dashboard")
async def get_dashboard(horizon: int = DEFAULT_HORIZON):
"""Dashboard ΓΆzet verileri."""
try:
summary = get_dashboard_summary(horizon)
# VeritabanΔ±ndaki gΓΌncel gΓΌven puanlarΔ±nΔ± al
db_scores = {s["symbol"]: s for s in get_all_confidence_scores(horizon)}
scores = []
for symbol in ACTIVE_SYMBOLS:
# Puanı varsa kullan, yoksa boş oluştur
if symbol in db_scores:
s = db_scores[symbol]
else:
latest = get_latest_price(symbol)
s = {
"symbol": symbol,
"latest_price": latest["close"] if latest else None,
"total_score": 0,
}
# Δ°sim ve TΓΌr belirle
s["name"] = COMMODITIES.get(symbol, symbol.replace(".IS", ""))
s["asset_type"] = "Emtia" if symbol in COMMODITIES else "Hisse"
label_info = get_confidence_label(s.get("total_score", 0))
s.update(label_info)
# Tahminleri ekle ve değişimi hesapla
preds = get_predictions(s["symbol"], horizon, limit=150)
if preds:
latest_p_date = preds[0]["prediction_date"]
latest_batch = [p for p in preds if p["prediction_date"] == latest_p_date]
if latest_batch:
final_pred = sorted(latest_batch, key=lambda x: x["target_date"])[-1]
s["predicted_close"] = final_pred["predicted_close"]
if s.get("latest_price"):
s["predicted_change"] = (s["predicted_close"] - s["latest_price"]) / s["latest_price"] * 100
else:
s["predicted_change"] = 0
else:
s["predicted_change"] = 0
scores.append(s)
return {
"summary": summary,
"stocks": scores,
"horizons": HORIZONS,
"active_horizon": horizon,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Dashboard hatasΔ±: {e}")
raise HTTPException(500, str(e))
@app.get("/api/stocks")
async def list_stocks():
"""Takip edilen hisse listesi."""
stocks = []
for symbol in ACTIVE_SYMBOLS:
latest = get_latest_price(symbol)
confidence = get_confidence_score(symbol, DEFAULT_HORIZON)
stock = {
"symbol": symbol,
"name": COMMODITIES.get(symbol, symbol.replace(".IS", "")),
"asset_type": "Emtia" if symbol in COMMODITIES else "Hisse",
"latest_price": latest["close"] if latest else None,
"latest_date": latest["date"] if latest else None,
"confidence_score": confidence["total_score"] if confidence else None,
}
if confidence:
label = get_confidence_label(confidence["total_score"])
stock.update(label)
stocks.append(stock)
return {"stocks": stocks, "total": len(stocks)}
@app.get("/api/stock/{symbol}")
async def get_stock_detail(symbol: str, horizon: int = DEFAULT_HORIZON):
"""Belirli bir hissenin detay bilgisi."""
if symbol not in ACTIVE_SYMBOLS:
raise HTTPException(404, f"{symbol} takip listesinde bulunamadΔ±")
# Fiyat verileri
prices = get_daily_prices(symbol, limit=200)
latest = get_latest_price(symbol)
# Tahminler
predictions = get_predictions(symbol, horizon, limit=50)
# GΓΌven puanΔ±
confidence = get_confidence_score(symbol, horizon)
confidence_history = get_confidence_history(symbol, horizon)
# Karşılaştırma verileri
comparisons = get_predictions_with_actuals(symbol, horizon)
result = {
"symbol": symbol,
"name": COMMODITIES.get(symbol, symbol.replace(".IS", "")),
"asset_type": "Emtia" if symbol in COMMODITIES else "Hisse",
"latest_price": latest,
"prices": prices[:100], # Son 100 gΓΌn
"predictions": predictions,
"confidence": confidence,
"confidence_label": get_confidence_label(confidence["total_score"]) if confidence else None,
"confidence_history": confidence_history,
"comparisons": comparisons[:30],
"horizon": horizon,
}
return result
# ─── Tahmin API ──────────────────────────────────────────────────────────────────
@app.get("/api/predictions/{symbol}")
async def get_symbol_predictions(
symbol: str,
horizon: int = DEFAULT_HORIZON,
limit: int = 50
):
"""Belirli hisse iΓ§in tahminler."""
predictions = get_predictions(symbol, horizon, limit)
return {"symbol": symbol, "horizon": horizon, "predictions": predictions}
@app.post("/api/predict/{symbol}")
async def trigger_prediction(
symbol: str,
background_tasks: BackgroundTasks,
horizons: str = Query(default="10", description="Virgülle ayrılmış horizonlar: 10,30,90")
):
"""Manuel tahmin tetikle."""
if symbol not in ACTIVE_SYMBOLS:
raise HTTPException(404, f"{symbol} takip listesinde bulunamadΔ±")
horizon_list = [int(h.strip()) for h in horizons.split(",")]
def run_prediction():
try:
result = run_single_prediction(symbol, horizon_list)
_broadcast_sse("prediction_manual", {
"symbol": symbol,
"horizons": horizon_list,
"status": "completed",
})
except Exception as e:
logger.error(f"Manuel tahmin hatasΔ±: {e}")
_broadcast_sse("prediction_error", {
"symbol": symbol,
"error": str(e),
})
background_tasks.add_task(run_prediction)
return {
"message": f"{symbol} için tahmin başlatıldı",
"horizons": horizon_list,
"status": "processing",
}
@app.post("/api/predict-all")
async def trigger_all_predictions(background_tasks: BackgroundTasks):
"""TΓΌm hisseler iΓ§in tahmin tetikle."""
def run_all():
job_daily_prediction()
background_tasks.add_task(run_all)
return {"message": "Tüm hisseler için tahmin başlatıldı", "total": len(ACTIVE_SYMBOLS)}
# ─── GΓΌven PuanΔ± API ─────────────────────────────────────────────────────────────
@app.get("/api/confidence/{symbol}")
async def get_symbol_confidence(symbol: str, horizon: int = DEFAULT_HORIZON):
"""Belirli hisme iΓ§in gΓΌven puanΔ± detayΔ±."""
confidence = calculate_confidence(symbol, horizon)
label = get_confidence_label(confidence["total_score"])
return {
"symbol": symbol,
"horizon": horizon,
**confidence,
**label,
}
@app.get("/api/confidence-ranking")
async def get_confidence_ranking(horizon: int = DEFAULT_HORIZON):
"""TΓΌm hisselerin gΓΌven puanΔ± sΔ±ralamasΔ±."""
scores = get_all_confidence_scores(horizon)
for s in scores:
label = get_confidence_label(s.get("total_score", 0))
s.update(label)
return {"horizon": horizon, "ranking": scores}
# ─── KarşılaştΔ±rma API ───────────────────────────────────────────────────────────
@app.get("/api/comparison/{symbol}")
async def get_comparison(symbol: str, horizon: int = DEFAULT_HORIZON):
"""Tahmin vs gerçek fiyat karşılaştırması."""
comparisons = get_predictions_with_actuals(symbol, horizon)
# Δ°statistik hesapla
if comparisons:
errors = []
direction_correct = 0
total = 0
for c in comparisons:
if c.get("actual_close") and c.get("predicted_close"):
error = abs(c["predicted_close"] - c["actual_close"]) / c["actual_close"] * 100
errors.append(error)
total += 1
stats = {
"total_comparisons": len(comparisons),
"average_error_pct": round(sum(errors) / len(errors), 2) if errors else 0,
"min_error_pct": round(min(errors), 2) if errors else 0,
"max_error_pct": round(max(errors), 2) if errors else 0,
}
else:
stats = {"total_comparisons": 0}
return {
"symbol": symbol,
"horizon": horizon,
"comparisons": comparisons[:50],
"stats": stats,
}
# ─── Veri YΓΆnetimi API ───────────────────────────────────────────────────────────
@app.post("/api/data/load")
async def load_initial_data(background_tasks: BackgroundTasks):
"""Tüm hisselerin geçmiş verilerini çek."""
def run_load():
run_initial_data_load()
background_tasks.add_task(run_load)
return {"message": "Veri yükleme başlatıldı", "total": len(ACTIVE_SYMBOLS)}
@app.post("/api/data/update/{symbol}")
async def update_stock_data(symbol: str):
"""Belirli hissenin verisini gΓΌncelle."""
success = fetch_and_store_history(symbol, period="3mo")
if success:
return {"message": f"{symbol} verisi gΓΌncellendi"}
raise HTTPException(500, f"{symbol} verisi gΓΌncellenemedi")
@app.post("/api/data/compare")
async def trigger_comparison(background_tasks: BackgroundTasks):
"""Manuel karşılaştırma tetikle."""
def run_compare():
job_daily_comparison()
background_tasks.add_task(run_compare)
return {"message": "Karşılaştırma başlatıldı"}
# ─── Sistem API ───────────────────────────────────────────────────────────────────
@app.post("/api/admin/force_predict")
async def force_predict(background_tasks: BackgroundTasks):
"""Manuel tΓΌm tahminleri tetikle."""
def run_pred():
job_daily_prediction()
background_tasks.add_task(run_pred)
return {"message": "Tüm hisseler için tahmin süreci arka planda başlatıldı."}
@app.get("/api/system/status")
async def system_status():
"""Sistem durumu."""
return {
"model": get_model_info(),
"scheduler": get_scheduler_status(),
"active_symbols": len(ACTIVE_SYMBOLS),
"horizons": HORIZONS,
"timestamp": datetime.now().isoformat(),
}
@app.get("/api/system/logs")
async def system_logs(limit: int = 50):
"""Sistem loglarΔ±."""
logs = get_system_logs(limit)
return {"logs": logs}
# ─── Statik Dosya Servisi ────────────────────────────────────────────────────────
# Frontend dosyalarΔ±nΔ± serve et
if FRONTEND_DIR.exists():
app.mount("/", StaticFiles(directory=str(FRONTEND_DIR), html=True), name="frontend")
# ─── Γ‡alıştΔ±rma ──────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=HOST,
port=PORT,
reload=False,
log_level="info",
)