ech0-prime-agi / learning /architecture_search.py
workofarttattoo's picture
Upload folder using huggingface_hub
f3dce3d verified
"""
Neural Architecture Search (NAS) and hyperparameter optimization system.
"""
import torch
import torch.nn as nn
import torch.optim as optim
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass
import numpy as np
import random
import math
from collections import defaultdict
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern
from skopt import gp_minimize
from skopt.utils import use_named_args
from skopt.space import Integer, Categorical, Real
import torch
import torch.nn as nn
from typing import Dict, List, Optional, Tuple, Any, Callable
import random
import time
from dataclasses import dataclass
import json
@dataclass
class ArchitectureSpec:
"""Specification for a neural architecture"""
layers: List[Dict[str, Any]]
connections: List[Tuple[int, int]] # (from_layer, to_layer)
input_size: int
output_size: int
hyperparameters: Dict[str, Any] # Learned hyperparameters
@dataclass
class ArchitecturePerformance:
"""Performance metrics for an architecture"""
architecture: ArchitectureSpec
accuracy: float
loss: float
training_time: float
memory_usage: float
parameter_count: int
timestamp: float
class BayesianOptimizer:
"""
Bayesian Optimization for Neural Architecture Search.
Uses Gaussian Processes and acquisition functions.
"""
def __init__(self, search_space: Dict[str, Any], objective_function: Callable):
self.search_space = search_space
self.objective = objective_function
self.observations = [] # List of (params, score) tuples
self.gp = None
self.best_params = None
self.best_score = float('inf')
# Define skopt search space
self.skopt_space = self._create_skopt_space()
def _create_skopt_space(self):
"""Create skopt search space from parameter definitions"""
space = []
for param_name, param_config in self.search_space.items():
param_type = param_config['type']
param_range = param_config['range']
if param_type == 'int':
space.append(Integer(param_range[0], param_range[1], name=param_name))
elif param_type == 'float':
space.append(Real(param_range[0], param_range[1], name=param_name))
elif param_type == 'categorical':
space.append(Categorical(param_range, name=param_name))
return space
def _objective_wrapper(self, **params):
"""Wrapper for skopt objective function"""
return self.objective(params)
def optimize(self, n_calls: int = 50, n_initial_points: int = 10) -> Dict[str, Any]:
"""
Run Bayesian optimization.
Args:
n_calls: Total number of function evaluations
n_initial_points: Number of random initial points
Returns:
Best parameters found
"""
print(f"Starting Bayesian optimization with {n_calls} evaluations...")
# Create decorated objective function
decorated_objective = use_named_args(self.skopt_space)(self._objective_wrapper)
result = gp_minimize(
func=decorated_objective,
dimensions=self.skopt_space,
n_calls=n_calls,
n_initial_points=n_initial_points,
random_state=42,
verbose=True
)
best_params = dict(zip([dim.name for dim in self.skopt_space], result.x))
best_score = result.fun
print(f"Bayesian optimization completed. Best score: {best_score:.4f}")
return {
'best_params': best_params,
'best_score': best_score,
'all_scores': result.func_vals,
'all_params': [dict(zip([dim.name for dim in self.skopt_space], x))
for x in result.x_iters]
}
class ArchitectureGenerator:
"""
Generates neural architectures from optimized parameters.
"""
def __init__(self):
self.operation_types = {
0: 'conv',
1: 'linear',
2: 'pool',
3: 'dropout',
4: 'batch_norm',
5: 'attention',
6: 'residual'
}
self.activation_types = ['relu', 'tanh', 'sigmoid', 'gelu']
def generate_architecture(self, params: Dict[str, Any]) -> ArchitectureSpec:
"""
Generate architecture from optimized parameters.
"""
num_layers = params['num_layers']
input_size = params.get('input_size', 784)
output_size = params.get('output_size', 10)
layers = []
connections = []
# Generate layers
for i in range(num_layers):
layer_type = self.operation_types[params[f'layer_{i}_type']]
layer_config = {
'type': layer_type,
'index': i
}
# Add type-specific parameters
if layer_type == 'conv':
layer_config.update({
'out_channels': params[f'layer_{i}_out_channels'],
'kernel_size': params[f'layer_{i}_kernel_size'],
'stride': params[f'layer_{i}_stride']
})
elif layer_type == 'linear':
layer_config.update({
'out_features': params[f'layer_{i}_out_features']
})
elif layer_type == 'attention':
layer_config.update({
'num_heads': params[f'layer_{i}_num_heads'],
'head_dim': params[f'layer_{i}_head_dim']
})
# Add activation
layer_config['activation'] = self.activation_types[params[f'layer_{i}_activation']]
layers.append(layer_config)
# Generate connections (simplified: chain topology)
for i in range(num_layers - 1):
connections.append((i, i + 1))
return ArchitectureSpec(
layers=layers,
connections=connections,
input_size=input_size,
output_size=output_size,
hyperparameters=params
)
class MultiObjectiveEvaluator:
"""
Evaluates architectures on multiple objectives.
"""
def __init__(self):
self.objectives = ['accuracy', 'efficiency', 'complexity']
self.weights = {'accuracy': 0.5, 'efficiency': 0.3, 'complexity': 0.2}
def evaluate(self, architecture: ArchitectureSpec, device: str = 'cpu') -> ArchitecturePerformance:
"""
Evaluate architecture on multiple objectives.
"""
# Create actual PyTorch model from architecture
model = self._build_model(architecture)
# Simulate training and evaluation (in real implementation, this would train)
accuracy = self._simulate_accuracy(model, architecture)
loss = self._simulate_loss(model, architecture)
training_time = self._estimate_training_time(architecture)
memory_usage = self._estimate_memory_usage(model)
parameter_count = sum(p.numel() for p in model.parameters())
return ArchitecturePerformance(
architecture=architecture,
accuracy=accuracy,
loss=loss,
training_time=training_time,
memory_usage=memory_usage,
parameter_count=parameter_count,
timestamp=time.time()
)
def _build_model(self, architecture: ArchitectureSpec) -> nn.Module:
"""Build PyTorch model from architecture specification"""
layers = []
for layer_spec in architecture.layers:
layer_type = layer_spec['type']
if layer_type == 'conv':
conv = nn.Conv2d(
in_channels=layer_spec.get('in_channels', 1),
out_channels=layer_spec['out_channels'],
kernel_size=layer_spec['kernel_size'],
stride=layer_spec['stride']
)
layers.append(conv)
elif layer_type == 'linear':
linear = nn.Linear(
in_features=layer_spec.get('in_features', 784),
out_features=layer_spec['out_features']
)
layers.append(linear)
elif layer_type == 'pool':
layers.append(nn.MaxPool2d(2, 2))
elif layer_type == 'dropout':
layers.append(nn.Dropout(0.5))
elif layer_type == 'batch_norm':
layers.append(nn.BatchNorm2d(layer_spec.get('num_features', 32)))
# Add activation
activation = layer_spec.get('activation', 'relu')
if activation == 'relu':
layers.append(nn.ReLU())
elif activation == 'tanh':
layers.append(nn.Tanh())
elif activation == 'sigmoid':
layers.append(nn.Sigmoid())
elif activation == 'gelu':
layers.append(nn.GELU())
# Add final classification layer
layers.append(nn.Linear(architecture.layers[-1].get('out_features', 128),
architecture.output_size))
return nn.Sequential(*layers)
def _simulate_accuracy(self, model: nn.Module, architecture: ArchitectureSpec) -> float:
"""Simulate model accuracy (placeholder for actual training)"""
# Simple heuristic based on architecture complexity
complexity_score = len(architecture.layers) / 10.0
parameter_penalty = sum(p.numel() for p in model.parameters()) / 1000000.0
# Simulate accuracy with some noise
base_accuracy = 0.8 - complexity_score * 0.1 - parameter_penalty * 0.05
noise = np.random.normal(0, 0.02)
accuracy = np.clip(base_accuracy + noise, 0.1, 0.95)
return float(accuracy)
def _simulate_loss(self, model: nn.Module, architecture: ArchitectureSpec) -> float:
"""Simulate training loss"""
return float(1.0 - architecture.layers[0].get('accuracy', 0.8) + np.random.normal(0, 0.1))
def _estimate_training_time(self, architecture: ArchitectureSpec) -> float:
"""Estimate training time based on architecture"""
base_time = 10.0 # Base training time in seconds
complexity_factor = len(architecture.layers) * 2.0
parameter_factor = sum(layer.get('out_features', layer.get('out_channels', 32))
for layer in architecture.layers) / 1000.0
return base_time + complexity_factor + parameter_factor
def _estimate_memory_usage(self, model: nn.Module) -> float:
"""Estimate memory usage"""
param_memory = sum(p.numel() * p.element_size() for p in model.parameters())
buffer_memory = sum(b.numel() * b.element_size() for b in model.buffers())
# Convert to MB
return (param_memory + buffer_memory) / (1024 * 1024)
def compute_pareto_front(self, performances: List[ArchitecturePerformance]) -> List[ArchitecturePerformance]:
"""
Compute Pareto front for multi-objective optimization.
"""
def dominates(p1: ArchitecturePerformance, p2: ArchitecturePerformance) -> bool:
"""Check if p1 dominates p2 (better in all objectives)"""
return (p1.accuracy >= p2.accuracy and
p1.training_time <= p2.training_time and
p1.memory_usage <= p2.memory_usage)
pareto_front = []
for perf in performances:
is_dominated = False
for other in performances:
if other != perf and dominates(other, perf):
is_dominated = True
break
if not is_dominated:
pareto_front.append(perf)
return pareto_front
class NASController:
"""
Neural Architecture Search controller using Bayesian optimization.
"""
def __init__(self, max_layers: int = 10, search_budget: int = 100):
self.max_layers = max_layers
self.search_budget = search_budget
# Initialize components
self.generator = ArchitectureGenerator()
self.evaluator = MultiObjectiveEvaluator()
# Search space definition
self.search_space = self._define_search_space()
# Results storage
self.evaluated_architectures = []
self.pareto_front = []
def _define_search_space(self) -> Dict[str, Any]:
"""Define the search space for architecture optimization"""
space = {
'num_layers': {'type': 'int', 'range': [3, self.max_layers]},
'input_size': {'type': 'int', 'range': [784, 784]}, # Fixed for MNIST
'output_size': {'type': 'int', 'range': [10, 10]} # Fixed for MNIST
}
# Add parameters for each possible layer
for i in range(self.max_layers):
space.update({
f'layer_{i}_type': {'type': 'int', 'range': [0, 6]}, # Operation types
f'layer_{i}_out_channels': {'type': 'int', 'range': [16, 256]},
f'layer_{i}_kernel_size': {'type': 'int', 'range': [1, 7]},
f'layer_{i}_stride': {'type': 'int', 'range': [1, 3]},
f'layer_{i}_out_features': {'type': 'int', 'range': [32, 1024]},
f'layer_{i}_num_heads': {'type': 'int', 'range': [1, 16]},
f'layer_{i}_head_dim': {'type': 'int', 'range': [16, 128]},
f'layer_{i}_activation': {'type': 'int', 'range': [0, 3]} # Activation types
})
return space
def objective_function(self, params: Dict[str, Any]) -> float:
"""
Objective function for Bayesian optimization.
Combines multiple objectives into a single score.
"""
# Generate architecture
architecture = self.generator.generate_architecture(params)
# Evaluate architecture
performance = self.evaluator.evaluate(architecture)
# Store evaluation
self.evaluated_architectures.append(performance)
# Compute weighted objective (higher accuracy, lower time/memory)
weights = {'accuracy': 1.0, 'time': -0.1, 'memory': -0.1}
objective = (
weights['accuracy'] * performance.accuracy +
weights['time'] * (performance.training_time / 100.0) + # Normalize
weights['memory'] * (performance.memory_usage / 100.0) # Normalize
)
return -objective # Minimize (BO minimizes)
def search(self, num_candidates: int = 50) -> Dict[str, Any]:
"""
Perform neural architecture search using Bayesian optimization.
Args:
num_candidates: Number of architectures to evaluate
Returns:
Search results including best architecture and Pareto front
"""
print(f"Starting NAS with Bayesian optimization ({num_candidates} candidates)...")
# Create Bayesian optimizer
optimizer = BayesianOptimizer(self.search_space, self.objective_function)
# Run optimization
optimization_result = optimizer.optimize(n_calls=num_candidates)
# Update Pareto front
self.pareto_front = self.evaluator.compute_pareto_front(self.evaluated_architectures)
# Get best architecture
best_performance = max(self.evaluated_architectures,
key=lambda p: p.accuracy - 0.1 * p.training_time - 0.1 * p.memory_usage)
results = {
'best_architecture': best_performance.architecture,
'best_performance': {
'accuracy': best_performance.accuracy,
'loss': best_performance.loss,
'training_time': best_performance.training_time,
'memory_usage': best_performance.memory_usage,
'parameter_count': best_performance.parameter_count
},
'pareto_front': [{
'accuracy': p.accuracy,
'training_time': p.training_time,
'memory_usage': p.memory_usage,
'architecture': p.architecture
} for p in self.pareto_front],
'total_evaluated': len(self.evaluated_architectures),
'optimization_result': optimization_result,
'search_time': time.time()
}
print(f"NAS completed. Best accuracy: {best_performance.accuracy:.4f}")
print(f"Pareto front size: {len(self.pareto_front)}")
return results
def fine_tune_architecture(self, architecture: ArchitectureSpec,
training_data: torch.utils.data.DataLoader,
epochs: int = 10) -> nn.Module:
"""
Fine-tune a discovered architecture on real data.
Args:
architecture: Architecture to fine-tune
training_data: Training data loader
epochs: Number of fine-tuning epochs
Returns:
Fine-tuned model
"""
print("Fine-tuning discovered architecture...")
# Build model
model = self.evaluator._build_model(architecture)
# Simple training loop (in real implementation, use proper training)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
model.train()
for epoch in range(epochs):
total_loss = 0.0
correct = 0
total = 0
for inputs, labels in training_data:
optimizer.zero_grad()
# Forward pass
try:
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
except Exception as e:
print(f"Training error: {e}")
continue
if total > 0:
accuracy = 100. * correct / total
avg_loss = total_loss / len(training_data)
print(f"Epoch {epoch+1}/{epochs}: Loss={avg_loss:.4f}, Accuracy={accuracy:.2f}%")
return model
def get_search_statistics(self) -> Dict[str, Any]:
"""Get statistics about the architecture search process"""
if not self.evaluated_architectures:
return {'status': 'no_search_performed'}
accuracies = [p.accuracy for p in self.evaluated_architectures]
training_times = [p.training_time for p in self.evaluated_architectures]
memory_usages = [p.memory_usage for p in self.evaluated_architectures]
return {
'total_architectures': len(self.evaluated_architectures),
'pareto_front_size': len(self.pareto_front),
'accuracy_stats': {
'mean': np.mean(accuracies),
'std': np.std(accuracies),
'min': np.min(accuracies),
'max': np.max(accuracies)
},
'training_time_stats': {
'mean': np.mean(training_times),
'std': np.std(training_times),
'total': np.sum(training_times)
},
'memory_stats': {
'mean': np.mean(memory_usages),
'std': np.std(memory_usages),
'max': np.max(memory_usages)
}
}
class ArchitectureSearchSystem:
"""
Complete Neural Architecture Search system with Bayesian optimization.
"""
def __init__(self, max_layers: int = 8, search_budget: int = 100):
self.controller = NASController(max_layers=max_layers, search_budget=search_budget)
self.search_history = []
def comprehensive_search(self, num_candidates: int = 50) -> Dict[str, Any]:
"""
Perform comprehensive architecture search.
Args:
num_candidates: Number of architectures to evaluate
Returns:
Complete search results
"""
print("🚀 Starting comprehensive neural architecture search...")
start_time = time.time()
# Run search
results = self.controller.search(num_candidates)
# Add metadata
results.update({
'search_duration': time.time() - start_time,
'system_info': {
'max_layers': self.controller.max_layers,
'search_space_size': len(self.controller.search_space),
'num_candidates': num_candidates
},
'search_timestamp': time.time()
})
# Store in history
self.search_history.append(results)
print(f"✅ NAS completed in {results['search_duration']:.2f} seconds")
print(f"📊 Evaluated {results['total_evaluated']} architectures")
print(f"🏆 Best accuracy: {results['best_performance']['accuracy']:.4f}")
return results
def get_search_history(self) -> List[Dict]:
"""Get history of all architecture searches"""
return self.search_history
def export_results(self, results: Dict, filename: str = None) -> str:
"""Export search results to JSON file"""
if filename is None:
timestamp = int(time.time())
filename = f"nas_results_{timestamp}.json"
# Convert numpy arrays to lists for JSON serialization
def serialize_obj(obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, torch.Tensor):
return obj.tolist()
elif hasattr(obj, '__dict__'):
return str(obj) # Simplified serialization
else:
return obj
serializable_results = json.loads(json.dumps(results, default=serialize_obj))
with open(filename, 'w') as f:
json.dump(serializable_results, f, indent=2)
print(f"📄 Results exported to {filename}")
return filename
# Legacy compatibility functions
def sample_architecture():
"""Legacy function for backward compatibility"""
system = ArchitectureSearchSystem()
results = system.comprehensive_search(num_candidates=10)
return results['best_architecture']
def comprehensive_search():
"""Legacy function for backward compatibility"""
system = ArchitectureSearchSystem()
return system.comprehensive_search()
def reset(self):
"""Reset controller state"""
self.hidden = None
def sample_architecture(self) -> ArchitectureSpec:
"""Sample an architecture from the controller"""
self.reset()
layers = []
connections = []
layer_count = 0
while layer_count < self.max_layers:
# Decide whether to add a layer
if self.hidden is None:
hidden = torch.zeros(1, 32)
cell = torch.zeros(1, 32)
else:
hidden, cell = self.hidden
logits = self.layer_decision(hidden)
probs = torch.softmax(logits, dim=-1)
decision = torch.multinomial(probs, 1).item()
if decision == 0: # Stop
break
# Choose operation for this layer
op_logits = self.operation_decision(hidden)
op_probs = torch.softmax(op_logits, dim=-1)
operation = torch.multinomial(op_probs, 1).item()
# Add layer
layer_spec = {
"type": self._operation_to_type(operation),
"params": self._get_operation_params(operation)
}
layers.append(layer_spec)
# Decide connections (simplified: connect to previous layer)
if layer_count > 0:
connections.append((layer_count - 1, layer_count))
# Update controller state
self.hidden = self.controller(torch.zeros(1, 32), (hidden, cell))
layer_count += 1
return ArchitectureSpec(
layers=layers,
connections=connections,
input_size=784, # MNIST-like
output_size=10
)
def _operation_to_type(self, op_idx: int) -> str:
"""Convert operation index to layer type"""
operations = ["conv", "linear", "pool", "dropout", "batch_norm"]
return operations[op_idx % len(operations)]
def _get_operation_params(self, op_idx: int) -> Dict[str, Any]:
"""Get parameters for operation"""
if op_idx == 0: # conv
return {"out_channels": 32, "kernel_size": 3}
elif op_idx == 1: # linear
return {"out_features": 128}
else:
return {}
class ArchitectureBuilder:
"""
Builds PyTorch models from architecture specifications.
"""
def __init__(self):
self.operation_map = {
"conv": self._build_conv,
"linear": self._build_linear,
"pool": self._build_pool,
"dropout": self._build_dropout,
"batch_norm": self._build_batch_norm
}
def build_model(self, spec: ArchitectureSpec) -> nn.Module:
"""Build PyTorch model from specification"""
layers = []
# Input layer
current_size = spec.input_size
for i, layer_spec in enumerate(spec.layers):
layer_type = layer_spec["type"]
params = layer_spec["params"]
if layer_type in self.operation_map:
layer = self.operation_map[layer_type](current_size, params)
layers.append(layer)
# Update current size (simplified)
if layer_type == "linear":
current_size = params.get("out_features", current_size)
elif layer_type == "conv":
# Simplified size calculation
current_size = params.get("out_channels", 32)
# Output layer
if layers:
layers.append(nn.Linear(current_size, spec.output_size))
else:
layers.append(nn.Linear(spec.input_size, spec.output_size))
return nn.Sequential(*layers)
def _build_conv(self, input_size: int, params: Dict) -> nn.Module:
"""Build convolutional layer"""
out_channels = params.get("out_channels", 32)
kernel_size = params.get("kernel_size", 3)
return nn.Conv2d(input_size, out_channels, kernel_size, padding=kernel_size//2)
def _build_linear(self, input_size: int, params: Dict) -> nn.Module:
"""Build linear layer"""
out_features = params.get("out_features", 128)
return nn.Linear(input_size, out_features)
def _build_pool(self, input_size: int, params: Dict) -> nn.Module:
"""Build pooling layer"""
return nn.AdaptiveAvgPool2d((1, 1))
def _build_dropout(self, input_size: int, params: Dict) -> nn.Module:
"""Build dropout layer"""
p = params.get("p", 0.5)
return nn.Dropout(p)
def _build_batch_norm(self, input_size: int, params: Dict) -> nn.Module:
"""Build batch normalization"""
return nn.BatchNorm2d(input_size)
class BayesianOptimizer:
"""
Bayesian optimization for hyperparameter tuning.
"""
def __init__(self, param_space: Dict[str, Tuple[float, float]]):
self.param_space = param_space
self.observations = []
self.gp_model = None # Would use GPy or similar
def suggest(self) -> Dict[str, float]:
"""Suggest next hyperparameters to try"""
if len(self.observations) < 5:
# Random sampling initially
return {name: random.uniform(low, high)
for name, (low, high) in self.param_space.items()}
# Would use Gaussian Process to suggest optimal point
# For now, return random
return {name: random.uniform(low, high)
for name, (low, high) in self.param_space.items()}
def observe(self, params: Dict[str, float], score: float):
"""Observe result of parameter evaluation"""
self.observations.append((params, score))
class EvolutionarySearch:
"""
Evolutionary algorithm for architecture search.
"""
def __init__(self, population_size: int = 20, mutation_rate: float = 0.1):
self.population_size = population_size
self.mutation_rate = mutation_rate
self.population = []
self.fitness_scores = {}
def initialize_population(self, base_spec: ArchitectureSpec):
"""Initialize population with variations of base architecture"""
self.population = []
for _ in range(self.population_size):
# Create variation of base spec
variation = self._mutate_architecture(base_spec)
self.population.append(variation)
def evolve(self, generations: int = 10, fitness_fn: Callable[[ArchitectureSpec], float] = None):
"""Run evolutionary search"""
for generation in range(generations):
# Evaluate fitness
fitness_scores = {}
for spec in self.population:
if fitness_fn:
score = fitness_fn(spec)
else:
score = random.random() # Random baseline
fitness_scores[id(spec)] = score
# Select best
sorted_pop = sorted(self.population,
key=lambda x: fitness_scores[id(x)],
reverse=True)
# Keep top half
elite = sorted_pop[:self.population_size // 2]
# Create offspring through mutation
offspring = []
for parent in elite:
child = self._mutate_architecture(parent)
offspring.append(child)
# New population
self.population = elite + offspring
# Return best
return max(self.population, key=lambda x: fitness_scores.get(id(x), 0))
def _mutate_architecture(self, spec: ArchitectureSpec) -> ArchitectureSpec:
"""Mutate an architecture specification"""
new_spec = ArchitectureSpec(
layers=spec.layers.copy(),
connections=spec.connections.copy(),
input_size=spec.input_size,
output_size=spec.output_size
)
# Random mutations
if random.random() < self.mutation_rate:
# Add/remove layer
if random.random() < 0.5 and len(new_spec.layers) < 10:
# Add layer
new_layer = {"type": "linear", "params": {"out_features": 64}}
new_spec.layers.append(new_layer)
elif len(new_spec.layers) > 1:
# Remove layer
idx = random.randint(0, len(new_spec.layers) - 1)
del new_spec.layers[idx]
return new_spec
class DARTSSearch:
"""
Differentiable Architecture Search (DARTS) implementation.
"""
def __init__(self, num_operations: int = 4, num_nodes: int = 4):
self.num_operations = num_operations
self.num_nodes = num_nodes
# Architecture parameters (learnable)
self.alpha_normal = nn.Parameter(torch.randn(num_nodes, num_operations))
self.alpha_reduce = nn.Parameter(torch.randn(num_nodes, num_operations))
def get_operations(self, node_idx: int, normal: bool = True) -> List[Tuple[float, nn.Module]]:
"""Get weighted operations for a node"""
alphas = self.alpha_normal if normal else self.alpha_reduce
operations = [
(alphas[node_idx, 0], lambda: nn.Identity()),
(alphas[node_idx, 1], lambda: nn.Conv2d(32, 32, 3, padding=1)),
(alphas[node_idx, 2], lambda: nn.Conv2d(32, 32, 5, padding=2)),
(alphas[node_idx, 3], lambda: nn.AvgPool2d(3, stride=1, padding=1))
]
return operations
def derive_final_architecture(self) -> ArchitectureSpec:
"""Derive final discrete architecture from learned parameters"""
layers = []
# Normal cells
for node in range(self.num_nodes):
# Find best operation
weights = torch.softmax(self.alpha_normal[node], dim=0)
best_op = torch.argmax(weights).item()
layer_spec = {
"type": self._operation_idx_to_type(best_op),
"params": self._get_operation_params(best_op)
}
layers.append(layer_spec)
return ArchitectureSpec(
layers=layers,
connections=[(i, i+1) for i in range(len(layers)-1)],
input_size=784,
output_size=10
)
def _operation_idx_to_type(self, idx: int) -> str:
"""Convert operation index to type"""
types = ["identity", "conv3", "conv5", "pool"]
return types[idx]
def _get_operation_params(self, idx: int) -> Dict[str, Any]:
"""Get parameters for operation"""
if idx == 1: # conv3
return {"out_channels": 32, "kernel_size": 3, "padding": 1}
elif idx == 2: # conv5
return {"out_channels": 32, "kernel_size": 5, "padding": 2}
elif idx == 3: # pool
return {"kernel_size": 3, "stride": 1, "padding": 1}
else:
return {}
class ArchitectureSearchSystem:
"""
Complete architecture search system with multiple methods.
"""
def __init__(self):
self.controller = NASController()
self.builder = ArchitectureBuilder()
self.bayesian_opt = BayesianOptimizer({
"learning_rate": (1e-5, 1e-1),
"batch_size": (16, 256),
"hidden_size": (64, 512)
})
self.evolutionary = EvolutionarySearch()
self.darts = DARTSSearch()
def search_controller_based(self, num_samples: int = 10,
evaluation_fn: Callable[[nn.Module], float] = None) -> ArchitectureSpec:
"""
Search using controller-based NAS.
"""
best_spec = None
best_score = float('-inf')
for _ in range(num_samples):
# Sample architecture
spec = self.controller.sample_architecture()
# Build and evaluate
model = self.builder.build_model(spec)
if evaluation_fn:
score = evaluation_fn(model)
else:
score = random.random() # Placeholder
if score > best_score:
best_spec = spec
best_score = score
return best_spec
def search_evolutionary(self, base_spec: ArchitectureSpec, generations: int = 5,
evaluation_fn: Callable[[ArchitectureSpec], float] = None) -> ArchitectureSpec:
"""
Search using evolutionary algorithms.
"""
self.evolutionary.initialize_population(base_spec)
return self.evolutionary.evolve(generations, evaluation_fn)
def search_darts(self, train_loader, val_loader, num_epochs: int = 10) -> ArchitectureSpec:
"""
Search using DARTS.
"""
optimizer = optim.Adam([
{'params': self.darts.parameters(), 'lr': 1e-3}
])
# Simplified DARTS training loop
for epoch in range(num_epochs):
for batch_x, batch_y in train_loader:
optimizer.zero_grad()
# Forward through DARTS model
# (This would be the actual DARTS supernet)
loss = torch.tensor(1.0, requires_grad=True) # Placeholder
loss.backward()
optimizer.step()
return self.darts.derive_final_architecture()
def optimize_hyperparameters(self, model_class: Callable, train_fn: Callable,
num_trials: int = 20) -> Dict[str, float]:
"""
Optimize hyperparameters using Bayesian optimization.
"""
best_params = None
best_score = float('-inf')
for trial in range(num_trials):
# Suggest parameters
params = self.bayesian_opt.suggest()
# Evaluate
model = model_class(**params)
score = train_fn(model, params)
# Observe result
self.bayesian_opt.observe(params, score)
if score > best_score:
best_params = params
best_score = score
return best_params
def comprehensive_search(self, base_spec: ArchitectureSpec = None,
evaluation_fn: Callable = None) -> Dict[str, Any]:
"""
Comprehensive architecture search using multiple methods.
"""
results = {}
# Create base spec if not provided
if base_spec is None:
base_spec = ArchitectureSpec(
layers=[{"type": "linear", "params": {"out_features": 128}}],
connections=[],
input_size=784,
output_size=10
)
# Controller-based search
results["controller"] = self.search_controller_based(5, evaluation_fn)
# Evolutionary search
results["evolutionary"] = self.search_evolutionary(base_spec, 3, evaluation_fn)
# DARTS (simplified)
results["darts"] = self.darts.derive_final_architecture()
# Find best overall
best_score = float('-inf')
best_spec = None
for method, spec in results.items():
if evaluation_fn:
score = evaluation_fn(self.builder.build_model(spec))
else:
score = random.random()
if score > best_score:
best_score = score
best_spec = spec
results["best"] = best_spec
results["best_score"] = best_score
return results