File size: 5,745 Bytes
3dc2617
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import json
import logging
import requests
from typing import Dict, Any, List, Optional
from datetime import datetime
from .config import logger

class SpaceXApi:
    """
    Cliente para la API de SpaceX que maneja solicitudes
    y respuestas en formato JSON para obtener información sobre lanzamientos,
    cohetes, cápsulas y misiones de SpaceX.
    """
    def __init__(self):
        self.logger = logging.getLogger("Orbix.SpaceXApi")
        from .config import SPACEX_API_URL
        self.BASE_URL = SPACEX_API_URL
        self.session = requests.Session()
        self.session.headers.update({
            "Content-Type": "application/json",
            "Accept": "application/json"
        })
    
    def get_launches(self, limit: Optional[int] = None, offset: Optional[int] = None) -> Dict[str, Any]:
        """
        Obtiene información sobre lanzamientos de SpaceX.
        
        Args:
            limit: Número máximo de lanzamientos a obtener
            offset: Número de lanzamientos a saltar
            
        Returns:
            Información sobre lanzamientos de SpaceX
        """
        params = {}
        if limit is not None:
            params['limit'] = limit
        if offset is not None:
            params['offset'] = offset
            
        try:
            response = self.session.get(f"{self.BASE_URL}/launches", params=params)
            response.raise_for_status()
            return self._process_response(response.json())
        except requests.RequestException as e:
            self.logger.error(f"Error al obtener lanzamientos: {str(e)}")
            return {"error": str(e)}
    
    def get_launch(self, flight_number: int) -> Dict[str, Any]:
        """
        Obtiene información sobre un lanzamiento específico de SpaceX.
        
        Args:
            flight_number: Número de vuelo del lanzamiento
            
        Returns:
            Información sobre el lanzamiento
        """
        try:
            response = self.session.get(f"{self.BASE_URL}/launches/{flight_number}")
            response.raise_for_status()
            return self._process_response(response.json())
        except requests.RequestException as e:
            self.logger.error(f"Error al obtener lanzamiento {flight_number}: {str(e)}")
            return {"error": str(e)}
    
    def get_rockets(self) -> Dict[str, Any]:
        """
        Obtiene información sobre cohetes de SpaceX.
        
        Returns:
            Información sobre cohetes de SpaceX
        """
        try:
            response = self.session.get(f"{self.BASE_URL}/rockets")
            response.raise_for_status()
            return self._process_response(response.json())
        except requests.RequestException as e:
            self.logger.error(f"Error al obtener cohetes: {str(e)}")
            return {"error": str(e)}
    
    def get_rocket(self, rocket_id: str) -> Dict[str, Any]:
        """
        Obtiene información sobre un cohete específico de SpaceX.
        
        Args:
            rocket_id: ID del cohete
            
        Returns:
            Información sobre el cohete
        """
        try:
            response = self.session.get(f"{self.BASE_URL}/rockets/{rocket_id}")
            response.raise_for_status()
            return self._process_response(response.json())
        except requests.RequestException as e:
            self.logger.error(f"Error al obtener cohete {rocket_id}: {str(e)}")
            return {"error": str(e)}
    
    def get_capsules(self) -> Dict[str, Any]:
        """
        Obtiene información sobre cápsulas de SpaceX.
        
        Returns:
            Información sobre cápsulas de SpaceX
        """
        try:
            response = self.session.get(f"{self.BASE_URL}/capsules")
            response.raise_for_status()
            return self._process_response(response.json())
        except requests.RequestException as e:
            self.logger.error(f"Error al obtener cápsulas: {str(e)}")
            return {"error": str(e)}
    
    def get_capsule(self, capsule_serial: str) -> Dict[str, Any]:
        """
        Obtiene información sobre una cápsula específica de SpaceX.
        
        Args:
            capsule_serial: Número de serie de la cápsula
            
        Returns:
            Información sobre la cápsula
        """
        try:
            response = self.session.get(f"{self.BASE_URL}/capsules/{capsule_serial}")
            response.raise_for_status()
            return self._process_response(response.json())
        except requests.RequestException as e:
            self.logger.error(f"Error al obtener cápsula {capsule_serial}: {str(e)}")
            return {"error": str(e)}
    
    def _process_response(self, response_data: Any) -> Dict[str, Any]:
        """
        Procesa la respuesta de la API de SpaceX y la convierte a un formato
        utilizable para el sistema Orbix.
        
        Args:
            response_data: Datos de respuesta de la API de SpaceX
            
        Returns:
            Datos procesados en formato compatible con Orbix
        """
        try:
            # La API de SpaceX devuelve datos en un formato más simple que la API SSC,
            # por lo que en la mayoría de los casos solo necesitamos devolver los datos tal como están
            if isinstance(response_data, list):
                return {"data": response_data}
            elif isinstance(response_data, dict):
                return response_data
            else:
                return {"data": response_data}
        except Exception as e:
            self.logger.error(f"Error al procesar la respuesta: {str(e)}")
            return {"error": str(e)}