SYNAPSIS / modules /swarm_intelligence.py
Lukeetah's picture
Update modules/swarm_intelligence.py
053d6b0 verified
"""
Sistema de Inteligencia de Enjambre
Implementa algoritmos de optimizaci贸n colectiva inspirados en comportamientos naturales
"""
import numpy as np
import random
from typing import Dict, List, Tuple, Callable, Any, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
import threading
import time
from datetime import datetime
@dataclass
class Agent:
"""Agente individual en el enjambre"""
id: str
position: np.ndarray
velocity: np.ndarray
best_position: np.ndarray
best_fitness: float
fitness: float
role: str = "explorer"
@dataclass
class SwarmState:
"""Estado actual del enjambre"""
global_best_position: np.ndarray
global_best_fitness: float
agents: List[Agent]
iteration: int
convergence_rate: float
diversity_measure: float
class OptimizationAlgorithm(ABC):
"""Clase base para algoritmos de optimizaci贸n"""
@abstractmethod
def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent:
pass
@abstractmethod
def get_algorithm_name(self) -> str:
pass
class ParticleSwarmOptimization(OptimizationAlgorithm):
"""Algoritmo de Optimizaci贸n por Enjambre de Part铆culas"""
def __init__(self, w: float = 0.729, c1: float = 1.494, c2: float = 1.494):
self.w = w # factor de inercia
self.c1 = c1 # coeficiente cognitivo
self.c2 = c2 # coeficiente social
def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent:
"""Actualiza posici贸n y velocidad de un agente PSO"""
r1, r2 = random.random(), random.random()
# Actualizar velocidad
cognitive_component = self.c1 * r1 * (agent.best_position - agent.position)
social_component = self.c2 * r2 * (swarm_state.global_best_position - agent.position)
agent.velocity = (self.w * agent.velocity +
cognitive_component +
social_component)
# Actualizar posici贸n
agent.position = agent.position + agent.velocity
return agent
def get_algorithm_name(self) -> str:
return "Particle Swarm Optimization"
class AntColonyOptimization(OptimizationAlgorithm):
"""Algoritmo de Optimizaci贸n por Colonia de Hormigas"""
def __init__(self, alpha: float = 1.0, beta: float = 2.0, rho: float = 0.1):
self.alpha = alpha # importancia del feromona
self.beta = beta # importancia de la heur铆stica
self.rho = rho # tasa de evaporaci贸n
self.pheromone_matrix = None
def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent:
"""Actualiza posici贸n basada en feromonas"""
if self.pheromone_matrix is None:
self._initialize_pheromone_matrix(len(agent.position))
# Selecci贸n probabil铆stica basada en feromonas
probabilities = self._calculate_probabilities(agent.position)
new_position = self._select_next_position(agent.position, probabilities)
agent.position = new_position
return agent
def _initialize_pheromone_matrix(self, size: int):
"""Inicializa matriz de feromonas"""
self.pheromone_matrix = np.ones((size, size)) * 0.1
def _calculate_probabilities(self, position: np.ndarray) -> np.ndarray:
"""Calcula probabilidades de transici贸n"""
if self.pheromone_matrix is None:
return np.ones(len(position)) / len(position)
# Simplificaci贸n: usa feromonas locales
pheromones = np.diag(self.pheromone_matrix)
heuristic = 1.0 / (1.0 + np.abs(position)) # heur铆stica simple
numerator = (pheromones ** self.alpha) * (heuristic ** self.beta)
return numerator / np.sum(numerator)
def _select_next_position(self, current_pos: np.ndarray,
probabilities: np.ndarray) -> np.ndarray:
"""Selecciona siguiente posici贸n"""
# Movimiento basado en probabilidades
direction = np.random.choice(len(probabilities), p=probabilities)
step = np.zeros_like(current_pos)
step[direction % len(current_pos)] = np.random.normal(0, 0.1)
return current_pos + step
def get_algorithm_name(self) -> str:
return "Ant Colony Optimization"
class GeneticAlgorithm(OptimizationAlgorithm):
"""Algoritmo Gen茅tico"""
def __init__(self, mutation_rate: float = 0.1, crossover_rate: float = 0.8):
self.mutation_rate = mutation_rate
self.crossover_rate = crossover_rate
def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent:
"""Actualiza agente mediante operaciones gen茅ticas"""
# Selecci贸n de padres
parents = self._select_parents(swarm_state.agents)
# Cruzamiento
if random.random() < self.crossover_rate and len(parents) >= 2:
agent.position = self._crossover(parents[0].position, parents[1].position)
# Mutaci贸n
if random.random() < self.mutation_rate:
agent.position = self._mutate(agent.position)
return agent
def _select_parents(self, agents: List[Agent]) -> List[Agent]:
"""Selecci贸n de padres por torneo"""
tournament_size = min(3, len(agents))
tournament = random.sample(agents, tournament_size)
return sorted(tournament, key=lambda a: a.fitness, reverse=True)[:2]
def _crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> np.ndarray:
"""Cruzamiento uniforme"""
mask = np.random.random(len(parent1)) < 0.5
child = np.where(mask, parent1, parent2)
return child
def _mutate(self, individual: np.ndarray) -> np.ndarray:
"""Mutaci贸n gaussiana"""
mutation_mask = np.random.random(len(individual)) < self.mutation_rate
mutations = np.random.normal(0, 0.1, len(individual))
return individual + mutation_mask * mutations
def get_algorithm_name(self) -> str:
return "Genetic Algorithm"
class ConsensusBuilder:
"""Constructor de consenso para decisiones colectivas"""
def __init__(self, convergence_threshold: float = 0.01):
self.convergence_threshold = convergence_threshold
self.voting_history = []
def build_consensus(self, agent_opinions: List[np.ndarray]) -> Dict[str, Any]:
"""Construye consenso a partir de opiniones de agentes"""
if not agent_opinions:
return {"consensus": None, "confidence": 0.0}
# Calcula centroide como consenso inicial
consensus = np.mean(agent_opinions, axis=0)
# Mide dispersi贸n para evaluar confianza
distances = [np.linalg.norm(opinion - consensus)
for opinion in agent_opinions]
avg_distance = np.mean(distances)
confidence = 1.0 / (1.0 + avg_distance)
# Identifica agentes disidentes
outliers = [i for i, dist in enumerate(distances)
if dist > 2 * np.std(distances)]
# Proceso iterativo de refinamiento
if len(outliers) > 0 and len(agent_opinions) > 3:
# Recalcula consenso sin outliers extremos
filtered_opinions = [op for i, op in enumerate(agent_opinions)
if i not in outliers[:len(outliers)//2]]
if filtered_opinions:
refined_consensus = np.mean(filtered_opinions, axis=0)
consensus = 0.7 * consensus + 0.3 * refined_consensus
return {
"consensus": consensus,
"confidence": confidence,
"participation_rate": len(agent_opinions),
"outliers": len(outliers),
"convergence_achieved": avg_distance < self.convergence_threshold
}
class SwarmIntelligence:
"""Sistema principal de inteligencia de enjambre"""
def __init__(self,
swarm_size: int = 50,
dimensions: int = 10,
max_iterations: int = 100):
self.swarm_size = swarm_size
self.dimensions = dimensions
self.max_iterations = max_iterations
# Algoritmos disponibles
self.algorithms = {
'pso': ParticleSwarmOptimization(),
'aco': AntColonyOptimization(),
'ga': GeneticAlgorithm()
}
self.consensus_builder = ConsensusBuilder()
self.swarm_state = None
self.optimization_history = []
# Control de ejecuci贸n
self._running = False
self._lock = threading.Lock()
def initialize_swarm(self,
bounds: Tuple[float, float] = (-10, 10),
fitness_function: Optional[Callable] = None) -> SwarmState:
"""Inicializa el enjambre"""
agents = []
for i in range(self.swarm_size):
# Posici贸n inicial aleatoria
position = np.random.uniform(bounds[0], bounds[1], self.dimensions)
velocity = np.random.uniform(-1, 1, self.dimensions)
agent = Agent(
id=f"agent_{i}",
position=position,
velocity=velocity,
best_position=position.copy(),
best_fitness=float('-inf'),
fitness=float('-inf'),
role=self._assign_role(i)
)
# Evaluar fitness inicial
if fitness_function:
agent.fitness = fitness_function(agent.position)
agent.best_fitness = agent.fitness
agents.append(agent)
# Encontrar mejor global
best_agent = max(agents, key=lambda a: a.fitness)
self.swarm_state = SwarmState(
global_best_position=best_agent.position.copy(),
global_best_fitness=best_agent.fitness,
agents=agents,
iteration=0,
convergence_rate=0.0,
diversity_measure=self._calculate_diversity(agents)
)
return self.swarm_state
def optimize(self,
fitness_function: Callable[[np.ndarray], float],
algorithm: str = 'pso',
bounds: Tuple[float, float] = (-10, 10),
parallel: bool = True) -> Dict[str, Any]:
"""Ejecuta optimizaci贸n con el algoritmo especificado"""
if algorithm not in self.algorithms:
raise ValueError(f"Algorithm {algorithm} not available")
if self.swarm_state is None:
self.initialize_swarm(bounds, fitness_function)
optimizer = self.algorithms[algorithm]
self._running = True
try:
for iteration in range(self.max_iterations):
if not self._running:
break
# Actualizar agentes
if parallel:
self._update_agents_parallel(optimizer, fitness_function)
else:
self._update_agents_sequential(optimizer, fitness_function)
# Actualizar estado del enjambre
self._update_swarm_state(iteration)
# Guardar historial
self.optimization_history.append({
'iteration': iteration,
'best_fitness': self.swarm_state.global_best_fitness,
'diversity': self.swarm_state.diversity_measure,
'algorithm': algorithm
})
# Verificar convergencia
if self._check_convergence():
break
finally:
self._running = False
return self._get_optimization_results()
def collective_decision_making(self,
decision_space: List[np.ndarray],
agent_preferences: Dict[str, np.ndarray]) -> Dict[str, Any]:
"""Toma de decisiones colectiva"""
if not self.swarm_state:
raise ValueError("Swarm not initialized")
# Recopilar opiniones de agentes
agent_opinions = []
for agent in self.swarm_state.agents:
if agent.id in agent_preferences:
# Usar preferencias espec铆ficas del agente
opinion = agent_preferences[agent.id]
else:
# Generar opini贸n basada en posici贸n del agente
opinion = self._generate_agent_opinion(agent, decision_space)
agent_opinions.append(opinion)
# Construir consenso
consensus_result = self.consensus_builder.build_consensus(agent_opinions)
# M茅tricas de calidad de decisi贸n
decision_quality = self._assess_decision_quality(
consensus_result, agent_opinions
)
return {
'decision': consensus_result['consensus'],
'confidence': consensus_result['confidence'],
'quality_metrics': decision_quality,
'participation': len(agent_opinions),
'algorithm_used': 'collective_consensus'
}
def adaptive_swarm_behavior(self,
environment_feedback: Dict[str, float]) -> None:
"""Adapta comportamiento del enjambre seg煤n retroalimentaci贸n"""
if not self.swarm_state:
return
# Analizar feedback del ambiente
performance_score = environment_feedback.get('performance', 0.5)
complexity_score = environment_feedback.get('complexity', 0.5)
resource_availability = environment_feedback.get('resources', 1.0)
# Adaptar roles de agentes
self._adapt_agent_roles(performance_score, complexity_score)
# Ajustar par谩metros de algoritmos
self._adapt_algorithm_parameters(performance_score, resource_availability)
# Modificar estructura del enjambre si necesario
if performance_score < 0.3:
self._restructure_swarm()
def get_swarm_analytics(self) -> Dict[str, Any]:
"""Obtiene analytics del enjambre"""
if not self.swarm_state:
return {"status": "Swarm not initialized"}
# Estad铆sticas de agentes
fitnesses = [agent.fitness for agent in self.swarm_state.agents]
# An谩lisis de convergencia
convergence_analysis = self._analyze_convergence()
# Diversidad y exploraci贸n
exploration_metrics = self._calculate_exploration_metrics()
return {
'swarm_size': len(self.swarm_state.agents),
'current_iteration': self.swarm_state.iteration,
'best_fitness': self.swarm_state.global_best_fitness,
'average_fitness': np.mean(fitnesses),
'fitness_std': np.std(fitnesses),
'diversity_measure': self.swarm_state.diversity_measure,
'convergence_rate': self.swarm_state.convergence_rate,
'convergence_analysis': convergence_analysis,
'exploration_metrics': exploration_metrics,
'agent_roles': self._get_role_distribution()
}
# M茅todos auxiliares privados
def _assign_role(self, agent_index: int) -> str:
"""Asigna rol inicial a agente"""
roles = ['explorer', 'exploiter', 'scout', 'coordinator']
return roles[agent_index % len(roles)]
def _calculate_diversity(self, agents: List[Agent]) -> float:
"""Calcula medida de diversidad del enjambre"""
if len(agents) < 2:
return 0.0
positions = np.array([agent.position for agent in agents])
center = np.mean(positions, axis=0)
distances = [np.linalg.norm(pos - center) for pos in positions]
return np.mean(distances)
def _update_agents_parallel(self,
optimizer: OptimizationAlgorithm,
fitness_function: Callable) -> None:
"""Actualiza agentes en paralelo"""
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for agent in self.swarm_state.agents:
future = executor.submit(
self._update_single_agent,
agent, optimizer, fitness_function
)
futures.append(future)
# Recopilar resultados
for i, future in enumerate(futures):
self.swarm_state.agents[i] = future.result()
def _update_agents_sequential(self,
optimizer: OptimizationAlgorithm,
fitness_function: Callable) -> None:
"""Actualiza agentes secuencialmente"""
for i, agent in enumerate(self.swarm_state.agents):
self.swarm_state.agents[i] = self._update_single_agent(
agent, optimizer, fitness_function
)
def _update_single_agent(self,
agent: Agent,
optimizer: OptimizationAlgorithm,
fitness_function: Callable) -> Agent:
"""Actualiza un agente individual"""
# Actualizar posici贸n con algoritmo
updated_agent = optimizer.update_agent(agent, self.swarm_state)
# Evaluar nuevo fitness
updated_agent.fitness = fitness_function(updated_agent.position)
# Actualizar mejor personal
if updated_agent.fitness > updated_agent.best_fitness:
updated_agent.best_fitness = updated_agent.fitness
updated_agent.best_position = updated_agent.position.copy()
return updated_agent
def _update_swarm_state(self, iteration: int) -> None:
"""Actualiza estado global del enjambre"""
# Encontrar nuevo mejor global
best_agent = max(self.swarm_state.agents, key=lambda a: a.fitness)
if best_agent.fitness > self.swarm_state.global_best_fitness:
self.swarm_state.global_best_fitness = best_agent.fitness
self.swarm_state.global_best_position = best_agent.position.copy()
# Actualizar m茅tricas
self.swarm_state.iteration = iteration
self.swarm_state.diversity_measure = self._calculate_diversity(
self.swarm_state.agents
)
self.swarm_state.convergence_rate = self._calculate_convergence_rate()
def _calculate_convergence_rate(self) -> float:
"""Calcula tasa de convergencia"""
if len(self.optimization_history) < 2:
return 0.0
recent_improvements = []
for i in range(1, min(10, len(self.optimization_history))):
current = self.optimization_history[-i]['best_fitness']
previous = self.optimization_history[-i-1]['best_fitness']
improvement = current - previous
recent_improvements.append(improvement)
return np.mean(recent_improvements) if recent_improvements else 0.0
def _check_convergence(self, tolerance: float = 1e-6) -> bool:
"""Verifica convergencia del enjambre"""
if len(self.optimization_history) < 10:
return False
recent_bests = [
entry['best_fitness']
for entry in self.optimization_history[-10:]
]
return np.std(recent_bests) < tolerance
def _get_optimization_results(self) -> Dict[str, Any]:
"""Obtiene resultados de optimizaci贸n"""
return {
'best_position': self.swarm_state.global_best_position,
'best_fitness': self.swarm_state.global_best_fitness,
'iterations': self.swarm_state.iteration,
'convergence_achieved': self._check_convergence(),
'final_diversity': self.swarm_state.diversity_measure,
'optimization_history': self.optimization_history
}
def _generate_agent_opinion(self,
agent: Agent,
decision_space: List[np.ndarray]) -> np.ndarray:
"""Genera opini贸n de agente para decisi贸n colectiva"""
# Selecciona opci贸n m谩s cercana a posici贸n del agente
distances = [
np.linalg.norm(agent.position - option)
for option in decision_space
]
# A帽ade ruido basado en rol del agente
noise_factor = {
'explorer': 0.3,
'exploiter': 0.1,
'scout': 0.5,
'coordinator': 0.2
}.get(agent.role, 0.2)
best_option_idx = np.argmin(distances)
opinion = decision_space[best_option_idx].copy()
# A帽adir ruido seg煤n rol
noise = np.random.normal(0, noise_factor, len(opinion))
opinion += noise
return opinion
def _assess_decision_quality(self,
consensus_result: Dict[str, Any],
agent_opinions: List[np.ndarray]) -> Dict[str, float]:
"""Eval煤a calidad de decisi贸n colectiva"""
if not agent_opinions:
return {"quality_score": 0.0}
consensus = consensus_result['consensus']
# M茅tricas de calidad
quality_metrics = {}
# 1. Satisfacci贸n promedio
satisfactions = [
1.0 / (1.0 + np.linalg.norm(opinion - consensus))
for opinion in agent_opinions
]
quality_metrics['average_satisfaction'] = np.mean(satisfactions)
# 2. Equidad (distribuci贸n de satisfacciones)
quality_metrics['fairness'] = 1.0 - np.std(satisfactions)
# 3. Robustez (sensibilidad a outliers)
outlier_opinions = [
op for op in agent_opinions
if np.linalg.norm(op - consensus) > 2 * np.std([
np.linalg.norm(o - consensus) for o in agent_opinions
])
]
quality_metrics['robustness'] = 1.0 - len(outlier_opinions) / len(agent_opinions)
# 4. Score compuesto
quality_metrics['quality_score'] = (
0.4 * quality_metrics['average_satisfaction'] +
0.3 * quality_metrics['fairness'] +
0.3 * quality_metrics['robustness']
)
return quality_metrics
def _adapt_agent_roles(self, performance_score: float, complexity_score: float):
"""Adapta roles de agentes seg煤n performance"""
if performance_score < 0.4:
# Incrementar exploradores si performance es baja
for agent in self.swarm_state.agents[:len(self.swarm_state.agents)//3]:
agent.role = 'explorer'
elif performance_score > 0.8:
# Incrementar explotadores si performance es alta
for agent in self.swarm_state.agents[:len(self.swarm_state.agents)//2]:
agent.role = 'exploiter'
def _adapt_algorithm_parameters(self, performance_score: float, resource_availability: float):
"""Adapta par谩metros de algoritmos"""
# Adaptar PSO
if 'pso' in self.algorithms:
pso = self.algorithms['pso']
if performance_score < 0.5:
pso.w *= 1.1 # Incrementar exploraci贸n
pso.c1 *= 0.9 # Reducir componente cognitivo
else:
pso.w *= 0.95 # Reducir exploraci贸n
pso.c2 *= 1.05 # Incrementar componente social
def _restructure_swarm(self):
"""Reestructura enjambre si performance es muy baja"""
# Reinicializar 30% de agentes con peor performance
worst_agents = sorted(
self.swarm_state.agents,
key=lambda a: a.fitness
)[:len(self.swarm_state.agents)//3]
for agent in worst_agents:
agent.position = np.random.uniform(-10, 10, self.dimensions)
agent.velocity = np.random.uniform(-1, 1, self.dimensions)
agent.fitness = float('-inf')
def _analyze_convergence(self) -> Dict[str, Any]:
"""Analiza patrones de convergencia"""
if len(self.optimization_history) < 5:
return {"status": "Insufficient data"}
# Analizar tendencias
fitness_trend = [entry['best_fitness'] for entry in self.optimization_history[-10:]]
diversity_trend = [entry['diversity'] for entry in self.optimization_history[-10:]]
return {
'fitness_improvement_rate': np.mean(np.diff(fitness_trend)),
'diversity_trend': 'decreasing' if np.mean(np.diff(diversity_trend)) < 0 else 'stable',
'convergence_speed': 'fast' if abs(np.mean(np.diff(fitness_trend))) > 0.1 else 'slow',
'premature_convergence_risk': 'high' if np.mean(diversity_trend) < 0.1 else 'low'
}
def _calculate_exploration_metrics(self) -> Dict[str, float]:
"""Calcula m茅tricas de exploraci贸n"""
if not self.swarm_state:
return {}
positions = np.array([agent.position for agent in self.swarm_state.agents])
# Cobertura del espacio de b煤squeda
ranges = np.max(positions, axis=0) - np.min(positions, axis=0)
space_coverage = np.mean(ranges)
# Distribuci贸n de agentes
center = np.mean(positions, axis=0)
distances_to_center = [np.linalg.norm(pos - center) for pos in positions]
distribution_uniformity = 1.0 - np.std(distances_to_center) / np.mean(distances_to_center)
return {
'space_coverage': space_coverage,
'distribution_uniformity': distribution_uniformity,
'exploration_pressure': np.mean([
1.0 if agent.role in ['explorer', 'scout'] else 0.0
for agent in self.swarm_state.agents
])
}
def _get_role_distribution(self) -> Dict[str, int]:
"""Obtiene distribuci贸n de roles en el enjambre"""
if not self.swarm_state:
return {}
roles = {}
for agent in self.swarm_state.agents:
roles[agent.role] = roles.get(agent.role, 0) + 1
return roles
def stop_optimization(self):
"""Detiene optimizaci贸n en curso"""
self._running = False