Spaces:
Running
Running
| from datetime import datetime | |
| import json | |
| from pathlib import Path | |
| import dagshub | |
| from loguru import logger | |
| from mlflow.tracking import MlflowClient | |
| import numpy as np | |
| import pandas as pd | |
| import typer | |
| from turing import config | |
| from turing.modeling.model_selector import get_best_model_by_tag | |
| from turing.monitoring.baseline_manager import extract_baseline_statistics | |
| from turing.monitoring.drift_detector import DriftDetector | |
| from turing.monitoring.feedback_manager import load_feedback_for_language | |
| from turing.monitoring.synthetic_data_generator import SyntheticDataGenerator | |
| app = typer.Typer() | |
| def load_training_data(dataset_name: str, language: str): | |
| """ | |
| Load training data for a specific programming language. | |
| Args: | |
| dataset_name: Dataset name (e.g., 'clean-k5000') | |
| language: Programming language (java, python, pharo) | |
| Returns: | |
| Tuple of (texts, labels) as lists | |
| """ | |
| dataset_path = config.INTERIM_DATA_DIR / "features" / dataset_name | |
| if not dataset_path.exists(): | |
| raise FileNotFoundError(f"Dataset path not found: {dataset_path}") | |
| train_file = None | |
| for file in dataset_path.rglob(f"{language}_train*.csv"): | |
| train_file = file | |
| break | |
| if not train_file: | |
| raise FileNotFoundError(f"Training file not found for {language} in {dataset_path}") | |
| logger.info(f"Loading training data from: {train_file}") | |
| df = pd.read_csv(train_file) | |
| X_train = df[config.INPUT_COLUMN].tolist() | |
| if isinstance(df[config.LABEL_COLUMN].iloc[0], str): | |
| y_train = np.array([eval(label) for label in df[config.LABEL_COLUMN]]) | |
| else: | |
| y_train = df[config.LABEL_COLUMN].values | |
| logger.success(f"Loaded {len(X_train)} training samples for {language}") | |
| return X_train, y_train | |
| def print_drift_report(drift_results: dict, drift_type: str, report_lines: list | None = None): | |
| """ | |
| Format and display drift detection results for a specific drift type. | |
| Args: | |
| drift_results: Dictionary with drift detection metrics and alerts | |
| drift_type: Name of drift type tested (e.g., 'none', 'text_length_short') | |
| report_lines: Optional list to collect formatted report lines | |
| """ | |
| def log_and_collect(msg: str): | |
| logger.info(msg) | |
| if report_lines is not None: | |
| report_lines.append(msg) | |
| log_and_collect(f"\n{'=' * 60}") | |
| log_and_collect(f"DRIFT DETECTION REPORT - {drift_type.upper()}") | |
| log_and_collect(f"{'=' * 60}") | |
| for metric_name, result in drift_results.items(): | |
| if metric_name == "overall": | |
| continue | |
| p_value = result.get("p_value", result.get("check_result", {}).get("passed", None)) | |
| statistic = result.get("statistic", None) | |
| drifted = result.get("drifted", False) | |
| alert = result.get("alert", False) | |
| if alert: | |
| status = "ALERT" | |
| elif drifted: | |
| status = "DRIFT" | |
| else: | |
| status = "OK" | |
| log_and_collect(f"\n{metric_name.upper()}") | |
| log_and_collect(f" Status: {status}") | |
| if p_value is not None: | |
| log_and_collect(f" P-value: {p_value:.6f}") | |
| if statistic is not None: | |
| log_and_collect(f" Statistic: {statistic:.6f}") | |
| log_and_collect(f" Drift detected: {drifted}") | |
| log_and_collect(f" Critical alert: {alert}") | |
| log_and_collect(f" Method: {result.get('method', 'unknown')}") | |
| overall = drift_results.get("overall", {}) | |
| overall_drifted = overall.get("drifted", False) | |
| overall_alert = overall.get("alert", False) | |
| drift_count = overall.get("num_drifts", 0) | |
| log_and_collect(f"\n{'=' * 60}") | |
| log_and_collect("OVERALL SUMMARY") | |
| log_and_collect(f" Drift detected: {overall_drifted}") | |
| log_and_collect(f" Critical alert: {overall_alert}") | |
| log_and_collect(f" Number of drifted metrics: {drift_count}") | |
| log_and_collect(f" Methods used: {overall.get('methods', [])}") | |
| log_and_collect(f"{'=' * 60}\n") | |
| def save_drift_report( | |
| language: str, | |
| dataset_name: str, | |
| baseline_stats: dict, | |
| test_results: dict, | |
| report_text: str, | |
| ): | |
| """ | |
| Save drift detection report to TXT and JSON files. | |
| Args: | |
| language: Programming language tested | |
| dataset_name: Name of dataset used | |
| baseline_stats: Baseline statistics dictionary | |
| test_results: Dictionary with test results for each drift type | |
| report_text: Formatted report text | |
| """ | |
| def convert_numpy_types(obj): | |
| if isinstance(obj, dict): | |
| return {k: convert_numpy_types(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [convert_numpy_types(item) for item in obj] | |
| elif isinstance(obj, np.bool_): | |
| return bool(obj) | |
| elif isinstance(obj, np.integer): | |
| return int(obj) | |
| elif isinstance(obj, np.floating): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| else: | |
| return obj | |
| monitoring_dir = config.REPORTS_DIR / "monitoring" | |
| monitoring_dir.mkdir(parents=True, exist_ok=True) | |
| report_file = monitoring_dir / f"drift_report_{language}.txt" | |
| with open(report_file, "w") as f: | |
| f.write("DRIFT DETECTION REPORT\n") | |
| f.write(f"Language: {language}\n") | |
| f.write(f"Dataset: {dataset_name}\n") | |
| f.write(f"Timestamp: {datetime.now().isoformat()}\n") | |
| f.write(f"P-value threshold: {config.DRIFT_P_VALUE_THRESHOLD}\n") | |
| f.write(f"Alert threshold: {config.DRIFT_ALERT_THRESHOLD}\n") | |
| f.write("\n" + "=" * 80 + "\n\n") | |
| f.write("BASELINE STATISTICS\n") | |
| f.write( | |
| f" Text length: mean={baseline_stats['text_length_mean']:.2f}, std={baseline_stats['text_length_std']:.2f}\n" | |
| ) | |
| f.write( | |
| f" Word count: mean={baseline_stats['word_count_mean']:.2f}, std={baseline_stats['word_count_std']:.2f}\n" | |
| ) | |
| f.write(f" Label counts: {baseline_stats['label_counts']}\n") | |
| f.write(f" Number of samples: {baseline_stats['num_samples']}\n") | |
| f.write("\n" + "=" * 80 + "\n\n") | |
| f.write(report_text) | |
| json_file = monitoring_dir / f"drift_report_{language}.json" | |
| report_data = { | |
| "language": language, | |
| "dataset": dataset_name, | |
| "timestamp": datetime.now().isoformat(), | |
| "config": { | |
| "p_value_threshold": config.DRIFT_P_VALUE_THRESHOLD, | |
| "alert_threshold": config.DRIFT_ALERT_THRESHOLD, | |
| }, | |
| "baseline": { | |
| "text_length_mean": baseline_stats["text_length_mean"], | |
| "text_length_std": baseline_stats["text_length_std"], | |
| "word_count_mean": baseline_stats["word_count_mean"], | |
| "word_count_std": baseline_stats["word_count_std"], | |
| "label_counts": baseline_stats["label_counts"], | |
| "num_samples": baseline_stats["num_samples"], | |
| "n_labels": baseline_stats["n_labels"], | |
| }, | |
| "test_results": convert_numpy_types(test_results), | |
| } | |
| with open(json_file, "w") as f: | |
| json.dump(report_data, f, indent=2) | |
| logger.success("Report saved to:") | |
| logger.info(f" Text: {report_file}") | |
| logger.info(f" JSON: {json_file}") | |
| def verify( | |
| language: str = typer.Option("java", help="Language to test (java, python, pharo)"), | |
| repo_owner: str = typer.Option("se4ai2526-uniba", help="DagsHub repository owner"), | |
| repo_name: str = typer.Option("Turing", help="DagsHub repository name"), | |
| n_samples: int = typer.Option(100, help="Number of samples for synthetic data generation"), | |
| use_feedback: bool = typer.Option(False, help="Include user feedback rows in drift analysis"), | |
| feedback_path: Path = typer.Option( | |
| config.PROJ_ROOT / "turing" / "monitoring" / "feedback" / "feedback_data.csv", | |
| help="Path to user feedback CSV", | |
| ), | |
| ): | |
| """ | |
| Verify drift detection on best model's training dataset. | |
| """ | |
| import os | |
| logger.info("Starting drift detection verification...") | |
| logger.info("Configuration:") | |
| logger.info(f" Language: {language}") | |
| logger.info(f" P-value threshold: {config.DRIFT_P_VALUE_THRESHOLD}") | |
| logger.info(f" Alert threshold: {config.DRIFT_ALERT_THRESHOLD}") | |
| logger.info(f" Baseline cache: {config.BASELINE_CACHE_DIR}") | |
| # Setup DagsHub credentials from environment variables | |
| dagshub_username = os.getenv("DAGSHUB_USERNAME") | |
| dagshub_token = os.getenv("DAGSHUB_TOKEN") | |
| mlflow_uri = os.getenv("MLFLOW_TRACKING_URI") | |
| is_ci_environment = os.getenv("CI") or os.getenv("GITHUB_ACTIONS") | |
| if dagshub_username and dagshub_token and mlflow_uri: | |
| # Use environment credentials for non-interactive mode (GitHub Actions) | |
| logger.info("Using DagsHub credentials from environment variables") | |
| os.environ["MLFLOW_TRACKING_USERNAME"] = dagshub_username | |
| os.environ["MLFLOW_TRACKING_PASSWORD"] = dagshub_token | |
| # Don't call dagshub.init() - credentials are already set via environment | |
| logger.info("Skipping dagshub.init() - using environment credentials directly") | |
| elif is_ci_environment: | |
| # In CI without credentials, skip OAuth and log warning | |
| logger.warning("CI environment detected but credentials not found. Proceeding without dagshub.init()") | |
| else: | |
| # Interactive mode - try to initialize with OAuth | |
| logger.info("Initializing DagsHub interactively") | |
| try: | |
| dagshub.init(repo_owner=repo_owner, repo_name=repo_name, mlflow=True) | |
| except Exception as e: | |
| logger.warning(f"DagsHub initialization failed: {e}") | |
| logger.info(f"\n[1/6] Searching for best model for {language}...") | |
| best_model_info = get_best_model_by_tag(language=language) | |
| if not best_model_info: | |
| logger.error(f"No best model found for {language}") | |
| return | |
| run_id = best_model_info["run_id"] | |
| logger.info(f"\n[2/6] Retrieving dataset information from MLflow run {run_id}...") | |
| client = MlflowClient() | |
| run = client.get_run(run_id) | |
| dataset_name = run.data.tags.get("dataset_name", None) | |
| if not dataset_name: | |
| logger.error("Dataset name not found in run tags") | |
| return | |
| logger.success(f"Found dataset: {dataset_name}") | |
| logger.info("\n[3/6] Loading training data...") | |
| try: | |
| X_train, y_train = load_training_data(dataset_name, language) | |
| y_train = np.asarray(y_train) # Ensure y_train is np.ndarray | |
| except Exception as e: | |
| logger.error(f"Failed to load training data: {e}") | |
| return | |
| logger.info("\n[4/6] Extracting baseline statistics...") | |
| baseline_stats = extract_baseline_statistics(X_train, y_train, language) | |
| logger.success("Baseline extracted:") | |
| logger.info( | |
| f" Text length: mean={baseline_stats['text_length_mean']:.2f}, std={baseline_stats['text_length_std']:.2f}" | |
| ) | |
| logger.info( | |
| f" Word count: mean={baseline_stats['word_count_mean']:.2f}, std={baseline_stats['word_count_std']:.2f}" | |
| ) | |
| logger.info(f" Label counts: {baseline_stats['label_counts']}") | |
| logger.info("\n[5/6] Initializing drift detection components...") | |
| drift_detector = DriftDetector() | |
| synthetic_generator = SyntheticDataGenerator(seed=42) | |
| feedback_texts, feedback_labels = [], np.array([]) | |
| if use_feedback: | |
| try: | |
| feedback_texts, feedback_labels = load_feedback_for_language(feedback_path, language) | |
| except Exception as e: | |
| logger.warning(f"Feedback load skipped: {e}") | |
| logger.info("\n[6/6] Testing drift detection on different data types...\n") | |
| test_cases = [ | |
| ("NORMAL DATA (no drift expected)", "none"), | |
| ("SHORT TEXT DRIFT", "text_length_short"), | |
| ("LONG TEXT DRIFT", "text_length_long"), | |
| ("CORRUPTED VOCABULARY DRIFT", "corrupted_vocab"), | |
| ("CLASS IMBALANCE DRIFT", "class_imbalance"), | |
| ] | |
| if use_feedback and len(feedback_texts) > 0: | |
| test_cases.append(("USER FEEDBACK", "feedback")) | |
| all_test_results = {} | |
| all_report_lines = [] | |
| for test_name, drift_type in test_cases: | |
| logger.info(f"\n{'#' * 60}") | |
| logger.info(f"Test: {test_name}") | |
| logger.info(f"{'#' * 60}") | |
| if drift_type == "feedback": | |
| production_texts = feedback_texts | |
| production_labels = feedback_labels | |
| else: | |
| production_texts, production_labels = synthetic_generator.generate_synthetic_batch( | |
| reference_texts=X_train, | |
| reference_labels=y_train, | |
| drift_type=drift_type, | |
| batch_size=n_samples, | |
| ) | |
| drift_results = drift_detector.detect_all_drifts( | |
| production_texts=production_texts, | |
| production_labels=production_labels, | |
| reference_texts=X_train, | |
| reference_labels=y_train, | |
| ) | |
| all_test_results[drift_type] = drift_results | |
| print_drift_report(drift_results, drift_type, report_lines=all_report_lines) | |
| logger.info("\nSaving drift detection report...") | |
| report_text = "\n".join(all_report_lines) | |
| save_drift_report( | |
| language=language, | |
| dataset_name=dataset_name, | |
| baseline_stats=baseline_stats, | |
| test_results=all_test_results, | |
| report_text=report_text, | |
| ) | |
| logger.success("\nDrift detection verification completed!") | |
| if __name__ == "__main__": | |
| app() | |