File size: 6,683 Bytes
3dc2617
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Ejemplo de uso del sistema de alertas cu谩nticas con algoritmos reales

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from src.quantum_alerts import QuantumCollisionAlertSystem, QuantumAlertConfig
from src.quantum_api_integrator import QuantumApiIntegrator
from plot_comparison import plot_comparison

# Configurar el sistema de alertas con diferentes algoritmos cu谩nticos
def test_quantum_algorithms():
    print("\n=== Comparaci贸n de Algoritmos Cu谩nticos para Detecci贸n de Colisiones ===")
    
    # Crear trayectorias de prueba
    # Trayectoria con alto riesgo de colisi贸n (puntos muy cercanos)
    high_risk_trajectory = pd.DataFrame({
        "x": [950, 951, 952, 953, 954],
        "y": [1950, 1951, 1952, 1953, 1954],
        "z": [2950, 2951, 2952, 2953, 2954]
    })
    
    # Trayectoria con riesgo medio de colisi贸n
    medium_risk_trajectory = pd.DataFrame({
        "x": np.linspace(950, 1100, 5),
        "y": np.linspace(1950, 2100, 5),
        "z": np.linspace(2950, 3100, 5)
    })
    
    # Trayectoria con bajo riesgo de colisi贸n (puntos muy separados)
    low_risk_trajectory = pd.DataFrame({
        "x": [950, 1050, 1150, 1250, 1350],
        "y": [1950, 2050, 2150, 2250, 2350],
        "z": [2950, 3050, 3150, 3250, 3350]
    })
    
    # Algoritmos a probar
    algorithms = ["vqe", "grover", "qaoa", "basic"]
    noise_models = ["none", "low", "high"]
    
    # Almacenar resultados para comparaci贸n
    results = {}
    
    # Probar cada algoritmo con diferentes modelos de ruido
    for algo in algorithms:
        algo_results = []
        print(f"\nAlgoritmo: {algo.upper()}")
        
        for noise in noise_models:
            # Configurar sistema de alertas con este algoritmo y modelo de ruido
            config = QuantumAlertConfig(
                simulator_type=algo,
                noise_model=noise,
                shots=1000
            )
            alert_system = QuantumCollisionAlertSystem(config)
            
            # Calcular probabilidades para cada trayectoria
            high_prob = alert_system.calculate_collision_probability(high_risk_trajectory)
            medium_prob = alert_system.calculate_collision_probability(medium_risk_trajectory)
            low_prob = alert_system.calculate_collision_probability(low_risk_trajectory)
            
            print(f"  Modelo de ruido: {noise}")
            print(f"    Riesgo alto:   {high_prob:.4f}")
            print(f"    Riesgo medio:  {medium_prob:.4f}")
            print(f"    Riesgo bajo:   {low_prob:.4f}")
            
            algo_results.append((noise, high_prob, medium_prob, low_prob))
        
        results[algo] = algo_results
    
    # Visualizar resultados
    plot_comparison(results)

# Generar una alerta de colisi贸n completa
def test_collision_alert():
    print("\n=== Generaci贸n de Alerta de Colisi贸n con Algoritmo Cu谩ntico ===")
    
    # Configurar sistema de alertas con VQE
    config = QuantumAlertConfig(
        simulator_type="vqe",
        noise_model="low",
        shots=1000
    )
    alert_system = QuantumCollisionAlertSystem(config)
    
    # Crear trayectoria de prueba
    trajectory = pd.DataFrame({
        "x": np.linspace(950, 1000, 10),
        "y": np.linspace(1950, 2000, 10),
        "z": np.linspace(2950, 3000, 10),
        "timestamp": [datetime.now() + timedelta(minutes=i*30) for i in range(10)]
    })
    
    # Generar alerta
    alert = alert_system.generate_alert(
        satellite_id="ORBIX-SAT-001",
        trajectory=trajectory,
        other_object_id="DEBRIS-22",
        additional_metadata={
            "satellite_type": "Observaci贸n Terrestre",
            "orbit_type": "LEO",
            "altitude_km": 550,
            "inclination_deg": 53.0
        }
    )
    
    # Mostrar informaci贸n de la alerta
    print(f"\nAlerta generada:")
    for key, value in alert.items():
        if key == "generation_metadata" or key == "additional_metadata" or key == "confidence_interval":
            print(f"  {key}:")
            for subkey, subvalue in value.items():
                print(f"    {subkey}: {subvalue}")
        else:
            print(f"  {key}: {value}")
    
    # Publicar la alerta en Kafka
    success = alert_system.publish_alert(alert)
    if success:
        print("\nAlerta publicada exitosamente en Kafka")
    else:
        print("\nError al publicar la alerta en Kafka")

# Probar la integraci贸n con APIs externas
def test_api_integration():
    print("\n=== Prueba de Integraci贸n con APIs Externas ===")
    
    # Inicializar el integrador de APIs
    api_integrator = QuantumApiIntegrator()
    
    # Definir par谩metros de prueba
    satellite_id = "25544"  # ISS (Estaci贸n Espacial Internacional)
    start_time = datetime.now()
    end_time = start_time + timedelta(hours=24)
    
    # Obtener datos del sat茅lite
    print(f"\nObteniendo datos para el sat茅lite {satellite_id}...")
    satellite_data = api_integrator.get_satellite_data(satellite_id, start_time, end_time)
    
    # Verificar si se obtuvieron datos
    if "error" in satellite_data:
        print(f"Error: {satellite_data['error']}")
    else:
        print("Datos obtenidos correctamente:")
        for source, data in satellite_data.items():
            print(f"  {source}: {type(data)}")
    
    # Predecir trayectoria
    print("\nPrediciendo trayectoria futura...")
    trajectory = api_integrator.predict_trajectory(satellite_id, start_time, end_time, prediction_hours=48)
    
    if trajectory.empty:
        print("Error: No se pudo predecir la trayectoria")
    else:
        print(f"Trayectoria predicha con {len(trajectory)} puntos")
        print(trajectory.head())
    
    # Generar alerta de colisi贸n
    print("\nGenerando alerta de colisi贸n...")
    alert = api_integrator.generate_collision_alert(satellite_id, prediction_hours=48)
    
    if "error" in alert:
        print(f"Error: {alert['error']}")
    else:
        print(f"Alerta generada con ID: {alert.get('alert_id')}")
        print(f"Probabilidad de colisi贸n: {alert.get('collision_probability', 0):.4f}")
        print(f"Nivel de alerta: {alert.get('alert_level', 'N/A')}")

# Funci贸n principal para ejecutar todas las pruebas
def main():
    print("=== Sistema de Alertas Cu谩nticas para Colisiones Orbitales ===")
    print("Ejecutando pruebas de funcionalidad...\n")
    
    # Ejecutar pruebas
    test_quantum_algorithms()
    test_collision_alert()
    test_api_integration()
    
    print("\n隆Todas las pruebas completadas!")

# Ejecutar si se llama directamente
if __name__ == "__main__":
    main()