File size: 6,283 Bytes
7e825f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""

Utility functions for the MLOps platform.

"""

import os
import json
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional


def generate_run_id() -> str:
    """Generate a unique run ID based on timestamp."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    random_suffix = hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()[:6]
    return f"run_{timestamp}_{random_suffix}"


def save_config(config: Dict[str, Any], output_dir: str) -> str:
    """Save configuration to JSON file."""
    os.makedirs(output_dir, exist_ok=True)
    config_path = os.path.join(output_dir, "config.json")
    
    with open(config_path, 'w', encoding='utf-8') as f:
        json.dump(config, f, indent=2, ensure_ascii=False, default=str)
    
    return config_path


def load_config(config_path: str) -> Dict[str, Any]:
    """Load configuration from JSON file."""
    with open(config_path, 'r', encoding='utf-8') as f:
        return json.load(f)


def get_model_size(model_path: str) -> str:
    """Get the size of a model directory in human-readable format."""
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(model_path):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            total_size += os.path.getsize(filepath)
    
    # Convert to human-readable format
    for unit in ['B', 'KB', 'MB', 'GB']:
        if total_size < 1024.0:
            return f"{total_size:.2f} {unit}"
        total_size /= 1024.0
    return f"{total_size:.2f} TB"


def validate_csv_file(file_path: str, required_columns: list = None) -> Dict[str, Any]:
    """

    Validate a CSV file for training.

    

    Returns:

        Dict with 'valid' (bool), 'errors' (list), 'warnings' (list), 'info' (dict)

    """
    import pandas as pd
    
    result = {
        'valid': True,
        'errors': [],
        'warnings': [],
        'info': {}
    }
    
    if required_columns is None:
        required_columns = ['text', 'label']
    
    try:
        df = pd.read_csv(file_path)
        result['info']['rows'] = len(df)
        result['info']['columns'] = list(df.columns)
        
        # Check required columns
        for col in required_columns:
            if col not in df.columns:
                result['errors'].append(f"Missing required column: '{col}'")
                result['valid'] = False
        
        if result['valid']:
            # Check for missing values
            for col in required_columns:
                missing = df[col].isna().sum()
                if missing > 0:
                    result['warnings'].append(f"Column '{col}' has {missing} missing values")
            
            # Check label distribution
            if 'label' in df.columns:
                label_counts = df['label'].value_counts().to_dict()
                result['info']['label_distribution'] = label_counts
                
                # Check for class imbalance
                if len(label_counts) > 1:
                    min_count = min(label_counts.values())
                    max_count = max(label_counts.values())
                    if max_count > min_count * 5:
                        result['warnings'].append(
                            f"Severe class imbalance detected: {label_counts}"
                        )
    
    except Exception as e:
        result['valid'] = False
        result['errors'].append(f"Error reading file: {str(e)}")
    
    return result


def format_duration(seconds: float) -> str:
    """Format duration in human-readable format."""
    if seconds < 60:
        return f"{seconds:.1f}s"
    elif seconds < 3600:
        minutes = seconds / 60
        return f"{minutes:.1f}m"
    else:
        hours = seconds / 3600
        return f"{hours:.1f}h"


def ensure_dir(path: str) -> str:
    """Ensure directory exists, create if not."""
    os.makedirs(path, exist_ok=True)
    return path


class ExperimentLogger:
    """Simple experiment logger for tracking training runs."""
    
    def __init__(self, log_dir: str = "experiments"):
        self.log_dir = ensure_dir(log_dir)
        self.current_run = None
        self.metrics = []
    
    def start_run(self, run_name: Optional[str] = None, config: Dict = None):
        """Start a new experiment run."""
        self.current_run = run_name or generate_run_id()
        run_dir = ensure_dir(os.path.join(self.log_dir, self.current_run))
        
        if config:
            save_config(config, run_dir)
        
        self.metrics = []
        return self.current_run
    
    def log_metrics(self, metrics: Dict[str, float], step: int = None):
        """Log metrics for the current step."""
        entry = {
            'timestamp': datetime.now().isoformat(),
            'step': step,
            **metrics
        }
        self.metrics.append(entry)
    
    def end_run(self):
        """End the current run and save metrics."""
        if self.current_run and self.metrics:
            metrics_path = os.path.join(
                self.log_dir, self.current_run, "metrics.json"
            )
            with open(metrics_path, 'w', encoding='utf-8') as f:
                json.dump(self.metrics, f, indent=2)
        
        self.current_run = None
        self.metrics = []
    
    def get_all_runs(self) -> list:
        """Get list of all experiment runs."""
        runs = []
        for item in os.listdir(self.log_dir):
            run_path = os.path.join(self.log_dir, item)
            if os.path.isdir(run_path):
                config_path = os.path.join(run_path, "config.json")
                metrics_path = os.path.join(run_path, "metrics.json")
                
                run_info = {
                    'name': item,
                    'path': run_path,
                    'has_config': os.path.exists(config_path),
                    'has_metrics': os.path.exists(metrics_path)
                }
                runs.append(run_info)
        
        return sorted(runs, key=lambda x: x['name'], reverse=True)