| |
| """ |
| Implements the AutoML Agent for Tensorus. |
| |
| This agent performs basic hyperparameter optimization using random search. |
| It trains a simple dummy model on synthetic data, evaluates its performance, |
| and logs the results, including storing trial results in TensorStorage. |
| |
| Future Enhancements: |
| - Implement more advanced search strategies (Bayesian Optimization, Hyperband). |
| - Allow configuration of different model architectures. |
| - Integrate with real datasets from TensorStorage. |
| - Implement early stopping and other training optimizations. |
| - Store best model state_dict (requires serialization strategy). |
| - Parallelize trials for faster search. |
| - Use dedicated hyperparameter optimization libraries (Optuna, Ray Tune). |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| import numpy as np |
| import random |
| import logging |
| import time |
| from typing import Dict, Any, Callable, Tuple, Optional |
|
|
| from tensor_storage import TensorStorage |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| class DummyMLP(nn.Module): |
| """A simple Multi-Layer Perceptron for regression/classification.""" |
| def __init__(self, input_dim: int, output_dim: int, hidden_size: int = 64, activation_fn: Callable = nn.ReLU): |
| super().__init__() |
| self.layer_1 = nn.Linear(input_dim, hidden_size) |
| self.activation = activation_fn() |
| self.layer_2 = nn.Linear(hidden_size, output_dim) |
|
|
| def forward(self, x): |
| x = self.activation(self.layer_1(x)) |
| x = self.layer_2(x) |
| return x |
|
|
|
|
| |
| class AutoMLAgent: |
| """Performs random search hyperparameter optimization.""" |
|
|
| def __init__(self, |
| tensor_storage: TensorStorage, |
| search_space: Dict[str, Callable[[], Any]], |
| input_dim: int, |
| output_dim: int, |
| task_type: str = 'regression', |
| results_dataset: str = "automl_results"): |
| """ |
| Initializes the AutoML Agent. |
| |
| Args: |
| tensor_storage: An instance of TensorStorage. |
| search_space: Dictionary defining the hyperparameter search space. |
| Keys are param names (e.g., 'lr', 'hidden_size'). |
| Values are functions that sample a value for that param (e.g., lambda: 10**random.uniform(-4,-2)). |
| input_dim: Input dimension for the dummy model. |
| output_dim: Output dimension for the dummy model. |
| task_type: Type of task ('regression' or 'classification'), influences loss and data generation. |
| results_dataset: Name of the dataset in TensorStorage to store trial results. |
| """ |
| if not isinstance(tensor_storage, TensorStorage): |
| raise TypeError("tensor_storage must be an instance of TensorStorage") |
| if task_type not in ['regression', 'classification']: |
| raise ValueError("task_type must be 'regression' or 'classification'") |
|
|
| self.tensor_storage = tensor_storage |
| self.search_space = search_space |
| self.input_dim = input_dim |
| self.output_dim = output_dim |
| self.task_type = task_type |
| self.results_dataset = results_dataset |
|
|
| |
| try: |
| self.tensor_storage.get_dataset(self.results_dataset) |
| except ValueError: |
| logger.info(f"Dataset '{self.results_dataset}' not found. Creating it.") |
| self.tensor_storage.create_dataset(self.results_dataset) |
|
|
| |
| self.best_score: Optional[float] = None |
| self.best_params: Optional[Dict[str, Any]] = None |
| |
| self.higher_score_is_better = False if task_type == 'regression' else True |
|
|
|
|
| |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| logger.info(f"AutoML Agent using device: {self.device}") |
| logger.info(f"AutoML Agent initialized for {task_type} task. Results stored in '{results_dataset}'.") |
|
|
|
|
| def _generate_synthetic_data(self, n_samples=500, batch_size=32) -> Tuple[Any, Any]: |
| """Generates synthetic data loaders for training and validation.""" |
| X = torch.randn(n_samples, self.input_dim, device=self.device) |
|
|
| if self.task_type == 'regression': |
| |
| true_weight = torch.randn(self.input_dim, self.output_dim, device=self.device) * 2 |
| true_bias = torch.randn(self.output_dim, device=self.device) |
| y = X @ true_weight + true_bias + torch.randn(n_samples, self.output_dim, device=self.device) * 0.5 |
| loss_fn = nn.MSELoss() |
| else: |
| |
| if self.output_dim <= 1: |
| raise ValueError("Output dimension must be > 1 for classification task example.") |
| true_weight = torch.randn(self.input_dim, self.output_dim, device=self.device) |
| logits = X @ true_weight |
| y = torch.softmax(logits, dim=1).argmax(dim=1) |
| loss_fn = nn.CrossEntropyLoss() |
|
|
| |
| split_idx = int(n_samples * 0.8) |
| X_train, X_val = X[:split_idx], X[split_idx:] |
| y_train, y_val = y[:split_idx], y[split_idx:] |
|
|
| train_dataset = torch.utils.data.TensorDataset(X_train, y_train) |
| val_dataset = torch.utils.data.TensorDataset(X_val, y_val) |
|
|
| train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True) |
| val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size) |
|
|
| return train_loader, val_loader, loss_fn |
|
|
|
|
| def _build_dummy_model(self, params: Dict[str, Any]) -> nn.Module: |
| """Builds the dummy MLP model based on hyperparameters.""" |
| hidden_size = params.get('hidden_size', 64) |
| activation_name = params.get('activation', 'relu') |
|
|
| act_fn_map = {'relu': nn.ReLU, 'tanh': nn.Tanh, 'sigmoid': nn.Sigmoid} |
| activation_fn = act_fn_map.get(activation_name.lower(), nn.ReLU) |
|
|
| model = DummyMLP( |
| input_dim=self.input_dim, |
| output_dim=self.output_dim, |
| hidden_size=hidden_size, |
| activation_fn=activation_fn |
| ).to(self.device) |
| return model |
|
|
|
|
| def _train_and_evaluate(self, params: Dict[str, Any], num_epochs: int = 5) -> Optional[float]: |
| """Trains and evaluates a model with given hyperparameters.""" |
| logger.debug(f"Training trial with params: {params}") |
| start_time = time.time() |
|
|
| try: |
| |
| model = self._build_dummy_model(params) |
|
|
| |
| train_loader, val_loader, loss_fn = self._generate_synthetic_data() |
|
|
| |
| lr = params.get('lr', 1e-3) |
| optimizer = optim.Adam(model.parameters(), lr=lr) |
|
|
| |
| model.train() |
| for epoch in range(num_epochs): |
| epoch_loss = 0 |
| for batch_X, batch_y in train_loader: |
| optimizer.zero_grad() |
| outputs = model(batch_X) |
| loss = loss_fn(outputs, batch_y) |
|
|
| |
| if not torch.isfinite(loss): |
| logger.warning(f"Trial failed: Non-finite loss detected during training epoch {epoch}. Params: {params}") |
| return None |
|
|
| loss.backward() |
| optimizer.step() |
| epoch_loss += loss.item() |
| |
|
|
|
|
| |
| model.eval() |
| total_val_loss = 0 |
| total_correct = 0 |
| total_samples = 0 |
| with torch.no_grad(): |
| for batch_X, batch_y in val_loader: |
| outputs = model(batch_X) |
| loss = loss_fn(outputs, batch_y) |
| total_val_loss += loss.item() |
|
|
| if self.task_type == 'classification': |
| predicted = outputs.argmax(dim=1) |
| total_correct += (predicted == batch_y).sum().item() |
| total_samples += batch_y.size(0) |
|
|
| avg_val_loss = total_val_loss / len(val_loader) |
| duration = time.time() - start_time |
| logger.debug(f"Trial completed in {duration:.2f}s. Val Loss: {avg_val_loss:.4f}") |
|
|
|
|
| |
| if self.task_type == 'regression': |
| score = avg_val_loss |
| else: |
| accuracy = total_correct / total_samples if total_samples > 0 else 0 |
| score = accuracy |
| logger.debug(f" Trial Val Accuracy: {accuracy:.4f}") |
|
|
| return score |
|
|
| except Exception as e: |
| logger.error(f"Trial failed with exception for params {params}: {e}", exc_info=True) |
| return None |
|
|
|
|
| def hyperparameter_search(self, trials: int, num_epochs_per_trial: int = 5) -> Optional[Dict[str, Any]]: |
| """ |
| Performs random search for the specified number of trials. |
| |
| Args: |
| trials: The number of hyperparameter configurations to try. |
| num_epochs_per_trial: Number of epochs to train each model configuration. |
| |
| Returns: |
| The dictionary of hyperparameters that achieved the best score, or None if no trial succeeded. |
| """ |
| logger.info(f"--- Starting Hyperparameter Search ({trials} trials) ---") |
| self.best_score = None |
| self.best_params = None |
|
|
| for i in range(trials): |
| |
| current_params = {name: sampler() for name, sampler in self.search_space.items()} |
| logger.info(f"Trial {i+1}/{trials}: Testing params: {current_params}") |
|
|
| |
| score = self._train_and_evaluate(current_params, num_epochs=num_epochs_per_trial) |
|
|
| |
| score_tensor = torch.tensor(float('nan') if score is None else score) |
| trial_metadata = { |
| "trial_id": i + 1, |
| "params": current_params, |
| "score": score, |
| "task_type": self.task_type, |
| "search_timestamp_utc": time.time() |
| } |
| try: |
| record_id = self.tensor_storage.insert( |
| self.results_dataset, |
| score_tensor, |
| trial_metadata |
| ) |
| logger.debug(f"Stored trial {i+1} results (Score: {score}) with record ID: {record_id}") |
| except Exception as e: |
| logger.error(f"Failed to store trial {i+1} results in TensorStorage: {e}") |
|
|
|
|
| |
| if score is not None: |
| is_better = False |
| if self.best_score is None: |
| is_better = True |
| elif self.higher_score_is_better and score > self.best_score: |
| is_better = True |
| elif not self.higher_score_is_better and score < self.best_score: |
| is_better = True |
|
|
| if is_better: |
| self.best_score = score |
| self.best_params = current_params |
| logger.info(f"*** New best score found! Trial {i+1}: Score={score:.4f}, Params={current_params} ***") |
|
|
|
|
| logger.info(f"--- Hyperparameter Search Finished ---") |
| if self.best_params: |
| logger.info(f"Best score overall: {self.best_score:.4f}") |
| logger.info(f"Best hyperparameters found: {self.best_params}") |
| |
| |
| else: |
| logger.warning("No successful trials completed. Could not determine best parameters.") |
|
|
| return self.best_params |
|
|
|
|
| |
| if __name__ == "__main__": |
| print("--- Starting AutoML Agent Example ---") |
|
|
| |
| storage = TensorStorage() |
|
|
| |
| |
| search_space_reg = { |
| 'lr': lambda: 10**random.uniform(-5, -2), |
| 'hidden_size': lambda: random.choice([32, 64, 128, 256]), |
| 'activation': lambda: random.choice(['relu', 'tanh']) |
| } |
|
|
| |
| input_dim = 10 |
| output_dim = 1 |
| automl_agent = AutoMLAgent( |
| tensor_storage=storage, |
| search_space=search_space_reg, |
| input_dim=input_dim, |
| output_dim=output_dim, |
| task_type='regression', |
| results_dataset="automl_regression_results" |
| ) |
|
|
| |
| num_trials = 20 |
| num_epochs = 10 |
| best_hyperparams = automl_agent.hyperparameter_search(trials=num_trials, num_epochs_per_trial=num_epochs) |
|
|
|
|
| |
| print("\n--- Checking TensorStorage contents (Sample) ---") |
| try: |
| results_count = len(storage.get_dataset(automl_agent.results_dataset)) |
| print(f"Found {results_count} trial records in '{automl_agent.results_dataset}'.") |
|
|
| if results_count > 0: |
| print("\nExample trial record (metadata):") |
| |
| sample_trial = storage.sample_dataset(automl_agent.results_dataset, 1) |
| if sample_trial: |
| print(f" Metadata: {sample_trial[0]['metadata']}") |
| print(f" Score Tensor: {sample_trial[0]['tensor']}") |
|
|
| |
| |
|
|
| except ValueError as e: |
| print(f"Could not retrieve dataset '{automl_agent.results_dataset}': {e}") |
| except Exception as e: |
| print(f"An error occurred checking storage: {e}") |
|
|
|
|
| print("\n--- AutoML Agent Example Finished ---") |