""" Sistema de Inteligencia de Enjambre Implementa algoritmos de optimización colectiva inspirados en comportamientos naturales """ import numpy as np import random from typing import Dict, List, Tuple, Callable, Any, Optional from dataclasses import dataclass from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor import threading import time from datetime import datetime @dataclass class Agent: """Agente individual en el enjambre""" id: str position: np.ndarray velocity: np.ndarray best_position: np.ndarray best_fitness: float fitness: float role: str = "explorer" @dataclass class SwarmState: """Estado actual del enjambre""" global_best_position: np.ndarray global_best_fitness: float agents: List[Agent] iteration: int convergence_rate: float diversity_measure: float class OptimizationAlgorithm(ABC): """Clase base para algoritmos de optimización""" @abstractmethod def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent: pass @abstractmethod def get_algorithm_name(self) -> str: pass class ParticleSwarmOptimization(OptimizationAlgorithm): """Algoritmo de Optimización por Enjambre de Partículas""" def __init__(self, w: float = 0.729, c1: float = 1.494, c2: float = 1.494): self.w = w # factor de inercia self.c1 = c1 # coeficiente cognitivo self.c2 = c2 # coeficiente social def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent: """Actualiza posición y velocidad de un agente PSO""" r1, r2 = random.random(), random.random() # Actualizar velocidad cognitive_component = self.c1 * r1 * (agent.best_position - agent.position) social_component = self.c2 * r2 * (swarm_state.global_best_position - agent.position) agent.velocity = (self.w * agent.velocity + cognitive_component + social_component) # Actualizar posición agent.position = agent.position + agent.velocity return agent def get_algorithm_name(self) -> str: return "Particle Swarm Optimization" class AntColonyOptimization(OptimizationAlgorithm): """Algoritmo de Optimización por Colonia de Hormigas""" def __init__(self, alpha: float = 1.0, beta: float = 2.0, rho: float = 0.1): self.alpha = alpha # importancia del feromona self.beta = beta # importancia de la heurística self.rho = rho # tasa de evaporación self.pheromone_matrix = None def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent: """Actualiza posición basada en feromonas""" if self.pheromone_matrix is None: self._initialize_pheromone_matrix(len(agent.position)) # Selección probabilística basada en feromonas probabilities = self._calculate_probabilities(agent.position) new_position = self._select_next_position(agent.position, probabilities) agent.position = new_position return agent def _initialize_pheromone_matrix(self, size: int): """Inicializa matriz de feromonas""" self.pheromone_matrix = np.ones((size, size)) * 0.1 def _calculate_probabilities(self, position: np.ndarray) -> np.ndarray: """Calcula probabilidades de transición""" if self.pheromone_matrix is None: return np.ones(len(position)) / len(position) # Simplificación: usa feromonas locales pheromones = np.diag(self.pheromone_matrix) heuristic = 1.0 / (1.0 + np.abs(position)) # heurística simple numerator = (pheromones ** self.alpha) * (heuristic ** self.beta) return numerator / np.sum(numerator) def _select_next_position(self, current_pos: np.ndarray, probabilities: np.ndarray) -> np.ndarray: """Selecciona siguiente posición""" # Movimiento basado en probabilidades direction = np.random.choice(len(probabilities), p=probabilities) step = np.zeros_like(current_pos) step[direction % len(current_pos)] = np.random.normal(0, 0.1) return current_pos + step def get_algorithm_name(self) -> str: return "Ant Colony Optimization" class GeneticAlgorithm(OptimizationAlgorithm): """Algoritmo Genético""" def __init__(self, mutation_rate: float = 0.1, crossover_rate: float = 0.8): self.mutation_rate = mutation_rate self.crossover_rate = crossover_rate def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent: """Actualiza agente mediante operaciones genéticas""" # Selección de padres parents = self._select_parents(swarm_state.agents) # Cruzamiento if random.random() < self.crossover_rate and len(parents) >= 2: agent.position = self._crossover(parents[0].position, parents[1].position) # Mutación if random.random() < self.mutation_rate: agent.position = self._mutate(agent.position) return agent def _select_parents(self, agents: List[Agent]) -> List[Agent]: """Selección de padres por torneo""" tournament_size = min(3, len(agents)) tournament = random.sample(agents, tournament_size) return sorted(tournament, key=lambda a: a.fitness, reverse=True)[:2] def _crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> np.ndarray: """Cruzamiento uniforme""" mask = np.random.random(len(parent1)) < 0.5 child = np.where(mask, parent1, parent2) return child def _mutate(self, individual: np.ndarray) -> np.ndarray: """Mutación gaussiana""" mutation_mask = np.random.random(len(individual)) < self.mutation_rate mutations = np.random.normal(0, 0.1, len(individual)) return individual + mutation_mask * mutations def get_algorithm_name(self) -> str: return "Genetic Algorithm" class ConsensusBuilder: """Constructor de consenso para decisiones colectivas""" def __init__(self, convergence_threshold: float = 0.01): self.convergence_threshold = convergence_threshold self.voting_history = [] def build_consensus(self, agent_opinions: List[np.ndarray]) -> Dict[str, Any]: """Construye consenso a partir de opiniones de agentes""" if not agent_opinions: return {"consensus": None, "confidence": 0.0} # Calcula centroide como consenso inicial consensus = np.mean(agent_opinions, axis=0) # Mide dispersión para evaluar confianza distances = [np.linalg.norm(opinion - consensus) for opinion in agent_opinions] avg_distance = np.mean(distances) confidence = 1.0 / (1.0 + avg_distance) # Identifica agentes disidentes outliers = [i for i, dist in enumerate(distances) if dist > 2 * np.std(distances)] # Proceso iterativo de refinamiento if len(outliers) > 0 and len(agent_opinions) > 3: # Recalcula consenso sin outliers extremos filtered_opinions = [op for i, op in enumerate(agent_opinions) if i not in outliers[:len(outliers)//2]] if filtered_opinions: refined_consensus = np.mean(filtered_opinions, axis=0) consensus = 0.7 * consensus + 0.3 * refined_consensus return { "consensus": consensus, "confidence": confidence, "participation_rate": len(agent_opinions), "outliers": len(outliers), "convergence_achieved": avg_distance < self.convergence_threshold } class SwarmIntelligence: """Sistema principal de inteligencia de enjambre""" def __init__(self, swarm_size: int = 50, dimensions: int = 10, max_iterations: int = 100): self.swarm_size = swarm_size self.dimensions = dimensions self.max_iterations = max_iterations # Algoritmos disponibles self.algorithms = { 'pso': ParticleSwarmOptimization(), 'aco': AntColonyOptimization(), 'ga': GeneticAlgorithm() } self.consensus_builder = ConsensusBuilder() self.swarm_state = None self.optimization_history = [] # Control de ejecución self._running = False self._lock = threading.Lock() def initialize_swarm(self, bounds: Tuple[float, float] = (-10, 10), fitness_function: Optional[Callable] = None) -> SwarmState: """Inicializa el enjambre""" agents = [] for i in range(self.swarm_size): # Posición inicial aleatoria position = np.random.uniform(bounds[0], bounds[1], self.dimensions) velocity = np.random.uniform(-1, 1, self.dimensions) agent = Agent( id=f"agent_{i}", position=position, velocity=velocity, best_position=position.copy(), best_fitness=float('-inf'), fitness=float('-inf'), role=self._assign_role(i) ) # Evaluar fitness inicial if fitness_function: agent.fitness = fitness_function(agent.position) agent.best_fitness = agent.fitness agents.append(agent) # Encontrar mejor global best_agent = max(agents, key=lambda a: a.fitness) self.swarm_state = SwarmState( global_best_position=best_agent.position.copy(), global_best_fitness=best_agent.fitness, agents=agents, iteration=0, convergence_rate=0.0, diversity_measure=self._calculate_diversity(agents) ) return self.swarm_state def optimize(self, fitness_function: Callable[[np.ndarray], float], algorithm: str = 'pso', bounds: Tuple[float, float] = (-10, 10), parallel: bool = True) -> Dict[str, Any]: """Ejecuta optimización con el algoritmo especificado""" if algorithm not in self.algorithms: raise ValueError(f"Algorithm {algorithm} not available") if self.swarm_state is None: self.initialize_swarm(bounds, fitness_function) optimizer = self.algorithms[algorithm] self._running = True try: for iteration in range(self.max_iterations): if not self._running: break # Actualizar agentes if parallel: self._update_agents_parallel(optimizer, fitness_function) else: self._update_agents_sequential(optimizer, fitness_function) # Actualizar estado del enjambre self._update_swarm_state(iteration) # Guardar historial self.optimization_history.append({ 'iteration': iteration, 'best_fitness': self.swarm_state.global_best_fitness, 'diversity': self.swarm_state.diversity_measure, 'algorithm': algorithm }) # Verificar convergencia if self._check_convergence(): break finally: self._running = False return self._get_optimization_results() def collective_decision_making(self, decision_space: List[np.ndarray], agent_preferences: Dict[str, np.ndarray]) -> Dict[str, Any]: """Toma de decisiones colectiva""" if not self.swarm_state: raise ValueError("Swarm not initialized") # Recopilar opiniones de agentes agent_opinions = [] for agent in self.swarm_state.agents: if agent.id in agent_preferences: # Usar preferencias específicas del agente opinion = agent_preferences[agent.id] else: # Generar opinión basada en posición del agente opinion = self._generate_agent_opinion(agent, decision_space) agent_opinions.append(opinion) # Construir consenso consensus_result = self.consensus_builder.build_consensus(agent_opinions) # Métricas de calidad de decisión decision_quality = self._assess_decision_quality( consensus_result, agent_opinions ) return { 'decision': consensus_result['consensus'], 'confidence': consensus_result['confidence'], 'quality_metrics': decision_quality, 'participation': len(agent_opinions), 'algorithm_used': 'collective_consensus' } def adaptive_swarm_behavior(self, environment_feedback: Dict[str, float]) -> None: """Adapta comportamiento del enjambre según retroalimentación""" if not self.swarm_state: return # Analizar feedback del ambiente performance_score = environment_feedback.get('performance', 0.5) complexity_score = environment_feedback.get('complexity', 0.5) resource_availability = environment_feedback.get('resources', 1.0) # Adaptar roles de agentes self._adapt_agent_roles(performance_score, complexity_score) # Ajustar parámetros de algoritmos self._adapt_algorithm_parameters(performance_score, resource_availability) # Modificar estructura del enjambre si necesario if performance_score < 0.3: self._restructure_swarm() def get_swarm_analytics(self) -> Dict[str, Any]: """Obtiene analytics del enjambre""" if not self.swarm_state: return {"status": "Swarm not initialized"} # Estadísticas de agentes fitnesses = [agent.fitness for agent in self.swarm_state.agents] # Análisis de convergencia convergence_analysis = self._analyze_convergence() # Diversidad y exploración exploration_metrics = self._calculate_exploration_metrics() return { 'swarm_size': len(self.swarm_state.agents), 'current_iteration': self.swarm_state.iteration, 'best_fitness': self.swarm_state.global_best_fitness, 'average_fitness': np.mean(fitnesses), 'fitness_std': np.std(fitnesses), 'diversity_measure': self.swarm_state.diversity_measure, 'convergence_rate': self.swarm_state.convergence_rate, 'convergence_analysis': convergence_analysis, 'exploration_metrics': exploration_metrics, 'agent_roles': self._get_role_distribution() } # Métodos auxiliares privados def _assign_role(self, agent_index: int) -> str: """Asigna rol inicial a agente""" roles = ['explorer', 'exploiter', 'scout', 'coordinator'] return roles[agent_index % len(roles)] def _calculate_diversity(self, agents: List[Agent]) -> float: """Calcula medida de diversidad del enjambre""" if len(agents) < 2: return 0.0 positions = np.array([agent.position for agent in agents]) center = np.mean(positions, axis=0) distances = [np.linalg.norm(pos - center) for pos in positions] return np.mean(distances) def _update_agents_parallel(self, optimizer: OptimizationAlgorithm, fitness_function: Callable) -> None: """Actualiza agentes en paralelo""" with ThreadPoolExecutor(max_workers=4) as executor: futures = [] for agent in self.swarm_state.agents: future = executor.submit( self._update_single_agent, agent, optimizer, fitness_function ) futures.append(future) # Recopilar resultados for i, future in enumerate(futures): self.swarm_state.agents[i] = future.result() def _update_agents_sequential(self, optimizer: OptimizationAlgorithm, fitness_function: Callable) -> None: """Actualiza agentes secuencialmente""" for i, agent in enumerate(self.swarm_state.agents): self.swarm_state.agents[i] = self._update_single_agent( agent, optimizer, fitness_function ) def _update_single_agent(self, agent: Agent, optimizer: OptimizationAlgorithm, fitness_function: Callable) -> Agent: """Actualiza un agente individual""" # Actualizar posición con algoritmo updated_agent = optimizer.update_agent(agent, self.swarm_state) # Evaluar nuevo fitness updated_agent.fitness = fitness_function(updated_agent.position) # Actualizar mejor personal if updated_agent.fitness > updated_agent.best_fitness: updated_agent.best_fitness = updated_agent.fitness updated_agent.best_position = updated_agent.position.copy() return updated_agent def _update_swarm_state(self, iteration: int) -> None: """Actualiza estado global del enjambre""" # Encontrar nuevo mejor global best_agent = max(self.swarm_state.agents, key=lambda a: a.fitness) if best_agent.fitness > self.swarm_state.global_best_fitness: self.swarm_state.global_best_fitness = best_agent.fitness self.swarm_state.global_best_position = best_agent.position.copy() # Actualizar métricas self.swarm_state.iteration = iteration self.swarm_state.diversity_measure = self._calculate_diversity( self.swarm_state.agents ) self.swarm_state.convergence_rate = self._calculate_convergence_rate() def _calculate_convergence_rate(self) -> float: """Calcula tasa de convergencia""" if len(self.optimization_history) < 2: return 0.0 recent_improvements = [] for i in range(1, min(10, len(self.optimization_history))): current = self.optimization_history[-i]['best_fitness'] previous = self.optimization_history[-i-1]['best_fitness'] improvement = current - previous recent_improvements.append(improvement) return np.mean(recent_improvements) if recent_improvements else 0.0 def _check_convergence(self, tolerance: float = 1e-6) -> bool: """Verifica convergencia del enjambre""" if len(self.optimization_history) < 10: return False recent_bests = [ entry['best_fitness'] for entry in self.optimization_history[-10:] ] return np.std(recent_bests) < tolerance def _get_optimization_results(self) -> Dict[str, Any]: """Obtiene resultados de optimización""" return { 'best_position': self.swarm_state.global_best_position, 'best_fitness': self.swarm_state.global_best_fitness, 'iterations': self.swarm_state.iteration, 'convergence_achieved': self._check_convergence(), 'final_diversity': self.swarm_state.diversity_measure, 'optimization_history': self.optimization_history } def _generate_agent_opinion(self, agent: Agent, decision_space: List[np.ndarray]) -> np.ndarray: """Genera opinión de agente para decisión colectiva""" # Selecciona opción más cercana a posición del agente distances = [ np.linalg.norm(agent.position - option) for option in decision_space ] # Añade ruido basado en rol del agente noise_factor = { 'explorer': 0.3, 'exploiter': 0.1, 'scout': 0.5, 'coordinator': 0.2 }.get(agent.role, 0.2) best_option_idx = np.argmin(distances) opinion = decision_space[best_option_idx].copy() # Añadir ruido según rol noise = np.random.normal(0, noise_factor, len(opinion)) opinion += noise return opinion def _assess_decision_quality(self, consensus_result: Dict[str, Any], agent_opinions: List[np.ndarray]) -> Dict[str, float]: """Evalúa calidad de decisión colectiva""" if not agent_opinions: return {"quality_score": 0.0} consensus = consensus_result['consensus'] # Métricas de calidad quality_metrics = {} # 1. Satisfacción promedio satisfactions = [ 1.0 / (1.0 + np.linalg.norm(opinion - consensus)) for opinion in agent_opinions ] quality_metrics['average_satisfaction'] = np.mean(satisfactions) # 2. Equidad (distribución de satisfacciones) quality_metrics['fairness'] = 1.0 - np.std(satisfactions) # 3. Robustez (sensibilidad a outliers) outlier_opinions = [ op for op in agent_opinions if np.linalg.norm(op - consensus) > 2 * np.std([ np.linalg.norm(o - consensus) for o in agent_opinions ]) ] quality_metrics['robustness'] = 1.0 - len(outlier_opinions) / len(agent_opinions) # 4. Score compuesto quality_metrics['quality_score'] = ( 0.4 * quality_metrics['average_satisfaction'] + 0.3 * quality_metrics['fairness'] + 0.3 * quality_metrics['robustness'] ) return quality_metrics def _adapt_agent_roles(self, performance_score: float, complexity_score: float): """Adapta roles de agentes según performance""" if performance_score < 0.4: # Incrementar exploradores si performance es baja for agent in self.swarm_state.agents[:len(self.swarm_state.agents)//3]: agent.role = 'explorer' elif performance_score > 0.8: # Incrementar explotadores si performance es alta for agent in self.swarm_state.agents[:len(self.swarm_state.agents)//2]: agent.role = 'exploiter' def _adapt_algorithm_parameters(self, performance_score: float, resource_availability: float): """Adapta parámetros de algoritmos""" # Adaptar PSO if 'pso' in self.algorithms: pso = self.algorithms['pso'] if performance_score < 0.5: pso.w *= 1.1 # Incrementar exploración pso.c1 *= 0.9 # Reducir componente cognitivo else: pso.w *= 0.95 # Reducir exploración pso.c2 *= 1.05 # Incrementar componente social def _restructure_swarm(self): """Reestructura enjambre si performance es muy baja""" # Reinicializar 30% de agentes con peor performance worst_agents = sorted( self.swarm_state.agents, key=lambda a: a.fitness )[:len(self.swarm_state.agents)//3] for agent in worst_agents: agent.position = np.random.uniform(-10, 10, self.dimensions) agent.velocity = np.random.uniform(-1, 1, self.dimensions) agent.fitness = float('-inf') def _analyze_convergence(self) -> Dict[str, Any]: """Analiza patrones de convergencia""" if len(self.optimization_history) < 5: return {"status": "Insufficient data"} # Analizar tendencias fitness_trend = [entry['best_fitness'] for entry in self.optimization_history[-10:]] diversity_trend = [entry['diversity'] for entry in self.optimization_history[-10:]] return { 'fitness_improvement_rate': np.mean(np.diff(fitness_trend)), 'diversity_trend': 'decreasing' if np.mean(np.diff(diversity_trend)) < 0 else 'stable', 'convergence_speed': 'fast' if abs(np.mean(np.diff(fitness_trend))) > 0.1 else 'slow', 'premature_convergence_risk': 'high' if np.mean(diversity_trend) < 0.1 else 'low' } def _calculate_exploration_metrics(self) -> Dict[str, float]: """Calcula métricas de exploración""" if not self.swarm_state: return {} positions = np.array([agent.position for agent in self.swarm_state.agents]) # Cobertura del espacio de búsqueda ranges = np.max(positions, axis=0) - np.min(positions, axis=0) space_coverage = np.mean(ranges) # Distribución de agentes center = np.mean(positions, axis=0) distances_to_center = [np.linalg.norm(pos - center) for pos in positions] distribution_uniformity = 1.0 - np.std(distances_to_center) / np.mean(distances_to_center) return { 'space_coverage': space_coverage, 'distribution_uniformity': distribution_uniformity, 'exploration_pressure': np.mean([ 1.0 if agent.role in ['explorer', 'scout'] else 0.0 for agent in self.swarm_state.agents ]) } def _get_role_distribution(self) -> Dict[str, int]: """Obtiene distribución de roles en el enjambre""" if not self.swarm_state: return {} roles = {} for agent in self.swarm_state.agents: roles[agent.role] = roles.get(agent.role, 0) + 1 return roles def stop_optimization(self): """Detiene optimización en curso""" self._running = False