Spaces:
Running
Running
| """ | |
| MELODIX - Test de Integracion para el Flujo de Migracion de Dispositivo | |
| Este script verifica: | |
| 1. Registro e inicio de sesion de un usuario de prueba. | |
| 2. Inicio de una sesion de migracion y subida del mapa de metadatos. | |
| 3. Subida fisica de stems individuales (.flac y .json). | |
| 4. Finalizacion de la copia de seguridad. | |
| 5. Verificacion del estado del respaldo. | |
| 6. Recuperacion de presigned URLs y metadatos. | |
| 7. Confirmacion de descarga y limpieza automatica en B2. | |
| Uso: python tests/test_migration_flow.py | |
| """ | |
| import requests | |
| import os | |
| import time | |
| import uuid | |
| BASE_URL = os.getenv("TEST_BASE_URL", "http://127.0.0.1:8000") | |
| unique_id = str(uuid.uuid4())[:8] | |
| TEST_USER = f"testuser_mig_{unique_id}" | |
| TEST_PASS = "migrationpass123" | |
| TEST_EMAIL = f"test_migration_{unique_id}@example.com" | |
| def print_separator(title=""): | |
| print("\n" + "=" * 60) | |
| if title: | |
| print(f" {title}") | |
| print("=" * 60) | |
| def run_migration_test(): | |
| print_separator("TEST DE MIGRACION DE DISPOSITIVO") | |
| print(f"Base URL: {BASE_URL}") | |
| # 1. Registro | |
| print("\n[1] Registrando usuario de prueba...") | |
| payload = { | |
| "username": TEST_USER, | |
| "email": TEST_EMAIL, | |
| "password": TEST_PASS | |
| } | |
| res = requests.post(f"{BASE_URL}/auth/register", json=payload, timeout=10) | |
| if res.status_code == 200: | |
| print(" [OK] Registro exitoso") | |
| elif res.status_code == 400: | |
| print(" [WARN] El usuario ya existe (esperado en ejecuciones repetidas)") | |
| else: | |
| print(f" [FAIL] Error en registro: {res.status_code} - {res.text}") | |
| return False | |
| # 2. Login | |
| print("\n[2] Iniciando sesion...") | |
| data = { | |
| "username": TEST_EMAIL, | |
| "password": TEST_PASS, | |
| "device_id": "test_migration_device_001", | |
| "device_name": "Migration Test Script" | |
| } | |
| res = requests.post(f"{BASE_URL}/auth/token", data=data, timeout=10) | |
| if res.status_code != 200: | |
| print(f" [FAIL] Error en login: {res.status_code} - {res.text}") | |
| return False | |
| token = res.json()["access_token"] | |
| headers = {"Authorization": f"Bearer {token}"} | |
| print(" [OK] Token obtenido exitosamente") | |
| # 3. Inicializar respaldo (POST /migration/backup) | |
| print("\n[3] Creando sesion de respaldo (POST /migration/backup)...") | |
| dummy_meta = { | |
| "task_123": { | |
| "task_id": "task_123", | |
| "songName": "Cancion de Prueba", | |
| "artist": "Artista de Prueba", | |
| "stems": ["vocals", "vocals_pitch"] | |
| } | |
| } | |
| res = requests.post(f"{BASE_URL}/migration/backup", json={"metadata_json": dummy_meta}, headers=headers, timeout=10) | |
| if res.status_code != 200: | |
| print(f" [FAIL] Error creando sesion: {res.status_code} - {res.text}") | |
| return False | |
| backup_data = res.json() | |
| backup_id = backup_data["backup_id"] | |
| print(f" [OK] Sesion de respaldo creada. Backup ID: {backup_id}") | |
| # Create dummy files | |
| dummy_flac_content = b"fake flac audio content" | |
| dummy_json_content = b'{"pitch_data": [1.0, 1.2, 1.1]}' | |
| # 4. Subir archivos (POST /migration/upload/...) | |
| print("\n[4] Subiendo archivo stems (vocals.flac)...") | |
| res = requests.post( | |
| f"{BASE_URL}/migration/upload/{backup_id}/task_123/vocals", | |
| files={"file": ("vocals.flac", dummy_flac_content, "audio/flac")}, | |
| headers=headers, | |
| timeout=15 | |
| ) | |
| if res.status_code != 200: | |
| print(f" [FAIL] Error subiendo vocals.flac: {res.status_code} - {res.text}") | |
| return False | |
| print(" [OK] vocals.flac subido correctamente") | |
| print("\n[5] Subiendo archivo afinacion (vocals_pitch.json)...") | |
| res = requests.post( | |
| f"{BASE_URL}/migration/upload/{backup_id}/task_123/vocals_pitch", | |
| files={"file": ("vocals_pitch.json", dummy_json_content, "application/json")}, | |
| headers=headers, | |
| timeout=15 | |
| ) | |
| if res.status_code != 200: | |
| print(f" [FAIL] Error subiendo vocals_pitch.json: {res.status_code} - {res.text}") | |
| return False | |
| print(" [OK] vocals_pitch.json subido correctamente") | |
| # 6. Finalizar respaldo (POST /migration/finalize) | |
| print("\n[6] Finalizando sesion de respaldo...") | |
| res = requests.post(f"{BASE_URL}/migration/finalize/{backup_id}", headers=headers, timeout=10) | |
| if res.status_code != 200: | |
| print(f" [FAIL] Error al finalizar respaldo: {res.status_code} - {res.text}") | |
| return False | |
| print(" [OK] Sesion de respaldo finalizada correctamente") | |
| # 7. Consultar estado (GET /migration/status) | |
| print("\n[7] Consultando estado del respaldo (GET /migration/status)...") | |
| res = requests.get(f"{BASE_URL}/migration/status", headers=headers, timeout=10) | |
| if res.status_code != 200: | |
| print(f" [FAIL] Error al consultar estado: {res.status_code} - {res.text}") | |
| return False | |
| status_info = res.json() | |
| print(" Respuesta del estado:", status_info) | |
| if not status_info.get("backup_available"): | |
| print(" [FAIL] ERROR: No hay respaldo disponible en el estado") | |
| return False | |
| if status_info.get("files_count") != 2: | |
| print(f" [FAIL] ERROR: Recuento de archivos incorrecto ({status_info.get('files_count')} en lugar de 2)") | |
| return False | |
| print(" [OK] Estado del respaldo verificado exitosamente") | |
| # 8. Restaurar respaldo (GET /migration/restore) | |
| print("\n[8] Restaurando respaldo (GET /migration/restore)...") | |
| res = requests.get(f"{BASE_URL}/migration/restore", headers=headers, timeout=10) | |
| if res.status_code != 200: | |
| print(f" [FAIL] Error al restaurar: {res.status_code} - {res.text}") | |
| return False | |
| restore_info = res.json() | |
| print(" [OK] Datos de restauracion obtenidos") | |
| print(" Metadata recuperada:", restore_info.get("metadata_json")) | |
| files = restore_info.get("files", []) | |
| print(f" Archivos a descargar ({len(files)}):") | |
| for f in files: | |
| print(f" - {f['task_id']}/{f['stem_name']}: {f['url'][:60]}...") | |
| # Descargar de forma simulada para validar la firma de la URL | |
| dl_res = requests.get(f['url'], timeout=10) | |
| if dl_res.status_code != 200: | |
| print(f" [FAIL] ERROR al descargar archivo desde URL firmada: {dl_res.status_code}") | |
| return False | |
| print(" [OK] Descarga simulada OK") | |
| # 9. Completar migracion (POST /migration/complete) | |
| print("\n[9] Confirmando restauracion completa (POST /migration/complete)...") | |
| res = requests.post(f"{BASE_URL}/migration/complete", headers=headers, timeout=10) | |
| if res.status_code != 200: | |
| print(f" [FAIL] Error al completar restauracion: {res.status_code} - {res.text}") | |
| return False | |
| print(" [OK] Restauracion confirmada y completada") | |
| # 10. Re-verificar estado | |
| print("\n[10] Re-verificando estado tras completado...") | |
| res = requests.get(f"{BASE_URL}/migration/status", headers=headers, timeout=10) | |
| if res.status_code != 200: | |
| print(f" [FAIL] Error al consultar estado post-restauracion: {res.status_code}") | |
| return False | |
| post_status = res.json() | |
| print(" Respuesta del estado:", post_status) | |
| if post_status.get("backup_available"): | |
| print(" [FAIL] ERROR: El respaldo sigue estando disponible despues de confirmarse la restauracion") | |
| return False | |
| print("\n" + "=" * 60) | |
| print(" [SUCCESS] TODAS LAS PRUEBAS DE MIGRACION PASARON CON EXITO!") | |
| print("=" * 60 + "\n") | |
| return True | |
| if __name__ == "__main__": | |
| run_migration_test() | |