| """ |
| Sistema de Inteligencia de Enjambre |
| Implementa algoritmos de optimizaci贸n colectiva inspirados en comportamientos naturales |
| """ |
|
|
| import numpy as np |
| import random |
| from typing import Dict, List, Tuple, Callable, Any, Optional |
| from dataclasses import dataclass |
| from abc import ABC, abstractmethod |
| from concurrent.futures import ThreadPoolExecutor |
| import threading |
| import time |
| from datetime import datetime |
|
|
| @dataclass |
| class Agent: |
| """Agente individual en el enjambre""" |
| id: str |
| position: np.ndarray |
| velocity: np.ndarray |
| best_position: np.ndarray |
| best_fitness: float |
| fitness: float |
| role: str = "explorer" |
| |
| @dataclass |
| class SwarmState: |
| """Estado actual del enjambre""" |
| global_best_position: np.ndarray |
| global_best_fitness: float |
| agents: List[Agent] |
| iteration: int |
| convergence_rate: float |
| diversity_measure: float |
|
|
| class OptimizationAlgorithm(ABC): |
| """Clase base para algoritmos de optimizaci贸n""" |
| |
| @abstractmethod |
| def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent: |
| pass |
| |
| @abstractmethod |
| def get_algorithm_name(self) -> str: |
| pass |
|
|
| class ParticleSwarmOptimization(OptimizationAlgorithm): |
| """Algoritmo de Optimizaci贸n por Enjambre de Part铆culas""" |
| |
| def __init__(self, w: float = 0.729, c1: float = 1.494, c2: float = 1.494): |
| self.w = w |
| self.c1 = c1 |
| self.c2 = c2 |
| |
| def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent: |
| """Actualiza posici贸n y velocidad de un agente PSO""" |
| r1, r2 = random.random(), random.random() |
| |
| |
| cognitive_component = self.c1 * r1 * (agent.best_position - agent.position) |
| social_component = self.c2 * r2 * (swarm_state.global_best_position - agent.position) |
| |
| agent.velocity = (self.w * agent.velocity + |
| cognitive_component + |
| social_component) |
| |
| |
| agent.position = agent.position + agent.velocity |
| |
| return agent |
| |
| def get_algorithm_name(self) -> str: |
| return "Particle Swarm Optimization" |
|
|
| class AntColonyOptimization(OptimizationAlgorithm): |
| """Algoritmo de Optimizaci贸n por Colonia de Hormigas""" |
| |
| def __init__(self, alpha: float = 1.0, beta: float = 2.0, rho: float = 0.1): |
| self.alpha = alpha |
| self.beta = beta |
| self.rho = rho |
| self.pheromone_matrix = None |
| |
| def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent: |
| """Actualiza posici贸n basada en feromonas""" |
| if self.pheromone_matrix is None: |
| self._initialize_pheromone_matrix(len(agent.position)) |
| |
| |
| probabilities = self._calculate_probabilities(agent.position) |
| new_position = self._select_next_position(agent.position, probabilities) |
| |
| agent.position = new_position |
| return agent |
| |
| def _initialize_pheromone_matrix(self, size: int): |
| """Inicializa matriz de feromonas""" |
| self.pheromone_matrix = np.ones((size, size)) * 0.1 |
| |
| def _calculate_probabilities(self, position: np.ndarray) -> np.ndarray: |
| """Calcula probabilidades de transici贸n""" |
| if self.pheromone_matrix is None: |
| return np.ones(len(position)) / len(position) |
| |
| |
| pheromones = np.diag(self.pheromone_matrix) |
| heuristic = 1.0 / (1.0 + np.abs(position)) |
| |
| numerator = (pheromones ** self.alpha) * (heuristic ** self.beta) |
| return numerator / np.sum(numerator) |
| |
| def _select_next_position(self, current_pos: np.ndarray, |
| probabilities: np.ndarray) -> np.ndarray: |
| """Selecciona siguiente posici贸n""" |
| |
| direction = np.random.choice(len(probabilities), p=probabilities) |
| step = np.zeros_like(current_pos) |
| step[direction % len(current_pos)] = np.random.normal(0, 0.1) |
| |
| return current_pos + step |
| |
| def get_algorithm_name(self) -> str: |
| return "Ant Colony Optimization" |
|
|
| class GeneticAlgorithm(OptimizationAlgorithm): |
| """Algoritmo Gen茅tico""" |
| |
| def __init__(self, mutation_rate: float = 0.1, crossover_rate: float = 0.8): |
| self.mutation_rate = mutation_rate |
| self.crossover_rate = crossover_rate |
| |
| def update_agent(self, agent: Agent, swarm_state: SwarmState) -> Agent: |
| """Actualiza agente mediante operaciones gen茅ticas""" |
| |
| parents = self._select_parents(swarm_state.agents) |
| |
| |
| if random.random() < self.crossover_rate and len(parents) >= 2: |
| agent.position = self._crossover(parents[0].position, parents[1].position) |
| |
| |
| if random.random() < self.mutation_rate: |
| agent.position = self._mutate(agent.position) |
| |
| return agent |
| |
| def _select_parents(self, agents: List[Agent]) -> List[Agent]: |
| """Selecci贸n de padres por torneo""" |
| tournament_size = min(3, len(agents)) |
| tournament = random.sample(agents, tournament_size) |
| return sorted(tournament, key=lambda a: a.fitness, reverse=True)[:2] |
| |
| def _crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> np.ndarray: |
| """Cruzamiento uniforme""" |
| mask = np.random.random(len(parent1)) < 0.5 |
| child = np.where(mask, parent1, parent2) |
| return child |
| |
| def _mutate(self, individual: np.ndarray) -> np.ndarray: |
| """Mutaci贸n gaussiana""" |
| mutation_mask = np.random.random(len(individual)) < self.mutation_rate |
| mutations = np.random.normal(0, 0.1, len(individual)) |
| return individual + mutation_mask * mutations |
| |
| def get_algorithm_name(self) -> str: |
| return "Genetic Algorithm" |
|
|
| class ConsensusBuilder: |
| """Constructor de consenso para decisiones colectivas""" |
| |
| def __init__(self, convergence_threshold: float = 0.01): |
| self.convergence_threshold = convergence_threshold |
| self.voting_history = [] |
| |
| def build_consensus(self, agent_opinions: List[np.ndarray]) -> Dict[str, Any]: |
| """Construye consenso a partir de opiniones de agentes""" |
| if not agent_opinions: |
| return {"consensus": None, "confidence": 0.0} |
| |
| |
| consensus = np.mean(agent_opinions, axis=0) |
| |
| |
| distances = [np.linalg.norm(opinion - consensus) |
| for opinion in agent_opinions] |
| avg_distance = np.mean(distances) |
| confidence = 1.0 / (1.0 + avg_distance) |
| |
| |
| outliers = [i for i, dist in enumerate(distances) |
| if dist > 2 * np.std(distances)] |
| |
| |
| if len(outliers) > 0 and len(agent_opinions) > 3: |
| |
| filtered_opinions = [op for i, op in enumerate(agent_opinions) |
| if i not in outliers[:len(outliers)//2]] |
| if filtered_opinions: |
| refined_consensus = np.mean(filtered_opinions, axis=0) |
| consensus = 0.7 * consensus + 0.3 * refined_consensus |
| |
| return { |
| "consensus": consensus, |
| "confidence": confidence, |
| "participation_rate": len(agent_opinions), |
| "outliers": len(outliers), |
| "convergence_achieved": avg_distance < self.convergence_threshold |
| } |
|
|
| class SwarmIntelligence: |
| """Sistema principal de inteligencia de enjambre""" |
| |
| def __init__(self, |
| swarm_size: int = 50, |
| dimensions: int = 10, |
| max_iterations: int = 100): |
| self.swarm_size = swarm_size |
| self.dimensions = dimensions |
| self.max_iterations = max_iterations |
| |
| |
| self.algorithms = { |
| 'pso': ParticleSwarmOptimization(), |
| 'aco': AntColonyOptimization(), |
| 'ga': GeneticAlgorithm() |
| } |
| |
| self.consensus_builder = ConsensusBuilder() |
| self.swarm_state = None |
| self.optimization_history = [] |
| |
| |
| self._running = False |
| self._lock = threading.Lock() |
| |
| def initialize_swarm(self, |
| bounds: Tuple[float, float] = (-10, 10), |
| fitness_function: Optional[Callable] = None) -> SwarmState: |
| """Inicializa el enjambre""" |
| agents = [] |
| |
| for i in range(self.swarm_size): |
| |
| position = np.random.uniform(bounds[0], bounds[1], self.dimensions) |
| velocity = np.random.uniform(-1, 1, self.dimensions) |
| |
| agent = Agent( |
| id=f"agent_{i}", |
| position=position, |
| velocity=velocity, |
| best_position=position.copy(), |
| best_fitness=float('-inf'), |
| fitness=float('-inf'), |
| role=self._assign_role(i) |
| ) |
| |
| |
| if fitness_function: |
| agent.fitness = fitness_function(agent.position) |
| agent.best_fitness = agent.fitness |
| |
| agents.append(agent) |
| |
| |
| best_agent = max(agents, key=lambda a: a.fitness) |
| |
| self.swarm_state = SwarmState( |
| global_best_position=best_agent.position.copy(), |
| global_best_fitness=best_agent.fitness, |
| agents=agents, |
| iteration=0, |
| convergence_rate=0.0, |
| diversity_measure=self._calculate_diversity(agents) |
| ) |
| |
| return self.swarm_state |
| |
| def optimize(self, |
| fitness_function: Callable[[np.ndarray], float], |
| algorithm: str = 'pso', |
| bounds: Tuple[float, float] = (-10, 10), |
| parallel: bool = True) -> Dict[str, Any]: |
| """Ejecuta optimizaci贸n con el algoritmo especificado""" |
| |
| if algorithm not in self.algorithms: |
| raise ValueError(f"Algorithm {algorithm} not available") |
| |
| if self.swarm_state is None: |
| self.initialize_swarm(bounds, fitness_function) |
| |
| optimizer = self.algorithms[algorithm] |
| self._running = True |
| |
| try: |
| for iteration in range(self.max_iterations): |
| if not self._running: |
| break |
| |
| |
| if parallel: |
| self._update_agents_parallel(optimizer, fitness_function) |
| else: |
| self._update_agents_sequential(optimizer, fitness_function) |
| |
| |
| self._update_swarm_state(iteration) |
| |
| |
| self.optimization_history.append({ |
| 'iteration': iteration, |
| 'best_fitness': self.swarm_state.global_best_fitness, |
| 'diversity': self.swarm_state.diversity_measure, |
| 'algorithm': algorithm |
| }) |
| |
| |
| if self._check_convergence(): |
| break |
| |
| finally: |
| self._running = False |
| |
| return self._get_optimization_results() |
| |
| def collective_decision_making(self, |
| decision_space: List[np.ndarray], |
| agent_preferences: Dict[str, np.ndarray]) -> Dict[str, Any]: |
| """Toma de decisiones colectiva""" |
| |
| if not self.swarm_state: |
| raise ValueError("Swarm not initialized") |
| |
| |
| agent_opinions = [] |
| |
| for agent in self.swarm_state.agents: |
| if agent.id in agent_preferences: |
| |
| opinion = agent_preferences[agent.id] |
| else: |
| |
| opinion = self._generate_agent_opinion(agent, decision_space) |
| |
| agent_opinions.append(opinion) |
| |
| |
| consensus_result = self.consensus_builder.build_consensus(agent_opinions) |
| |
| |
| decision_quality = self._assess_decision_quality( |
| consensus_result, agent_opinions |
| ) |
| |
| return { |
| 'decision': consensus_result['consensus'], |
| 'confidence': consensus_result['confidence'], |
| 'quality_metrics': decision_quality, |
| 'participation': len(agent_opinions), |
| 'algorithm_used': 'collective_consensus' |
| } |
| |
| def adaptive_swarm_behavior(self, |
| environment_feedback: Dict[str, float]) -> None: |
| """Adapta comportamiento del enjambre seg煤n retroalimentaci贸n""" |
| |
| if not self.swarm_state: |
| return |
| |
| |
| performance_score = environment_feedback.get('performance', 0.5) |
| complexity_score = environment_feedback.get('complexity', 0.5) |
| resource_availability = environment_feedback.get('resources', 1.0) |
| |
| |
| self._adapt_agent_roles(performance_score, complexity_score) |
| |
| |
| self._adapt_algorithm_parameters(performance_score, resource_availability) |
| |
| |
| if performance_score < 0.3: |
| self._restructure_swarm() |
| |
| def get_swarm_analytics(self) -> Dict[str, Any]: |
| """Obtiene analytics del enjambre""" |
| if not self.swarm_state: |
| return {"status": "Swarm not initialized"} |
| |
| |
| fitnesses = [agent.fitness for agent in self.swarm_state.agents] |
| |
| |
| convergence_analysis = self._analyze_convergence() |
| |
| |
| exploration_metrics = self._calculate_exploration_metrics() |
| |
| return { |
| 'swarm_size': len(self.swarm_state.agents), |
| 'current_iteration': self.swarm_state.iteration, |
| 'best_fitness': self.swarm_state.global_best_fitness, |
| 'average_fitness': np.mean(fitnesses), |
| 'fitness_std': np.std(fitnesses), |
| 'diversity_measure': self.swarm_state.diversity_measure, |
| 'convergence_rate': self.swarm_state.convergence_rate, |
| 'convergence_analysis': convergence_analysis, |
| 'exploration_metrics': exploration_metrics, |
| 'agent_roles': self._get_role_distribution() |
| } |
| |
| |
| def _assign_role(self, agent_index: int) -> str: |
| """Asigna rol inicial a agente""" |
| roles = ['explorer', 'exploiter', 'scout', 'coordinator'] |
| return roles[agent_index % len(roles)] |
| |
| def _calculate_diversity(self, agents: List[Agent]) -> float: |
| """Calcula medida de diversidad del enjambre""" |
| if len(agents) < 2: |
| return 0.0 |
| |
| positions = np.array([agent.position for agent in agents]) |
| center = np.mean(positions, axis=0) |
| distances = [np.linalg.norm(pos - center) for pos in positions] |
| |
| return np.mean(distances) |
| |
| def _update_agents_parallel(self, |
| optimizer: OptimizationAlgorithm, |
| fitness_function: Callable) -> None: |
| """Actualiza agentes en paralelo""" |
| with ThreadPoolExecutor(max_workers=4) as executor: |
| futures = [] |
| |
| for agent in self.swarm_state.agents: |
| future = executor.submit( |
| self._update_single_agent, |
| agent, optimizer, fitness_function |
| ) |
| futures.append(future) |
| |
| |
| for i, future in enumerate(futures): |
| self.swarm_state.agents[i] = future.result() |
| |
| def _update_agents_sequential(self, |
| optimizer: OptimizationAlgorithm, |
| fitness_function: Callable) -> None: |
| """Actualiza agentes secuencialmente""" |
| for i, agent in enumerate(self.swarm_state.agents): |
| self.swarm_state.agents[i] = self._update_single_agent( |
| agent, optimizer, fitness_function |
| ) |
| |
| def _update_single_agent(self, |
| agent: Agent, |
| optimizer: OptimizationAlgorithm, |
| fitness_function: Callable) -> Agent: |
| """Actualiza un agente individual""" |
| |
| updated_agent = optimizer.update_agent(agent, self.swarm_state) |
| |
| |
| updated_agent.fitness = fitness_function(updated_agent.position) |
| |
| |
| if updated_agent.fitness > updated_agent.best_fitness: |
| updated_agent.best_fitness = updated_agent.fitness |
| updated_agent.best_position = updated_agent.position.copy() |
| |
| return updated_agent |
| |
| def _update_swarm_state(self, iteration: int) -> None: |
| """Actualiza estado global del enjambre""" |
| |
| best_agent = max(self.swarm_state.agents, key=lambda a: a.fitness) |
| |
| if best_agent.fitness > self.swarm_state.global_best_fitness: |
| self.swarm_state.global_best_fitness = best_agent.fitness |
| self.swarm_state.global_best_position = best_agent.position.copy() |
| |
| |
| self.swarm_state.iteration = iteration |
| self.swarm_state.diversity_measure = self._calculate_diversity( |
| self.swarm_state.agents |
| ) |
| self.swarm_state.convergence_rate = self._calculate_convergence_rate() |
| |
| def _calculate_convergence_rate(self) -> float: |
| """Calcula tasa de convergencia""" |
| if len(self.optimization_history) < 2: |
| return 0.0 |
| |
| recent_improvements = [] |
| for i in range(1, min(10, len(self.optimization_history))): |
| current = self.optimization_history[-i]['best_fitness'] |
| previous = self.optimization_history[-i-1]['best_fitness'] |
| improvement = current - previous |
| recent_improvements.append(improvement) |
| |
| return np.mean(recent_improvements) if recent_improvements else 0.0 |
| |
| def _check_convergence(self, tolerance: float = 1e-6) -> bool: |
| """Verifica convergencia del enjambre""" |
| if len(self.optimization_history) < 10: |
| return False |
| |
| recent_bests = [ |
| entry['best_fitness'] |
| for entry in self.optimization_history[-10:] |
| ] |
| |
| return np.std(recent_bests) < tolerance |
| |
| def _get_optimization_results(self) -> Dict[str, Any]: |
| """Obtiene resultados de optimizaci贸n""" |
| return { |
| 'best_position': self.swarm_state.global_best_position, |
| 'best_fitness': self.swarm_state.global_best_fitness, |
| 'iterations': self.swarm_state.iteration, |
| 'convergence_achieved': self._check_convergence(), |
| 'final_diversity': self.swarm_state.diversity_measure, |
| 'optimization_history': self.optimization_history |
| } |
| |
| def _generate_agent_opinion(self, |
| agent: Agent, |
| decision_space: List[np.ndarray]) -> np.ndarray: |
| """Genera opini贸n de agente para decisi贸n colectiva""" |
| |
| distances = [ |
| np.linalg.norm(agent.position - option) |
| for option in decision_space |
| ] |
| |
| |
| noise_factor = { |
| 'explorer': 0.3, |
| 'exploiter': 0.1, |
| 'scout': 0.5, |
| 'coordinator': 0.2 |
| }.get(agent.role, 0.2) |
| |
| best_option_idx = np.argmin(distances) |
| opinion = decision_space[best_option_idx].copy() |
| |
| |
| noise = np.random.normal(0, noise_factor, len(opinion)) |
| opinion += noise |
| |
| return opinion |
| |
| def _assess_decision_quality(self, |
| consensus_result: Dict[str, Any], |
| agent_opinions: List[np.ndarray]) -> Dict[str, float]: |
| """Eval煤a calidad de decisi贸n colectiva""" |
| if not agent_opinions: |
| return {"quality_score": 0.0} |
| |
| consensus = consensus_result['consensus'] |
| |
| |
| quality_metrics = {} |
| |
| |
| satisfactions = [ |
| 1.0 / (1.0 + np.linalg.norm(opinion - consensus)) |
| for opinion in agent_opinions |
| ] |
| quality_metrics['average_satisfaction'] = np.mean(satisfactions) |
| |
| |
| quality_metrics['fairness'] = 1.0 - np.std(satisfactions) |
| |
| |
| outlier_opinions = [ |
| op for op in agent_opinions |
| if np.linalg.norm(op - consensus) > 2 * np.std([ |
| np.linalg.norm(o - consensus) for o in agent_opinions |
| ]) |
| ] |
| quality_metrics['robustness'] = 1.0 - len(outlier_opinions) / len(agent_opinions) |
| |
| |
| quality_metrics['quality_score'] = ( |
| 0.4 * quality_metrics['average_satisfaction'] + |
| 0.3 * quality_metrics['fairness'] + |
| 0.3 * quality_metrics['robustness'] |
| ) |
| |
| return quality_metrics |
| |
| def _adapt_agent_roles(self, performance_score: float, complexity_score: float): |
| """Adapta roles de agentes seg煤n performance""" |
| if performance_score < 0.4: |
| |
| for agent in self.swarm_state.agents[:len(self.swarm_state.agents)//3]: |
| agent.role = 'explorer' |
| elif performance_score > 0.8: |
| |
| for agent in self.swarm_state.agents[:len(self.swarm_state.agents)//2]: |
| agent.role = 'exploiter' |
| |
| def _adapt_algorithm_parameters(self, performance_score: float, resource_availability: float): |
| """Adapta par谩metros de algoritmos""" |
| |
| if 'pso' in self.algorithms: |
| pso = self.algorithms['pso'] |
| if performance_score < 0.5: |
| pso.w *= 1.1 |
| pso.c1 *= 0.9 |
| else: |
| pso.w *= 0.95 |
| pso.c2 *= 1.05 |
| |
| def _restructure_swarm(self): |
| """Reestructura enjambre si performance es muy baja""" |
| |
| worst_agents = sorted( |
| self.swarm_state.agents, |
| key=lambda a: a.fitness |
| )[:len(self.swarm_state.agents)//3] |
| |
| for agent in worst_agents: |
| agent.position = np.random.uniform(-10, 10, self.dimensions) |
| agent.velocity = np.random.uniform(-1, 1, self.dimensions) |
| agent.fitness = float('-inf') |
| |
| def _analyze_convergence(self) -> Dict[str, Any]: |
| """Analiza patrones de convergencia""" |
| if len(self.optimization_history) < 5: |
| return {"status": "Insufficient data"} |
| |
| |
| fitness_trend = [entry['best_fitness'] for entry in self.optimization_history[-10:]] |
| diversity_trend = [entry['diversity'] for entry in self.optimization_history[-10:]] |
| |
| return { |
| 'fitness_improvement_rate': np.mean(np.diff(fitness_trend)), |
| 'diversity_trend': 'decreasing' if np.mean(np.diff(diversity_trend)) < 0 else 'stable', |
| 'convergence_speed': 'fast' if abs(np.mean(np.diff(fitness_trend))) > 0.1 else 'slow', |
| 'premature_convergence_risk': 'high' if np.mean(diversity_trend) < 0.1 else 'low' |
| } |
| |
| def _calculate_exploration_metrics(self) -> Dict[str, float]: |
| """Calcula m茅tricas de exploraci贸n""" |
| if not self.swarm_state: |
| return {} |
| |
| positions = np.array([agent.position for agent in self.swarm_state.agents]) |
| |
| |
| ranges = np.max(positions, axis=0) - np.min(positions, axis=0) |
| space_coverage = np.mean(ranges) |
| |
| |
| center = np.mean(positions, axis=0) |
| distances_to_center = [np.linalg.norm(pos - center) for pos in positions] |
| distribution_uniformity = 1.0 - np.std(distances_to_center) / np.mean(distances_to_center) |
| |
| return { |
| 'space_coverage': space_coverage, |
| 'distribution_uniformity': distribution_uniformity, |
| 'exploration_pressure': np.mean([ |
| 1.0 if agent.role in ['explorer', 'scout'] else 0.0 |
| for agent in self.swarm_state.agents |
| ]) |
| } |
| |
| def _get_role_distribution(self) -> Dict[str, int]: |
| """Obtiene distribuci贸n de roles en el enjambre""" |
| if not self.swarm_state: |
| return {} |
| |
| roles = {} |
| for agent in self.swarm_state.agents: |
| roles[agent.role] = roles.get(agent.role, 0) + 1 |
| |
| return roles |
| |
| def stop_optimization(self): |
| """Detiene optimizaci贸n en curso""" |
| self._running = False |
|
|