Spaces:
Running
Running
| """ | |
| BIST Predictor — Güven Puanı Hesaplama Modülü | |
| Tahminlerin doğruluğuna göre 0–100 arasında güven puanı hesaplar. | |
| Metrikler: | |
| 1. Yön Doğruluğu (%40) — Fiyat yönünü doğru tahmin etme oranı | |
| 2. MAPE (%30) — Ortalama yüzdelik mutlak hata | |
| 3. Quantile Kapsama (%20) — Gerçek fiyatın tahmin bandı içinde kalma oranı | |
| 4. Volatilite Uyumu (%10) — Tahmin edilen/gerçekleşen volatilite uyumu | |
| """ | |
| import logging | |
| import math | |
| from typing import Optional | |
| from config import CONFIDENCE_WEIGHTS | |
| from services.database import ( | |
| get_predictions_with_actuals, | |
| save_confidence_score, | |
| get_daily_prices, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def calculate_direction_accuracy(predictions: list[dict]) -> dict: | |
| """ | |
| Yön doğruluğu hesapla. | |
| Tahmin edilen yönün (yukarı/aşağı) gerçekleşen yönle eşleşme oranı. | |
| """ | |
| if len(predictions) < 2: | |
| return {"score": 50, "correct": 0, "total": 0, "rate": 0.5} | |
| correct = 0 | |
| total = 0 | |
| for p in predictions: | |
| predicted = p.get("predicted_close") | |
| actual = p.get("actual_close") | |
| if predicted is None or actual is None: | |
| continue | |
| # Son bilinen fiyattan yön | |
| # quantile_p50 bulamazasak predicted_close kullanırız | |
| last_known = p.get("quantile_p50") or predicted | |
| # Basitleştirilmiş: eğer prediction_date'teki fiyat veritabanında yoksa | |
| # predicted ve actual'ın yönünü kontrol ederiz | |
| predicted_direction = 1 if predicted > 0 else -1 # Placeholder | |
| # Daha anlamlı kontrol: tahmin önceki gerçeğe göre yukarı mı aşağı mı? | |
| # her prediction'da last_known_price gibi bir referans olmalı | |
| # Şimdilik predicted vs actual farkını kullanıyoruz | |
| total += 1 | |
| # Basit yön kontrolü: predicted_close tahminin beklediği yön | |
| # Bu alanı geçmiş fiyatla karşılaştırarak hesaplarız | |
| # predictions tablosunda referans fiyat bilgisi olmayabilir | |
| # Bu yüzden ardışık tahminleri karşılaştırabiliriz | |
| # Gerçek yön doğruluğu hesaplaması | |
| correct = 0 | |
| total = 0 | |
| for i in range(len(predictions) - 1): | |
| curr = predictions[i] | |
| prev_actual = predictions[i + 1].get("actual_close") # Sıralama DESC olduğu için | |
| curr_actual = curr.get("actual_close") | |
| curr_predicted = curr.get("predicted_close") | |
| if prev_actual is None or curr_actual is None or curr_predicted is None: | |
| continue | |
| # Gerçek yön: curr_actual vs prev_actual | |
| actual_direction = curr_actual - prev_actual | |
| # Tahmin yönü: curr_predicted vs prev_actual | |
| predicted_direction = curr_predicted - prev_actual | |
| total += 1 | |
| if (actual_direction > 0 and predicted_direction > 0) or \ | |
| (actual_direction < 0 and predicted_direction < 0) or \ | |
| (actual_direction == 0 and predicted_direction == 0): | |
| correct += 1 | |
| rate = correct / total if total > 0 else 0.5 | |
| # 50% baseline çıkararak puanlama (random guess = %50 yön) | |
| score = min(100, max(0, rate * 100)) | |
| return { | |
| "score": round(score, 1), | |
| "correct": correct, | |
| "total": total, | |
| "rate": round(rate, 3), | |
| } | |
| def calculate_mape_score(predictions: list[dict]) -> dict: | |
| """ | |
| MAPE (Mean Absolute Percentage Error) tabanlı puan hesapla. | |
| Düşük MAPE = Yüksek puan. | |
| score = max(0, 100 - MAPE * 10) | |
| """ | |
| errors = [] | |
| for p in predictions: | |
| predicted = p.get("predicted_close") | |
| actual = p.get("actual_close") | |
| if predicted is None or actual is None or actual == 0: | |
| continue | |
| ape = abs(predicted - actual) / abs(actual) * 100 # Yüzdelik hata | |
| errors.append(ape) | |
| if not errors: | |
| return {"score": 50, "mape": 0, "count": 0} | |
| mape = sum(errors) / len(errors) | |
| # MAPE'yi puana dönüştür | |
| # %0 hata = 100 puan, %10+ hata = 0 puan | |
| score = max(0, min(100, 100 - mape * 10)) | |
| return { | |
| "score": round(score, 1), | |
| "mape": round(mape, 2), | |
| "count": len(errors), | |
| "min_error": round(min(errors), 2), | |
| "max_error": round(max(errors), 2), | |
| "median_error": round(sorted(errors)[len(errors) // 2], 2), | |
| } | |
| def calculate_quantile_coverage(predictions: list[dict]) -> dict: | |
| """ | |
| Quantile kapsama oranı hesapla. | |
| Gerçek fiyatın P10–P90 bandı içinde kalma yüzdesi. | |
| Beklenen: ~%80 (P10-P90 = %80 güven aralığı) | |
| """ | |
| in_band = 0 | |
| total = 0 | |
| for p in predictions: | |
| actual = p.get("actual_close") | |
| p10 = p.get("quantile_p10") | |
| p90 = p.get("quantile_p90") | |
| if actual is None or p10 is None or p90 is None: | |
| continue | |
| total += 1 | |
| if p10 <= actual <= p90: | |
| in_band += 1 | |
| if total == 0: | |
| return {"score": 50, "coverage": 0, "in_band": 0, "total": 0} | |
| coverage = in_band / total | |
| # İdeal kapsama ~%80 (P10-P90 bandı) | |
| # %80 = 100 puan, sapmalar düşürür | |
| # |coverage - 0.8| * 500 kadar puan düşürüyoruz | |
| deviation = abs(coverage - 0.80) | |
| score = max(0, min(100, 100 - deviation * 500)) | |
| return { | |
| "score": round(score, 1), | |
| "coverage": round(coverage, 3), | |
| "in_band": in_band, | |
| "total": total, | |
| } | |
| def calculate_volatility_match(predictions: list[dict], symbol: str) -> dict: | |
| """ | |
| Volatilite uyumu hesapla. | |
| Tahmin edilen fiyat bandwidth'i ile gerçekleşen fiyat dalgalanmasını karşılaştırır. | |
| """ | |
| if len(predictions) < 5: | |
| return {"score": 50, "predicted_vol": 0, "actual_vol": 0} | |
| # Tahmin edilen volatilite: P10-P90 bandwidth ortalaması | |
| predicted_vols = [] | |
| actual_prices = [] | |
| for p in predictions: | |
| p10 = p.get("quantile_p10") | |
| p90 = p.get("quantile_p90") | |
| actual = p.get("actual_close") | |
| predicted = p.get("predicted_close") | |
| if p10 is not None and p90 is not None and predicted and predicted > 0: | |
| predicted_vols.append((p90 - p10) / predicted) | |
| if actual is not None: | |
| actual_prices.append(actual) | |
| if len(actual_prices) < 3 or not predicted_vols: | |
| return {"score": 50, "predicted_vol": 0, "actual_vol": 0} | |
| # Gerçekleşen volatilite: fiyat değişimlerinin standart sapması | |
| returns = [] | |
| for i in range(1, len(actual_prices)): | |
| if actual_prices[i - 1] != 0: | |
| ret = (actual_prices[i] - actual_prices[i - 1]) / actual_prices[i - 1] | |
| returns.append(ret) | |
| if not returns: | |
| return {"score": 50, "predicted_vol": 0, "actual_vol": 0} | |
| actual_vol = (sum(r ** 2 for r in returns) / len(returns)) ** 0.5 | |
| predicted_vol = sum(predicted_vols) / len(predicted_vols) | |
| # Uyum oranı | |
| if max(actual_vol, predicted_vol) == 0: | |
| ratio = 1.0 | |
| else: | |
| ratio = min(actual_vol, predicted_vol) / max(actual_vol, predicted_vol) | |
| score = ratio * 100 | |
| return { | |
| "score": round(score, 1), | |
| "predicted_vol": round(predicted_vol, 4), | |
| "actual_vol": round(actual_vol, 4), | |
| "match_ratio": round(ratio, 3), | |
| } | |
| def calculate_confidence(symbol: str, horizon: int = 10) -> dict: | |
| """ | |
| Belirli bir hisse için tüm metrikleri hesapla ve ağırlıklı güven puanı üret. | |
| Returns: | |
| dict: { | |
| total_score: float (0-100), | |
| direction_accuracy: float, | |
| mape_score: float, | |
| quantile_coverage: float, | |
| volatility_match: float, | |
| details: dict, # Her metriğin detay bilgileri | |
| } | |
| """ | |
| predictions = get_predictions_with_actuals(symbol, horizon) | |
| if not predictions: | |
| return { | |
| "total_score": 0, | |
| "direction_accuracy": 0, | |
| "mape_score": 0, | |
| "quantile_coverage": 0, | |
| "volatility_match": 0, | |
| "details": {"message": "Henüz karşılaştırılmış tahmin yok."}, | |
| "data_count": 0, | |
| } | |
| # Her metriği hesapla | |
| dir_result = calculate_direction_accuracy(predictions) | |
| mape_result = calculate_mape_score(predictions) | |
| quant_result = calculate_quantile_coverage(predictions) | |
| vol_result = calculate_volatility_match(predictions, symbol) | |
| # Ağırlıklı toplam | |
| weights = CONFIDENCE_WEIGHTS | |
| total_score = ( | |
| dir_result["score"] * weights["direction_accuracy"] + | |
| mape_result["score"] * weights["mape"] + | |
| quant_result["score"] * weights["quantile_coverage"] + | |
| vol_result["score"] * weights["volatility_match"] | |
| ) | |
| result = { | |
| "total_score": round(total_score, 1), | |
| "direction_accuracy": round(dir_result["score"], 1), | |
| "mape_score": round(mape_result["score"], 1), | |
| "quantile_coverage": round(quant_result["score"], 1), | |
| "volatility_match": round(vol_result["score"], 1), | |
| "details": { | |
| "direction": dir_result, | |
| "mape": mape_result, | |
| "quantile": quant_result, | |
| "volatility": vol_result, | |
| }, | |
| "data_count": len(predictions), | |
| } | |
| return result | |
| def update_confidence_score(symbol: str, horizon: int = 10): | |
| """ | |
| Güven puanını hesapla ve veritabanına kaydet. | |
| """ | |
| from datetime import date | |
| result = calculate_confidence(symbol, horizon) | |
| save_confidence_score( | |
| symbol=symbol, | |
| score_date=date.today().isoformat(), | |
| horizon=horizon, | |
| scores=result | |
| ) | |
| logger.info( | |
| f"{symbol} güven puanı (h={horizon}): " | |
| f"{result['total_score']}/100 " | |
| f"(yön={result['direction_accuracy']}, " | |
| f"mape={result['mape_score']}, " | |
| f"quant={result['quantile_coverage']}, " | |
| f"vol={result['volatility_match']})" | |
| ) | |
| return result | |
| def get_confidence_label(score: float) -> dict: | |
| """ | |
| Güven puanına göre label ve renk bilgisi döndür. | |
| """ | |
| if score >= 80: | |
| return {"label": "Çok Güvenilir", "color": "#00e676", "emoji": "🟢"} | |
| elif score >= 60: | |
| return {"label": "Güvenilir", "color": "#ffeb3b", "emoji": "🟡"} | |
| elif score >= 40: | |
| return {"label": "Orta", "color": "#ff9800", "emoji": "🟠"} | |
| else: | |
| return {"label": "Düşük Güven", "color": "#f44336", "emoji": "🔴"} | |
| def calculate_all_confidences(symbols: list[str], horizon: int = 10) -> dict: | |
| """Tüm hisselerin güven puanlarını hesapla.""" | |
| results = {} | |
| for symbol in symbols: | |
| try: | |
| results[symbol] = update_confidence_score(symbol, horizon) | |
| except Exception as e: | |
| logger.error(f"{symbol} güven puanı hatası: {e}") | |
| results[symbol] = {"total_score": 0, "error": str(e)} | |
| return results | |