File size: 4,618 Bytes
3dc2617
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import unittest
import pandas as pd
import numpy as np
import tensorflow as tf
from unittest.mock import patch, MagicMock
from src.prediction import predict_trajectory
from src.optimization import OrbitalPathOptimizer
from src.quantum_alerts import QuantumCollisionAlertSystem
from src.telemetry import TelemetryMonitor
from src.graph_analysis import build_satellite_graph

class TestIntegration(unittest.TestCase):
    """Pruebas de integraci贸n para verificar la interacci贸n entre componentes."""
    
    def setUp(self):
        """Configuraci贸n inicial para las pruebas."""
        # Inicializar componentes
        self.optimizer = OrbitalPathOptimizer()
        self.alert_system = QuantumCollisionAlertSystem()
        self.telemetry = TelemetryMonitor()
        
        # Datos de prueba
        self.satellite_data = pd.DataFrame({
            "satellite_id": [101, 102],
            "feature": [0.5, 0.8]
        })
        self.debris_data = pd.DataFrame({
            "debris_id": [201, 202],
            "feature": [0.3, 0.7],
            "x": [1000, 1050],
            "y": [2000, 2050],
            "z": [3000, 3050]
        })
        self.tle_data = {
            "satellite_id": "12345",
            "data": [0.1, 0.2, 0.3, 0.4, 0.5]
        }
        self.trajectory_df = pd.DataFrame({
            "x": np.linspace(950, 1100, 5),
            "y": np.linspace(1950, 2100, 5),
            "z": np.linspace(2950, 3100, 5)
        })
    
    @patch('src.prediction.global_model')
    def test_prediction_to_collision_alert(self, mock_model):
        """Prueba la integraci贸n entre predicci贸n y alertas de colisi贸n."""
        # Configurar el mock para la predicci贸n
        mock_output = tf.constant([[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]])
        mock_model.return_value = mock_output
        
        # Realizar la predicci贸n
        prediction_result = predict_trajectory(self.tle_data)
        
        # Verificar que la predicci贸n tiene el formato esperado
        self.assertEqual(prediction_result["satellite_id"], "12345")
        self.assertIn("trajectory", prediction_result)
        
        # Convertir la predicci贸n a un DataFrame para el sistema de alertas
        # (Simulando la conversi贸n que se har铆a en un flujo real)
        trajectory_points = []
        for batch in prediction_result["trajectory"]:
            for point in batch:
                trajectory_points.append({"x": point[0], "y": point[1], "z": point[2]})
        
        trajectory_df = pd.DataFrame(trajectory_points)
        
        # Calcular la probabilidad de colisi贸n
        collision_prob = self.alert_system.calculate_collision_probability(trajectory_df)
        
        # Verificar que la probabilidad est谩 en el rango esperado
        self.assertGreaterEqual(collision_prob, 0.0)
        self.assertLessEqual(collision_prob, 1.0)
    
    def test_graph_to_optimization(self):
        """Prueba la integraci贸n entre an谩lisis de grafos y optimizaci贸n de rutas."""
        # Construir el grafo
        graph = build_satellite_graph(self.satellite_data, self.debris_data)
        
        # Verificar que el grafo se construy贸 correctamente
        self.assertIsNotNone(graph)
        
        # Usar los mismos datos para optimizar una ruta
        satellite_state = {"position": (950.0, 1950.0, 2950.0)}
        optimized_route = self.optimizer.optimize_route(satellite_state, self.debris_data)
        
        # Verificar que se obtuvo una ruta
        self.assertIsInstance(optimized_route, list)
    
    def test_optimization_to_telemetry(self):
        """Prueba la integraci贸n entre optimizaci贸n y telemetr铆a."""
        # Optimizar una ruta
        satellite_state = {"position": (950.0, 1950.0, 2950.0)}
        optimized_route = self.optimizer.optimize_route(satellite_state, self.debris_data)
        
        # Convertir la ruta a un formato compatible con telemetr铆a
        # (Simulando la conversi贸n que se har铆a en un flujo real)
        predictions = tf.constant([[point[0], point[1], point[2]] for point in optimized_route])
        ground_truth = tf.constant([[point[0]+0.1, point[1]+0.1, point[2]+0.1] for point in optimized_route])
        
        # Actualizar m茅tricas de telemetr铆a
        self.telemetry.update_metrics(predictions, ground_truth)
        
        # Obtener m茅tricas
        metrics = self.telemetry.get_prometheus_metrics()
        
        # Verificar que las m茅tricas est谩n presentes
        self.assertIn('position_error', metrics)
        self.assertIn('velocity_error', metrics)

if __name__ == '__main__':
    unittest.main()