Spaces:
Running
Running
| """ | |
| MELODIX - Test de Integración | |
| Prueba el flujo completo: registro, login, subida, procesamiento y descarga. | |
| Uso: python tests/integration_test.py | |
| """ | |
| import requests | |
| import os | |
| import sys | |
| import time | |
| # ========================================== | |
| # CONFIGURACIÓN | |
| # ========================================== | |
| BASE_URL = os.getenv("TEST_BASE_URL", "http://127.0.0.1:8000") | |
| TEST_USER = "testuser_integration" | |
| TEST_PASS = "password123" | |
| TEST_EMAIL = "test_integration@example.com" | |
| TEST_FILE = "tests/test.wav" | |
| # Crear archivo de test si no existe | |
| if not os.path.exists("tests"): | |
| os.makedirs("tests") | |
| # ========================================== | |
| # FUNCIONES AUXILIARES | |
| # ========================================== | |
| def create_test_audio(): | |
| """Crea un archivo WAV válido para testing""" | |
| import wave | |
| import struct | |
| duration = 2 # segundos | |
| sample_rate = 44100 | |
| frequency = 440 # Hz (A4) | |
| num_samples = int(sample_rate * duration) | |
| with wave.open(TEST_FILE, 'w') as wav_file: | |
| wav_file.setnchannels(1) # Mono | |
| wav_file.setsampwidth(2) # 16 bits | |
| wav_file.setframerate(sample_rate) | |
| for i in range(num_samples): | |
| # Generar onda sinusoidal | |
| value = int(32767.0 * 0.5 * 1) # Amplitud al 50% | |
| data = struct.pack('<h', value) | |
| wav_file.writeframesraw(data) | |
| print(f" Archivo de test creado: {TEST_FILE}") | |
| def print_separator(title=""): | |
| print("\n" + "=" * 60) | |
| if title: | |
| print(f" {title}") | |
| print("=" * 60) | |
| # ========================================== | |
| # TEST DE INTEGRACIÓN | |
| # ========================================== | |
| def test_integration(): | |
| print_separator("MELODIX - TEST DE INTEGRACIÓN") | |
| print(f"Base URL: {BASE_URL}") | |
| # Crear archivo de test | |
| print("\n[0] Creando archivo de audio de test...") | |
| try: | |
| create_test_audio() | |
| print(" ✓ Archivo creado exitosamente") | |
| except Exception as e: | |
| print(f" ✗ Error creando archivo: {e}") | |
| return | |
| # ========================================== | |
| # TEST 1: REGISTRO | |
| # ========================================== | |
| print_separator("TEST 1: REGISTRO") | |
| payload = { | |
| "username": TEST_USER, | |
| "email": TEST_EMAIL, | |
| "password": TEST_PASS | |
| } | |
| try: | |
| res = requests.post(f"{BASE_URL}/auth/register", json=payload, timeout=10) | |
| if res.status_code == 200: | |
| print(f" ✓ Registro exitoso") | |
| user_data = res.json() | |
| print(f" User ID: {user_data.get('id')}") | |
| print(f" Créditos iniciales: {user_data.get('credits')}") | |
| elif res.status_code == 400: | |
| print(f" ⚠ Usuario ya existe (esperado en reintentos)") | |
| else: | |
| print(f" ✗ FALLO Registro: {res.status_code} - {res.text}") | |
| return | |
| except requests.exceptions.ConnectionError: | |
| print(" ✗ ERROR DE CONEXIÓN: ¿Está corriendo la API?") | |
| print(" Ejecuta: iniciar_melodix.bat") | |
| return | |
| except Exception as e: | |
| print(f" ✗ ERROR: {e}") | |
| return | |
| # ========================================== | |
| # TEST 2: LOGIN | |
| # ========================================== | |
| print_separator("TEST 2: LOGIN") | |
| data = { | |
| "username": TEST_USER, | |
| "password": TEST_PASS, | |
| "device_id": "test_device_001", | |
| "device_name": "Integration Test Script" | |
| } | |
| try: | |
| res = requests.post(f"{BASE_URL}/auth/token", data=data, timeout=10) | |
| if res.status_code != 200: | |
| print(f" ✗ FALLO Login: {res.status_code} - {res.text}") | |
| return | |
| token_data = res.json() | |
| token = token_data["access_token"] | |
| print(f" ✓ Token obtenido exitosamente") | |
| print(f" Token type: {token_data.get('token_type')}") | |
| print(f" Expires in: {token_data.get('expires_in')}s") | |
| except Exception as e: | |
| print(f" ✗ ERROR LOGIN: {e}") | |
| return | |
| headers = {"Authorization": f"Bearer {token}"} | |
| # ========================================== | |
| # TEST 3: VERIFICAR CRÉDITOS | |
| # ========================================== | |
| print_separator("TEST 3: PERFIL DE USUARIO") | |
| try: | |
| res = requests.get(f"{BASE_URL}/auth/users/me", headers=headers, timeout=10) | |
| user_data = res.json() | |
| print(f" Username: {user_data.get('username')}") | |
| print(f" Email: {user_data.get('email')}") | |
| print(f" Créditos: {user_data.get('credits')}") | |
| credits_before = user_data.get('credits', 0) | |
| if credits_before < 1: | |
| print(" ✗ SIN CRÉDITOS SUFICIENTES PARA PRUEBA DE SUBIDA") | |
| print(" Tip: Ejecuta en la DB: UPDATE users SET credits = 50 WHERE username = 'testuser_integration';") | |
| return | |
| except Exception as e: | |
| print(f" ✗ ERROR: {e}") | |
| return | |
| # ========================================== | |
| # TEST 4: SUBIR CANCIÓN | |
| # ========================================== | |
| print_separator("TEST 4: SUBIDA DE CANCIÓN") | |
| try: | |
| with open(TEST_FILE, "rb") as f: | |
| files = {"file": f} | |
| data_upload = { | |
| "stems": 2, | |
| "device_id": "test_device_001", | |
| "trim_start": "", | |
| "trim_end": "" | |
| } | |
| print(f" Subiendo archivo: {TEST_FILE}") | |
| res = requests.post( | |
| f"{BASE_URL}/songs/upload", | |
| headers=headers, | |
| files=files, | |
| data=data_upload, | |
| timeout=30 | |
| ) | |
| if res.status_code == 200: | |
| data_res = res.json() | |
| celery_id = data_res.get('celery_id') | |
| task_id = data_res.get('task_id') | |
| print(f" ✓ SUBIDA EXITOSA") | |
| print(f" Task ID: {task_id}") | |
| print(f" Celery ID: {celery_id}") | |
| print(f" Créditos restantes: {data_res.get('credits_left')}") | |
| else: | |
| print(f" ✗ FALLO SUBIDA: {res.status_code} - {res.text}") | |
| return | |
| except Exception as e: | |
| print(f" ✗ ERROR SUBIDA: {e}") | |
| return | |
| # ========================================== | |
| # TEST 5: VERIFICAR DESCUENTO DE CRÉDITO | |
| # ========================================== | |
| print_separator("TEST 5: VERIFICAR CRÉDITOS") | |
| try: | |
| res = requests.get(f"{BASE_URL}/auth/users/me", headers=headers, timeout=10) | |
| credits_after = res.json().get('credits', 0) | |
| print(f" Créditos antes: {credits_before}") | |
| print(f" Créditos después: {credits_after}") | |
| if credits_after == credits_before - 1: | |
| print(f" ✓ Crédito descontado correctamente") | |
| else: | |
| print(f" ✗ ERROR: Créditos no descontados correctamente") | |
| except Exception as e: | |
| print(f" ✗ ERROR: {e}") | |
| # ========================================== | |
| # TEST 6: CONSULTAR ESTADO | |
| # ========================================== | |
| print_separator("TEST 6: ESTADO DE LA TAREA") | |
| if not celery_id: | |
| print(" ✗ No hay Celery ID para consultar") | |
| return | |
| max_attempts = 30 | |
| attempt = 0 | |
| while attempt < max_attempts: | |
| try: | |
| res = requests.get( | |
| f"{BASE_URL}/songs/status/{celery_id}", | |
| headers=headers, | |
| timeout=10 | |
| ) | |
| status_data = res.json() | |
| estado = status_data.get('estado') | |
| progreso = status_data.get('progreso', 0) | |
| mensaje = status_data.get('mensaje', '') | |
| print(f" Intento {attempt + 1}/{max_attempts}: Estado = {estado}, Progreso = {progreso}%") | |
| if estado == "SUCCESS": | |
| print(f" ✓ TAREA COMPLETADA EXITOSAMENTE") | |
| print(f" Folder: {status_data.get('folder')}") | |
| resultado = status_data.get('resultado') | |
| if resultado: | |
| print(f" BPM: {resultado.get('bpm')}") | |
| print(f" Acordes: {len(resultado.get('acordes', []))} detectados") | |
| break | |
| elif estado == "FAILURE": | |
| print(f" ✗ TAREA FALLÓ: {mensaje}") | |
| break | |
| elif estado == "PROGRESS": | |
| print(f" → Progreso: {mensaje}") | |
| else: | |
| print(f" → Estado: {estado}") | |
| attempt += 1 | |
| time.sleep(2) | |
| except Exception as e: | |
| print(f" ✗ ERROR CONSULTANDO ESTADO: {e}") | |
| attempt += 1 | |
| time.sleep(2) | |
| if attempt >= max_attempts: | |
| print(f" ⚠ Timeout: La tarea tardó más de {max_attempts * 2} segundos") | |
| # ========================================== | |
| # TEST 7: HISTORIAL | |
| # ========================================== | |
| print_separator("TEST 7: HISTORIAL DE CANCIONES") | |
| try: | |
| res = requests.get( | |
| f"{BASE_URL}/songs/history", | |
| headers=headers, | |
| timeout=10 | |
| ) | |
| if res.status_code == 200: | |
| history = res.json() | |
| print(f" Total canciones: {history.get('total')}") | |
| tasks = history.get('tasks', []) | |
| if tasks: | |
| print(f" Última canción:") | |
| print(f" - Nombre: {tasks[0].get('song_name')}") | |
| print(f" - Stems: {tasks[0].get('stems')}") | |
| print(f" - Estado: {tasks[0].get('status')}") | |
| print(f" ✓ Historial obtenido correctamente") | |
| else: | |
| print(f" ✗ FALLO HISTORIAL: {res.status_code}") | |
| except Exception as e: | |
| print(f" ✗ ERROR: {e}") | |
| # ========================================== | |
| # RESUMEN FINAL | |
| # ========================================== | |
| print_separator("RESUMEN DE TESTS") | |
| print(" ✓ Registro/Login") | |
| print(" ✓ Verificación de créditos") | |
| print(" ✓ Subida de archivo") | |
| print(" ✓ Descuento de créditos") | |
| print(" ✓ Consulta de estado") | |
| print(" ✓ Historial de canciones") | |
| print_separator("¡TESTS COMPLETADOS!") | |
| print() | |
| if __name__ == "__main__": | |
| test_integration() | |