Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Sistema de Otimização de Performance para Ensemble AI | |
| Implementa cache inteligente, processamento paralelo e otimizações avançadas | |
| """ | |
| import asyncio | |
| import hashlib | |
| import json | |
| import logging | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from dataclasses import dataclass, asdict | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Callable, Tuple | |
| from collections import defaultdict, deque | |
| import threading | |
| import weakref | |
| import pickle | |
| import os | |
| from pathlib import Path | |
| class CacheEntry: | |
| """Entrada do cache com metadados""" | |
| key: str | |
| value: Any | |
| timestamp: datetime | |
| access_count: int | |
| last_access: datetime | |
| ttl: Optional[timedelta] | |
| size_bytes: int | |
| hit_count: int = 0 | |
| class PerformanceMetrics: | |
| """Métricas de performance do sistema""" | |
| cache_hits: int = 0 | |
| cache_misses: int = 0 | |
| total_requests: int = 0 | |
| avg_response_time: float = 0.0 | |
| parallel_executions: int = 0 | |
| memory_usage_mb: float = 0.0 | |
| cpu_usage_percent: float = 0.0 | |
| active_threads: int = 0 | |
| queue_size: int = 0 | |
| class IntelligentCache: | |
| """Cache inteligente com estratégias adaptativas""" | |
| def __init__(self, max_size: int = 1000, default_ttl: timedelta = timedelta(hours=1)): | |
| self.max_size = max_size | |
| self.default_ttl = default_ttl | |
| self.cache: Dict[str, CacheEntry] = {} | |
| self.access_order = deque() # Para LRU | |
| self.size_tracker = 0 | |
| self.lock = threading.RLock() | |
| # Estatísticas | |
| self.hits = 0 | |
| self.misses = 0 | |
| self.evictions = 0 | |
| # Cache persistente | |
| self.persistent_cache_dir = Path("cache/ai_cache") | |
| self.persistent_cache_dir.mkdir(parents=True, exist_ok=True) | |
| self.enable_persistence = True | |
| # Estratégias de eviction | |
| self.eviction_strategies = { | |
| 'lru': self._evict_lru, | |
| 'lfu': self._evict_lfu, | |
| 'ttl': self._evict_expired, | |
| 'size': self._evict_largest, | |
| 'adaptive': self._evict_adaptive | |
| } | |
| self.current_strategy = 'adaptive' | |
| def get(self, key: str) -> Optional[Any]: | |
| """Recupera item do cache""" | |
| with self.lock: | |
| cache_key = self._generate_key(key) | |
| if cache_key in self.cache: | |
| entry = self.cache[cache_key] | |
| # Verificar TTL | |
| if self._is_expired(entry): | |
| self._remove_entry(cache_key) | |
| self.misses += 1 | |
| return None | |
| # Atualizar estatísticas de acesso | |
| entry.access_count += 1 | |
| entry.hit_count += 1 | |
| entry.last_access = datetime.now() | |
| # Atualizar ordem LRU | |
| if cache_key in self.access_order: | |
| self.access_order.remove(cache_key) | |
| self.access_order.append(cache_key) | |
| self.hits += 1 | |
| return entry.value | |
| self.misses += 1 | |
| # Tentar cache persistente | |
| if self.enable_persistence: | |
| persistent_value = self._load_from_persistent_cache(cache_key) | |
| if persistent_value is not None: | |
| # Recarregar no cache em memória | |
| self.put(key, persistent_value) | |
| return persistent_value | |
| return None | |
| def put(self, key: str, value: Any, ttl: Optional[timedelta] = None) -> None: | |
| """Armazena item no cache""" | |
| with self.lock: | |
| cache_key = self._generate_key(key) | |
| # Calcular tamanho | |
| try: | |
| size_bytes = len(pickle.dumps(value)) | |
| except: | |
| size_bytes = 1024 # Estimativa padrão | |
| # Verificar se precisa fazer eviction | |
| while len(self.cache) >= self.max_size or self.size_tracker + size_bytes > self.max_size * 10000: | |
| if not self._evict_one(): | |
| break # Não conseguiu fazer eviction | |
| # Criar entrada | |
| entry = CacheEntry( | |
| key=cache_key, | |
| value=value, | |
| timestamp=datetime.now(), | |
| access_count=1, | |
| last_access=datetime.now(), | |
| ttl=ttl or self.default_ttl, | |
| size_bytes=size_bytes | |
| ) | |
| # Remover entrada existente se houver | |
| if cache_key in self.cache: | |
| old_entry = self.cache[cache_key] | |
| self.size_tracker -= old_entry.size_bytes | |
| # Adicionar nova entrada | |
| self.cache[cache_key] = entry | |
| self.size_tracker += size_bytes | |
| # Atualizar ordem LRU | |
| if cache_key in self.access_order: | |
| self.access_order.remove(cache_key) | |
| self.access_order.append(cache_key) | |
| # Salvar no cache persistente | |
| if self.enable_persistence: | |
| self._save_to_persistent_cache(cache_key, value) | |
| def _generate_key(self, key: str) -> str: | |
| """Gera chave hash para o cache""" | |
| return hashlib.md5(key.encode()).hexdigest() | |
| def _is_expired(self, entry: CacheEntry) -> bool: | |
| """Verifica se entrada expirou""" | |
| if entry.ttl is None: | |
| return False | |
| return datetime.now() - entry.timestamp > entry.ttl | |
| def _evict_one(self) -> bool: | |
| """Remove uma entrada usando estratégia atual""" | |
| strategy_func = self.eviction_strategies.get(self.current_strategy, self._evict_lru) | |
| return strategy_func() | |
| def _evict_lru(self) -> bool: | |
| """Remove entrada menos recentemente usada""" | |
| if not self.access_order: | |
| return False | |
| key_to_remove = self.access_order.popleft() | |
| self._remove_entry(key_to_remove) | |
| return True | |
| def _evict_lfu(self) -> bool: | |
| """Remove entrada menos frequentemente usada""" | |
| if not self.cache: | |
| return False | |
| # Encontrar entrada com menor access_count | |
| min_access_key = min(self.cache.keys(), key=lambda k: self.cache[k].access_count) | |
| self._remove_entry(min_access_key) | |
| return True | |
| def _evict_expired(self) -> bool: | |
| """Remove entradas expiradas""" | |
| expired_keys = [k for k, v in self.cache.items() if self._is_expired(v)] | |
| if not expired_keys: | |
| return False | |
| for key in expired_keys: | |
| self._remove_entry(key) | |
| return True | |
| def _evict_largest(self) -> bool: | |
| """Remove entrada com maior tamanho""" | |
| if not self.cache: | |
| return False | |
| largest_key = max(self.cache.keys(), key=lambda k: self.cache[k].size_bytes) | |
| self._remove_entry(largest_key) | |
| return True | |
| def _evict_adaptive(self) -> bool: | |
| """Estratégia adaptativa de eviction""" | |
| # Primeiro tentar remover expirados | |
| if self._evict_expired(): | |
| return True | |
| # Se cache está muito cheio, remover os maiores | |
| if len(self.cache) > self.max_size * 0.9: | |
| return self._evict_largest() | |
| # Caso contrário, usar LRU | |
| return self._evict_lru() | |
| def _remove_entry(self, key: str) -> None: | |
| """Remove entrada do cache""" | |
| if key in self.cache: | |
| entry = self.cache[key] | |
| self.size_tracker -= entry.size_bytes | |
| del self.cache[key] | |
| self.evictions += 1 | |
| if key in self.access_order: | |
| self.access_order.remove(key) | |
| def _save_to_persistent_cache(self, key: str, value: Any) -> None: | |
| """Salva no cache persistente""" | |
| try: | |
| cache_file = self.persistent_cache_dir / f"{key}.pkl" | |
| with open(cache_file, 'wb') as f: | |
| pickle.dump({ | |
| 'value': value, | |
| 'timestamp': datetime.now(), | |
| 'key': key | |
| }, f) | |
| except Exception as e: | |
| logging.warning(f"Erro ao salvar cache persistente: {e}") | |
| def _load_from_persistent_cache(self, key: str) -> Optional[Any]: | |
| """Carrega do cache persistente""" | |
| try: | |
| cache_file = self.persistent_cache_dir / f"{key}.pkl" | |
| if cache_file.exists(): | |
| with open(cache_file, 'rb') as f: | |
| data = pickle.load(f) | |
| # Verificar se não expirou (24 horas) | |
| if datetime.now() - data['timestamp'] < timedelta(hours=24): | |
| return data['value'] | |
| else: | |
| # Remover arquivo expirado | |
| cache_file.unlink() | |
| except Exception as e: | |
| logging.warning(f"Erro ao carregar cache persistente: {e}") | |
| return None | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Retorna estatísticas do cache""" | |
| with self.lock: | |
| total_requests = self.hits + self.misses | |
| hit_rate = (self.hits / total_requests * 100) if total_requests > 0 else 0 | |
| return { | |
| 'hits': self.hits, | |
| 'misses': self.misses, | |
| 'hit_rate': hit_rate, | |
| 'evictions': self.evictions, | |
| 'current_size': len(self.cache), | |
| 'max_size': self.max_size, | |
| 'memory_usage_bytes': self.size_tracker, | |
| 'strategy': self.current_strategy | |
| } | |
| def clear(self) -> None: | |
| """Limpa o cache""" | |
| with self.lock: | |
| self.cache.clear() | |
| self.access_order.clear() | |
| self.size_tracker = 0 | |
| class ParallelProcessor: | |
| """Processador paralelo para análises de IA""" | |
| def __init__(self, max_workers: int = 4): | |
| self.max_workers = max_workers | |
| self.executor = ThreadPoolExecutor(max_workers=max_workers) | |
| self.active_tasks = set() | |
| self.task_queue = asyncio.Queue() | |
| self.metrics = PerformanceMetrics() | |
| self.lock = threading.Lock() | |
| async def process_parallel(self, tasks: List[Callable], timeout: float = 30.0) -> List[Any]: | |
| """Processa tarefas em paralelo""" | |
| if not tasks: | |
| return [] | |
| start_time = time.time() | |
| # Submeter tarefas | |
| futures = [] | |
| for task in tasks: | |
| future = self.executor.submit(task) | |
| futures.append(future) | |
| with self.lock: | |
| self.active_tasks.add(future) | |
| self.metrics.parallel_executions += 1 | |
| # Aguardar resultados | |
| results = [] | |
| try: | |
| for future in as_completed(futures, timeout=timeout): | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| except Exception as e: | |
| logging.error(f"Erro em tarefa paralela: {e}") | |
| results.append(None) | |
| finally: | |
| with self.lock: | |
| self.active_tasks.discard(future) | |
| except TimeoutError: | |
| logging.warning(f"Timeout em processamento paralelo após {timeout}s") | |
| # Cancelar tarefas pendentes | |
| for future in futures: | |
| future.cancel() | |
| with self.lock: | |
| self.active_tasks.discard(future) | |
| # Atualizar métricas | |
| processing_time = time.time() - start_time | |
| with self.lock: | |
| self.metrics.avg_response_time = ( | |
| self.metrics.avg_response_time * 0.9 + processing_time * 0.1 | |
| ) | |
| self.metrics.active_threads = len(self.active_tasks) | |
| return results | |
| def get_metrics(self) -> PerformanceMetrics: | |
| """Retorna métricas de performance""" | |
| with self.lock: | |
| return PerformanceMetrics( | |
| cache_hits=self.metrics.cache_hits, | |
| cache_misses=self.metrics.cache_misses, | |
| total_requests=self.metrics.total_requests, | |
| avg_response_time=self.metrics.avg_response_time, | |
| parallel_executions=self.metrics.parallel_executions, | |
| active_threads=len(self.active_tasks), | |
| queue_size=self.task_queue.qsize() if hasattr(self.task_queue, 'qsize') else 0 | |
| ) | |
| def shutdown(self): | |
| """Encerra o processador""" | |
| self.executor.shutdown(wait=True) | |
| class PerformanceOptimizer: | |
| """Sistema principal de otimização de performance""" | |
| def __init__(self, cache_size: int = 1000, max_workers: int = 4): | |
| self.cache = IntelligentCache(max_size=cache_size) | |
| self.parallel_processor = ParallelProcessor(max_workers=max_workers) | |
| self.metrics_history = deque(maxlen=1000) | |
| self.optimization_rules = [] | |
| self.logger = logging.getLogger(__name__) | |
| # Configurações adaptativas | |
| self.adaptive_config = { | |
| 'cache_ttl_base': timedelta(hours=1), | |
| 'parallel_threshold': 3, # Número mínimo de tarefas para paralelizar | |
| 'timeout_base': 30.0, | |
| 'memory_threshold_mb': 500 | |
| } | |
| # Inicializar regras de otimização | |
| self._initialize_optimization_rules() | |
| def _initialize_optimization_rules(self): | |
| """Inicializa regras de otimização adaptativa""" | |
| self.optimization_rules = [ | |
| self._rule_adjust_cache_ttl, | |
| self._rule_adjust_parallel_threshold, | |
| self._rule_memory_management, | |
| self._rule_timeout_adjustment | |
| ] | |
| async def optimize_analysis(self, analysis_func: Callable, | |
| text: str, | |
| use_cache: bool = True, | |
| force_parallel: bool = False) -> Any: | |
| """Otimiza execução de análise com cache e paralelização""" | |
| start_time = time.time() | |
| # Gerar chave de cache | |
| cache_key = f"analysis_{hashlib.md5(text.encode()).hexdigest()}" | |
| # Tentar cache primeiro | |
| if use_cache: | |
| cached_result = self.cache.get(cache_key) | |
| if cached_result is not None: | |
| self.logger.debug(f"Cache hit para análise: {cache_key[:8]}...") | |
| return cached_result | |
| # Executar análise | |
| try: | |
| if force_parallel or self._should_use_parallel(): | |
| # Análise paralela (se aplicável) | |
| result = await self._execute_parallel_analysis(analysis_func, text) | |
| else: | |
| # Análise sequencial | |
| result = await self._execute_sequential_analysis(analysis_func, text) | |
| # Armazenar no cache | |
| if use_cache and result is not None: | |
| ttl = self._calculate_adaptive_ttl(text, result) | |
| self.cache.put(cache_key, result, ttl) | |
| # Registrar métricas | |
| processing_time = time.time() - start_time | |
| self._record_metrics(processing_time, use_cache, cached_result is not None) | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Erro na análise otimizada: {e}") | |
| raise | |
| async def _execute_sequential_analysis(self, analysis_func: Callable, text: str) -> Any: | |
| """Executa análise sequencial""" | |
| if asyncio.iscoroutinefunction(analysis_func): | |
| return await analysis_func(text) | |
| else: | |
| return analysis_func(text) | |
| async def _execute_parallel_analysis(self, analysis_func: Callable, text: str) -> Any: | |
| """Executa análise paralela (quando aplicável)""" | |
| # Para análises que podem ser paralelizadas (ex: múltiplos modelos) | |
| # Por enquanto, executa sequencialmente | |
| return await self._execute_sequential_analysis(analysis_func, text) | |
| def _should_use_parallel(self) -> bool: | |
| """Determina se deve usar processamento paralelo""" | |
| # Lógica para decidir paralelização | |
| current_load = len(self.parallel_processor.active_tasks) | |
| return current_load < self.adaptive_config['parallel_threshold'] | |
| def _calculate_adaptive_ttl(self, text: str, result: Any) -> timedelta: | |
| """Calcula TTL adaptativo baseado no conteúdo""" | |
| base_ttl = self.adaptive_config['cache_ttl_base'] | |
| # Ajustar baseado no tamanho do texto | |
| text_factor = min(2.0, len(text) / 1000) # Textos maiores = TTL maior | |
| # Ajustar baseado na confiança do resultado | |
| confidence_factor = 1.0 | |
| if hasattr(result, 'confidence'): | |
| confidence_factor = result.confidence # Alta confiança = TTL maior | |
| adjusted_ttl = base_ttl * text_factor * confidence_factor | |
| return max(timedelta(minutes=5), min(timedelta(hours=6), adjusted_ttl)) | |
| def _record_metrics(self, processing_time: float, used_cache: bool, cache_hit: bool): | |
| """Registra métricas de performance""" | |
| metrics = { | |
| 'timestamp': datetime.now(), | |
| 'processing_time': processing_time, | |
| 'used_cache': used_cache, | |
| 'cache_hit': cache_hit, | |
| 'memory_usage': self._get_memory_usage() | |
| } | |
| self.metrics_history.append(metrics) | |
| # Aplicar regras de otimização | |
| self._apply_optimization_rules() | |
| def _get_memory_usage(self) -> float: | |
| """Estima uso de memória em MB""" | |
| try: | |
| import psutil | |
| process = psutil.Process() | |
| return process.memory_info().rss / 1024 / 1024 | |
| except ImportError: | |
| return 0.0 | |
| def _apply_optimization_rules(self): | |
| """Aplica regras de otimização adaptativa""" | |
| for rule in self.optimization_rules: | |
| try: | |
| rule() | |
| except Exception as e: | |
| self.logger.warning(f"Erro ao aplicar regra de otimização: {e}") | |
| def _rule_adjust_cache_ttl(self): | |
| """Regra: Ajustar TTL do cache baseado na taxa de hit""" | |
| if len(self.metrics_history) < 10: | |
| return | |
| recent_metrics = list(self.metrics_history)[-10:] | |
| hit_rate = sum(1 for m in recent_metrics if m['cache_hit']) / len(recent_metrics) | |
| if hit_rate > 0.8: # Alta taxa de hit - aumentar TTL | |
| self.adaptive_config['cache_ttl_base'] *= 1.1 | |
| elif hit_rate < 0.3: # Baixa taxa de hit - diminuir TTL | |
| self.adaptive_config['cache_ttl_base'] *= 0.9 | |
| # Limitar TTL | |
| self.adaptive_config['cache_ttl_base'] = max( | |
| timedelta(minutes=10), | |
| min(timedelta(hours=4), self.adaptive_config['cache_ttl_base']) | |
| ) | |
| def _rule_adjust_parallel_threshold(self): | |
| """Regra: Ajustar threshold de paralelização""" | |
| if len(self.metrics_history) < 20: | |
| return | |
| recent_metrics = list(self.metrics_history)[-20:] | |
| avg_processing_time = sum(m['processing_time'] for m in recent_metrics) / len(recent_metrics) | |
| if avg_processing_time > 5.0: # Processamento lento - mais paralelização | |
| self.adaptive_config['parallel_threshold'] = max(1, self.adaptive_config['parallel_threshold'] - 1) | |
| elif avg_processing_time < 1.0: # Processamento rápido - menos paralelização | |
| self.adaptive_config['parallel_threshold'] = min(8, self.adaptive_config['parallel_threshold'] + 1) | |
| def _rule_memory_management(self): | |
| """Regra: Gerenciar memória""" | |
| current_memory = self._get_memory_usage() | |
| if current_memory > self.adaptive_config['memory_threshold_mb']: | |
| # Limpar cache parcialmente | |
| self.cache.clear() | |
| self.logger.info(f"Cache limpo devido ao uso de memória: {current_memory:.1f}MB") | |
| def _rule_timeout_adjustment(self): | |
| """Regra: Ajustar timeouts""" | |
| if len(self.metrics_history) < 15: | |
| return | |
| recent_metrics = list(self.metrics_history)[-15:] | |
| avg_time = sum(m['processing_time'] for m in recent_metrics) / len(recent_metrics) | |
| # Ajustar timeout baseado no tempo médio | |
| self.adaptive_config['timeout_base'] = max(10.0, min(60.0, avg_time * 3)) | |
| def get_performance_report(self) -> Dict[str, Any]: | |
| """Gera relatório completo de performance""" | |
| cache_stats = self.cache.get_stats() | |
| processor_metrics = self.parallel_processor.get_metrics() | |
| # Estatísticas históricas | |
| if self.metrics_history: | |
| recent_metrics = list(self.metrics_history)[-50:] | |
| avg_processing_time = sum(m['processing_time'] for m in recent_metrics) / len(recent_metrics) | |
| cache_hit_rate = sum(1 for m in recent_metrics if m['cache_hit']) / len(recent_metrics) * 100 | |
| else: | |
| avg_processing_time = 0.0 | |
| cache_hit_rate = 0.0 | |
| return { | |
| 'cache': cache_stats, | |
| 'parallel_processing': asdict(processor_metrics), | |
| 'adaptive_config': { | |
| k: str(v) if isinstance(v, timedelta) else v | |
| for k, v in self.adaptive_config.items() | |
| }, | |
| 'performance_summary': { | |
| 'avg_processing_time': avg_processing_time, | |
| 'cache_hit_rate': cache_hit_rate, | |
| 'total_analyses': len(self.metrics_history), | |
| 'memory_usage_mb': self._get_memory_usage() | |
| } | |
| } | |
| def cleanup(self): | |
| """Limpeza de recursos""" | |
| self.parallel_processor.shutdown() | |
| self.cache.clear() | |
| # Instância global do otimizador | |
| performance_optimizer = PerformanceOptimizer() | |
| # Função de conveniência | |
| async def optimize_ai_analysis(analysis_func: Callable, text: str, use_cache: bool = True) -> Any: | |
| """Função principal para análise otimizada""" | |
| return await performance_optimizer.optimize_analysis(analysis_func, text, use_cache) | |
| if __name__ == "__main__": | |
| # Teste do sistema de otimização | |
| async def test_analysis(text: str): | |
| await asyncio.sleep(0.1) # Simular processamento | |
| return {'result': f'Análise de: {text[:20]}...', 'confidence': 0.8} | |
| async def test_optimizer(): | |
| print("Testando sistema de otimização...") | |
| # Teste de cache | |
| result1 = await optimize_ai_analysis(test_analysis, "Texto de teste para análise") | |
| result2 = await optimize_ai_analysis(test_analysis, "Texto de teste para análise") # Deve usar cache | |
| print(f"Resultado 1: {result1}") | |
| print(f"Resultado 2: {result2}") | |
| # Relatório de performance | |
| report = performance_optimizer.get_performance_report() | |
| print(f"\nRelatório de Performance:") | |
| print(f"Cache Hit Rate: {report['performance_summary']['cache_hit_rate']:.1f}%") | |
| print(f"Tempo Médio: {report['performance_summary']['avg_processing_time']:.3f}s") | |
| performance_optimizer.cleanup() | |
| # Executar teste | |
| asyncio.run(test_optimizer()) |