""" Utility functions for the MLOps platform. """ import os import json import hashlib from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional def generate_run_id() -> str: """Generate a unique run ID based on timestamp.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") random_suffix = hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()[:6] return f"run_{timestamp}_{random_suffix}" def save_config(config: Dict[str, Any], output_dir: str) -> str: """Save configuration to JSON file.""" os.makedirs(output_dir, exist_ok=True) config_path = os.path.join(output_dir, "config.json") with open(config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False, default=str) return config_path def load_config(config_path: str) -> Dict[str, Any]: """Load configuration from JSON file.""" with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) def get_model_size(model_path: str) -> str: """Get the size of a model directory in human-readable format.""" total_size = 0 for dirpath, dirnames, filenames in os.walk(model_path): for filename in filenames: filepath = os.path.join(dirpath, filename) total_size += os.path.getsize(filepath) # Convert to human-readable format for unit in ['B', 'KB', 'MB', 'GB']: if total_size < 1024.0: return f"{total_size:.2f} {unit}" total_size /= 1024.0 return f"{total_size:.2f} TB" def validate_csv_file(file_path: str, required_columns: list = None) -> Dict[str, Any]: """ Validate a CSV file for training. Returns: Dict with 'valid' (bool), 'errors' (list), 'warnings' (list), 'info' (dict) """ import pandas as pd result = { 'valid': True, 'errors': [], 'warnings': [], 'info': {} } if required_columns is None: required_columns = ['text', 'label'] try: df = pd.read_csv(file_path) result['info']['rows'] = len(df) result['info']['columns'] = list(df.columns) # Check required columns for col in required_columns: if col not in df.columns: result['errors'].append(f"Missing required column: '{col}'") result['valid'] = False if result['valid']: # Check for missing values for col in required_columns: missing = df[col].isna().sum() if missing > 0: result['warnings'].append(f"Column '{col}' has {missing} missing values") # Check label distribution if 'label' in df.columns: label_counts = df['label'].value_counts().to_dict() result['info']['label_distribution'] = label_counts # Check for class imbalance if len(label_counts) > 1: min_count = min(label_counts.values()) max_count = max(label_counts.values()) if max_count > min_count * 5: result['warnings'].append( f"Severe class imbalance detected: {label_counts}" ) except Exception as e: result['valid'] = False result['errors'].append(f"Error reading file: {str(e)}") return result def format_duration(seconds: float) -> str: """Format duration in human-readable format.""" if seconds < 60: return f"{seconds:.1f}s" elif seconds < 3600: minutes = seconds / 60 return f"{minutes:.1f}m" else: hours = seconds / 3600 return f"{hours:.1f}h" def ensure_dir(path: str) -> str: """Ensure directory exists, create if not.""" os.makedirs(path, exist_ok=True) return path class ExperimentLogger: """Simple experiment logger for tracking training runs.""" def __init__(self, log_dir: str = "experiments"): self.log_dir = ensure_dir(log_dir) self.current_run = None self.metrics = [] def start_run(self, run_name: Optional[str] = None, config: Dict = None): """Start a new experiment run.""" self.current_run = run_name or generate_run_id() run_dir = ensure_dir(os.path.join(self.log_dir, self.current_run)) if config: save_config(config, run_dir) self.metrics = [] return self.current_run def log_metrics(self, metrics: Dict[str, float], step: int = None): """Log metrics for the current step.""" entry = { 'timestamp': datetime.now().isoformat(), 'step': step, **metrics } self.metrics.append(entry) def end_run(self): """End the current run and save metrics.""" if self.current_run and self.metrics: metrics_path = os.path.join( self.log_dir, self.current_run, "metrics.json" ) with open(metrics_path, 'w', encoding='utf-8') as f: json.dump(self.metrics, f, indent=2) self.current_run = None self.metrics = [] def get_all_runs(self) -> list: """Get list of all experiment runs.""" runs = [] for item in os.listdir(self.log_dir): run_path = os.path.join(self.log_dir, item) if os.path.isdir(run_path): config_path = os.path.join(run_path, "config.json") metrics_path = os.path.join(run_path, "metrics.json") run_info = { 'name': item, 'path': run_path, 'has_config': os.path.exists(config_path), 'has_metrics': os.path.exists(metrics_path) } runs.append(run_info) return sorted(runs, key=lambda x: x['name'], reverse=True)