Spaces:
Running
Running
| import pandas as pd | |
| from pathlib import Path | |
| from typing import Optional | |
| try: | |
| from evidently.report import Report | |
| from evidently.metric_preset import DataDriftPreset, DataQualityPreset | |
| EVIDENTLY_AVAILABLE = True | |
| except ImportError: | |
| EVIDENTLY_AVAILABLE = False | |
| class EvidentlyMonitor: | |
| def __init__(self, reference_data: pd.DataFrame): | |
| if not EVIDENTLY_AVAILABLE: | |
| raise ImportError("Evidently not installed. Run: pip install evidently") | |
| self.reference_data = reference_data | |
| def generate_drift_report(self, current_data: pd.DataFrame, output_path: Optional[Path] = None): | |
| """Generate Evidently data drift report""" | |
| report = Report(metrics=[ | |
| DataDriftPreset(), | |
| DataQualityPreset() | |
| ]) | |
| report.run(reference_data=self.reference_data, current_data=current_data) | |
| if output_path: | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| report.save_html(str(output_path)) | |
| return report | |
| def get_drift_metrics(self, current_data: pd.DataFrame) -> dict: | |
| """Get drift metrics as dictionary""" | |
| report = Report(metrics=[DataDriftPreset()]) | |
| report.run(reference_data=self.reference_data, current_data=current_data) | |
| return report.as_dict() | |