|
|
|
|
|
import os |
|
|
import pandas as pd |
|
|
from evidently.report import Report |
|
|
from evidently.metric_preset import DataDriftPreset |
|
|
from app.monitoring.governance import Governance |
|
|
|
|
|
REFERENCE_DATA_PATH = "models/v1/reference_data.csv" |
|
|
REPORT_DIR = "reports/evidently" |
|
|
REPORT_PATH = os.path.join(REPORT_DIR, "drift_report.html") |
|
|
|
|
|
|
|
|
thresholds = { |
|
|
"psi": 0.2, |
|
|
"accuracy_drop": 0.05, |
|
|
"f1": 0.7 |
|
|
} |
|
|
|
|
|
governance = Governance(thresholds=thresholds) |
|
|
|
|
|
|
|
|
def run_drift_check(current_data: pd.DataFrame, reference_data: pd.DataFrame, model_version="v1"): |
|
|
""" |
|
|
Run Evidently DataDriftPreset on current vs reference data, |
|
|
save HTML report, and run governance checks. |
|
|
Returns a tuple: (alerts, drift_scores) |
|
|
""" |
|
|
os.makedirs(REPORT_DIR, exist_ok=True) |
|
|
|
|
|
report = Report(metrics=[DataDriftPreset()]) |
|
|
report.run(current_data=current_data, reference_data=reference_data) |
|
|
report.save_html(REPORT_PATH) |
|
|
|
|
|
|
|
|
report_dict = report.as_dict() if hasattr(report, "as_dict") else {} |
|
|
drift_scores = {} |
|
|
|
|
|
metrics_list = report_dict.get("metrics", []) |
|
|
|
|
|
for metric in metrics_list: |
|
|
result = metric.get("result", {}) |
|
|
|
|
|
drift_by_columns = result.get("drift_by_columns", {}) |
|
|
if drift_by_columns: |
|
|
for col, info in drift_by_columns.items(): |
|
|
score = info.get("drift_score", 0.0) |
|
|
if score is None or not pd.notna(score): |
|
|
score = 0.0 |
|
|
drift_scores[col] = float(score) |
|
|
|
|
|
elif metric.get("metric") == "DatasetDriftMetric": |
|
|
drift_scores["dataset"] = float(result.get("share_of_drifted_columns", 0.0)) |
|
|
|
|
|
|
|
|
alerts = governance.check_metrics(report_dict, model_version=model_version) |
|
|
|
|
|
return alerts, drift_scores |
|
|
|