#!/usr/bin/env python3 """ Sistema de execução massiva de testes com paralelismo """ import asyncio import logging import time import threading from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, List, Any, Optional from datetime import datetime import uuid import sys import os # Adiciona path do projeto sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from graphs.main_graph import AgentGraphManager from testes.test_validator import TestValidator from utils.config import AVAILABLE_MODELS class MassiveTestRunner: """ Executor de testes massivos com paralelismo otimizado """ def __init__(self, max_workers: int = 5): """ Inicializa o test runner Args: max_workers: Número máximo de workers paralelos """ self.max_workers = max_workers logging.info(f"🔧 MassiveTestRunner inicializado com {max_workers} workers paralelos") self.validator = TestValidator() self.results = { 'session_info': {}, 'group_results': [], 'individual_results': [], 'summary': {} } self.status = { 'current_status': 'idle', 'progress': 0, 'current_group': None, 'completed_tests': 0, 'total_tests': 0, 'start_time': None, 'estimated_remaining': None, 'errors': [], 'current_test': None, 'running_tests': {}, # {thread_id: {start_time, group_id, iteration, task, future}} 'cancelled_tests': set(), 'timeout_tests': set() } self._lock = threading.Lock() self._cancel_event = threading.Event() self._test_timeout = 360 # 1.5 minutos timeout por teste self._active_futures = {} # {thread_id: future} para cancelamento real async def run_test_session(self, session: Dict[str, Any], validation_method: str = 'llm', expected_content: str = None) -> Dict[str, Any]: """ Executa sessão completa de testes Args: session: Dados da sessão de teste validation_method: Método de validação ('llm' ou 'keyword') expected_content: Conteúdo esperado (para validação keyword) Returns: Resultados completos dos testes """ try: print(f"\n🔥 MASSIVE TEST RUNNER INICIADO") print(f"📋 Sessão: {session['id']}") print(f"❓ Pergunta: {session['question']}") print(f"👥 Grupos: {len(session['groups'])}") total_tests = sum(group['iterations'] for group in session['groups']) print(f"🔢 Total de testes: {total_tests}") print(f"⚡ Workers paralelos: {self.max_workers}") print("-" * 60) logging.info(f"🚀 Iniciando sessão de testes: {session['id']}") # Atualiza status with self._lock: self.status.update({ 'current_status': 'initializing', 'start_time': time.time(), 'total_tests': total_tests }) # Armazena informações da sessão self.results['session_info'] = { 'id': session['id'], 'question': session['question'], 'validation_method': validation_method, 'expected_content': expected_content, 'total_groups': len(session['groups']), 'total_tests': self.status['total_tests'], 'started_at': datetime.now().isoformat() } # Executa grupos de teste group_results = [] for group_idx, group in enumerate(session['groups']): print(f"\n📊 EXECUTANDO GRUPO {group_idx + 1}/{len(session['groups'])}") print(f"🤖 Modelo SQL: {group['sql_model_name']}") print(f"🔄 Processing Agent: {'✅ ' + group['processing_model_name'] if group['processing_enabled'] else '❌ Desativado'}") print(f"🔢 Iterações: {group['iterations']}") print(f"⏰ {datetime.now().strftime('%H:%M:%S')}") logging.info(f"📊 Executando grupo {group_idx + 1}/{len(session['groups'])}: {group['sql_model_name']}") with self._lock: self.status['current_group'] = group_idx + 1 self.status['current_status'] = 'running_group' # Executa testes do grupo em paralelo group_result = await self._run_group_tests( session['question'], group, validation_method, expected_content ) group_results.append(group_result) self.results['group_results'] = group_results # Atualiza progresso completed_so_far = sum(len(gr['individual_results']) for gr in group_results) with self._lock: self.status['completed_tests'] = completed_so_far self.status['progress'] = (completed_so_far / self.status['total_tests']) * 100 # Estima tempo restante if self.status['start_time']: elapsed = time.time() - self.status['start_time'] if completed_so_far > 0: avg_time_per_test = elapsed / completed_so_far remaining_tests = self.status['total_tests'] - completed_so_far self.status['estimated_remaining'] = avg_time_per_test * remaining_tests # Gera resumo final self._generate_summary() with self._lock: self.status['current_status'] = 'completed' self.status['progress'] = 100 self.status['end_time'] = time.time() total_time = self.status['end_time'] - self.status['start_time'] self.status['total_execution_time'] = total_time logging.info(f"✅ Sessão de testes concluída: {session['id']}") logging.info(f"📊 Resumo final: {self.status['total_tests']} testes em {total_time:.2f}s") logging.info(f"🎯 Taxa geral de sucesso: {self.results['summary'].get('overall_success_rate', 0)}%") return self.results except Exception as e: logging.error(f"❌ Erro na sessão de testes: {e}") with self._lock: self.status['current_status'] = 'error' self.status['errors'].append(str(e)) raise async def _run_group_tests(self, question: str, group: Dict[str, Any], validation_method: str, expected_content: str) -> Dict[str, Any]: """ Executa testes de um grupo específico com paralelismo REAL Args: question: Pergunta do teste group: Configuração do grupo validation_method: Método de validação expected_content: Conteúdo esperado Returns: Resultados do grupo """ print(f"🔄 Executando {group['iterations']} testes em paralelo (máx {self.max_workers} simultâneos)") logging.info(f"🔄 Executando {group['iterations']} testes para grupo {group['id']}") # Cria semáforo para controle de concorrência semaphore = asyncio.Semaphore(self.max_workers) print(f"⚡ Iniciando {group['iterations']} testes com paralelismo REAL...") start_time = time.time() # VOLTA AO PARALELISMO ORIGINAL QUE FUNCIONAVA print(f"🚀 Executando {group['iterations']} testes em paralelo (máx {self.max_workers} simultâneos)") # Cria tasks para execução paralela (COMO ESTAVA ANTES) semaphore = asyncio.Semaphore(self.max_workers) tasks = [] print(f"⚡ Criando {group['iterations']} tasks paralelas...") for iteration in range(group['iterations']): task = self._run_single_test( semaphore, question, group, iteration + 1, validation_method, expected_content ) tasks.append(task) print(f"🚀 Executando {len(tasks)} testes em paralelo...") start_time = time.time() # Executa testes em paralelo (COMO ESTAVA ANTES) individual_results = await asyncio.gather(*tasks, return_exceptions=True) execution_time = time.time() - start_time print(f"✅ Grupo {group['id']} concluído em {execution_time:.2f}s") execution_time = time.time() - start_time print(f"✅ Grupo {group['id']} concluído em {execution_time:.2f}s ({group['iterations']} testes)") # Filtra resultados válidos valid_results = [] errors = [] for result in individual_results: if isinstance(result, Exception): errors.append(str(result)) logging.error(f"Erro em teste individual: {result}") else: valid_results.append(result) self.results['individual_results'].append(result) # Calcula estatísticas do grupo group_stats = self._calculate_group_stats(valid_results, group) group_stats['errors'] = errors group_stats['error_count'] = len(errors) logging.info(f"✅ Grupo {group['id']} concluído: {len(valid_results)} sucessos, {len(errors)} erros") return group_stats async def _run_single_test(self, semaphore: asyncio.Semaphore, question: str, group: Dict[str, Any], iteration: int, validation_method: str, expected_content: str) -> Dict[str, Any]: """ Executa um teste individual com paralelismo real Args: semaphore: Semáforo para controle de concorrência question: Pergunta do teste group: Configuração do grupo iteration: Número da iteração validation_method: Método de validação expected_content: Conteúdo esperado Returns: Resultado do teste individual """ async with semaphore: try: start_time = time.time() # Cria thread_id único para este teste thread_id = f"test_{group['id']}_{iteration}_{uuid.uuid4().hex[:8]}" # Registra teste como em execução with self._lock: self.status['running_tests'][thread_id] = { 'start_time': start_time, 'group_id': group['id'], 'iteration': iteration, 'question': question[:50] + '...' if len(question) > 50 else question } self.status['current_test'] = thread_id print(f"🔄 [{datetime.now().strftime('%H:%M:%S')}] 🚀 INICIANDO {thread_id} (Worker {asyncio.current_task().get_name() if asyncio.current_task() else 'unknown'})") logging.info(f"🔄 Iniciando teste {thread_id} - Grupo {group['id']}, Iteração {iteration}") # Verifica se foi cancelado antes de começar if thread_id in self.status['cancelled_tests']: print(f"🚫 Teste {thread_id} cancelado antes de iniciar") return self._create_cancelled_result(thread_id, group, iteration, start_time) # Registra task para cancelamento (NOVO) current_task = asyncio.current_task() with self._lock: self._active_futures[thread_id] = current_task # Executa em thread separada para paralelismo real (COMO ESTAVA ANTES) loop = asyncio.get_event_loop() def run_sync_test(): """Executa teste de forma síncrona em thread separada""" try: # Verifica cancelamento antes de iniciar (NOVO) if thread_id in self.status['cancelled_tests']: return {'cancelled': True, 'reason': 'cancelled_before_start'} # Cria novo loop para esta thread new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) # Inicializa AgentGraphManager para este teste graph_manager = AgentGraphManager() # Executa query result = new_loop.run_until_complete( graph_manager.process_query( user_input=question, selected_model=group['sql_model_name'], processing_enabled=group['processing_enabled'], processing_model=group['processing_model_name'] if group['processing_enabled'] else None, thread_id=thread_id ) ) new_loop.close() # Verifica cancelamento após execução (NOVO) if thread_id in self.status['cancelled_tests']: return {'cancelled': True, 'reason': 'cancelled_after_execution'} return result except Exception as e: logging.error(f"Erro em thread separada para {thread_id}: {e}") return {'error': str(e)} # Executa em ThreadPoolExecutor para paralelismo real (COMO ESTAVA ANTES) with ThreadPoolExecutor(max_workers=1) as executor: future = loop.run_in_executor(executor, run_sync_test) # Aguarda com verificação de cancelamento (NOVO) while not future.done(): await asyncio.sleep(0.1) # Verifica a cada 100ms if thread_id in self.status['cancelled_tests']: future.cancel() print(f"🚫 Cancelando future do teste {thread_id}") try: await future except: pass return self._create_cancelled_result(thread_id, group, iteration, start_time, 'user_cancelled') result = await future execution_time = time.time() - start_time # Remove teste da lista de execução e limpa future with self._lock: if thread_id in self.status['running_tests']: del self.status['running_tests'][thread_id] if self.status['current_test'] == thread_id: self.status['current_test'] = None if thread_id in self._active_futures: del self._active_futures[thread_id] # Verifica tipo de resultado if isinstance(result, dict): if result.get('cancelled'): print(f"🚫 [{datetime.now().strftime('%H:%M:%S')}] CANCELADO {thread_id} - {result.get('reason', 'unknown')}") logging.info(f"🚫 Teste {thread_id} cancelado") return self._create_cancelled_result(thread_id, group, iteration, start_time, result.get('reason')) elif result.get('timeout'): print(f"⏰ [{datetime.now().strftime('%H:%M:%S')}] TIMEOUT {thread_id} após {result.get('duration')}s") logging.warning(f"⏰ Teste {thread_id} timeout") return self._create_timeout_result(thread_id, group, iteration, start_time, result.get('duration')) elif result.get('error'): print(f"❌ [{datetime.now().strftime('%H:%M:%S')}] ERRO {thread_id}: {result['error']}") logging.error(f"❌ Teste {thread_id} erro: {result['error']}") print(f"✅ [{datetime.now().strftime('%H:%M:%S')}] 🎉 CONCLUÍDO {thread_id} em {execution_time:.2f}s") logging.info(f"✅ Teste {thread_id} concluído em {execution_time:.2f}s") # Valida resultado validation_result = await self.validator.validate_result( question=question, sql_query=result.get('sql_query_extracted', ''), response=result.get('response', ''), method=validation_method, expected_content=expected_content ) # Monta resultado individual individual_result = { 'group_id': group['id'], 'iteration': iteration, 'thread_id': thread_id, 'timestamp': datetime.now().isoformat(), 'execution_time': round(execution_time, 2), 'question': question, 'sql_model': group['sql_model_name'], 'processing_enabled': group['processing_enabled'], 'processing_model': group['processing_model_name'], 'sql_query': result.get('sql_query_extracted', ''), 'response': result.get('response', ''), 'error': result.get('error'), 'success': not bool(result.get('error')), 'validation': validation_result } # Atualiza progresso with self._lock: self.status['completed_tests'] += 1 progress = (self.status['completed_tests'] / self.status['total_tests']) * 100 self.status['progress'] = progress # Estima tempo restante if self.status['start_time']: elapsed = time.time() - self.status['start_time'] if self.status['completed_tests'] > 0: avg_time_per_test = elapsed / self.status['completed_tests'] remaining_tests = self.status['total_tests'] - self.status['completed_tests'] self.status['estimated_remaining'] = avg_time_per_test * remaining_tests # Print visual do progresso remaining_min = int(self.status['estimated_remaining'] // 60) remaining_sec = int(self.status['estimated_remaining'] % 60) print(f"📊 [{datetime.now().strftime('%H:%M:%S')}] Progresso: {self.status['completed_tests']}/{self.status['total_tests']} ({progress:.1f}%) - Restam ~{remaining_min}m{remaining_sec}s") logging.info(f"📊 Progresso: {self.status['completed_tests']}/{self.status['total_tests']} ({progress:.1f}%)") return individual_result except Exception as e: logging.error(f"❌ Erro em teste individual (grupo {group['id']}, iteração {iteration}): {e}") # Atualiza progresso mesmo com erro with self._lock: self.status['completed_tests'] += 1 self.status['errors'].append(f"Grupo {group['id']}, Iteração {iteration}: {e}") return { 'group_id': group['id'], 'iteration': iteration, 'thread_id': f"error_{group['id']}_{iteration}", 'timestamp': datetime.now().isoformat(), 'execution_time': time.time() - start_time, 'question': question, 'sql_model': group['sql_model_name'], 'processing_enabled': group['processing_enabled'], 'processing_model': group['processing_model_name'], 'sql_query': '', 'response': '', 'error': str(e), 'success': False, 'validation': {'valid': False, 'score': 0, 'reason': f'Erro de execução: {e}'} } def _calculate_group_stats(self, results: List[Dict[str, Any]], group: Dict[str, Any]) -> Dict[str, Any]: """ Calcula estatísticas de um grupo Args: results: Resultados individuais do grupo group: Configuração do grupo Returns: Estatísticas do grupo """ if not results: return { 'group_id': group['id'], 'group_config': group, 'total_tests': 0, 'success_rate': 0, 'validation_rate': 0, 'consistency_rate': 0, 'avg_execution_time': 0, 'individual_results': [] } total_tests = len(results) successful_tests = sum(1 for r in results if r.get('success', False)) valid_responses = sum(1 for r in results if r.get('validation', {}).get('valid', False)) # Calcula consistência (respostas similares) responses = [r.get('response', '') for r in results if r.get('success', False)] sql_queries = [r.get('sql_query', '') for r in results if r.get('success', False)] response_consistency = self._calculate_consistency(responses) sql_consistency = self._calculate_consistency(sql_queries) avg_execution_time = sum(r.get('execution_time', 0) for r in results) / total_tests return { 'group_id': group['id'], 'group_config': group, 'total_tests': total_tests, 'successful_tests': successful_tests, 'valid_responses': valid_responses, 'success_rate': round((successful_tests / total_tests) * 100, 2), 'validation_rate': round((valid_responses / total_tests) * 100, 2), 'response_consistency': round(response_consistency * 100, 2), 'sql_consistency': round(sql_consistency * 100, 2), 'avg_execution_time': round(avg_execution_time, 2), 'individual_results': results } def _calculate_consistency(self, items: List[str]) -> float: """ Calcula taxa de consistência entre itens Args: items: Lista de strings para comparar Returns: Taxa de consistência (0-1) """ if len(items) <= 1: return 1.0 # Conta ocorrências únicas unique_items = set(items) most_common_count = max(items.count(item) for item in unique_items) return most_common_count / len(items) def _generate_summary(self): """Gera resumo geral dos testes""" group_results = self.results.get('group_results', []) if not group_results: self.results['summary'] = {} return total_tests = sum(gr['total_tests'] for gr in group_results) total_successful = sum(gr['successful_tests'] for gr in group_results) total_valid = sum(gr['valid_responses'] for gr in group_results) avg_success_rate = sum(gr['success_rate'] for gr in group_results) / len(group_results) avg_validation_rate = sum(gr['validation_rate'] for gr in group_results) / len(group_results) avg_response_consistency = sum(gr['response_consistency'] for gr in group_results) / len(group_results) avg_sql_consistency = sum(gr['sql_consistency'] for gr in group_results) / len(group_results) self.results['summary'] = { 'total_groups': len(group_results), 'total_tests': total_tests, 'total_successful': total_successful, 'total_valid': total_valid, 'overall_success_rate': round((total_successful / total_tests) * 100, 2), 'overall_validation_rate': round((total_valid / total_tests) * 100, 2), 'avg_response_consistency': round(avg_response_consistency, 2), 'avg_sql_consistency': round(avg_sql_consistency, 2), 'best_performing_group': max(group_results, key=lambda x: x['validation_rate']), 'most_consistent_group': max(group_results, key=lambda x: x['response_consistency']) } def get_status(self) -> Dict[str, Any]: """Retorna status atual dos testes""" with self._lock: status = self.status.copy() # Adiciona informações dos testes em execução status['running_tests_count'] = len(self.status['running_tests']) status['running_tests_details'] = list(self.status['running_tests'].values()) return status def cancel_current_test(self, thread_id: str = None) -> bool: """ Cancela teste específico ou o mais antigo em execução Args: thread_id: ID do teste específico para cancelar (opcional) Returns: True se cancelou algum teste """ with self._lock: if thread_id: if thread_id in self.status['running_tests']: self.status['cancelled_tests'].add(thread_id) print(f"🚫 Teste {thread_id} marcado para cancelamento") logging.info(f"Teste {thread_id} cancelado pelo usuário") return True else: # Cancela o teste mais antigo if self.status['running_tests']: oldest_test = min( self.status['running_tests'].items(), key=lambda x: x[1]['start_time'] ) thread_id = oldest_test[0] self.status['cancelled_tests'].add(thread_id) print(f"🚫 Teste mais antigo {thread_id} marcado para cancelamento") logging.info(f"Teste mais antigo {thread_id} cancelado pelo usuário") return True return False def cancel_all_tests(self) -> int: """ Cancela todos os testes em execução Returns: Número de testes cancelados """ with self._lock: running_count = len(self.status['running_tests']) for thread_id in self.status['running_tests'].keys(): self.status['cancelled_tests'].add(thread_id) print(f"🚫 {running_count} testes marcados para cancelamento") logging.info(f"{running_count} testes cancelados pelo usuário") return running_count def skip_stuck_tests(self, max_duration: int = 120) -> int: """ Marca testes travados (que excedem tempo limite) para cancelamento Args: max_duration: Tempo máximo em segundos Returns: Número de testes marcados como travados """ current_time = time.time() stuck_count = 0 with self._lock: for thread_id, test_info in self.status['running_tests'].items(): if current_time - test_info['start_time'] > max_duration: if thread_id not in self.status['cancelled_tests']: self.status['timeout_tests'].add(thread_id) self.status['cancelled_tests'].add(thread_id) stuck_count += 1 print(f"⏰ Teste {thread_id} marcado como travado (>{max_duration}s)") logging.warning(f"Teste {thread_id} travado - timeout após {max_duration}s") return stuck_count def _create_cancelled_result(self, thread_id: str, group: Dict[str, Any], iteration: int, start_time: float, reason: str = 'user_cancelled') -> Dict[str, Any]: """Cria resultado para teste cancelado""" execution_time = time.time() - start_time return { 'thread_id': thread_id, 'group_id': group['id'], 'iteration': iteration, 'success': False, 'cancelled': True, 'cancel_reason': reason, 'execution_time': execution_time, 'sql_query': None, 'final_response': f"Teste cancelado: {reason}", 'validation_valid': False, 'validation_score': 0, 'error': None, 'timestamp': datetime.now().isoformat() } def _create_timeout_result(self, thread_id: str, group: Dict[str, Any], iteration: int, start_time: float, duration: int) -> Dict[str, Any]: """Cria resultado para teste com timeout""" execution_time = time.time() - start_time return { 'thread_id': thread_id, 'group_id': group['id'], 'iteration': iteration, 'success': False, 'timeout': True, 'timeout_duration': duration, 'execution_time': execution_time, 'sql_query': None, 'final_response': f"Teste travado - timeout após {duration}s", 'validation_valid': False, 'validation_score': 0, 'error': f"Timeout após {duration}s", 'timestamp': datetime.now().isoformat() } def get_results(self) -> Dict[str, Any]: """Retorna resultados dos testes""" return self.results