|
|
""" |
|
|
Neural Architecture Search (NAS) and hyperparameter optimization system. |
|
|
""" |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.optim as optim |
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable |
|
|
from dataclasses import dataclass |
|
|
import numpy as np |
|
|
import random |
|
|
import math |
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
from scipy.stats import norm |
|
|
from scipy.optimize import minimize |
|
|
from sklearn.gaussian_process import GaussianProcessRegressor |
|
|
from sklearn.gaussian_process.kernels import Matern |
|
|
from skopt import gp_minimize |
|
|
from skopt.utils import use_named_args |
|
|
from skopt.space import Integer, Categorical, Real |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable |
|
|
import random |
|
|
import time |
|
|
from dataclasses import dataclass |
|
|
import json |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ArchitectureSpec: |
|
|
"""Specification for a neural architecture""" |
|
|
layers: List[Dict[str, Any]] |
|
|
connections: List[Tuple[int, int]] |
|
|
input_size: int |
|
|
output_size: int |
|
|
hyperparameters: Dict[str, Any] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ArchitecturePerformance: |
|
|
"""Performance metrics for an architecture""" |
|
|
architecture: ArchitectureSpec |
|
|
accuracy: float |
|
|
loss: float |
|
|
training_time: float |
|
|
memory_usage: float |
|
|
parameter_count: int |
|
|
timestamp: float |
|
|
|
|
|
|
|
|
class BayesianOptimizer: |
|
|
""" |
|
|
Bayesian Optimization for Neural Architecture Search. |
|
|
Uses Gaussian Processes and acquisition functions. |
|
|
""" |
|
|
def __init__(self, search_space: Dict[str, Any], objective_function: Callable): |
|
|
self.search_space = search_space |
|
|
self.objective = objective_function |
|
|
self.observations = [] |
|
|
self.gp = None |
|
|
self.best_params = None |
|
|
self.best_score = float('inf') |
|
|
|
|
|
|
|
|
self.skopt_space = self._create_skopt_space() |
|
|
|
|
|
def _create_skopt_space(self): |
|
|
"""Create skopt search space from parameter definitions""" |
|
|
space = [] |
|
|
|
|
|
for param_name, param_config in self.search_space.items(): |
|
|
param_type = param_config['type'] |
|
|
param_range = param_config['range'] |
|
|
|
|
|
if param_type == 'int': |
|
|
space.append(Integer(param_range[0], param_range[1], name=param_name)) |
|
|
elif param_type == 'float': |
|
|
space.append(Real(param_range[0], param_range[1], name=param_name)) |
|
|
elif param_type == 'categorical': |
|
|
space.append(Categorical(param_range, name=param_name)) |
|
|
|
|
|
return space |
|
|
|
|
|
def _objective_wrapper(self, **params): |
|
|
"""Wrapper for skopt objective function""" |
|
|
return self.objective(params) |
|
|
|
|
|
def optimize(self, n_calls: int = 50, n_initial_points: int = 10) -> Dict[str, Any]: |
|
|
""" |
|
|
Run Bayesian optimization. |
|
|
|
|
|
Args: |
|
|
n_calls: Total number of function evaluations |
|
|
n_initial_points: Number of random initial points |
|
|
|
|
|
Returns: |
|
|
Best parameters found |
|
|
""" |
|
|
print(f"Starting Bayesian optimization with {n_calls} evaluations...") |
|
|
|
|
|
|
|
|
decorated_objective = use_named_args(self.skopt_space)(self._objective_wrapper) |
|
|
|
|
|
result = gp_minimize( |
|
|
func=decorated_objective, |
|
|
dimensions=self.skopt_space, |
|
|
n_calls=n_calls, |
|
|
n_initial_points=n_initial_points, |
|
|
random_state=42, |
|
|
verbose=True |
|
|
) |
|
|
|
|
|
best_params = dict(zip([dim.name for dim in self.skopt_space], result.x)) |
|
|
best_score = result.fun |
|
|
|
|
|
print(f"Bayesian optimization completed. Best score: {best_score:.4f}") |
|
|
|
|
|
return { |
|
|
'best_params': best_params, |
|
|
'best_score': best_score, |
|
|
'all_scores': result.func_vals, |
|
|
'all_params': [dict(zip([dim.name for dim in self.skopt_space], x)) |
|
|
for x in result.x_iters] |
|
|
} |
|
|
|
|
|
|
|
|
class ArchitectureGenerator: |
|
|
""" |
|
|
Generates neural architectures from optimized parameters. |
|
|
""" |
|
|
def __init__(self): |
|
|
self.operation_types = { |
|
|
0: 'conv', |
|
|
1: 'linear', |
|
|
2: 'pool', |
|
|
3: 'dropout', |
|
|
4: 'batch_norm', |
|
|
5: 'attention', |
|
|
6: 'residual' |
|
|
} |
|
|
|
|
|
self.activation_types = ['relu', 'tanh', 'sigmoid', 'gelu'] |
|
|
|
|
|
def generate_architecture(self, params: Dict[str, Any]) -> ArchitectureSpec: |
|
|
""" |
|
|
Generate architecture from optimized parameters. |
|
|
""" |
|
|
num_layers = params['num_layers'] |
|
|
input_size = params.get('input_size', 784) |
|
|
output_size = params.get('output_size', 10) |
|
|
|
|
|
layers = [] |
|
|
connections = [] |
|
|
|
|
|
|
|
|
for i in range(num_layers): |
|
|
layer_type = self.operation_types[params[f'layer_{i}_type']] |
|
|
|
|
|
layer_config = { |
|
|
'type': layer_type, |
|
|
'index': i |
|
|
} |
|
|
|
|
|
|
|
|
if layer_type == 'conv': |
|
|
layer_config.update({ |
|
|
'out_channels': params[f'layer_{i}_out_channels'], |
|
|
'kernel_size': params[f'layer_{i}_kernel_size'], |
|
|
'stride': params[f'layer_{i}_stride'] |
|
|
}) |
|
|
elif layer_type == 'linear': |
|
|
layer_config.update({ |
|
|
'out_features': params[f'layer_{i}_out_features'] |
|
|
}) |
|
|
elif layer_type == 'attention': |
|
|
layer_config.update({ |
|
|
'num_heads': params[f'layer_{i}_num_heads'], |
|
|
'head_dim': params[f'layer_{i}_head_dim'] |
|
|
}) |
|
|
|
|
|
|
|
|
layer_config['activation'] = self.activation_types[params[f'layer_{i}_activation']] |
|
|
|
|
|
layers.append(layer_config) |
|
|
|
|
|
|
|
|
for i in range(num_layers - 1): |
|
|
connections.append((i, i + 1)) |
|
|
|
|
|
return ArchitectureSpec( |
|
|
layers=layers, |
|
|
connections=connections, |
|
|
input_size=input_size, |
|
|
output_size=output_size, |
|
|
hyperparameters=params |
|
|
) |
|
|
|
|
|
|
|
|
class MultiObjectiveEvaluator: |
|
|
""" |
|
|
Evaluates architectures on multiple objectives. |
|
|
""" |
|
|
def __init__(self): |
|
|
self.objectives = ['accuracy', 'efficiency', 'complexity'] |
|
|
self.weights = {'accuracy': 0.5, 'efficiency': 0.3, 'complexity': 0.2} |
|
|
|
|
|
def evaluate(self, architecture: ArchitectureSpec, device: str = 'cpu') -> ArchitecturePerformance: |
|
|
""" |
|
|
Evaluate architecture on multiple objectives. |
|
|
""" |
|
|
|
|
|
model = self._build_model(architecture) |
|
|
|
|
|
|
|
|
accuracy = self._simulate_accuracy(model, architecture) |
|
|
loss = self._simulate_loss(model, architecture) |
|
|
training_time = self._estimate_training_time(architecture) |
|
|
memory_usage = self._estimate_memory_usage(model) |
|
|
parameter_count = sum(p.numel() for p in model.parameters()) |
|
|
|
|
|
return ArchitecturePerformance( |
|
|
architecture=architecture, |
|
|
accuracy=accuracy, |
|
|
loss=loss, |
|
|
training_time=training_time, |
|
|
memory_usage=memory_usage, |
|
|
parameter_count=parameter_count, |
|
|
timestamp=time.time() |
|
|
) |
|
|
|
|
|
def _build_model(self, architecture: ArchitectureSpec) -> nn.Module: |
|
|
"""Build PyTorch model from architecture specification""" |
|
|
layers = [] |
|
|
|
|
|
for layer_spec in architecture.layers: |
|
|
layer_type = layer_spec['type'] |
|
|
|
|
|
if layer_type == 'conv': |
|
|
conv = nn.Conv2d( |
|
|
in_channels=layer_spec.get('in_channels', 1), |
|
|
out_channels=layer_spec['out_channels'], |
|
|
kernel_size=layer_spec['kernel_size'], |
|
|
stride=layer_spec['stride'] |
|
|
) |
|
|
layers.append(conv) |
|
|
elif layer_type == 'linear': |
|
|
linear = nn.Linear( |
|
|
in_features=layer_spec.get('in_features', 784), |
|
|
out_features=layer_spec['out_features'] |
|
|
) |
|
|
layers.append(linear) |
|
|
elif layer_type == 'pool': |
|
|
layers.append(nn.MaxPool2d(2, 2)) |
|
|
elif layer_type == 'dropout': |
|
|
layers.append(nn.Dropout(0.5)) |
|
|
elif layer_type == 'batch_norm': |
|
|
layers.append(nn.BatchNorm2d(layer_spec.get('num_features', 32))) |
|
|
|
|
|
|
|
|
activation = layer_spec.get('activation', 'relu') |
|
|
if activation == 'relu': |
|
|
layers.append(nn.ReLU()) |
|
|
elif activation == 'tanh': |
|
|
layers.append(nn.Tanh()) |
|
|
elif activation == 'sigmoid': |
|
|
layers.append(nn.Sigmoid()) |
|
|
elif activation == 'gelu': |
|
|
layers.append(nn.GELU()) |
|
|
|
|
|
|
|
|
layers.append(nn.Linear(architecture.layers[-1].get('out_features', 128), |
|
|
architecture.output_size)) |
|
|
|
|
|
return nn.Sequential(*layers) |
|
|
|
|
|
def _simulate_accuracy(self, model: nn.Module, architecture: ArchitectureSpec) -> float: |
|
|
"""Simulate model accuracy (placeholder for actual training)""" |
|
|
|
|
|
complexity_score = len(architecture.layers) / 10.0 |
|
|
parameter_penalty = sum(p.numel() for p in model.parameters()) / 1000000.0 |
|
|
|
|
|
|
|
|
base_accuracy = 0.8 - complexity_score * 0.1 - parameter_penalty * 0.05 |
|
|
noise = np.random.normal(0, 0.02) |
|
|
accuracy = np.clip(base_accuracy + noise, 0.1, 0.95) |
|
|
|
|
|
return float(accuracy) |
|
|
|
|
|
def _simulate_loss(self, model: nn.Module, architecture: ArchitectureSpec) -> float: |
|
|
"""Simulate training loss""" |
|
|
return float(1.0 - architecture.layers[0].get('accuracy', 0.8) + np.random.normal(0, 0.1)) |
|
|
|
|
|
def _estimate_training_time(self, architecture: ArchitectureSpec) -> float: |
|
|
"""Estimate training time based on architecture""" |
|
|
base_time = 10.0 |
|
|
complexity_factor = len(architecture.layers) * 2.0 |
|
|
parameter_factor = sum(layer.get('out_features', layer.get('out_channels', 32)) |
|
|
for layer in architecture.layers) / 1000.0 |
|
|
|
|
|
return base_time + complexity_factor + parameter_factor |
|
|
|
|
|
def _estimate_memory_usage(self, model: nn.Module) -> float: |
|
|
"""Estimate memory usage""" |
|
|
param_memory = sum(p.numel() * p.element_size() for p in model.parameters()) |
|
|
buffer_memory = sum(b.numel() * b.element_size() for b in model.buffers()) |
|
|
|
|
|
|
|
|
return (param_memory + buffer_memory) / (1024 * 1024) |
|
|
|
|
|
def compute_pareto_front(self, performances: List[ArchitecturePerformance]) -> List[ArchitecturePerformance]: |
|
|
""" |
|
|
Compute Pareto front for multi-objective optimization. |
|
|
""" |
|
|
def dominates(p1: ArchitecturePerformance, p2: ArchitecturePerformance) -> bool: |
|
|
"""Check if p1 dominates p2 (better in all objectives)""" |
|
|
return (p1.accuracy >= p2.accuracy and |
|
|
p1.training_time <= p2.training_time and |
|
|
p1.memory_usage <= p2.memory_usage) |
|
|
|
|
|
pareto_front = [] |
|
|
|
|
|
for perf in performances: |
|
|
is_dominated = False |
|
|
for other in performances: |
|
|
if other != perf and dominates(other, perf): |
|
|
is_dominated = True |
|
|
break |
|
|
|
|
|
if not is_dominated: |
|
|
pareto_front.append(perf) |
|
|
|
|
|
return pareto_front |
|
|
|
|
|
|
|
|
class NASController: |
|
|
""" |
|
|
Neural Architecture Search controller using Bayesian optimization. |
|
|
""" |
|
|
def __init__(self, max_layers: int = 10, search_budget: int = 100): |
|
|
self.max_layers = max_layers |
|
|
self.search_budget = search_budget |
|
|
|
|
|
|
|
|
self.generator = ArchitectureGenerator() |
|
|
self.evaluator = MultiObjectiveEvaluator() |
|
|
|
|
|
|
|
|
self.search_space = self._define_search_space() |
|
|
|
|
|
|
|
|
self.evaluated_architectures = [] |
|
|
self.pareto_front = [] |
|
|
|
|
|
def _define_search_space(self) -> Dict[str, Any]: |
|
|
"""Define the search space for architecture optimization""" |
|
|
space = { |
|
|
'num_layers': {'type': 'int', 'range': [3, self.max_layers]}, |
|
|
'input_size': {'type': 'int', 'range': [784, 784]}, |
|
|
'output_size': {'type': 'int', 'range': [10, 10]} |
|
|
} |
|
|
|
|
|
|
|
|
for i in range(self.max_layers): |
|
|
space.update({ |
|
|
f'layer_{i}_type': {'type': 'int', 'range': [0, 6]}, |
|
|
f'layer_{i}_out_channels': {'type': 'int', 'range': [16, 256]}, |
|
|
f'layer_{i}_kernel_size': {'type': 'int', 'range': [1, 7]}, |
|
|
f'layer_{i}_stride': {'type': 'int', 'range': [1, 3]}, |
|
|
f'layer_{i}_out_features': {'type': 'int', 'range': [32, 1024]}, |
|
|
f'layer_{i}_num_heads': {'type': 'int', 'range': [1, 16]}, |
|
|
f'layer_{i}_head_dim': {'type': 'int', 'range': [16, 128]}, |
|
|
f'layer_{i}_activation': {'type': 'int', 'range': [0, 3]} |
|
|
}) |
|
|
|
|
|
return space |
|
|
|
|
|
def objective_function(self, params: Dict[str, Any]) -> float: |
|
|
""" |
|
|
Objective function for Bayesian optimization. |
|
|
Combines multiple objectives into a single score. |
|
|
""" |
|
|
|
|
|
architecture = self.generator.generate_architecture(params) |
|
|
|
|
|
|
|
|
performance = self.evaluator.evaluate(architecture) |
|
|
|
|
|
|
|
|
self.evaluated_architectures.append(performance) |
|
|
|
|
|
|
|
|
weights = {'accuracy': 1.0, 'time': -0.1, 'memory': -0.1} |
|
|
|
|
|
objective = ( |
|
|
weights['accuracy'] * performance.accuracy + |
|
|
weights['time'] * (performance.training_time / 100.0) + |
|
|
weights['memory'] * (performance.memory_usage / 100.0) |
|
|
) |
|
|
|
|
|
return -objective |
|
|
|
|
|
def search(self, num_candidates: int = 50) -> Dict[str, Any]: |
|
|
""" |
|
|
Perform neural architecture search using Bayesian optimization. |
|
|
|
|
|
Args: |
|
|
num_candidates: Number of architectures to evaluate |
|
|
|
|
|
Returns: |
|
|
Search results including best architecture and Pareto front |
|
|
""" |
|
|
print(f"Starting NAS with Bayesian optimization ({num_candidates} candidates)...") |
|
|
|
|
|
|
|
|
optimizer = BayesianOptimizer(self.search_space, self.objective_function) |
|
|
|
|
|
|
|
|
optimization_result = optimizer.optimize(n_calls=num_candidates) |
|
|
|
|
|
|
|
|
self.pareto_front = self.evaluator.compute_pareto_front(self.evaluated_architectures) |
|
|
|
|
|
|
|
|
best_performance = max(self.evaluated_architectures, |
|
|
key=lambda p: p.accuracy - 0.1 * p.training_time - 0.1 * p.memory_usage) |
|
|
|
|
|
results = { |
|
|
'best_architecture': best_performance.architecture, |
|
|
'best_performance': { |
|
|
'accuracy': best_performance.accuracy, |
|
|
'loss': best_performance.loss, |
|
|
'training_time': best_performance.training_time, |
|
|
'memory_usage': best_performance.memory_usage, |
|
|
'parameter_count': best_performance.parameter_count |
|
|
}, |
|
|
'pareto_front': [{ |
|
|
'accuracy': p.accuracy, |
|
|
'training_time': p.training_time, |
|
|
'memory_usage': p.memory_usage, |
|
|
'architecture': p.architecture |
|
|
} for p in self.pareto_front], |
|
|
'total_evaluated': len(self.evaluated_architectures), |
|
|
'optimization_result': optimization_result, |
|
|
'search_time': time.time() |
|
|
} |
|
|
|
|
|
print(f"NAS completed. Best accuracy: {best_performance.accuracy:.4f}") |
|
|
print(f"Pareto front size: {len(self.pareto_front)}") |
|
|
|
|
|
return results |
|
|
|
|
|
def fine_tune_architecture(self, architecture: ArchitectureSpec, |
|
|
training_data: torch.utils.data.DataLoader, |
|
|
epochs: int = 10) -> nn.Module: |
|
|
""" |
|
|
Fine-tune a discovered architecture on real data. |
|
|
|
|
|
Args: |
|
|
architecture: Architecture to fine-tune |
|
|
training_data: Training data loader |
|
|
epochs: Number of fine-tuning epochs |
|
|
|
|
|
Returns: |
|
|
Fine-tuned model |
|
|
""" |
|
|
print("Fine-tuning discovered architecture...") |
|
|
|
|
|
|
|
|
model = self.evaluator._build_model(architecture) |
|
|
|
|
|
|
|
|
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) |
|
|
criterion = nn.CrossEntropyLoss() |
|
|
|
|
|
model.train() |
|
|
for epoch in range(epochs): |
|
|
total_loss = 0.0 |
|
|
correct = 0 |
|
|
total = 0 |
|
|
|
|
|
for inputs, labels in training_data: |
|
|
optimizer.zero_grad() |
|
|
|
|
|
|
|
|
try: |
|
|
outputs = model(inputs) |
|
|
loss = criterion(outputs, labels) |
|
|
loss.backward() |
|
|
optimizer.step() |
|
|
|
|
|
total_loss += loss.item() |
|
|
_, predicted = outputs.max(1) |
|
|
total += labels.size(0) |
|
|
correct += predicted.eq(labels).sum().item() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Training error: {e}") |
|
|
continue |
|
|
|
|
|
if total > 0: |
|
|
accuracy = 100. * correct / total |
|
|
avg_loss = total_loss / len(training_data) |
|
|
print(f"Epoch {epoch+1}/{epochs}: Loss={avg_loss:.4f}, Accuracy={accuracy:.2f}%") |
|
|
|
|
|
return model |
|
|
|
|
|
def get_search_statistics(self) -> Dict[str, Any]: |
|
|
"""Get statistics about the architecture search process""" |
|
|
if not self.evaluated_architectures: |
|
|
return {'status': 'no_search_performed'} |
|
|
|
|
|
accuracies = [p.accuracy for p in self.evaluated_architectures] |
|
|
training_times = [p.training_time for p in self.evaluated_architectures] |
|
|
memory_usages = [p.memory_usage for p in self.evaluated_architectures] |
|
|
|
|
|
return { |
|
|
'total_architectures': len(self.evaluated_architectures), |
|
|
'pareto_front_size': len(self.pareto_front), |
|
|
'accuracy_stats': { |
|
|
'mean': np.mean(accuracies), |
|
|
'std': np.std(accuracies), |
|
|
'min': np.min(accuracies), |
|
|
'max': np.max(accuracies) |
|
|
}, |
|
|
'training_time_stats': { |
|
|
'mean': np.mean(training_times), |
|
|
'std': np.std(training_times), |
|
|
'total': np.sum(training_times) |
|
|
}, |
|
|
'memory_stats': { |
|
|
'mean': np.mean(memory_usages), |
|
|
'std': np.std(memory_usages), |
|
|
'max': np.max(memory_usages) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class ArchitectureSearchSystem: |
|
|
""" |
|
|
Complete Neural Architecture Search system with Bayesian optimization. |
|
|
""" |
|
|
def __init__(self, max_layers: int = 8, search_budget: int = 100): |
|
|
self.controller = NASController(max_layers=max_layers, search_budget=search_budget) |
|
|
self.search_history = [] |
|
|
|
|
|
def comprehensive_search(self, num_candidates: int = 50) -> Dict[str, Any]: |
|
|
""" |
|
|
Perform comprehensive architecture search. |
|
|
|
|
|
Args: |
|
|
num_candidates: Number of architectures to evaluate |
|
|
|
|
|
Returns: |
|
|
Complete search results |
|
|
""" |
|
|
print("🚀 Starting comprehensive neural architecture search...") |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
results = self.controller.search(num_candidates) |
|
|
|
|
|
|
|
|
results.update({ |
|
|
'search_duration': time.time() - start_time, |
|
|
'system_info': { |
|
|
'max_layers': self.controller.max_layers, |
|
|
'search_space_size': len(self.controller.search_space), |
|
|
'num_candidates': num_candidates |
|
|
}, |
|
|
'search_timestamp': time.time() |
|
|
}) |
|
|
|
|
|
|
|
|
self.search_history.append(results) |
|
|
|
|
|
print(f"✅ NAS completed in {results['search_duration']:.2f} seconds") |
|
|
print(f"📊 Evaluated {results['total_evaluated']} architectures") |
|
|
print(f"🏆 Best accuracy: {results['best_performance']['accuracy']:.4f}") |
|
|
|
|
|
return results |
|
|
|
|
|
def get_search_history(self) -> List[Dict]: |
|
|
"""Get history of all architecture searches""" |
|
|
return self.search_history |
|
|
|
|
|
def export_results(self, results: Dict, filename: str = None) -> str: |
|
|
"""Export search results to JSON file""" |
|
|
if filename is None: |
|
|
timestamp = int(time.time()) |
|
|
filename = f"nas_results_{timestamp}.json" |
|
|
|
|
|
|
|
|
def serialize_obj(obj): |
|
|
if isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
elif isinstance(obj, torch.Tensor): |
|
|
return obj.tolist() |
|
|
elif hasattr(obj, '__dict__'): |
|
|
return str(obj) |
|
|
else: |
|
|
return obj |
|
|
|
|
|
serializable_results = json.loads(json.dumps(results, default=serialize_obj)) |
|
|
|
|
|
with open(filename, 'w') as f: |
|
|
json.dump(serializable_results, f, indent=2) |
|
|
|
|
|
print(f"📄 Results exported to {filename}") |
|
|
return filename |
|
|
|
|
|
|
|
|
|
|
|
def sample_architecture(): |
|
|
"""Legacy function for backward compatibility""" |
|
|
system = ArchitectureSearchSystem() |
|
|
results = system.comprehensive_search(num_candidates=10) |
|
|
return results['best_architecture'] |
|
|
|
|
|
def comprehensive_search(): |
|
|
"""Legacy function for backward compatibility""" |
|
|
system = ArchitectureSearchSystem() |
|
|
return system.comprehensive_search() |
|
|
|
|
|
def reset(self): |
|
|
"""Reset controller state""" |
|
|
self.hidden = None |
|
|
|
|
|
def sample_architecture(self) -> ArchitectureSpec: |
|
|
"""Sample an architecture from the controller""" |
|
|
self.reset() |
|
|
|
|
|
layers = [] |
|
|
connections = [] |
|
|
layer_count = 0 |
|
|
|
|
|
while layer_count < self.max_layers: |
|
|
|
|
|
if self.hidden is None: |
|
|
hidden = torch.zeros(1, 32) |
|
|
cell = torch.zeros(1, 32) |
|
|
else: |
|
|
hidden, cell = self.hidden |
|
|
|
|
|
logits = self.layer_decision(hidden) |
|
|
probs = torch.softmax(logits, dim=-1) |
|
|
decision = torch.multinomial(probs, 1).item() |
|
|
|
|
|
if decision == 0: |
|
|
break |
|
|
|
|
|
|
|
|
op_logits = self.operation_decision(hidden) |
|
|
op_probs = torch.softmax(op_logits, dim=-1) |
|
|
operation = torch.multinomial(op_probs, 1).item() |
|
|
|
|
|
|
|
|
layer_spec = { |
|
|
"type": self._operation_to_type(operation), |
|
|
"params": self._get_operation_params(operation) |
|
|
} |
|
|
layers.append(layer_spec) |
|
|
|
|
|
|
|
|
if layer_count > 0: |
|
|
connections.append((layer_count - 1, layer_count)) |
|
|
|
|
|
|
|
|
self.hidden = self.controller(torch.zeros(1, 32), (hidden, cell)) |
|
|
layer_count += 1 |
|
|
|
|
|
return ArchitectureSpec( |
|
|
layers=layers, |
|
|
connections=connections, |
|
|
input_size=784, |
|
|
output_size=10 |
|
|
) |
|
|
|
|
|
def _operation_to_type(self, op_idx: int) -> str: |
|
|
"""Convert operation index to layer type""" |
|
|
operations = ["conv", "linear", "pool", "dropout", "batch_norm"] |
|
|
return operations[op_idx % len(operations)] |
|
|
|
|
|
def _get_operation_params(self, op_idx: int) -> Dict[str, Any]: |
|
|
"""Get parameters for operation""" |
|
|
if op_idx == 0: |
|
|
return {"out_channels": 32, "kernel_size": 3} |
|
|
elif op_idx == 1: |
|
|
return {"out_features": 128} |
|
|
else: |
|
|
return {} |
|
|
|
|
|
|
|
|
class ArchitectureBuilder: |
|
|
""" |
|
|
Builds PyTorch models from architecture specifications. |
|
|
""" |
|
|
def __init__(self): |
|
|
self.operation_map = { |
|
|
"conv": self._build_conv, |
|
|
"linear": self._build_linear, |
|
|
"pool": self._build_pool, |
|
|
"dropout": self._build_dropout, |
|
|
"batch_norm": self._build_batch_norm |
|
|
} |
|
|
|
|
|
def build_model(self, spec: ArchitectureSpec) -> nn.Module: |
|
|
"""Build PyTorch model from specification""" |
|
|
layers = [] |
|
|
|
|
|
|
|
|
current_size = spec.input_size |
|
|
|
|
|
for i, layer_spec in enumerate(spec.layers): |
|
|
layer_type = layer_spec["type"] |
|
|
params = layer_spec["params"] |
|
|
|
|
|
if layer_type in self.operation_map: |
|
|
layer = self.operation_map[layer_type](current_size, params) |
|
|
layers.append(layer) |
|
|
|
|
|
|
|
|
if layer_type == "linear": |
|
|
current_size = params.get("out_features", current_size) |
|
|
elif layer_type == "conv": |
|
|
|
|
|
current_size = params.get("out_channels", 32) |
|
|
|
|
|
|
|
|
if layers: |
|
|
layers.append(nn.Linear(current_size, spec.output_size)) |
|
|
else: |
|
|
layers.append(nn.Linear(spec.input_size, spec.output_size)) |
|
|
|
|
|
return nn.Sequential(*layers) |
|
|
|
|
|
def _build_conv(self, input_size: int, params: Dict) -> nn.Module: |
|
|
"""Build convolutional layer""" |
|
|
out_channels = params.get("out_channels", 32) |
|
|
kernel_size = params.get("kernel_size", 3) |
|
|
return nn.Conv2d(input_size, out_channels, kernel_size, padding=kernel_size//2) |
|
|
|
|
|
def _build_linear(self, input_size: int, params: Dict) -> nn.Module: |
|
|
"""Build linear layer""" |
|
|
out_features = params.get("out_features", 128) |
|
|
return nn.Linear(input_size, out_features) |
|
|
|
|
|
def _build_pool(self, input_size: int, params: Dict) -> nn.Module: |
|
|
"""Build pooling layer""" |
|
|
return nn.AdaptiveAvgPool2d((1, 1)) |
|
|
|
|
|
def _build_dropout(self, input_size: int, params: Dict) -> nn.Module: |
|
|
"""Build dropout layer""" |
|
|
p = params.get("p", 0.5) |
|
|
return nn.Dropout(p) |
|
|
|
|
|
def _build_batch_norm(self, input_size: int, params: Dict) -> nn.Module: |
|
|
"""Build batch normalization""" |
|
|
return nn.BatchNorm2d(input_size) |
|
|
|
|
|
|
|
|
class BayesianOptimizer: |
|
|
""" |
|
|
Bayesian optimization for hyperparameter tuning. |
|
|
""" |
|
|
def __init__(self, param_space: Dict[str, Tuple[float, float]]): |
|
|
self.param_space = param_space |
|
|
self.observations = [] |
|
|
self.gp_model = None |
|
|
|
|
|
def suggest(self) -> Dict[str, float]: |
|
|
"""Suggest next hyperparameters to try""" |
|
|
if len(self.observations) < 5: |
|
|
|
|
|
return {name: random.uniform(low, high) |
|
|
for name, (low, high) in self.param_space.items()} |
|
|
|
|
|
|
|
|
|
|
|
return {name: random.uniform(low, high) |
|
|
for name, (low, high) in self.param_space.items()} |
|
|
|
|
|
def observe(self, params: Dict[str, float], score: float): |
|
|
"""Observe result of parameter evaluation""" |
|
|
self.observations.append((params, score)) |
|
|
|
|
|
|
|
|
class EvolutionarySearch: |
|
|
""" |
|
|
Evolutionary algorithm for architecture search. |
|
|
""" |
|
|
def __init__(self, population_size: int = 20, mutation_rate: float = 0.1): |
|
|
self.population_size = population_size |
|
|
self.mutation_rate = mutation_rate |
|
|
self.population = [] |
|
|
self.fitness_scores = {} |
|
|
|
|
|
def initialize_population(self, base_spec: ArchitectureSpec): |
|
|
"""Initialize population with variations of base architecture""" |
|
|
self.population = [] |
|
|
|
|
|
for _ in range(self.population_size): |
|
|
|
|
|
variation = self._mutate_architecture(base_spec) |
|
|
self.population.append(variation) |
|
|
|
|
|
def evolve(self, generations: int = 10, fitness_fn: Callable[[ArchitectureSpec], float] = None): |
|
|
"""Run evolutionary search""" |
|
|
for generation in range(generations): |
|
|
|
|
|
fitness_scores = {} |
|
|
for spec in self.population: |
|
|
if fitness_fn: |
|
|
score = fitness_fn(spec) |
|
|
else: |
|
|
score = random.random() |
|
|
fitness_scores[id(spec)] = score |
|
|
|
|
|
|
|
|
sorted_pop = sorted(self.population, |
|
|
key=lambda x: fitness_scores[id(x)], |
|
|
reverse=True) |
|
|
|
|
|
|
|
|
elite = sorted_pop[:self.population_size // 2] |
|
|
|
|
|
|
|
|
offspring = [] |
|
|
for parent in elite: |
|
|
child = self._mutate_architecture(parent) |
|
|
offspring.append(child) |
|
|
|
|
|
|
|
|
self.population = elite + offspring |
|
|
|
|
|
|
|
|
return max(self.population, key=lambda x: fitness_scores.get(id(x), 0)) |
|
|
|
|
|
def _mutate_architecture(self, spec: ArchitectureSpec) -> ArchitectureSpec: |
|
|
"""Mutate an architecture specification""" |
|
|
new_spec = ArchitectureSpec( |
|
|
layers=spec.layers.copy(), |
|
|
connections=spec.connections.copy(), |
|
|
input_size=spec.input_size, |
|
|
output_size=spec.output_size |
|
|
) |
|
|
|
|
|
|
|
|
if random.random() < self.mutation_rate: |
|
|
|
|
|
if random.random() < 0.5 and len(new_spec.layers) < 10: |
|
|
|
|
|
new_layer = {"type": "linear", "params": {"out_features": 64}} |
|
|
new_spec.layers.append(new_layer) |
|
|
elif len(new_spec.layers) > 1: |
|
|
|
|
|
idx = random.randint(0, len(new_spec.layers) - 1) |
|
|
del new_spec.layers[idx] |
|
|
|
|
|
return new_spec |
|
|
|
|
|
|
|
|
class DARTSSearch: |
|
|
""" |
|
|
Differentiable Architecture Search (DARTS) implementation. |
|
|
""" |
|
|
def __init__(self, num_operations: int = 4, num_nodes: int = 4): |
|
|
self.num_operations = num_operations |
|
|
self.num_nodes = num_nodes |
|
|
|
|
|
|
|
|
self.alpha_normal = nn.Parameter(torch.randn(num_nodes, num_operations)) |
|
|
self.alpha_reduce = nn.Parameter(torch.randn(num_nodes, num_operations)) |
|
|
|
|
|
def get_operations(self, node_idx: int, normal: bool = True) -> List[Tuple[float, nn.Module]]: |
|
|
"""Get weighted operations for a node""" |
|
|
alphas = self.alpha_normal if normal else self.alpha_reduce |
|
|
|
|
|
operations = [ |
|
|
(alphas[node_idx, 0], lambda: nn.Identity()), |
|
|
(alphas[node_idx, 1], lambda: nn.Conv2d(32, 32, 3, padding=1)), |
|
|
(alphas[node_idx, 2], lambda: nn.Conv2d(32, 32, 5, padding=2)), |
|
|
(alphas[node_idx, 3], lambda: nn.AvgPool2d(3, stride=1, padding=1)) |
|
|
] |
|
|
|
|
|
return operations |
|
|
|
|
|
def derive_final_architecture(self) -> ArchitectureSpec: |
|
|
"""Derive final discrete architecture from learned parameters""" |
|
|
layers = [] |
|
|
|
|
|
|
|
|
for node in range(self.num_nodes): |
|
|
|
|
|
weights = torch.softmax(self.alpha_normal[node], dim=0) |
|
|
best_op = torch.argmax(weights).item() |
|
|
|
|
|
layer_spec = { |
|
|
"type": self._operation_idx_to_type(best_op), |
|
|
"params": self._get_operation_params(best_op) |
|
|
} |
|
|
layers.append(layer_spec) |
|
|
|
|
|
return ArchitectureSpec( |
|
|
layers=layers, |
|
|
connections=[(i, i+1) for i in range(len(layers)-1)], |
|
|
input_size=784, |
|
|
output_size=10 |
|
|
) |
|
|
|
|
|
def _operation_idx_to_type(self, idx: int) -> str: |
|
|
"""Convert operation index to type""" |
|
|
types = ["identity", "conv3", "conv5", "pool"] |
|
|
return types[idx] |
|
|
|
|
|
def _get_operation_params(self, idx: int) -> Dict[str, Any]: |
|
|
"""Get parameters for operation""" |
|
|
if idx == 1: |
|
|
return {"out_channels": 32, "kernel_size": 3, "padding": 1} |
|
|
elif idx == 2: |
|
|
return {"out_channels": 32, "kernel_size": 5, "padding": 2} |
|
|
elif idx == 3: |
|
|
return {"kernel_size": 3, "stride": 1, "padding": 1} |
|
|
else: |
|
|
return {} |
|
|
|
|
|
|
|
|
class ArchitectureSearchSystem: |
|
|
""" |
|
|
Complete architecture search system with multiple methods. |
|
|
""" |
|
|
def __init__(self): |
|
|
self.controller = NASController() |
|
|
self.builder = ArchitectureBuilder() |
|
|
self.bayesian_opt = BayesianOptimizer({ |
|
|
"learning_rate": (1e-5, 1e-1), |
|
|
"batch_size": (16, 256), |
|
|
"hidden_size": (64, 512) |
|
|
}) |
|
|
self.evolutionary = EvolutionarySearch() |
|
|
self.darts = DARTSSearch() |
|
|
|
|
|
def search_controller_based(self, num_samples: int = 10, |
|
|
evaluation_fn: Callable[[nn.Module], float] = None) -> ArchitectureSpec: |
|
|
""" |
|
|
Search using controller-based NAS. |
|
|
""" |
|
|
best_spec = None |
|
|
best_score = float('-inf') |
|
|
|
|
|
for _ in range(num_samples): |
|
|
|
|
|
spec = self.controller.sample_architecture() |
|
|
|
|
|
|
|
|
model = self.builder.build_model(spec) |
|
|
if evaluation_fn: |
|
|
score = evaluation_fn(model) |
|
|
else: |
|
|
score = random.random() |
|
|
|
|
|
if score > best_score: |
|
|
best_spec = spec |
|
|
best_score = score |
|
|
|
|
|
return best_spec |
|
|
|
|
|
def search_evolutionary(self, base_spec: ArchitectureSpec, generations: int = 5, |
|
|
evaluation_fn: Callable[[ArchitectureSpec], float] = None) -> ArchitectureSpec: |
|
|
""" |
|
|
Search using evolutionary algorithms. |
|
|
""" |
|
|
self.evolutionary.initialize_population(base_spec) |
|
|
return self.evolutionary.evolve(generations, evaluation_fn) |
|
|
|
|
|
def search_darts(self, train_loader, val_loader, num_epochs: int = 10) -> ArchitectureSpec: |
|
|
""" |
|
|
Search using DARTS. |
|
|
""" |
|
|
optimizer = optim.Adam([ |
|
|
{'params': self.darts.parameters(), 'lr': 1e-3} |
|
|
]) |
|
|
|
|
|
|
|
|
for epoch in range(num_epochs): |
|
|
for batch_x, batch_y in train_loader: |
|
|
optimizer.zero_grad() |
|
|
|
|
|
|
|
|
|
|
|
loss = torch.tensor(1.0, requires_grad=True) |
|
|
loss.backward() |
|
|
optimizer.step() |
|
|
|
|
|
return self.darts.derive_final_architecture() |
|
|
|
|
|
def optimize_hyperparameters(self, model_class: Callable, train_fn: Callable, |
|
|
num_trials: int = 20) -> Dict[str, float]: |
|
|
""" |
|
|
Optimize hyperparameters using Bayesian optimization. |
|
|
""" |
|
|
best_params = None |
|
|
best_score = float('-inf') |
|
|
|
|
|
for trial in range(num_trials): |
|
|
|
|
|
params = self.bayesian_opt.suggest() |
|
|
|
|
|
|
|
|
model = model_class(**params) |
|
|
score = train_fn(model, params) |
|
|
|
|
|
|
|
|
self.bayesian_opt.observe(params, score) |
|
|
|
|
|
if score > best_score: |
|
|
best_params = params |
|
|
best_score = score |
|
|
|
|
|
return best_params |
|
|
|
|
|
def comprehensive_search(self, base_spec: ArchitectureSpec = None, |
|
|
evaluation_fn: Callable = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Comprehensive architecture search using multiple methods. |
|
|
""" |
|
|
results = {} |
|
|
|
|
|
|
|
|
if base_spec is None: |
|
|
base_spec = ArchitectureSpec( |
|
|
layers=[{"type": "linear", "params": {"out_features": 128}}], |
|
|
connections=[], |
|
|
input_size=784, |
|
|
output_size=10 |
|
|
) |
|
|
|
|
|
|
|
|
results["controller"] = self.search_controller_based(5, evaluation_fn) |
|
|
|
|
|
|
|
|
results["evolutionary"] = self.search_evolutionary(base_spec, 3, evaluation_fn) |
|
|
|
|
|
|
|
|
results["darts"] = self.darts.derive_final_architecture() |
|
|
|
|
|
|
|
|
best_score = float('-inf') |
|
|
best_spec = None |
|
|
|
|
|
for method, spec in results.items(): |
|
|
if evaluation_fn: |
|
|
score = evaluation_fn(self.builder.build_model(spec)) |
|
|
else: |
|
|
score = random.random() |
|
|
|
|
|
if score > best_score: |
|
|
best_score = score |
|
|
best_spec = spec |
|
|
|
|
|
results["best"] = best_spec |
|
|
results["best_score"] = best_score |
|
|
|
|
|
return results |
|
|
|