DataGraph / testes /app_teste.py
rwayz's picture
ss
7094511
#!/usr/bin/env python3
"""
Sistema de Testes Massivos para AgentGraph
Interface HTML isolada para testes de consistência e performance
"""
import os
import sys
import asyncio
import logging
from flask import Flask, render_template, request, jsonify, send_file
from datetime import datetime
import json
# Adiciona o diretório pai ao path para importar módulos do AgentGraph
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from testes.test_runner import MassiveTestRunner
from testes.test_validator import TestValidator
from testes.report_generator import ReportGenerator
from utils.config import AVAILABLE_MODELS
# Configuração do Flask
app = Flask(__name__,
template_folder='templates',
static_folder='static')
# Configuração de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Instância global do test runner
test_runner = None
current_test_session = None
@app.route('/')
def index():
"""Página principal do sistema de testes"""
return render_template('index.html',
available_models=AVAILABLE_MODELS)
@app.route('/api/models')
def get_models():
"""Retorna modelos disponíveis"""
return jsonify({
'success': True,
'models': AVAILABLE_MODELS
})
@app.route('/api/create_test_session', methods=['POST'])
def create_test_session():
"""Cria nova sessão de teste"""
global current_test_session
try:
data = request.json
question = data.get('question', '').strip()
if not question:
return jsonify({
'success': False,
'error': 'Pergunta é obrigatória'
})
# Cria nova sessão
current_test_session = {
'id': datetime.now().strftime('%Y%m%d_%H%M%S'),
'question': question,
'groups': [],
'created_at': datetime.now().isoformat(),
'status': 'created'
}
logging.info(f"Nova sessão de teste criada: {current_test_session['id']}")
return jsonify({
'success': True,
'session_id': current_test_session['id'],
'message': 'Sessão de teste criada com sucesso'
})
except Exception as e:
logging.error(f"Erro ao criar sessão de teste: {e}")
return jsonify({
'success': False,
'error': str(e)
})
@app.route('/api/add_test_group', methods=['POST'])
def add_test_group():
"""Adiciona grupo de teste à sessão atual"""
global current_test_session
try:
if not current_test_session:
return jsonify({
'success': False,
'error': 'Nenhuma sessão de teste ativa'
})
data = request.json
# Validação dos dados
required_fields = ['sql_model', 'iterations']
for field in required_fields:
if field not in data:
return jsonify({
'success': False,
'error': f'Campo obrigatório: {field}'
})
sql_model = data['sql_model']
iterations = int(data['iterations'])
processing_enabled = data.get('processing_enabled', False)
processing_model = data.get('processing_model', None)
# Validações
if sql_model not in AVAILABLE_MODELS.values():
return jsonify({
'success': False,
'error': 'Modelo SQL inválido'
})
if iterations < 1 or iterations > 100:
return jsonify({
'success': False,
'error': 'Número de iterações deve ser entre 1 e 100'
})
if processing_enabled and processing_model not in AVAILABLE_MODELS.values():
return jsonify({
'success': False,
'error': 'Modelo de processamento inválido'
})
# Cria grupo
group = {
'id': len(current_test_session['groups']) + 1,
'sql_model': sql_model,
'sql_model_name': next(k for k, v in AVAILABLE_MODELS.items() if v == sql_model),
'processing_enabled': processing_enabled,
'processing_model': processing_model,
'processing_model_name': next((k for k, v in AVAILABLE_MODELS.items() if v == processing_model), None) if processing_model else None,
'iterations': iterations,
'created_at': datetime.now().isoformat()
}
current_test_session['groups'].append(group)
logging.info(f"Grupo adicionado à sessão {current_test_session['id']}: {group}")
return jsonify({
'success': True,
'group': group,
'total_groups': len(current_test_session['groups']),
'message': 'Grupo adicionado com sucesso'
})
except Exception as e:
logging.error(f"Erro ao adicionar grupo: {e}")
return jsonify({
'success': False,
'error': str(e)
})
@app.route('/api/get_session_info')
def get_session_info():
"""Retorna informações da sessão atual"""
global current_test_session
if not current_test_session:
return jsonify({
'success': False,
'error': 'Nenhuma sessão ativa'
})
return jsonify({
'success': True,
'session': current_test_session
})
@app.route('/api/run_tests', methods=['POST'])
def run_tests():
"""Executa os testes da sessão atual"""
global test_runner, current_test_session
try:
if not current_test_session:
return jsonify({
'success': False,
'error': 'Nenhuma sessão de teste ativa'
})
if not current_test_session['groups']:
return jsonify({
'success': False,
'error': 'Nenhum grupo de teste configurado'
})
data = request.json
validation_method = data.get('validation_method', 'llm')
expected_content = data.get('expected_content', '') if validation_method == 'keyword' else None
# Atualiza status
current_test_session['status'] = 'running'
current_test_session['started_at'] = datetime.now().isoformat()
# Cria test runner
test_runner = MassiveTestRunner(max_workers=5) # Paralelismo de 5 workers
# Executa testes de forma assíncrona
def run_async_tests():
print(f"\n🚀 INICIANDO EXECUÇÃO DE TESTES - {datetime.now().strftime('%H:%M:%S')}")
print(f"📊 Sessão: {current_test_session['id']}")
print(f"❓ Pergunta: {current_test_session['question']}")
print(f"👥 Grupos: {len(current_test_session['groups'])}")
print(f"🔢 Total de testes: {sum(g['iterations'] for g in current_test_session['groups'])}")
print("=" * 60)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
test_runner.run_test_session(
current_test_session,
validation_method,
expected_content
)
)
# Marca como concluído após execução
current_test_session['status'] = 'completed'
current_test_session['completed_at'] = datetime.now().isoformat()
print(f"\n🎉 TESTES CONCLUÍDOS COM SUCESSO - {datetime.now().strftime('%H:%M:%S')}")
print(f"✅ Sessão: {current_test_session['id']}")
print("=" * 60)
logging.info(f"✅ Testes concluídos com sucesso: {current_test_session['id']}")
return result
except Exception as e:
current_test_session['status'] = 'error'
current_test_session['error'] = str(e)
print(f"\n❌ ERRO NA EXECUÇÃO DOS TESTES - {datetime.now().strftime('%H:%M:%S')}")
print(f"💥 Erro: {e}")
print("=" * 60)
logging.error(f"❌ Erro na execução dos testes: {e}")
raise
finally:
loop.close()
# Inicia execução em thread separada
import threading
test_thread = threading.Thread(target=run_async_tests)
test_thread.daemon = False # Não daemon para garantir conclusão
test_thread.start()
return jsonify({
'success': True,
'message': 'Testes iniciados com sucesso',
'session_id': current_test_session['id']
})
except Exception as e:
logging.error(f"Erro ao executar testes: {e}")
current_test_session['status'] = 'error'
return jsonify({
'success': False,
'error': str(e)
})
@app.route('/api/test_status')
def get_test_status():
"""Retorna status atual dos testes"""
global test_runner, current_test_session
if not current_test_session:
return jsonify({
'success': False,
'error': 'Nenhuma sessão ativa'
})
status_info = {
'session_id': current_test_session['id'],
'status': current_test_session.get('status', 'unknown'),
'progress': 0,
'current_group': None,
'completed_tests': 0,
'total_tests': sum(group['iterations'] for group in current_test_session['groups']),
'estimated_remaining': None
}
if test_runner:
try:
runner_status = test_runner.get_status()
status_info.update(runner_status)
# Se o runner indica que terminou, atualiza a sessão
if runner_status.get('current_status') == 'completed' and current_test_session.get('status') != 'completed':
current_test_session['status'] = 'completed'
current_test_session['completed_at'] = datetime.now().isoformat()
logging.info(f"🎉 Sessão {current_test_session['id']} marcada como concluída")
except Exception as e:
logging.error(f"Erro ao obter status do runner: {e}")
status_info['error'] = str(e)
# Adiciona informações de testes em execução
if test_runner:
status_info['running_tests'] = runner_status.get('running_tests_details', [])
status_info['running_tests_count'] = runner_status.get('running_tests_count', 0)
status_info['current_test'] = runner_status.get('current_test')
status_info['cancelled_tests'] = len(runner_status.get('cancelled_tests', []))
status_info['timeout_tests'] = len(runner_status.get('timeout_tests', []))
return jsonify({
'success': True,
'status': status_info
})
@app.route('/api/cancel_test', methods=['POST'])
def cancel_current_test():
"""Cancela o teste atual ou específico"""
global test_runner
if not test_runner:
return jsonify({
'success': False,
'error': 'Nenhum teste em execução'
})
try:
data = request.get_json() or {}
thread_id = data.get('thread_id') # Opcional - cancela teste específico
cancelled = test_runner.cancel_current_test(thread_id)
if cancelled:
return jsonify({
'success': True,
'message': f'Teste {"específico" if thread_id else "atual"} cancelado com sucesso',
'cancelled_test': thread_id
})
else:
return jsonify({
'success': False,
'error': 'Nenhum teste encontrado para cancelar'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erro ao cancelar teste: {str(e)}'
})
@app.route('/api/cancel_all_tests', methods=['POST'])
def cancel_all_tests():
"""Cancela todos os testes em execução"""
global test_runner
if not test_runner:
return jsonify({
'success': False,
'error': 'Nenhum teste em execução'
})
try:
cancelled_count = test_runner.cancel_all_tests()
return jsonify({
'success': True,
'message': f'{cancelled_count} testes cancelados',
'cancelled_count': cancelled_count
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erro ao cancelar testes: {str(e)}'
})
@app.route('/api/skip_stuck_tests', methods=['POST'])
def skip_stuck_tests():
"""Marca testes travados para cancelamento"""
global test_runner
if not test_runner:
return jsonify({
'success': False,
'error': 'Nenhum teste em execução'
})
try:
data = request.get_json() or {}
max_duration = data.get('max_duration', 120) # 2 minutos padrão
stuck_count = test_runner.skip_stuck_tests(max_duration)
return jsonify({
'success': True,
'message': f'{stuck_count} testes travados marcados para cancelamento',
'stuck_count': stuck_count,
'max_duration': max_duration
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erro ao marcar testes travados: {str(e)}'
})
@app.route('/api/test_results')
def get_test_results():
"""Retorna resultados dos testes"""
global test_runner, current_test_session
if not test_runner:
return jsonify({
'success': False,
'error': 'Nenhum teste executado'
})
if not current_test_session:
return jsonify({
'success': False,
'error': 'Nenhuma sessão ativa'
})
try:
results = test_runner.get_results()
# Verifica se há resultados válidos
if not results or not results.get('group_results'):
return jsonify({
'success': False,
'error': 'Resultados ainda não disponíveis'
})
logging.info(f"📊 Enviando resultados: {len(results.get('group_results', []))} grupos, {len(results.get('individual_results', []))} testes individuais")
return jsonify({
'success': True,
'results': results
})
except Exception as e:
logging.error(f"Erro ao obter resultados: {e}")
return jsonify({
'success': False,
'error': str(e)
})
@app.route('/api/download_csv')
def download_csv():
"""Gera e baixa relatório CSV"""
global test_runner
if not test_runner:
return jsonify({
'success': False,
'error': 'Nenhum teste executado'
})
try:
# Gera relatório
report_generator = ReportGenerator()
csv_path = report_generator.generate_csv_report(test_runner.get_results())
return send_file(
csv_path,
as_attachment=True,
download_name=f'teste_agentgraph_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
)
except Exception as e:
logging.error(f"Erro ao gerar CSV: {e}")
return jsonify({
'success': False,
'error': str(e)
})
if __name__ == '__main__':
print("🧪 Sistema de Testes Massivos - AgentGraph")
print("=" * 50)
print("🌐 Acesse: http://localhost:5001")
print("📊 Interface de testes disponível")
print("=" * 50)
app.run(
host='0.0.0.0',
port=5001,
debug=True,
threaded=True
)