File size: 1,977 Bytes
700e2b6
b4fadea
 
 
 
91a9dcd
b4fadea
 
 
 
 
91a9dcd
 
 
 
 
 
 
 
 
e105368
 
 
 
 
700e2b6
e105368
 
 
91a9dcd
 
e105368
 
88260af
 
 
700e2b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e105368
88260af
 
91a9dcd
88260af
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# app/monitoring/drift.py
import os
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
from app.monitoring.governance import Governance

REFERENCE_DATA_PATH = "models/v1/reference_data.csv"
REPORT_DIR = "reports/evidently"
REPORT_PATH = os.path.join(REPORT_DIR, "drift_report.html")

# Thresholds configuration
thresholds = {
    "psi": 0.2,
    "accuracy_drop": 0.05,
    "f1": 0.7
}

governance = Governance(thresholds=thresholds)


def run_drift_check(current_data: pd.DataFrame, reference_data: pd.DataFrame, model_version="v1"):
    """
    Run Evidently DataDriftPreset on current vs reference data,
    save HTML report, and run governance checks.
    Returns a tuple: (alerts, drift_scores)
    """
    os.makedirs(REPORT_DIR, exist_ok=True)

    report = Report(metrics=[DataDriftPreset()])
    report.run(current_data=current_data, reference_data=reference_data)
    report.save_html(REPORT_PATH)

    # Extract numeric drift scores per column
    report_dict = report.as_dict() if hasattr(report, "as_dict") else {}
    drift_scores = {}

    metrics_list = report_dict.get("metrics", [])

    for metric in metrics_list:
        result = metric.get("result", {})
        # Check column-level drift
        drift_by_columns = result.get("drift_by_columns", {})
        if drift_by_columns:
            for col, info in drift_by_columns.items():
                score = info.get("drift_score", 0.0)
                if score is None or not pd.notna(score):
                    score = 0.0
                drift_scores[col] = float(score)
        # fallback: Dataset-level drift metric (PSI share)
        elif metric.get("metric") == "DatasetDriftMetric":
            drift_scores["dataset"] = float(result.get("share_of_drifted_columns", 0.0))

    # Run governance checks (keeps existing alerts)
    alerts = governance.check_metrics(report_dict, model_version=model_version)

    return alerts, drift_scores