bistPredictor / backend /models /confidence.py
BIST Predictor Dev
Initial commit - Clean HF release
1802f47
"""
BIST Predictor — Güven Puanı Hesaplama Modülü
Tahminlerin doğruluğuna göre 0–100 arasında güven puanı hesaplar.
Metrikler:
1. Yön Doğruluğu (%40) — Fiyat yönünü doğru tahmin etme oranı
2. MAPE (%30) — Ortalama yüzdelik mutlak hata
3. Quantile Kapsama (%20) — Gerçek fiyatın tahmin bandı içinde kalma oranı
4. Volatilite Uyumu (%10) — Tahmin edilen/gerçekleşen volatilite uyumu
"""
import logging
import math
from typing import Optional
from config import CONFIDENCE_WEIGHTS
from services.database import (
get_predictions_with_actuals,
save_confidence_score,
get_daily_prices,
)
logger = logging.getLogger(__name__)
def calculate_direction_accuracy(predictions: list[dict]) -> dict:
"""
Yön doğruluğu hesapla.
Tahmin edilen yönün (yukarı/aşağı) gerçekleşen yönle eşleşme oranı.
"""
if len(predictions) < 2:
return {"score": 50, "correct": 0, "total": 0, "rate": 0.5}
correct = 0
total = 0
for p in predictions:
predicted = p.get("predicted_close")
actual = p.get("actual_close")
if predicted is None or actual is None:
continue
# Son bilinen fiyattan yön
# quantile_p50 bulamazasak predicted_close kullanırız
last_known = p.get("quantile_p50") or predicted
# Basitleştirilmiş: eğer prediction_date'teki fiyat veritabanında yoksa
# predicted ve actual'ın yönünü kontrol ederiz
predicted_direction = 1 if predicted > 0 else -1 # Placeholder
# Daha anlamlı kontrol: tahmin önceki gerçeğe göre yukarı mı aşağı mı?
# her prediction'da last_known_price gibi bir referans olmalı
# Şimdilik predicted vs actual farkını kullanıyoruz
total += 1
# Basit yön kontrolü: predicted_close tahminin beklediği yön
# Bu alanı geçmiş fiyatla karşılaştırarak hesaplarız
# predictions tablosunda referans fiyat bilgisi olmayabilir
# Bu yüzden ardışık tahminleri karşılaştırabiliriz
# Gerçek yön doğruluğu hesaplaması
correct = 0
total = 0
for i in range(len(predictions) - 1):
curr = predictions[i]
prev_actual = predictions[i + 1].get("actual_close") # Sıralama DESC olduğu için
curr_actual = curr.get("actual_close")
curr_predicted = curr.get("predicted_close")
if prev_actual is None or curr_actual is None or curr_predicted is None:
continue
# Gerçek yön: curr_actual vs prev_actual
actual_direction = curr_actual - prev_actual
# Tahmin yönü: curr_predicted vs prev_actual
predicted_direction = curr_predicted - prev_actual
total += 1
if (actual_direction > 0 and predicted_direction > 0) or \
(actual_direction < 0 and predicted_direction < 0) or \
(actual_direction == 0 and predicted_direction == 0):
correct += 1
rate = correct / total if total > 0 else 0.5
# 50% baseline çıkararak puanlama (random guess = %50 yön)
score = min(100, max(0, rate * 100))
return {
"score": round(score, 1),
"correct": correct,
"total": total,
"rate": round(rate, 3),
}
def calculate_mape_score(predictions: list[dict]) -> dict:
"""
MAPE (Mean Absolute Percentage Error) tabanlı puan hesapla.
Düşük MAPE = Yüksek puan.
score = max(0, 100 - MAPE * 10)
"""
errors = []
for p in predictions:
predicted = p.get("predicted_close")
actual = p.get("actual_close")
if predicted is None or actual is None or actual == 0:
continue
ape = abs(predicted - actual) / abs(actual) * 100 # Yüzdelik hata
errors.append(ape)
if not errors:
return {"score": 50, "mape": 0, "count": 0}
mape = sum(errors) / len(errors)
# MAPE'yi puana dönüştür
# %0 hata = 100 puan, %10+ hata = 0 puan
score = max(0, min(100, 100 - mape * 10))
return {
"score": round(score, 1),
"mape": round(mape, 2),
"count": len(errors),
"min_error": round(min(errors), 2),
"max_error": round(max(errors), 2),
"median_error": round(sorted(errors)[len(errors) // 2], 2),
}
def calculate_quantile_coverage(predictions: list[dict]) -> dict:
"""
Quantile kapsama oranı hesapla.
Gerçek fiyatın P10–P90 bandı içinde kalma yüzdesi.
Beklenen: ~%80 (P10-P90 = %80 güven aralığı)
"""
in_band = 0
total = 0
for p in predictions:
actual = p.get("actual_close")
p10 = p.get("quantile_p10")
p90 = p.get("quantile_p90")
if actual is None or p10 is None or p90 is None:
continue
total += 1
if p10 <= actual <= p90:
in_band += 1
if total == 0:
return {"score": 50, "coverage": 0, "in_band": 0, "total": 0}
coverage = in_band / total
# İdeal kapsama ~%80 (P10-P90 bandı)
# %80 = 100 puan, sapmalar düşürür
# |coverage - 0.8| * 500 kadar puan düşürüyoruz
deviation = abs(coverage - 0.80)
score = max(0, min(100, 100 - deviation * 500))
return {
"score": round(score, 1),
"coverage": round(coverage, 3),
"in_band": in_band,
"total": total,
}
def calculate_volatility_match(predictions: list[dict], symbol: str) -> dict:
"""
Volatilite uyumu hesapla.
Tahmin edilen fiyat bandwidth'i ile gerçekleşen fiyat dalgalanmasını karşılaştırır.
"""
if len(predictions) < 5:
return {"score": 50, "predicted_vol": 0, "actual_vol": 0}
# Tahmin edilen volatilite: P10-P90 bandwidth ortalaması
predicted_vols = []
actual_prices = []
for p in predictions:
p10 = p.get("quantile_p10")
p90 = p.get("quantile_p90")
actual = p.get("actual_close")
predicted = p.get("predicted_close")
if p10 is not None and p90 is not None and predicted and predicted > 0:
predicted_vols.append((p90 - p10) / predicted)
if actual is not None:
actual_prices.append(actual)
if len(actual_prices) < 3 or not predicted_vols:
return {"score": 50, "predicted_vol": 0, "actual_vol": 0}
# Gerçekleşen volatilite: fiyat değişimlerinin standart sapması
returns = []
for i in range(1, len(actual_prices)):
if actual_prices[i - 1] != 0:
ret = (actual_prices[i] - actual_prices[i - 1]) / actual_prices[i - 1]
returns.append(ret)
if not returns:
return {"score": 50, "predicted_vol": 0, "actual_vol": 0}
actual_vol = (sum(r ** 2 for r in returns) / len(returns)) ** 0.5
predicted_vol = sum(predicted_vols) / len(predicted_vols)
# Uyum oranı
if max(actual_vol, predicted_vol) == 0:
ratio = 1.0
else:
ratio = min(actual_vol, predicted_vol) / max(actual_vol, predicted_vol)
score = ratio * 100
return {
"score": round(score, 1),
"predicted_vol": round(predicted_vol, 4),
"actual_vol": round(actual_vol, 4),
"match_ratio": round(ratio, 3),
}
def calculate_confidence(symbol: str, horizon: int = 10) -> dict:
"""
Belirli bir hisse için tüm metrikleri hesapla ve ağırlıklı güven puanı üret.
Returns:
dict: {
total_score: float (0-100),
direction_accuracy: float,
mape_score: float,
quantile_coverage: float,
volatility_match: float,
details: dict, # Her metriğin detay bilgileri
}
"""
predictions = get_predictions_with_actuals(symbol, horizon)
if not predictions:
return {
"total_score": 0,
"direction_accuracy": 0,
"mape_score": 0,
"quantile_coverage": 0,
"volatility_match": 0,
"details": {"message": "Henüz karşılaştırılmış tahmin yok."},
"data_count": 0,
}
# Her metriği hesapla
dir_result = calculate_direction_accuracy(predictions)
mape_result = calculate_mape_score(predictions)
quant_result = calculate_quantile_coverage(predictions)
vol_result = calculate_volatility_match(predictions, symbol)
# Ağırlıklı toplam
weights = CONFIDENCE_WEIGHTS
total_score = (
dir_result["score"] * weights["direction_accuracy"] +
mape_result["score"] * weights["mape"] +
quant_result["score"] * weights["quantile_coverage"] +
vol_result["score"] * weights["volatility_match"]
)
result = {
"total_score": round(total_score, 1),
"direction_accuracy": round(dir_result["score"], 1),
"mape_score": round(mape_result["score"], 1),
"quantile_coverage": round(quant_result["score"], 1),
"volatility_match": round(vol_result["score"], 1),
"details": {
"direction": dir_result,
"mape": mape_result,
"quantile": quant_result,
"volatility": vol_result,
},
"data_count": len(predictions),
}
return result
def update_confidence_score(symbol: str, horizon: int = 10):
"""
Güven puanını hesapla ve veritabanına kaydet.
"""
from datetime import date
result = calculate_confidence(symbol, horizon)
save_confidence_score(
symbol=symbol,
score_date=date.today().isoformat(),
horizon=horizon,
scores=result
)
logger.info(
f"{symbol} güven puanı (h={horizon}): "
f"{result['total_score']}/100 "
f"(yön={result['direction_accuracy']}, "
f"mape={result['mape_score']}, "
f"quant={result['quantile_coverage']}, "
f"vol={result['volatility_match']})"
)
return result
def get_confidence_label(score: float) -> dict:
"""
Güven puanına göre label ve renk bilgisi döndür.
"""
if score >= 80:
return {"label": "Çok Güvenilir", "color": "#00e676", "emoji": "🟢"}
elif score >= 60:
return {"label": "Güvenilir", "color": "#ffeb3b", "emoji": "🟡"}
elif score >= 40:
return {"label": "Orta", "color": "#ff9800", "emoji": "🟠"}
else:
return {"label": "Düşük Güven", "color": "#f44336", "emoji": "🔴"}
def calculate_all_confidences(symbols: list[str], horizon: int = 10) -> dict:
"""Tüm hisselerin güven puanlarını hesapla."""
results = {}
for symbol in symbols:
try:
results[symbol] = update_confidence_score(symbol, horizon)
except Exception as e:
logger.error(f"{symbol} güven puanı hatası: {e}")
results[symbol] = {"total_score": 0, "error": str(e)}
return results