api / automl_agent.py
tensorus's picture
Upload 11 files
aa654a4 verified
# automl_agent.py
"""
Implements the AutoML Agent for Tensorus.
This agent performs basic hyperparameter optimization using random search.
It trains a simple dummy model on synthetic data, evaluates its performance,
and logs the results, including storing trial results in TensorStorage.
Future Enhancements:
- Implement more advanced search strategies (Bayesian Optimization, Hyperband).
- Allow configuration of different model architectures.
- Integrate with real datasets from TensorStorage.
- Implement early stopping and other training optimizations.
- Store best model state_dict (requires serialization strategy).
- Parallelize trials for faster search.
- Use dedicated hyperparameter optimization libraries (Optuna, Ray Tune).
"""
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
import logging
import time
from typing import Dict, Any, Callable, Tuple, Optional
from tensor_storage import TensorStorage # Import our storage module
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Dummy Model Definition ---
class DummyMLP(nn.Module):
"""A simple Multi-Layer Perceptron for regression/classification."""
def __init__(self, input_dim: int, output_dim: int, hidden_size: int = 64, activation_fn: Callable = nn.ReLU):
super().__init__()
self.layer_1 = nn.Linear(input_dim, hidden_size)
self.activation = activation_fn()
self.layer_2 = nn.Linear(hidden_size, output_dim)
def forward(self, x):
x = self.activation(self.layer_1(x))
x = self.layer_2(x)
return x
# --- AutoML Agent Class ---
class AutoMLAgent:
"""Performs random search hyperparameter optimization."""
def __init__(self,
tensor_storage: TensorStorage,
search_space: Dict[str, Callable[[], Any]],
input_dim: int,
output_dim: int,
task_type: str = 'regression', # 'regression' or 'classification'
results_dataset: str = "automl_results"):
"""
Initializes the AutoML Agent.
Args:
tensor_storage: An instance of TensorStorage.
search_space: Dictionary defining the hyperparameter search space.
Keys are param names (e.g., 'lr', 'hidden_size').
Values are functions that sample a value for that param (e.g., lambda: 10**random.uniform(-4,-2)).
input_dim: Input dimension for the dummy model.
output_dim: Output dimension for the dummy model.
task_type: Type of task ('regression' or 'classification'), influences loss and data generation.
results_dataset: Name of the dataset in TensorStorage to store trial results.
"""
if not isinstance(tensor_storage, TensorStorage):
raise TypeError("tensor_storage must be an instance of TensorStorage")
if task_type not in ['regression', 'classification']:
raise ValueError("task_type must be 'regression' or 'classification'")
self.tensor_storage = tensor_storage
self.search_space = search_space
self.input_dim = input_dim
self.output_dim = output_dim
self.task_type = task_type
self.results_dataset = results_dataset
# Ensure results dataset exists
try:
self.tensor_storage.get_dataset(self.results_dataset)
except ValueError:
logger.info(f"Dataset '{self.results_dataset}' not found. Creating it.")
self.tensor_storage.create_dataset(self.results_dataset)
# Track best results found during the search
self.best_score: Optional[float] = None # Use negative infinity for maximization tasks if needed
self.best_params: Optional[Dict[str, Any]] = None
# Assuming lower score is better (e.g., loss)
self.higher_score_is_better = False if task_type == 'regression' else True # Accuracy for classification
# Device configuration
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"AutoML Agent using device: {self.device}")
logger.info(f"AutoML Agent initialized for {task_type} task. Results stored in '{results_dataset}'.")
def _generate_synthetic_data(self, n_samples=500, batch_size=32) -> Tuple[Any, Any]:
"""Generates synthetic data loaders for training and validation."""
X = torch.randn(n_samples, self.input_dim, device=self.device)
if self.task_type == 'regression':
# Simple linear relationship with noise
true_weight = torch.randn(self.input_dim, self.output_dim, device=self.device) * 2
true_bias = torch.randn(self.output_dim, device=self.device)
y = X @ true_weight + true_bias + torch.randn(n_samples, self.output_dim, device=self.device) * 0.5
loss_fn = nn.MSELoss()
else: # classification
# Simple linear separation + softmax for multi-class
if self.output_dim <= 1:
raise ValueError("Output dimension must be > 1 for classification task example.")
true_weight = torch.randn(self.input_dim, self.output_dim, device=self.device)
logits = X @ true_weight
y = torch.softmax(logits, dim=1).argmax(dim=1) # Get class labels
loss_fn = nn.CrossEntropyLoss()
# Simple split
split_idx = int(n_samples * 0.8)
X_train, X_val = X[:split_idx], X[split_idx:]
y_train, y_val = y[:split_idx], y[split_idx:]
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)
return train_loader, val_loader, loss_fn
def _build_dummy_model(self, params: Dict[str, Any]) -> nn.Module:
"""Builds the dummy MLP model based on hyperparameters."""
hidden_size = params.get('hidden_size', 64) # Default if not in params
activation_name = params.get('activation', 'relu') # Default activation
act_fn_map = {'relu': nn.ReLU, 'tanh': nn.Tanh, 'sigmoid': nn.Sigmoid}
activation_fn = act_fn_map.get(activation_name.lower(), nn.ReLU) # Default to ReLU if unknown
model = DummyMLP(
input_dim=self.input_dim,
output_dim=self.output_dim,
hidden_size=hidden_size,
activation_fn=activation_fn
).to(self.device)
return model
def _train_and_evaluate(self, params: Dict[str, Any], num_epochs: int = 5) -> Optional[float]:
"""Trains and evaluates a model with given hyperparameters."""
logger.debug(f"Training trial with params: {params}")
start_time = time.time()
try:
# 1. Build Model
model = self._build_dummy_model(params)
# 2. Get Data and Loss Function
train_loader, val_loader, loss_fn = self._generate_synthetic_data()
# 3. Setup Optimizer
lr = params.get('lr', 1e-3) # Default LR
optimizer = optim.Adam(model.parameters(), lr=lr)
# 4. Training Loop
model.train()
for epoch in range(num_epochs):
epoch_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = loss_fn(outputs, batch_y)
# Check for NaN/inf loss
if not torch.isfinite(loss):
logger.warning(f"Trial failed: Non-finite loss detected during training epoch {epoch}. Params: {params}")
return None # Indicate failure
loss.backward()
optimizer.step()
epoch_loss += loss.item()
# logger.debug(f" Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss/len(train_loader):.4f}")
# 5. Evaluation Loop
model.eval()
total_val_loss = 0
total_correct = 0
total_samples = 0
with torch.no_grad():
for batch_X, batch_y in val_loader:
outputs = model(batch_X)
loss = loss_fn(outputs, batch_y)
total_val_loss += loss.item()
if self.task_type == 'classification':
predicted = outputs.argmax(dim=1)
total_correct += (predicted == batch_y).sum().item()
total_samples += batch_y.size(0)
avg_val_loss = total_val_loss / len(val_loader)
duration = time.time() - start_time
logger.debug(f"Trial completed in {duration:.2f}s. Val Loss: {avg_val_loss:.4f}")
# 6. Return Score
if self.task_type == 'regression':
score = avg_val_loss # Lower is better
else: # classification
accuracy = total_correct / total_samples if total_samples > 0 else 0
score = accuracy # Higher is better
logger.debug(f" Trial Val Accuracy: {accuracy:.4f}")
return score
except Exception as e:
logger.error(f"Trial failed with exception for params {params}: {e}", exc_info=True)
return None # Indicate failure
def hyperparameter_search(self, trials: int, num_epochs_per_trial: int = 5) -> Optional[Dict[str, Any]]:
"""
Performs random search for the specified number of trials.
Args:
trials: The number of hyperparameter configurations to try.
num_epochs_per_trial: Number of epochs to train each model configuration.
Returns:
The dictionary of hyperparameters that achieved the best score, or None if no trial succeeded.
"""
logger.info(f"--- Starting Hyperparameter Search ({trials} trials) ---")
self.best_score = None
self.best_params = None
for i in range(trials):
# 1. Sample hyperparameters
current_params = {name: sampler() for name, sampler in self.search_space.items()}
logger.info(f"Trial {i+1}/{trials}: Testing params: {current_params}")
# 2. Train and evaluate
score = self._train_and_evaluate(current_params, num_epochs=num_epochs_per_trial)
# 3. Store results in TensorStorage (even if trial failed, record params and score=None)
score_tensor = torch.tensor(float('nan') if score is None else score) # Store NaN for failed trials
trial_metadata = {
"trial_id": i + 1,
"params": current_params, # Store params dict directly in metadata
"score": score, # Store score also in metadata for easier querying
"task_type": self.task_type,
"search_timestamp_utc": time.time()
}
try:
record_id = self.tensor_storage.insert(
self.results_dataset,
score_tensor,
trial_metadata
)
logger.debug(f"Stored trial {i+1} results (Score: {score}) with record ID: {record_id}")
except Exception as e:
logger.error(f"Failed to store trial {i+1} results in TensorStorage: {e}")
# 4. Update best score if trial succeeded and is better
if score is not None:
is_better = False
if self.best_score is None:
is_better = True
elif self.higher_score_is_better and score > self.best_score:
is_better = True
elif not self.higher_score_is_better and score < self.best_score:
is_better = True
if is_better:
self.best_score = score
self.best_params = current_params
logger.info(f"*** New best score found! Trial {i+1}: Score={score:.4f}, Params={current_params} ***")
logger.info(f"--- Hyperparameter Search Finished ---")
if self.best_params:
logger.info(f"Best score overall: {self.best_score:.4f}")
logger.info(f"Best hyperparameters found: {self.best_params}")
# Optional: Here you could trigger saving the best model's state_dict
# e.g., self._save_best_model(self.best_params)
else:
logger.warning("No successful trials completed. Could not determine best parameters.")
return self.best_params
# --- Example Usage ---
if __name__ == "__main__":
print("--- Starting AutoML Agent Example ---")
# 1. Setup TensorStorage
storage = TensorStorage()
# 2. Define Search Space
# Simple example for MLP regression
search_space_reg = {
'lr': lambda: 10**random.uniform(-5, -2), # Log uniform for learning rate
'hidden_size': lambda: random.choice([32, 64, 128, 256]),
'activation': lambda: random.choice(['relu', 'tanh'])
}
# 3. Create the AutoML Agent
input_dim = 10
output_dim = 1 # Regression task
automl_agent = AutoMLAgent(
tensor_storage=storage,
search_space=search_space_reg,
input_dim=input_dim,
output_dim=output_dim,
task_type='regression',
results_dataset="automl_regression_results"
)
# 4. Run the hyperparameter search
num_trials = 20 # Number of random configurations to test
num_epochs = 10 # Epochs per trial (keep low for speed)
best_hyperparams = automl_agent.hyperparameter_search(trials=num_trials, num_epochs_per_trial=num_epochs)
# 5. Optional: Check TensorStorage for results
print("\n--- Checking TensorStorage contents (Sample) ---")
try:
results_count = len(storage.get_dataset(automl_agent.results_dataset))
print(f"Found {results_count} trial records in '{automl_agent.results_dataset}'.")
if results_count > 0:
print("\nExample trial record (metadata):")
# Sample one record to show structure
sample_trial = storage.sample_dataset(automl_agent.results_dataset, 1)
if sample_trial:
print(f" Metadata: {sample_trial[0]['metadata']}")
print(f" Score Tensor: {sample_trial[0]['tensor']}") # Should contain the score or NaN
# You could also query for the best score using NQLAgent if needed
# (e.g., find record where score = best_score) - requires parsing results first.
except ValueError as e:
print(f"Could not retrieve dataset '{automl_agent.results_dataset}': {e}")
except Exception as e:
print(f"An error occurred checking storage: {e}")
print("\n--- AutoML Agent Example Finished ---")