Spaces:
Configuration error
Configuration error
| import unittest | |
| import pandas as pd | |
| import numpy as np | |
| import tensorflow as tf | |
| from unittest.mock import patch, MagicMock | |
| from src.prediction import predict_trajectory | |
| from src.optimization import OrbitalPathOptimizer | |
| from src.quantum_alerts import QuantumCollisionAlertSystem | |
| from src.telemetry import TelemetryMonitor | |
| from src.graph_analysis import build_satellite_graph | |
| class TestIntegration(unittest.TestCase): | |
| """Pruebas de integraci贸n para verificar la interacci贸n entre componentes.""" | |
| def setUp(self): | |
| """Configuraci贸n inicial para las pruebas.""" | |
| # Inicializar componentes | |
| self.optimizer = OrbitalPathOptimizer() | |
| self.alert_system = QuantumCollisionAlertSystem() | |
| self.telemetry = TelemetryMonitor() | |
| # Datos de prueba | |
| self.satellite_data = pd.DataFrame({ | |
| "satellite_id": [101, 102], | |
| "feature": [0.5, 0.8] | |
| }) | |
| self.debris_data = pd.DataFrame({ | |
| "debris_id": [201, 202], | |
| "feature": [0.3, 0.7], | |
| "x": [1000, 1050], | |
| "y": [2000, 2050], | |
| "z": [3000, 3050] | |
| }) | |
| self.tle_data = { | |
| "satellite_id": "12345", | |
| "data": [0.1, 0.2, 0.3, 0.4, 0.5] | |
| } | |
| self.trajectory_df = pd.DataFrame({ | |
| "x": np.linspace(950, 1100, 5), | |
| "y": np.linspace(1950, 2100, 5), | |
| "z": np.linspace(2950, 3100, 5) | |
| }) | |
| def test_prediction_to_collision_alert(self, mock_model): | |
| """Prueba la integraci贸n entre predicci贸n y alertas de colisi贸n.""" | |
| # Configurar el mock para la predicci贸n | |
| mock_output = tf.constant([[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]]) | |
| mock_model.return_value = mock_output | |
| # Realizar la predicci贸n | |
| prediction_result = predict_trajectory(self.tle_data) | |
| # Verificar que la predicci贸n tiene el formato esperado | |
| self.assertEqual(prediction_result["satellite_id"], "12345") | |
| self.assertIn("trajectory", prediction_result) | |
| # Convertir la predicci贸n a un DataFrame para el sistema de alertas | |
| # (Simulando la conversi贸n que se har铆a en un flujo real) | |
| trajectory_points = [] | |
| for batch in prediction_result["trajectory"]: | |
| for point in batch: | |
| trajectory_points.append({"x": point[0], "y": point[1], "z": point[2]}) | |
| trajectory_df = pd.DataFrame(trajectory_points) | |
| # Calcular la probabilidad de colisi贸n | |
| collision_prob = self.alert_system.calculate_collision_probability(trajectory_df) | |
| # Verificar que la probabilidad est谩 en el rango esperado | |
| self.assertGreaterEqual(collision_prob, 0.0) | |
| self.assertLessEqual(collision_prob, 1.0) | |
| def test_graph_to_optimization(self): | |
| """Prueba la integraci贸n entre an谩lisis de grafos y optimizaci贸n de rutas.""" | |
| # Construir el grafo | |
| graph = build_satellite_graph(self.satellite_data, self.debris_data) | |
| # Verificar que el grafo se construy贸 correctamente | |
| self.assertIsNotNone(graph) | |
| # Usar los mismos datos para optimizar una ruta | |
| satellite_state = {"position": (950.0, 1950.0, 2950.0)} | |
| optimized_route = self.optimizer.optimize_route(satellite_state, self.debris_data) | |
| # Verificar que se obtuvo una ruta | |
| self.assertIsInstance(optimized_route, list) | |
| def test_optimization_to_telemetry(self): | |
| """Prueba la integraci贸n entre optimizaci贸n y telemetr铆a.""" | |
| # Optimizar una ruta | |
| satellite_state = {"position": (950.0, 1950.0, 2950.0)} | |
| optimized_route = self.optimizer.optimize_route(satellite_state, self.debris_data) | |
| # Convertir la ruta a un formato compatible con telemetr铆a | |
| # (Simulando la conversi贸n que se har铆a en un flujo real) | |
| predictions = tf.constant([[point[0], point[1], point[2]] for point in optimized_route]) | |
| ground_truth = tf.constant([[point[0]+0.1, point[1]+0.1, point[2]+0.1] for point in optimized_route]) | |
| # Actualizar m茅tricas de telemetr铆a | |
| self.telemetry.update_metrics(predictions, ground_truth) | |
| # Obtener m茅tricas | |
| metrics = self.telemetry.get_prometheus_metrics() | |
| # Verificar que las m茅tricas est谩n presentes | |
| self.assertIn('position_error', metrics) | |
| self.assertIn('velocity_error', metrics) | |
| if __name__ == '__main__': | |
| unittest.main() |