File size: 14,528 Bytes
3aba6a9
 
 
 
 
fec9e33
 
3aba6a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fec9e33
 
3aba6a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fec9e33
3aba6a9
 
 
 
 
 
fec9e33
 
 
 
 
 
 
 
 
3aba6a9
 
 
 
 
 
fec9e33
3aba6a9
 
 
 
 
 
fec9e33
3aba6a9
 
 
 
 
 
 
 
 
 
 
 
fec9e33
 
3aba6a9
 
 
 
 
 
 
 
 
 
 
 
 
 
fec9e33
 
 
3aba6a9
fec9e33
3aba6a9
 
 
 
 
 
 
 
 
fec9e33
3aba6a9
 
fec9e33
 
 
 
 
 
 
 
 
3aba6a9
 
 
fec9e33
 
 
 
3aba6a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fec9e33
3aba6a9
fec9e33
 
3aba6a9
 
 
 
fec9e33
3aba6a9
fec9e33
 
3aba6a9
 
 
fec9e33
3aba6a9
 
 
fec9e33
3aba6a9
 
fec9e33
 
3aba6a9
 
 
 
 
 
 
 
 
 
 
fec9e33
 
 
 
 
 
3aba6a9
fec9e33
3aba6a9
 
 
fec9e33
3aba6a9
fec9e33
 
3aba6a9
 
 
 
 
 
 
fec9e33
3aba6a9
fec9e33
3aba6a9
 
 
 
 
 
 
 
 
fec9e33
3aba6a9
fec9e33
3aba6a9
 
 
 
 
 
 
fec9e33
3aba6a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fec9e33
3aba6a9
fec9e33
 
3aba6a9
 
fec9e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
"""
Base API Client

Provides unified HTTP request handling, rate limiting, and error handling
for all external API clients in the BeatDebate system.

Aligned with Phase 4 agent architecture using dependency injection patterns.
"""

import asyncio
import time
from typing import Dict, List, Optional, Any, Union
from abc import ABC, abstractmethod
import json

import aiohttp
import structlog

logger = structlog.get_logger(__name__)


class BaseAPIClient(ABC):
    """
    Base HTTP client with unified request handling, rate limiting, and error handling.
    
    All API clients (LastFM, Spotify, etc.) should inherit from this class to ensure
    consistent behavior across the system. Designed to work with dependency injection
    patterns used in our agent architecture.
    """
    
    def __init__(
        self, 
        base_url: str, 
        rate_limiter: "UnifiedRateLimiter",
        timeout: int = 10,
        service_name: str = "api"
    ):
        """
        Initialize base API client.
        
        Args:
            base_url: Base URL for the API
            rate_limiter: Rate limiter instance for this client
            timeout: Request timeout in seconds
            service_name: Service name for logging and identification
        """
        self.base_url = base_url.rstrip('/')
        self.rate_limiter = rate_limiter
        self.timeout = timeout
        self.service_name = service_name
        self.session: Optional[aiohttp.ClientSession] = None
        
        # Enhanced logging with service context (matches agent pattern)
        self.logger = logger.bind(
            service=service_name,
            component="BaseAPIClient",
            base_url=base_url
        )
        
        self.logger.debug("Base API client initialized", timeout=timeout)
        
    async def __aenter__(self):
        """Async context manager entry."""
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=self.timeout)
        )
        self.logger.debug("API client session started")
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        if self.session:
            await self.session.close()
            self.logger.debug("API client session closed")
            
    async def _make_request(
        self,
        endpoint: str,
        params: Optional[Dict[str, Any]] = None,
        method: str = "GET",
        headers: Optional[Dict[str, str]] = None,
        retries: int = 3
    ) -> Dict[str, Any]:
        """
        Make rate-limited HTTP request with error handling and retries.
        
        Enhanced with agent-style logging and error handling patterns.
        
        Args:
            endpoint: API endpoint (relative to base_url)
            params: Query parameters
            method: HTTP method (GET, POST, etc.)
            headers: Additional headers
            retries: Number of retry attempts
            
        Returns:
            Parsed JSON response data
            
        Raises:
            Exception: For unrecoverable errors
        """
        if not self.session:
            error_msg = f"{self.service_name} client not initialized. Use async context manager."
            self.logger.error("Client not initialized")
            raise RuntimeError(error_msg)
            
        # Wait for rate limiter (shared pattern with agents)
        await self.rate_limiter.wait_if_needed()
        
        # Build full URL
        url = f"{self.base_url}/{endpoint.lstrip('/')}" if endpoint else self.base_url
        
        # Prepare request parameters
        request_params = params or {}
        request_headers = headers or {}
        
        # Add default headers (service identification pattern)
        request_headers.setdefault('User-Agent', f'BeatDebate-{self.service_name}/1.0')
        
        # Enhanced logging with request context
        request_context = {
            "method": method,
            "endpoint": endpoint,
            "url": url,
            "param_count": len(request_params),
            "has_headers": bool(request_headers)
        }
        
        for attempt in range(retries + 1):
            try:
                self.logger.debug(
                    "Making API request",
                    attempt=attempt + 1,
                    max_attempts=retries + 1,
                    **request_context
                )
                
                async with self.session.request(
                    method=method,
                    url=url,
                    params=request_params if method == "GET" else None,
                    json=request_params if method in ["POST", "PUT", "PATCH"] else None,
                    headers=request_headers
                ) as response:
                    
                    # Handle successful responses
                    if response.status == 200:
                        data = await self._parse_response(response)
                        
                        # Check for API-specific errors in response body
                        error_info = self._extract_api_error(data)
                        if error_info:
                            self.logger.error(
                                "API error in response body",
                                error=error_info,
                                endpoint=endpoint,
                                status=response.status
                            )
                            raise Exception(f"{self.service_name} API error: {error_info}")
                            
                        self.logger.debug(
                            "API request successful",
                            endpoint=endpoint,
                            status=response.status,
                            response_size=len(str(data)) if data else 0
                        )
                        return data
                        
                    # Handle rate limiting (common pattern across services)
                    elif response.status == 429:
                        wait_time = await self._calculate_backoff_time(response, attempt)
                        self.logger.warning(
                            "Rate limited - backing off",
                            attempt=attempt + 1,
                            wait_time=wait_time,
                            endpoint=endpoint,
                            retry_after=response.headers.get('Retry-After')
                        )
                        await asyncio.sleep(wait_time)
                        continue
                        
                    # Handle other HTTP errors
                    else:
                        await self._handle_http_error(response, endpoint, attempt, retries)
                        if attempt < retries:
                            await self._exponential_backoff(attempt)
                            continue
                        else:
                            error_msg = f"{self.service_name} request failed after {retries + 1} attempts"
                            self.logger.error(
                                "Request failed after all retries",
                                final_status=response.status,
                                endpoint=endpoint,
                                total_attempts=retries + 1
                            )
                            raise Exception(error_msg)
                            
            except asyncio.TimeoutError:
                self.logger.warning(
                    "Request timeout",
                    attempt=attempt + 1,
                    endpoint=endpoint,
                    timeout=self.timeout
                )
                if attempt == retries:
                    raise Exception(f"{self.service_name} request timed out after {retries + 1} attempts")
                await self._exponential_backoff(attempt)
                
            except aiohttp.ClientError as e:
                self.logger.error(
                    "HTTP client error",
                    error=str(e),
                    error_type=type(e).__name__,
                    attempt=attempt + 1,
                    endpoint=endpoint
                )
                if attempt == retries:
                    raise Exception(f"{self.service_name} client error: {str(e)}")
                await self._exponential_backoff(attempt)
                
            except Exception as e:
                self.logger.error(
                    "Unexpected error during request",
                    error=str(e),
                    error_type=type(e).__name__,
                    attempt=attempt + 1,
                    endpoint=endpoint
                )
                if attempt == retries:
                    raise
                await self._exponential_backoff(attempt)
                
        # Should never reach here, but safety fallback
        raise Exception(f"{self.service_name} request failed after {retries + 1} attempts")
    
    async def _parse_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]:
        """
        Parse API response. Can be overridden by subclasses for custom parsing.
        
        Args:
            response: HTTP response object
            
        Returns:
            Parsed response data
        """
        try:
            return await response.json()
        except json.JSONDecodeError as e:
            self.logger.error(f"{self.service_name} invalid JSON response", error=str(e))
            raise Exception(f"{self.service_name} returned invalid JSON")
    
    @abstractmethod
    def _extract_api_error(self, data: Dict[str, Any]) -> Optional[str]:
        """
        Extract API-specific error information from response data.
        Must be implemented by subclasses.
        
        Args:
            data: Parsed response data
            
        Returns:
            Error message if found, None otherwise
        """
        pass
    
    async def _calculate_backoff_time(
        self, 
        response: aiohttp.ClientResponse, 
        attempt: int
    ) -> float:
        """
        Calculate backoff time for rate limiting.
        
        Args:
            response: HTTP response with rate limit information
            attempt: Current attempt number
            
        Returns:
            Wait time in seconds
        """
        # Check for Retry-After header
        retry_after = response.headers.get('Retry-After')
        if retry_after:
            try:
                return float(retry_after)
            except ValueError:
                pass
                
        # Default exponential backoff
        return min(2 ** attempt, 60)  # Max 60 seconds
    
    async def _handle_http_error(
        self, 
        response: aiohttp.ClientResponse, 
        endpoint: str, 
        attempt: int, 
        max_retries: int
    ):
        """
        Handle HTTP error responses.
        
        Args:
            response: HTTP response object
            endpoint: Request endpoint
            attempt: Current attempt number
            max_retries: Maximum retry attempts
        """
        self.logger.warning(
            f"{self.service_name} HTTP error",
            status=response.status,
            endpoint=endpoint,
            attempt=attempt + 1,
            max_retries=max_retries + 1
        )
        
        # For 4xx errors (except 429), don't retry
        if 400 <= response.status < 500 and response.status != 429:
            raise Exception(f"{self.service_name} client error: {response.status}")
    
    async def _exponential_backoff(self, attempt: int, base_delay: float = 1.0):
        """
        Implement exponential backoff with jitter.
        
        Args:
            attempt: Current attempt number (0-based)
            base_delay: Base delay in seconds
        """
        import random
        
        # Exponential backoff with jitter
        delay = base_delay * (2 ** attempt)
        jitter = random.uniform(0.1, 0.3) * delay
        total_delay = min(delay + jitter, 60.0)  # Max 60 seconds
        
        self.logger.debug(
            "Backing off before retry",
            attempt=attempt + 1,
            delay=total_delay,
            base_delay=base_delay
        )
        
        await asyncio.sleep(total_delay)
    
    async def health_check(self) -> Dict[str, Any]:
        """
        Perform health check for this API client.
        
        Supports dependency injection pattern used by agents for service monitoring.
        
        Returns:
            Health status information
        """
        try:
            if not self.session:
                return {
                    "service": self.service_name,
                    "status": "not_initialized",
                    "healthy": False,
                    "message": "Client session not initialized"
                }
            
            # Try a simple request to check connectivity
            # Subclasses should override this for service-specific health checks
            start_time = time.time()
            await self._make_request("", method="HEAD", retries=1)
            response_time = time.time() - start_time
            
            return {
                "service": self.service_name,
                "status": "healthy",
                "healthy": True,
                "response_time_ms": int(response_time * 1000),
                "base_url": self.base_url,
                "rate_limiter": "active"
            }
            
        except Exception as e:
            self.logger.warning("Health check failed", error=str(e))
            return {
                "service": self.service_name,
                "status": "unhealthy",
                "healthy": False,
                "error": str(e),
                "base_url": self.base_url
            }
    
    def get_service_info(self) -> Dict[str, Any]:
        """
        Get service information for dependency injection and monitoring.
        
        Returns:
            Service configuration and status information
        """
        return {
            "service_name": self.service_name,
            "base_url": self.base_url,
            "timeout": self.timeout,
            "session_active": self.session is not None,
            "rate_limiter_type": type(self.rate_limiter).__name__,
            "component_type": "BaseAPIClient"
        }