songhieng's picture
Upload 72 files
7e825f9 verified
"""
Utility functions for the MLOps platform.
"""
import os
import json
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional
def generate_run_id() -> str:
"""Generate a unique run ID based on timestamp."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
random_suffix = hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()[:6]
return f"run_{timestamp}_{random_suffix}"
def save_config(config: Dict[str, Any], output_dir: str) -> str:
"""Save configuration to JSON file."""
os.makedirs(output_dir, exist_ok=True)
config_path = os.path.join(output_dir, "config.json")
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False, default=str)
return config_path
def load_config(config_path: str) -> Dict[str, Any]:
"""Load configuration from JSON file."""
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
def get_model_size(model_path: str) -> str:
"""Get the size of a model directory in human-readable format."""
total_size = 0
for dirpath, dirnames, filenames in os.walk(model_path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
total_size += os.path.getsize(filepath)
# Convert to human-readable format
for unit in ['B', 'KB', 'MB', 'GB']:
if total_size < 1024.0:
return f"{total_size:.2f} {unit}"
total_size /= 1024.0
return f"{total_size:.2f} TB"
def validate_csv_file(file_path: str, required_columns: list = None) -> Dict[str, Any]:
"""
Validate a CSV file for training.
Returns:
Dict with 'valid' (bool), 'errors' (list), 'warnings' (list), 'info' (dict)
"""
import pandas as pd
result = {
'valid': True,
'errors': [],
'warnings': [],
'info': {}
}
if required_columns is None:
required_columns = ['text', 'label']
try:
df = pd.read_csv(file_path)
result['info']['rows'] = len(df)
result['info']['columns'] = list(df.columns)
# Check required columns
for col in required_columns:
if col not in df.columns:
result['errors'].append(f"Missing required column: '{col}'")
result['valid'] = False
if result['valid']:
# Check for missing values
for col in required_columns:
missing = df[col].isna().sum()
if missing > 0:
result['warnings'].append(f"Column '{col}' has {missing} missing values")
# Check label distribution
if 'label' in df.columns:
label_counts = df['label'].value_counts().to_dict()
result['info']['label_distribution'] = label_counts
# Check for class imbalance
if len(label_counts) > 1:
min_count = min(label_counts.values())
max_count = max(label_counts.values())
if max_count > min_count * 5:
result['warnings'].append(
f"Severe class imbalance detected: {label_counts}"
)
except Exception as e:
result['valid'] = False
result['errors'].append(f"Error reading file: {str(e)}")
return result
def format_duration(seconds: float) -> str:
"""Format duration in human-readable format."""
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
else:
hours = seconds / 3600
return f"{hours:.1f}h"
def ensure_dir(path: str) -> str:
"""Ensure directory exists, create if not."""
os.makedirs(path, exist_ok=True)
return path
class ExperimentLogger:
"""Simple experiment logger for tracking training runs."""
def __init__(self, log_dir: str = "experiments"):
self.log_dir = ensure_dir(log_dir)
self.current_run = None
self.metrics = []
def start_run(self, run_name: Optional[str] = None, config: Dict = None):
"""Start a new experiment run."""
self.current_run = run_name or generate_run_id()
run_dir = ensure_dir(os.path.join(self.log_dir, self.current_run))
if config:
save_config(config, run_dir)
self.metrics = []
return self.current_run
def log_metrics(self, metrics: Dict[str, float], step: int = None):
"""Log metrics for the current step."""
entry = {
'timestamp': datetime.now().isoformat(),
'step': step,
**metrics
}
self.metrics.append(entry)
def end_run(self):
"""End the current run and save metrics."""
if self.current_run and self.metrics:
metrics_path = os.path.join(
self.log_dir, self.current_run, "metrics.json"
)
with open(metrics_path, 'w', encoding='utf-8') as f:
json.dump(self.metrics, f, indent=2)
self.current_run = None
self.metrics = []
def get_all_runs(self) -> list:
"""Get list of all experiment runs."""
runs = []
for item in os.listdir(self.log_dir):
run_path = os.path.join(self.log_dir, item)
if os.path.isdir(run_path):
config_path = os.path.join(run_path, "config.json")
metrics_path = os.path.join(run_path, "metrics.json")
run_info = {
'name': item,
'path': run_path,
'has_config': os.path.exists(config_path),
'has_metrics': os.path.exists(metrics_path)
}
runs.append(run_info)
return sorted(runs, key=lambda x: x['name'], reverse=True)