0rbix / examples /quantum_alerts_example.py
nicolasleiva's picture
Initial commit: Add complete Orbix project
3dc2617
# Ejemplo de uso del sistema de alertas cu谩nticas con algoritmos reales
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from src.quantum_alerts import QuantumCollisionAlertSystem, QuantumAlertConfig
from src.quantum_api_integrator import QuantumApiIntegrator
from plot_comparison import plot_comparison
# Configurar el sistema de alertas con diferentes algoritmos cu谩nticos
def test_quantum_algorithms():
print("\n=== Comparaci贸n de Algoritmos Cu谩nticos para Detecci贸n de Colisiones ===")
# Crear trayectorias de prueba
# Trayectoria con alto riesgo de colisi贸n (puntos muy cercanos)
high_risk_trajectory = pd.DataFrame({
"x": [950, 951, 952, 953, 954],
"y": [1950, 1951, 1952, 1953, 1954],
"z": [2950, 2951, 2952, 2953, 2954]
})
# Trayectoria con riesgo medio de colisi贸n
medium_risk_trajectory = pd.DataFrame({
"x": np.linspace(950, 1100, 5),
"y": np.linspace(1950, 2100, 5),
"z": np.linspace(2950, 3100, 5)
})
# Trayectoria con bajo riesgo de colisi贸n (puntos muy separados)
low_risk_trajectory = pd.DataFrame({
"x": [950, 1050, 1150, 1250, 1350],
"y": [1950, 2050, 2150, 2250, 2350],
"z": [2950, 3050, 3150, 3250, 3350]
})
# Algoritmos a probar
algorithms = ["vqe", "grover", "qaoa", "basic"]
noise_models = ["none", "low", "high"]
# Almacenar resultados para comparaci贸n
results = {}
# Probar cada algoritmo con diferentes modelos de ruido
for algo in algorithms:
algo_results = []
print(f"\nAlgoritmo: {algo.upper()}")
for noise in noise_models:
# Configurar sistema de alertas con este algoritmo y modelo de ruido
config = QuantumAlertConfig(
simulator_type=algo,
noise_model=noise,
shots=1000
)
alert_system = QuantumCollisionAlertSystem(config)
# Calcular probabilidades para cada trayectoria
high_prob = alert_system.calculate_collision_probability(high_risk_trajectory)
medium_prob = alert_system.calculate_collision_probability(medium_risk_trajectory)
low_prob = alert_system.calculate_collision_probability(low_risk_trajectory)
print(f" Modelo de ruido: {noise}")
print(f" Riesgo alto: {high_prob:.4f}")
print(f" Riesgo medio: {medium_prob:.4f}")
print(f" Riesgo bajo: {low_prob:.4f}")
algo_results.append((noise, high_prob, medium_prob, low_prob))
results[algo] = algo_results
# Visualizar resultados
plot_comparison(results)
# Generar una alerta de colisi贸n completa
def test_collision_alert():
print("\n=== Generaci贸n de Alerta de Colisi贸n con Algoritmo Cu谩ntico ===")
# Configurar sistema de alertas con VQE
config = QuantumAlertConfig(
simulator_type="vqe",
noise_model="low",
shots=1000
)
alert_system = QuantumCollisionAlertSystem(config)
# Crear trayectoria de prueba
trajectory = pd.DataFrame({
"x": np.linspace(950, 1000, 10),
"y": np.linspace(1950, 2000, 10),
"z": np.linspace(2950, 3000, 10),
"timestamp": [datetime.now() + timedelta(minutes=i*30) for i in range(10)]
})
# Generar alerta
alert = alert_system.generate_alert(
satellite_id="ORBIX-SAT-001",
trajectory=trajectory,
other_object_id="DEBRIS-22",
additional_metadata={
"satellite_type": "Observaci贸n Terrestre",
"orbit_type": "LEO",
"altitude_km": 550,
"inclination_deg": 53.0
}
)
# Mostrar informaci贸n de la alerta
print(f"\nAlerta generada:")
for key, value in alert.items():
if key == "generation_metadata" or key == "additional_metadata" or key == "confidence_interval":
print(f" {key}:")
for subkey, subvalue in value.items():
print(f" {subkey}: {subvalue}")
else:
print(f" {key}: {value}")
# Publicar la alerta en Kafka
success = alert_system.publish_alert(alert)
if success:
print("\nAlerta publicada exitosamente en Kafka")
else:
print("\nError al publicar la alerta en Kafka")
# Probar la integraci贸n con APIs externas
def test_api_integration():
print("\n=== Prueba de Integraci贸n con APIs Externas ===")
# Inicializar el integrador de APIs
api_integrator = QuantumApiIntegrator()
# Definir par谩metros de prueba
satellite_id = "25544" # ISS (Estaci贸n Espacial Internacional)
start_time = datetime.now()
end_time = start_time + timedelta(hours=24)
# Obtener datos del sat茅lite
print(f"\nObteniendo datos para el sat茅lite {satellite_id}...")
satellite_data = api_integrator.get_satellite_data(satellite_id, start_time, end_time)
# Verificar si se obtuvieron datos
if "error" in satellite_data:
print(f"Error: {satellite_data['error']}")
else:
print("Datos obtenidos correctamente:")
for source, data in satellite_data.items():
print(f" {source}: {type(data)}")
# Predecir trayectoria
print("\nPrediciendo trayectoria futura...")
trajectory = api_integrator.predict_trajectory(satellite_id, start_time, end_time, prediction_hours=48)
if trajectory.empty:
print("Error: No se pudo predecir la trayectoria")
else:
print(f"Trayectoria predicha con {len(trajectory)} puntos")
print(trajectory.head())
# Generar alerta de colisi贸n
print("\nGenerando alerta de colisi贸n...")
alert = api_integrator.generate_collision_alert(satellite_id, prediction_hours=48)
if "error" in alert:
print(f"Error: {alert['error']}")
else:
print(f"Alerta generada con ID: {alert.get('alert_id')}")
print(f"Probabilidad de colisi贸n: {alert.get('collision_probability', 0):.4f}")
print(f"Nivel de alerta: {alert.get('alert_level', 'N/A')}")
# Funci贸n principal para ejecutar todas las pruebas
def main():
print("=== Sistema de Alertas Cu谩nticas para Colisiones Orbitales ===")
print("Ejecutando pruebas de funcionalidad...\n")
# Ejecutar pruebas
test_quantum_algorithms()
test_collision_alert()
test_api_integration()
print("\n隆Todas las pruebas completadas!")
# Ejecutar si se llama directamente
if __name__ == "__main__":
main()