Spaces:
Running
Running
File size: 2,809 Bytes
38593e7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
"""
MLflow Logging for Drift Detection
Handles logging drift metrics and alerts to MLflow experiment runs.
"""
from typing import Dict, Optional
from loguru import logger
try:
import mlflow
except ImportError:
mlflow = None
class DriftLogger:
"""
Logs drift detection results to MLflow.
"""
def __init__(self, log_artifacts: bool = False):
"""
Initialize drift logger.
"""
self.log_artifacts = log_artifacts
self.has_mlflow = mlflow is not None
def log_drift_results(
self,
drift_results: Dict,
step: Optional[int] = None,
prefix: str = "drift",
) -> None:
"""
Log drift detection results to MLflow.
"""
if not self.has_mlflow:
logger.debug("MLflow not available")
return
try:
overall = drift_results.get("overall", {})
mlflow.log_metric(f"{prefix}/drifted", float(overall.get("drifted", False)), step=step)
mlflow.log_metric(
f"{prefix}/num_drifts", float(overall.get("num_drifts", 0)), step=step
)
for drift_type, result in drift_results.items():
if drift_type == "overall":
continue
if "p_value" in result:
mlflow.log_metric(
f"{prefix}/{drift_type}/p_value", result["p_value"], step=step
)
logger.debug("Logged drift results to MLflow")
except Exception as e:
logger.warning(f"Failed to log drift to MLflow: {e}")
def log_baseline_statistics(
self,
baseline_stats: Dict,
prefix: str = "baseline",
) -> None:
"""
Log baseline statistics to MLflow.
"""
if not self.has_mlflow:
return
try:
metrics = {
f"{prefix}/num_samples": baseline_stats.get("num_samples"),
f"{prefix}/text_length_mean": baseline_stats.get("text_length_mean"),
f"{prefix}/word_count_mean": baseline_stats.get("word_count_mean"),
}
for metric_name, value in metrics.items():
if value is not None:
mlflow.log_metric(metric_name, float(value))
mlflow.log_param(f"{prefix}/language", baseline_stats.get("language", "unknown"))
logger.debug("Logged baseline to MLflow")
except Exception as e:
logger.warning(f"Failed to log baseline: {e}")
def log_alert(self, message: str, severity: str = "warning") -> None:
"""
Log drift alert message.
"""
logger_func = getattr(logger, severity, logger.warning)
logger_func(f"DRIFT ALERT: {message}")
|