Spaces:
Configuration error
Configuration error
File size: 6,683 Bytes
3dc2617 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# Ejemplo de uso del sistema de alertas cu谩nticas con algoritmos reales
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from src.quantum_alerts import QuantumCollisionAlertSystem, QuantumAlertConfig
from src.quantum_api_integrator import QuantumApiIntegrator
from plot_comparison import plot_comparison
# Configurar el sistema de alertas con diferentes algoritmos cu谩nticos
def test_quantum_algorithms():
print("\n=== Comparaci贸n de Algoritmos Cu谩nticos para Detecci贸n de Colisiones ===")
# Crear trayectorias de prueba
# Trayectoria con alto riesgo de colisi贸n (puntos muy cercanos)
high_risk_trajectory = pd.DataFrame({
"x": [950, 951, 952, 953, 954],
"y": [1950, 1951, 1952, 1953, 1954],
"z": [2950, 2951, 2952, 2953, 2954]
})
# Trayectoria con riesgo medio de colisi贸n
medium_risk_trajectory = pd.DataFrame({
"x": np.linspace(950, 1100, 5),
"y": np.linspace(1950, 2100, 5),
"z": np.linspace(2950, 3100, 5)
})
# Trayectoria con bajo riesgo de colisi贸n (puntos muy separados)
low_risk_trajectory = pd.DataFrame({
"x": [950, 1050, 1150, 1250, 1350],
"y": [1950, 2050, 2150, 2250, 2350],
"z": [2950, 3050, 3150, 3250, 3350]
})
# Algoritmos a probar
algorithms = ["vqe", "grover", "qaoa", "basic"]
noise_models = ["none", "low", "high"]
# Almacenar resultados para comparaci贸n
results = {}
# Probar cada algoritmo con diferentes modelos de ruido
for algo in algorithms:
algo_results = []
print(f"\nAlgoritmo: {algo.upper()}")
for noise in noise_models:
# Configurar sistema de alertas con este algoritmo y modelo de ruido
config = QuantumAlertConfig(
simulator_type=algo,
noise_model=noise,
shots=1000
)
alert_system = QuantumCollisionAlertSystem(config)
# Calcular probabilidades para cada trayectoria
high_prob = alert_system.calculate_collision_probability(high_risk_trajectory)
medium_prob = alert_system.calculate_collision_probability(medium_risk_trajectory)
low_prob = alert_system.calculate_collision_probability(low_risk_trajectory)
print(f" Modelo de ruido: {noise}")
print(f" Riesgo alto: {high_prob:.4f}")
print(f" Riesgo medio: {medium_prob:.4f}")
print(f" Riesgo bajo: {low_prob:.4f}")
algo_results.append((noise, high_prob, medium_prob, low_prob))
results[algo] = algo_results
# Visualizar resultados
plot_comparison(results)
# Generar una alerta de colisi贸n completa
def test_collision_alert():
print("\n=== Generaci贸n de Alerta de Colisi贸n con Algoritmo Cu谩ntico ===")
# Configurar sistema de alertas con VQE
config = QuantumAlertConfig(
simulator_type="vqe",
noise_model="low",
shots=1000
)
alert_system = QuantumCollisionAlertSystem(config)
# Crear trayectoria de prueba
trajectory = pd.DataFrame({
"x": np.linspace(950, 1000, 10),
"y": np.linspace(1950, 2000, 10),
"z": np.linspace(2950, 3000, 10),
"timestamp": [datetime.now() + timedelta(minutes=i*30) for i in range(10)]
})
# Generar alerta
alert = alert_system.generate_alert(
satellite_id="ORBIX-SAT-001",
trajectory=trajectory,
other_object_id="DEBRIS-22",
additional_metadata={
"satellite_type": "Observaci贸n Terrestre",
"orbit_type": "LEO",
"altitude_km": 550,
"inclination_deg": 53.0
}
)
# Mostrar informaci贸n de la alerta
print(f"\nAlerta generada:")
for key, value in alert.items():
if key == "generation_metadata" or key == "additional_metadata" or key == "confidence_interval":
print(f" {key}:")
for subkey, subvalue in value.items():
print(f" {subkey}: {subvalue}")
else:
print(f" {key}: {value}")
# Publicar la alerta en Kafka
success = alert_system.publish_alert(alert)
if success:
print("\nAlerta publicada exitosamente en Kafka")
else:
print("\nError al publicar la alerta en Kafka")
# Probar la integraci贸n con APIs externas
def test_api_integration():
print("\n=== Prueba de Integraci贸n con APIs Externas ===")
# Inicializar el integrador de APIs
api_integrator = QuantumApiIntegrator()
# Definir par谩metros de prueba
satellite_id = "25544" # ISS (Estaci贸n Espacial Internacional)
start_time = datetime.now()
end_time = start_time + timedelta(hours=24)
# Obtener datos del sat茅lite
print(f"\nObteniendo datos para el sat茅lite {satellite_id}...")
satellite_data = api_integrator.get_satellite_data(satellite_id, start_time, end_time)
# Verificar si se obtuvieron datos
if "error" in satellite_data:
print(f"Error: {satellite_data['error']}")
else:
print("Datos obtenidos correctamente:")
for source, data in satellite_data.items():
print(f" {source}: {type(data)}")
# Predecir trayectoria
print("\nPrediciendo trayectoria futura...")
trajectory = api_integrator.predict_trajectory(satellite_id, start_time, end_time, prediction_hours=48)
if trajectory.empty:
print("Error: No se pudo predecir la trayectoria")
else:
print(f"Trayectoria predicha con {len(trajectory)} puntos")
print(trajectory.head())
# Generar alerta de colisi贸n
print("\nGenerando alerta de colisi贸n...")
alert = api_integrator.generate_collision_alert(satellite_id, prediction_hours=48)
if "error" in alert:
print(f"Error: {alert['error']}")
else:
print(f"Alerta generada con ID: {alert.get('alert_id')}")
print(f"Probabilidad de colisi贸n: {alert.get('collision_probability', 0):.4f}")
print(f"Nivel de alerta: {alert.get('alert_level', 'N/A')}")
# Funci贸n principal para ejecutar todas las pruebas
def main():
print("=== Sistema de Alertas Cu谩nticas para Colisiones Orbitales ===")
print("Ejecutando pruebas de funcionalidad...\n")
# Ejecutar pruebas
test_quantum_algorithms()
test_collision_alert()
test_api_integration()
print("\n隆Todas las pruebas completadas!")
# Ejecutar si se llama directamente
if __name__ == "__main__":
main() |