Spaces:
Sleeping
Sleeping
| """ | |
| Utility functions for the MLOps platform. | |
| """ | |
| import os | |
| import json | |
| import hashlib | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional | |
| def generate_run_id() -> str: | |
| """Generate a unique run ID based on timestamp.""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| random_suffix = hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()[:6] | |
| return f"run_{timestamp}_{random_suffix}" | |
| def save_config(config: Dict[str, Any], output_dir: str) -> str: | |
| """Save configuration to JSON file.""" | |
| os.makedirs(output_dir, exist_ok=True) | |
| config_path = os.path.join(output_dir, "config.json") | |
| with open(config_path, 'w', encoding='utf-8') as f: | |
| json.dump(config, f, indent=2, ensure_ascii=False, default=str) | |
| return config_path | |
| def load_config(config_path: str) -> Dict[str, Any]: | |
| """Load configuration from JSON file.""" | |
| with open(config_path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| def get_model_size(model_path: str) -> str: | |
| """Get the size of a model directory in human-readable format.""" | |
| total_size = 0 | |
| for dirpath, dirnames, filenames in os.walk(model_path): | |
| for filename in filenames: | |
| filepath = os.path.join(dirpath, filename) | |
| total_size += os.path.getsize(filepath) | |
| # Convert to human-readable format | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if total_size < 1024.0: | |
| return f"{total_size:.2f} {unit}" | |
| total_size /= 1024.0 | |
| return f"{total_size:.2f} TB" | |
| def validate_csv_file(file_path: str, required_columns: list = None) -> Dict[str, Any]: | |
| """ | |
| Validate a CSV file for training. | |
| Returns: | |
| Dict with 'valid' (bool), 'errors' (list), 'warnings' (list), 'info' (dict) | |
| """ | |
| import pandas as pd | |
| result = { | |
| 'valid': True, | |
| 'errors': [], | |
| 'warnings': [], | |
| 'info': {} | |
| } | |
| if required_columns is None: | |
| required_columns = ['text', 'label'] | |
| try: | |
| df = pd.read_csv(file_path) | |
| result['info']['rows'] = len(df) | |
| result['info']['columns'] = list(df.columns) | |
| # Check required columns | |
| for col in required_columns: | |
| if col not in df.columns: | |
| result['errors'].append(f"Missing required column: '{col}'") | |
| result['valid'] = False | |
| if result['valid']: | |
| # Check for missing values | |
| for col in required_columns: | |
| missing = df[col].isna().sum() | |
| if missing > 0: | |
| result['warnings'].append(f"Column '{col}' has {missing} missing values") | |
| # Check label distribution | |
| if 'label' in df.columns: | |
| label_counts = df['label'].value_counts().to_dict() | |
| result['info']['label_distribution'] = label_counts | |
| # Check for class imbalance | |
| if len(label_counts) > 1: | |
| min_count = min(label_counts.values()) | |
| max_count = max(label_counts.values()) | |
| if max_count > min_count * 5: | |
| result['warnings'].append( | |
| f"Severe class imbalance detected: {label_counts}" | |
| ) | |
| except Exception as e: | |
| result['valid'] = False | |
| result['errors'].append(f"Error reading file: {str(e)}") | |
| return result | |
| def format_duration(seconds: float) -> str: | |
| """Format duration in human-readable format.""" | |
| if seconds < 60: | |
| return f"{seconds:.1f}s" | |
| elif seconds < 3600: | |
| minutes = seconds / 60 | |
| return f"{minutes:.1f}m" | |
| else: | |
| hours = seconds / 3600 | |
| return f"{hours:.1f}h" | |
| def ensure_dir(path: str) -> str: | |
| """Ensure directory exists, create if not.""" | |
| os.makedirs(path, exist_ok=True) | |
| return path | |
| class ExperimentLogger: | |
| """Simple experiment logger for tracking training runs.""" | |
| def __init__(self, log_dir: str = "experiments"): | |
| self.log_dir = ensure_dir(log_dir) | |
| self.current_run = None | |
| self.metrics = [] | |
| def start_run(self, run_name: Optional[str] = None, config: Dict = None): | |
| """Start a new experiment run.""" | |
| self.current_run = run_name or generate_run_id() | |
| run_dir = ensure_dir(os.path.join(self.log_dir, self.current_run)) | |
| if config: | |
| save_config(config, run_dir) | |
| self.metrics = [] | |
| return self.current_run | |
| def log_metrics(self, metrics: Dict[str, float], step: int = None): | |
| """Log metrics for the current step.""" | |
| entry = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'step': step, | |
| **metrics | |
| } | |
| self.metrics.append(entry) | |
| def end_run(self): | |
| """End the current run and save metrics.""" | |
| if self.current_run and self.metrics: | |
| metrics_path = os.path.join( | |
| self.log_dir, self.current_run, "metrics.json" | |
| ) | |
| with open(metrics_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.metrics, f, indent=2) | |
| self.current_run = None | |
| self.metrics = [] | |
| def get_all_runs(self) -> list: | |
| """Get list of all experiment runs.""" | |
| runs = [] | |
| for item in os.listdir(self.log_dir): | |
| run_path = os.path.join(self.log_dir, item) | |
| if os.path.isdir(run_path): | |
| config_path = os.path.join(run_path, "config.json") | |
| metrics_path = os.path.join(run_path, "metrics.json") | |
| run_info = { | |
| 'name': item, | |
| 'path': run_path, | |
| 'has_config': os.path.exists(config_path), | |
| 'has_metrics': os.path.exists(metrics_path) | |
| } | |
| runs.append(run_info) | |
| return sorted(runs, key=lambda x: x['name'], reverse=True) | |