#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Sistema de Otimização de Performance para Ensemble AI Implementa cache inteligente, processamento paralelo e otimizações avançadas """ import asyncio import hashlib import json import logging import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, asdict from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Callable, Tuple from collections import defaultdict, deque import threading import weakref import pickle import os from pathlib import Path @dataclass class CacheEntry: """Entrada do cache com metadados""" key: str value: Any timestamp: datetime access_count: int last_access: datetime ttl: Optional[timedelta] size_bytes: int hit_count: int = 0 @dataclass class PerformanceMetrics: """Métricas de performance do sistema""" cache_hits: int = 0 cache_misses: int = 0 total_requests: int = 0 avg_response_time: float = 0.0 parallel_executions: int = 0 memory_usage_mb: float = 0.0 cpu_usage_percent: float = 0.0 active_threads: int = 0 queue_size: int = 0 class IntelligentCache: """Cache inteligente com estratégias adaptativas""" def __init__(self, max_size: int = 1000, default_ttl: timedelta = timedelta(hours=1)): self.max_size = max_size self.default_ttl = default_ttl self.cache: Dict[str, CacheEntry] = {} self.access_order = deque() # Para LRU self.size_tracker = 0 self.lock = threading.RLock() # Estatísticas self.hits = 0 self.misses = 0 self.evictions = 0 # Cache persistente self.persistent_cache_dir = Path("cache/ai_cache") self.persistent_cache_dir.mkdir(parents=True, exist_ok=True) self.enable_persistence = True # Estratégias de eviction self.eviction_strategies = { 'lru': self._evict_lru, 'lfu': self._evict_lfu, 'ttl': self._evict_expired, 'size': self._evict_largest, 'adaptive': self._evict_adaptive } self.current_strategy = 'adaptive' def get(self, key: str) -> Optional[Any]: """Recupera item do cache""" with self.lock: cache_key = self._generate_key(key) if cache_key in self.cache: entry = self.cache[cache_key] # Verificar TTL if self._is_expired(entry): self._remove_entry(cache_key) self.misses += 1 return None # Atualizar estatísticas de acesso entry.access_count += 1 entry.hit_count += 1 entry.last_access = datetime.now() # Atualizar ordem LRU if cache_key in self.access_order: self.access_order.remove(cache_key) self.access_order.append(cache_key) self.hits += 1 return entry.value self.misses += 1 # Tentar cache persistente if self.enable_persistence: persistent_value = self._load_from_persistent_cache(cache_key) if persistent_value is not None: # Recarregar no cache em memória self.put(key, persistent_value) return persistent_value return None def put(self, key: str, value: Any, ttl: Optional[timedelta] = None) -> None: """Armazena item no cache""" with self.lock: cache_key = self._generate_key(key) # Calcular tamanho try: size_bytes = len(pickle.dumps(value)) except: size_bytes = 1024 # Estimativa padrão # Verificar se precisa fazer eviction while len(self.cache) >= self.max_size or self.size_tracker + size_bytes > self.max_size * 10000: if not self._evict_one(): break # Não conseguiu fazer eviction # Criar entrada entry = CacheEntry( key=cache_key, value=value, timestamp=datetime.now(), access_count=1, last_access=datetime.now(), ttl=ttl or self.default_ttl, size_bytes=size_bytes ) # Remover entrada existente se houver if cache_key in self.cache: old_entry = self.cache[cache_key] self.size_tracker -= old_entry.size_bytes # Adicionar nova entrada self.cache[cache_key] = entry self.size_tracker += size_bytes # Atualizar ordem LRU if cache_key in self.access_order: self.access_order.remove(cache_key) self.access_order.append(cache_key) # Salvar no cache persistente if self.enable_persistence: self._save_to_persistent_cache(cache_key, value) def _generate_key(self, key: str) -> str: """Gera chave hash para o cache""" return hashlib.md5(key.encode()).hexdigest() def _is_expired(self, entry: CacheEntry) -> bool: """Verifica se entrada expirou""" if entry.ttl is None: return False return datetime.now() - entry.timestamp > entry.ttl def _evict_one(self) -> bool: """Remove uma entrada usando estratégia atual""" strategy_func = self.eviction_strategies.get(self.current_strategy, self._evict_lru) return strategy_func() def _evict_lru(self) -> bool: """Remove entrada menos recentemente usada""" if not self.access_order: return False key_to_remove = self.access_order.popleft() self._remove_entry(key_to_remove) return True def _evict_lfu(self) -> bool: """Remove entrada menos frequentemente usada""" if not self.cache: return False # Encontrar entrada com menor access_count min_access_key = min(self.cache.keys(), key=lambda k: self.cache[k].access_count) self._remove_entry(min_access_key) return True def _evict_expired(self) -> bool: """Remove entradas expiradas""" expired_keys = [k for k, v in self.cache.items() if self._is_expired(v)] if not expired_keys: return False for key in expired_keys: self._remove_entry(key) return True def _evict_largest(self) -> bool: """Remove entrada com maior tamanho""" if not self.cache: return False largest_key = max(self.cache.keys(), key=lambda k: self.cache[k].size_bytes) self._remove_entry(largest_key) return True def _evict_adaptive(self) -> bool: """Estratégia adaptativa de eviction""" # Primeiro tentar remover expirados if self._evict_expired(): return True # Se cache está muito cheio, remover os maiores if len(self.cache) > self.max_size * 0.9: return self._evict_largest() # Caso contrário, usar LRU return self._evict_lru() def _remove_entry(self, key: str) -> None: """Remove entrada do cache""" if key in self.cache: entry = self.cache[key] self.size_tracker -= entry.size_bytes del self.cache[key] self.evictions += 1 if key in self.access_order: self.access_order.remove(key) def _save_to_persistent_cache(self, key: str, value: Any) -> None: """Salva no cache persistente""" try: cache_file = self.persistent_cache_dir / f"{key}.pkl" with open(cache_file, 'wb') as f: pickle.dump({ 'value': value, 'timestamp': datetime.now(), 'key': key }, f) except Exception as e: logging.warning(f"Erro ao salvar cache persistente: {e}") def _load_from_persistent_cache(self, key: str) -> Optional[Any]: """Carrega do cache persistente""" try: cache_file = self.persistent_cache_dir / f"{key}.pkl" if cache_file.exists(): with open(cache_file, 'rb') as f: data = pickle.load(f) # Verificar se não expirou (24 horas) if datetime.now() - data['timestamp'] < timedelta(hours=24): return data['value'] else: # Remover arquivo expirado cache_file.unlink() except Exception as e: logging.warning(f"Erro ao carregar cache persistente: {e}") return None def get_stats(self) -> Dict[str, Any]: """Retorna estatísticas do cache""" with self.lock: total_requests = self.hits + self.misses hit_rate = (self.hits / total_requests * 100) if total_requests > 0 else 0 return { 'hits': self.hits, 'misses': self.misses, 'hit_rate': hit_rate, 'evictions': self.evictions, 'current_size': len(self.cache), 'max_size': self.max_size, 'memory_usage_bytes': self.size_tracker, 'strategy': self.current_strategy } def clear(self) -> None: """Limpa o cache""" with self.lock: self.cache.clear() self.access_order.clear() self.size_tracker = 0 class ParallelProcessor: """Processador paralelo para análises de IA""" def __init__(self, max_workers: int = 4): self.max_workers = max_workers self.executor = ThreadPoolExecutor(max_workers=max_workers) self.active_tasks = set() self.task_queue = asyncio.Queue() self.metrics = PerformanceMetrics() self.lock = threading.Lock() async def process_parallel(self, tasks: List[Callable], timeout: float = 30.0) -> List[Any]: """Processa tarefas em paralelo""" if not tasks: return [] start_time = time.time() # Submeter tarefas futures = [] for task in tasks: future = self.executor.submit(task) futures.append(future) with self.lock: self.active_tasks.add(future) self.metrics.parallel_executions += 1 # Aguardar resultados results = [] try: for future in as_completed(futures, timeout=timeout): try: result = future.result() results.append(result) except Exception as e: logging.error(f"Erro em tarefa paralela: {e}") results.append(None) finally: with self.lock: self.active_tasks.discard(future) except TimeoutError: logging.warning(f"Timeout em processamento paralelo após {timeout}s") # Cancelar tarefas pendentes for future in futures: future.cancel() with self.lock: self.active_tasks.discard(future) # Atualizar métricas processing_time = time.time() - start_time with self.lock: self.metrics.avg_response_time = ( self.metrics.avg_response_time * 0.9 + processing_time * 0.1 ) self.metrics.active_threads = len(self.active_tasks) return results def get_metrics(self) -> PerformanceMetrics: """Retorna métricas de performance""" with self.lock: return PerformanceMetrics( cache_hits=self.metrics.cache_hits, cache_misses=self.metrics.cache_misses, total_requests=self.metrics.total_requests, avg_response_time=self.metrics.avg_response_time, parallel_executions=self.metrics.parallel_executions, active_threads=len(self.active_tasks), queue_size=self.task_queue.qsize() if hasattr(self.task_queue, 'qsize') else 0 ) def shutdown(self): """Encerra o processador""" self.executor.shutdown(wait=True) class PerformanceOptimizer: """Sistema principal de otimização de performance""" def __init__(self, cache_size: int = 1000, max_workers: int = 4): self.cache = IntelligentCache(max_size=cache_size) self.parallel_processor = ParallelProcessor(max_workers=max_workers) self.metrics_history = deque(maxlen=1000) self.optimization_rules = [] self.logger = logging.getLogger(__name__) # Configurações adaptativas self.adaptive_config = { 'cache_ttl_base': timedelta(hours=1), 'parallel_threshold': 3, # Número mínimo de tarefas para paralelizar 'timeout_base': 30.0, 'memory_threshold_mb': 500 } # Inicializar regras de otimização self._initialize_optimization_rules() def _initialize_optimization_rules(self): """Inicializa regras de otimização adaptativa""" self.optimization_rules = [ self._rule_adjust_cache_ttl, self._rule_adjust_parallel_threshold, self._rule_memory_management, self._rule_timeout_adjustment ] async def optimize_analysis(self, analysis_func: Callable, text: str, use_cache: bool = True, force_parallel: bool = False) -> Any: """Otimiza execução de análise com cache e paralelização""" start_time = time.time() # Gerar chave de cache cache_key = f"analysis_{hashlib.md5(text.encode()).hexdigest()}" # Tentar cache primeiro if use_cache: cached_result = self.cache.get(cache_key) if cached_result is not None: self.logger.debug(f"Cache hit para análise: {cache_key[:8]}...") return cached_result # Executar análise try: if force_parallel or self._should_use_parallel(): # Análise paralela (se aplicável) result = await self._execute_parallel_analysis(analysis_func, text) else: # Análise sequencial result = await self._execute_sequential_analysis(analysis_func, text) # Armazenar no cache if use_cache and result is not None: ttl = self._calculate_adaptive_ttl(text, result) self.cache.put(cache_key, result, ttl) # Registrar métricas processing_time = time.time() - start_time self._record_metrics(processing_time, use_cache, cached_result is not None) return result except Exception as e: self.logger.error(f"Erro na análise otimizada: {e}") raise async def _execute_sequential_analysis(self, analysis_func: Callable, text: str) -> Any: """Executa análise sequencial""" if asyncio.iscoroutinefunction(analysis_func): return await analysis_func(text) else: return analysis_func(text) async def _execute_parallel_analysis(self, analysis_func: Callable, text: str) -> Any: """Executa análise paralela (quando aplicável)""" # Para análises que podem ser paralelizadas (ex: múltiplos modelos) # Por enquanto, executa sequencialmente return await self._execute_sequential_analysis(analysis_func, text) def _should_use_parallel(self) -> bool: """Determina se deve usar processamento paralelo""" # Lógica para decidir paralelização current_load = len(self.parallel_processor.active_tasks) return current_load < self.adaptive_config['parallel_threshold'] def _calculate_adaptive_ttl(self, text: str, result: Any) -> timedelta: """Calcula TTL adaptativo baseado no conteúdo""" base_ttl = self.adaptive_config['cache_ttl_base'] # Ajustar baseado no tamanho do texto text_factor = min(2.0, len(text) / 1000) # Textos maiores = TTL maior # Ajustar baseado na confiança do resultado confidence_factor = 1.0 if hasattr(result, 'confidence'): confidence_factor = result.confidence # Alta confiança = TTL maior adjusted_ttl = base_ttl * text_factor * confidence_factor return max(timedelta(minutes=5), min(timedelta(hours=6), adjusted_ttl)) def _record_metrics(self, processing_time: float, used_cache: bool, cache_hit: bool): """Registra métricas de performance""" metrics = { 'timestamp': datetime.now(), 'processing_time': processing_time, 'used_cache': used_cache, 'cache_hit': cache_hit, 'memory_usage': self._get_memory_usage() } self.metrics_history.append(metrics) # Aplicar regras de otimização self._apply_optimization_rules() def _get_memory_usage(self) -> float: """Estima uso de memória em MB""" try: import psutil process = psutil.Process() return process.memory_info().rss / 1024 / 1024 except ImportError: return 0.0 def _apply_optimization_rules(self): """Aplica regras de otimização adaptativa""" for rule in self.optimization_rules: try: rule() except Exception as e: self.logger.warning(f"Erro ao aplicar regra de otimização: {e}") def _rule_adjust_cache_ttl(self): """Regra: Ajustar TTL do cache baseado na taxa de hit""" if len(self.metrics_history) < 10: return recent_metrics = list(self.metrics_history)[-10:] hit_rate = sum(1 for m in recent_metrics if m['cache_hit']) / len(recent_metrics) if hit_rate > 0.8: # Alta taxa de hit - aumentar TTL self.adaptive_config['cache_ttl_base'] *= 1.1 elif hit_rate < 0.3: # Baixa taxa de hit - diminuir TTL self.adaptive_config['cache_ttl_base'] *= 0.9 # Limitar TTL self.adaptive_config['cache_ttl_base'] = max( timedelta(minutes=10), min(timedelta(hours=4), self.adaptive_config['cache_ttl_base']) ) def _rule_adjust_parallel_threshold(self): """Regra: Ajustar threshold de paralelização""" if len(self.metrics_history) < 20: return recent_metrics = list(self.metrics_history)[-20:] avg_processing_time = sum(m['processing_time'] for m in recent_metrics) / len(recent_metrics) if avg_processing_time > 5.0: # Processamento lento - mais paralelização self.adaptive_config['parallel_threshold'] = max(1, self.adaptive_config['parallel_threshold'] - 1) elif avg_processing_time < 1.0: # Processamento rápido - menos paralelização self.adaptive_config['parallel_threshold'] = min(8, self.adaptive_config['parallel_threshold'] + 1) def _rule_memory_management(self): """Regra: Gerenciar memória""" current_memory = self._get_memory_usage() if current_memory > self.adaptive_config['memory_threshold_mb']: # Limpar cache parcialmente self.cache.clear() self.logger.info(f"Cache limpo devido ao uso de memória: {current_memory:.1f}MB") def _rule_timeout_adjustment(self): """Regra: Ajustar timeouts""" if len(self.metrics_history) < 15: return recent_metrics = list(self.metrics_history)[-15:] avg_time = sum(m['processing_time'] for m in recent_metrics) / len(recent_metrics) # Ajustar timeout baseado no tempo médio self.adaptive_config['timeout_base'] = max(10.0, min(60.0, avg_time * 3)) def get_performance_report(self) -> Dict[str, Any]: """Gera relatório completo de performance""" cache_stats = self.cache.get_stats() processor_metrics = self.parallel_processor.get_metrics() # Estatísticas históricas if self.metrics_history: recent_metrics = list(self.metrics_history)[-50:] avg_processing_time = sum(m['processing_time'] for m in recent_metrics) / len(recent_metrics) cache_hit_rate = sum(1 for m in recent_metrics if m['cache_hit']) / len(recent_metrics) * 100 else: avg_processing_time = 0.0 cache_hit_rate = 0.0 return { 'cache': cache_stats, 'parallel_processing': asdict(processor_metrics), 'adaptive_config': { k: str(v) if isinstance(v, timedelta) else v for k, v in self.adaptive_config.items() }, 'performance_summary': { 'avg_processing_time': avg_processing_time, 'cache_hit_rate': cache_hit_rate, 'total_analyses': len(self.metrics_history), 'memory_usage_mb': self._get_memory_usage() } } def cleanup(self): """Limpeza de recursos""" self.parallel_processor.shutdown() self.cache.clear() # Instância global do otimizador performance_optimizer = PerformanceOptimizer() # Função de conveniência async def optimize_ai_analysis(analysis_func: Callable, text: str, use_cache: bool = True) -> Any: """Função principal para análise otimizada""" return await performance_optimizer.optimize_analysis(analysis_func, text, use_cache) if __name__ == "__main__": # Teste do sistema de otimização async def test_analysis(text: str): await asyncio.sleep(0.1) # Simular processamento return {'result': f'Análise de: {text[:20]}...', 'confidence': 0.8} async def test_optimizer(): print("Testando sistema de otimização...") # Teste de cache result1 = await optimize_ai_analysis(test_analysis, "Texto de teste para análise") result2 = await optimize_ai_analysis(test_analysis, "Texto de teste para análise") # Deve usar cache print(f"Resultado 1: {result1}") print(f"Resultado 2: {result2}") # Relatório de performance report = performance_optimizer.get_performance_report() print(f"\nRelatório de Performance:") print(f"Cache Hit Rate: {report['performance_summary']['cache_hit_rate']:.1f}%") print(f"Tempo Médio: {report['performance_summary']['avg_processing_time']:.3f}s") performance_optimizer.cleanup() # Executar teste asyncio.run(test_optimizer())