""" BIST Predictor — Güven Puanı Hesaplama Modülü Tahminlerin doğruluğuna göre 0–100 arasında güven puanı hesaplar. Metrikler: 1. Yön Doğruluğu (%40) — Fiyat yönünü doğru tahmin etme oranı 2. MAPE (%30) — Ortalama yüzdelik mutlak hata 3. Quantile Kapsama (%20) — Gerçek fiyatın tahmin bandı içinde kalma oranı 4. Volatilite Uyumu (%10) — Tahmin edilen/gerçekleşen volatilite uyumu """ import logging import math from typing import Optional from config import CONFIDENCE_WEIGHTS from services.database import ( get_predictions_with_actuals, save_confidence_score, get_daily_prices, ) logger = logging.getLogger(__name__) def calculate_direction_accuracy(predictions: list[dict]) -> dict: """ Yön doğruluğu hesapla. Tahmin edilen yönün (yukarı/aşağı) gerçekleşen yönle eşleşme oranı. """ if len(predictions) < 2: return {"score": 50, "correct": 0, "total": 0, "rate": 0.5} correct = 0 total = 0 for p in predictions: predicted = p.get("predicted_close") actual = p.get("actual_close") if predicted is None or actual is None: continue # Son bilinen fiyattan yön # quantile_p50 bulamazasak predicted_close kullanırız last_known = p.get("quantile_p50") or predicted # Basitleştirilmiş: eğer prediction_date'teki fiyat veritabanında yoksa # predicted ve actual'ın yönünü kontrol ederiz predicted_direction = 1 if predicted > 0 else -1 # Placeholder # Daha anlamlı kontrol: tahmin önceki gerçeğe göre yukarı mı aşağı mı? # her prediction'da last_known_price gibi bir referans olmalı # Şimdilik predicted vs actual farkını kullanıyoruz total += 1 # Basit yön kontrolü: predicted_close tahminin beklediği yön # Bu alanı geçmiş fiyatla karşılaştırarak hesaplarız # predictions tablosunda referans fiyat bilgisi olmayabilir # Bu yüzden ardışık tahminleri karşılaştırabiliriz # Gerçek yön doğruluğu hesaplaması correct = 0 total = 0 for i in range(len(predictions) - 1): curr = predictions[i] prev_actual = predictions[i + 1].get("actual_close") # Sıralama DESC olduğu için curr_actual = curr.get("actual_close") curr_predicted = curr.get("predicted_close") if prev_actual is None or curr_actual is None or curr_predicted is None: continue # Gerçek yön: curr_actual vs prev_actual actual_direction = curr_actual - prev_actual # Tahmin yönü: curr_predicted vs prev_actual predicted_direction = curr_predicted - prev_actual total += 1 if (actual_direction > 0 and predicted_direction > 0) or \ (actual_direction < 0 and predicted_direction < 0) or \ (actual_direction == 0 and predicted_direction == 0): correct += 1 rate = correct / total if total > 0 else 0.5 # 50% baseline çıkararak puanlama (random guess = %50 yön) score = min(100, max(0, rate * 100)) return { "score": round(score, 1), "correct": correct, "total": total, "rate": round(rate, 3), } def calculate_mape_score(predictions: list[dict]) -> dict: """ MAPE (Mean Absolute Percentage Error) tabanlı puan hesapla. Düşük MAPE = Yüksek puan. score = max(0, 100 - MAPE * 10) """ errors = [] for p in predictions: predicted = p.get("predicted_close") actual = p.get("actual_close") if predicted is None or actual is None or actual == 0: continue ape = abs(predicted - actual) / abs(actual) * 100 # Yüzdelik hata errors.append(ape) if not errors: return {"score": 50, "mape": 0, "count": 0} mape = sum(errors) / len(errors) # MAPE'yi puana dönüştür # %0 hata = 100 puan, %10+ hata = 0 puan score = max(0, min(100, 100 - mape * 10)) return { "score": round(score, 1), "mape": round(mape, 2), "count": len(errors), "min_error": round(min(errors), 2), "max_error": round(max(errors), 2), "median_error": round(sorted(errors)[len(errors) // 2], 2), } def calculate_quantile_coverage(predictions: list[dict]) -> dict: """ Quantile kapsama oranı hesapla. Gerçek fiyatın P10–P90 bandı içinde kalma yüzdesi. Beklenen: ~%80 (P10-P90 = %80 güven aralığı) """ in_band = 0 total = 0 for p in predictions: actual = p.get("actual_close") p10 = p.get("quantile_p10") p90 = p.get("quantile_p90") if actual is None or p10 is None or p90 is None: continue total += 1 if p10 <= actual <= p90: in_band += 1 if total == 0: return {"score": 50, "coverage": 0, "in_band": 0, "total": 0} coverage = in_band / total # İdeal kapsama ~%80 (P10-P90 bandı) # %80 = 100 puan, sapmalar düşürür # |coverage - 0.8| * 500 kadar puan düşürüyoruz deviation = abs(coverage - 0.80) score = max(0, min(100, 100 - deviation * 500)) return { "score": round(score, 1), "coverage": round(coverage, 3), "in_band": in_band, "total": total, } def calculate_volatility_match(predictions: list[dict], symbol: str) -> dict: """ Volatilite uyumu hesapla. Tahmin edilen fiyat bandwidth'i ile gerçekleşen fiyat dalgalanmasını karşılaştırır. """ if len(predictions) < 5: return {"score": 50, "predicted_vol": 0, "actual_vol": 0} # Tahmin edilen volatilite: P10-P90 bandwidth ortalaması predicted_vols = [] actual_prices = [] for p in predictions: p10 = p.get("quantile_p10") p90 = p.get("quantile_p90") actual = p.get("actual_close") predicted = p.get("predicted_close") if p10 is not None and p90 is not None and predicted and predicted > 0: predicted_vols.append((p90 - p10) / predicted) if actual is not None: actual_prices.append(actual) if len(actual_prices) < 3 or not predicted_vols: return {"score": 50, "predicted_vol": 0, "actual_vol": 0} # Gerçekleşen volatilite: fiyat değişimlerinin standart sapması returns = [] for i in range(1, len(actual_prices)): if actual_prices[i - 1] != 0: ret = (actual_prices[i] - actual_prices[i - 1]) / actual_prices[i - 1] returns.append(ret) if not returns: return {"score": 50, "predicted_vol": 0, "actual_vol": 0} actual_vol = (sum(r ** 2 for r in returns) / len(returns)) ** 0.5 predicted_vol = sum(predicted_vols) / len(predicted_vols) # Uyum oranı if max(actual_vol, predicted_vol) == 0: ratio = 1.0 else: ratio = min(actual_vol, predicted_vol) / max(actual_vol, predicted_vol) score = ratio * 100 return { "score": round(score, 1), "predicted_vol": round(predicted_vol, 4), "actual_vol": round(actual_vol, 4), "match_ratio": round(ratio, 3), } def calculate_confidence(symbol: str, horizon: int = 10) -> dict: """ Belirli bir hisse için tüm metrikleri hesapla ve ağırlıklı güven puanı üret. Returns: dict: { total_score: float (0-100), direction_accuracy: float, mape_score: float, quantile_coverage: float, volatility_match: float, details: dict, # Her metriğin detay bilgileri } """ predictions = get_predictions_with_actuals(symbol, horizon) if not predictions: return { "total_score": 0, "direction_accuracy": 0, "mape_score": 0, "quantile_coverage": 0, "volatility_match": 0, "details": {"message": "Henüz karşılaştırılmış tahmin yok."}, "data_count": 0, } # Her metriği hesapla dir_result = calculate_direction_accuracy(predictions) mape_result = calculate_mape_score(predictions) quant_result = calculate_quantile_coverage(predictions) vol_result = calculate_volatility_match(predictions, symbol) # Ağırlıklı toplam weights = CONFIDENCE_WEIGHTS total_score = ( dir_result["score"] * weights["direction_accuracy"] + mape_result["score"] * weights["mape"] + quant_result["score"] * weights["quantile_coverage"] + vol_result["score"] * weights["volatility_match"] ) result = { "total_score": round(total_score, 1), "direction_accuracy": round(dir_result["score"], 1), "mape_score": round(mape_result["score"], 1), "quantile_coverage": round(quant_result["score"], 1), "volatility_match": round(vol_result["score"], 1), "details": { "direction": dir_result, "mape": mape_result, "quantile": quant_result, "volatility": vol_result, }, "data_count": len(predictions), } return result def update_confidence_score(symbol: str, horizon: int = 10): """ Güven puanını hesapla ve veritabanına kaydet. """ from datetime import date result = calculate_confidence(symbol, horizon) save_confidence_score( symbol=symbol, score_date=date.today().isoformat(), horizon=horizon, scores=result ) logger.info( f"{symbol} güven puanı (h={horizon}): " f"{result['total_score']}/100 " f"(yön={result['direction_accuracy']}, " f"mape={result['mape_score']}, " f"quant={result['quantile_coverage']}, " f"vol={result['volatility_match']})" ) return result def get_confidence_label(score: float) -> dict: """ Güven puanına göre label ve renk bilgisi döndür. """ if score >= 80: return {"label": "Çok Güvenilir", "color": "#00e676", "emoji": "🟢"} elif score >= 60: return {"label": "Güvenilir", "color": "#ffeb3b", "emoji": "🟡"} elif score >= 40: return {"label": "Orta", "color": "#ff9800", "emoji": "🟠"} else: return {"label": "Düşük Güven", "color": "#f44336", "emoji": "🔴"} def calculate_all_confidences(symbols: list[str], horizon: int = 10) -> dict: """Tüm hisselerin güven puanlarını hesapla.""" results = {} for symbol in symbols: try: results[symbol] = update_confidence_score(symbol, horizon) except Exception as e: logger.error(f"{symbol} güven puanı hatası: {e}") results[symbol] = {"total_score": 0, "error": str(e)} return results