Spaces:
Paused
Paused
| """ | |
| API Integration Hub | |
| Centralized management of third-party API integrations with webhook support, | |
| rate limiting, and partner ecosystem management. | |
| """ | |
| import hashlib | |
| import hmac | |
| import json | |
| import logging | |
| from collections.abc import Callable | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from typing import Any | |
| import aiohttp | |
| from app.services.infrastructure.circuit_breaker import ( | |
| CircuitBreakerConfig, | |
| circuit_breaker, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class IntegrationStatus(Enum): | |
| ACTIVE = "active" | |
| INACTIVE = "inactive" | |
| ERROR = "error" | |
| MAINTENANCE = "maintenance" | |
| class IntegrationType(Enum): | |
| WEBHOOK = "webhook" | |
| REST_API = "rest_api" | |
| GRAPHQL = "graphql" | |
| SOAP = "soap" | |
| DATABASE = "database" | |
| FILE_UPLOAD = "file_upload" | |
| class AuthenticationType(Enum): | |
| NONE = "none" | |
| API_KEY = "api_key" | |
| OAUTH2 = "oauth2" | |
| JWT = "jwt" | |
| BASIC_AUTH = "basic_auth" | |
| CERTIFICATE = "certificate" | |
| class IntegrationConfig: | |
| """Configuration for a third-party integration""" | |
| integration_id: str | |
| name: str | |
| type: IntegrationType | |
| status: IntegrationStatus | |
| endpoint_url: str | |
| authentication: AuthenticationType | |
| auth_config: dict[str, Any] = field(default_factory=dict) | |
| rate_limit: int = 100 # requests per minute | |
| timeout_seconds: int = 30 | |
| retry_attempts: int = 3 | |
| headers: dict[str, str] = field(default_factory=dict) | |
| query_params: dict[str, str] = field(default_factory=dict) | |
| webhook_secret: str | None = None | |
| created_at: datetime = field(default_factory=datetime.now) | |
| last_used: datetime | None = None | |
| error_count: int = 0 | |
| success_count: int = 0 | |
| class IntegrationCall: | |
| """Record of an integration API call""" | |
| call_id: str | |
| integration_id: str | |
| method: str | |
| endpoint: str | |
| request_data: dict[str, Any] | None = None | |
| response_data: dict[str, Any] | None = None | |
| status_code: int | None = None | |
| duration_ms: int | None = None | |
| error_message: str | None = None | |
| timestamp: datetime = field(default_factory=datetime.now) | |
| class WebhookEvent: | |
| """Incoming webhook event""" | |
| event_id: str | |
| integration_id: str | |
| event_type: str | |
| payload: dict[str, Any] | |
| headers: dict[str, str] | |
| signature: str | None = None | |
| verified: bool = False | |
| processed_at: datetime | None = None | |
| timestamp: datetime = field(default_factory=datetime.now) | |
| class APIIntegrationHub: | |
| """Centralized API integration management system""" | |
| def __init__(self): | |
| self.integrations: dict[str, IntegrationConfig] = {} | |
| self.call_history: list[IntegrationCall] = [] | |
| self.webhook_handlers: dict[str, Callable] = {} | |
| self.rate_limiters: dict[str, dict[str, list[datetime]]] = {} | |
| self.session_pool: dict[str, aiohttp.ClientSession] = {} | |
| self.db = None | |
| async def initialize(self, db_session): | |
| """Initialize the hub with persistent data from DB""" | |
| self.db = db_session | |
| try: | |
| from core.database import IntegrationConfigModel | |
| db_integrations = self.db.query(IntegrationConfigModel).all() | |
| for db_int in db_integrations: | |
| config = IntegrationConfig( | |
| integration_id=db_int.id, | |
| name=db_int.name, | |
| type=IntegrationType(db_int.type), | |
| status=IntegrationStatus(db_int.status), | |
| endpoint_url=db_int.endpoint_url, | |
| authentication=AuthenticationType(db_int.auth_type), | |
| auth_config=db_int.auth_config or {}, | |
| rate_limit=db_int.rate_limit, | |
| created_at=db_int.created_at, | |
| last_used=db_int.last_used, | |
| ) | |
| self.integrations[config.integration_id] = config | |
| # Setup session and rate limiter | |
| if config.type in [IntegrationType.REST_API, IntegrationType.GRAPHQL]: | |
| await self._create_session(config) | |
| self.rate_limiters[config.integration_id] = { | |
| "calls": [], | |
| "last_reset": datetime.now(), | |
| } | |
| logger.info(f"Loaded {len(db_integrations)} integrations from DB") | |
| except Exception as e: | |
| logger.error(f"Failed to load integrations from DB: {e}") | |
| async def register_integration(self, config: IntegrationConfig) -> bool: | |
| """ | |
| Register a new third-party integration and persist to DB | |
| """ | |
| try: | |
| # Validate configuration | |
| await self._validate_integration_config(config) | |
| # Store in DB if session available | |
| if self.db: | |
| from core.database import IntegrationConfigModel | |
| db_int = ( | |
| self.db.query(IntegrationConfigModel) | |
| .filter(IntegrationConfigModel.id == config.integration_id) | |
| .first() | |
| ) | |
| if not db_int: | |
| db_int = IntegrationConfigModel( | |
| id=config.integration_id, | |
| name=config.name, | |
| type=config.type.value, | |
| status=config.status.value, | |
| endpoint_url=config.endpoint_url, | |
| auth_type=config.authentication.value, | |
| auth_config=config.auth_config, | |
| rate_limit=config.rate_limit, | |
| created_at=config.created_at, | |
| ) | |
| self.db.add(db_int) | |
| else: | |
| db_int.name = config.name | |
| db_int.type = config.type.value | |
| db_int.status = config.status.value | |
| db_int.endpoint_url = config.endpoint_url | |
| db_int.auth_type = config.authentication.value | |
| db_int.auth_config = config.auth_config | |
| db_int.rate_limit = config.rate_limit | |
| self.db.commit() | |
| # Store in memory | |
| self.integrations[config.integration_id] = config | |
| # Initialize session pool for REST APIs | |
| if config.type in [IntegrationType.REST_API, IntegrationType.GRAPHQL]: | |
| await self._create_session(config) | |
| # Initialize rate limiter | |
| self.rate_limiters[config.integration_id] = { | |
| "calls": [], | |
| "last_reset": datetime.now(), | |
| } | |
| logger.info( | |
| f"Registered integration: {config.name} ({config.integration_id})" | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to register integration {config.integration_id}: {e}") | |
| return False | |
| async def call_integration( | |
| self, | |
| integration_id: str, | |
| method: str = "GET", | |
| endpoint: str = "", | |
| data: dict[str, Any] | None = None, | |
| headers: dict[str, str] | None = None, | |
| ) -> dict[str, Any]: | |
| """ | |
| Make an API call to a registered integration | |
| Args: | |
| integration_id: Integration identifier | |
| method: HTTP method | |
| endpoint: API endpoint path | |
| data: Request data | |
| headers: Additional headers | |
| Returns: | |
| API response | |
| """ | |
| if integration_id not in self.integrations: | |
| raise ValueError(f"Integration not found: {integration_id}") | |
| config = self.integrations[integration_id] | |
| # Check rate limits | |
| if not await self._check_rate_limit(config): | |
| raise Exception(f"Rate limit exceeded for integration: {integration_id}") | |
| # Prepare request | |
| start_time = datetime.now() | |
| call_id = f"call_{integration_id}_{int(start_time.timestamp() * 1000)}" | |
| try: | |
| # Build full URL | |
| base_url = config.endpoint_url.rstrip("/") | |
| full_url = f"{base_url}/{endpoint.lstrip('/')}" if endpoint else base_url | |
| # Add query parameters | |
| if config.query_params: | |
| from urllib.parse import urlencode | |
| query_string = urlencode(config.query_params) | |
| full_url += f"?{query_string}" | |
| # Prepare headers | |
| request_headers = dict(config.headers) | |
| if headers: | |
| request_headers.update(headers) | |
| # Add authentication | |
| await self._add_authentication( | |
| config, request_headers, method, full_url, data | |
| ) | |
| # Make request based on integration type | |
| if config.type == IntegrationType.REST_API: | |
| response = await self._make_rest_call( | |
| config, method, full_url, data, request_headers | |
| ) | |
| elif config.type == IntegrationType.GRAPHQL: | |
| response = await self._make_graphql_call( | |
| config, full_url, data, request_headers | |
| ) | |
| else: | |
| raise ValueError(f"Unsupported integration type: {config.type}") | |
| # Record successful call | |
| duration = int((datetime.now() - start_time).total_seconds() * 1000) | |
| call_record = IntegrationCall( | |
| call_id=call_id, | |
| integration_id=integration_id, | |
| method=method, | |
| endpoint=endpoint, | |
| request_data=data, | |
| response_data=response.get("data"), | |
| status_code=response.get("status_code"), | |
| duration_ms=duration, | |
| ) | |
| self.call_history.append(call_record) | |
| config.success_count += 1 | |
| config.last_used = datetime.now() | |
| # Update DB last_used | |
| if self.db: | |
| try: | |
| from core.database import IntegrationConfigModel | |
| db_int = ( | |
| self.db.query(IntegrationConfigModel) | |
| .filter(IntegrationConfigModel.id == integration_id) | |
| .first() | |
| ) | |
| if db_int: | |
| db_int.last_used = config.last_used | |
| self.db.commit() | |
| except Exception: | |
| pass | |
| # Keep only recent history | |
| self.call_history = self.call_history[-1000:] | |
| return response | |
| except Exception as e: | |
| # Record failed call | |
| duration = int((datetime.now() - start_time).total_seconds() * 1000) | |
| call_record = IntegrationCall( | |
| call_id=call_id, | |
| integration_id=integration_id, | |
| method=method, | |
| endpoint=endpoint, | |
| request_data=data, | |
| error_message=str(e), | |
| duration_ms=duration, | |
| ) | |
| self.call_history.append(call_record) | |
| config.error_count += 1 | |
| logger.error(f"Integration call failed: {integration_id} - {e}") | |
| raise | |
| async def register_webhook_handler( | |
| self, integration_id: str, event_type: str, handler: Callable | |
| ) -> bool: | |
| """ | |
| Register a webhook event handler | |
| Args: | |
| integration_id: Integration identifier | |
| event_type: Webhook event type | |
| handler: Async function to handle the event | |
| Returns: | |
| Registration success | |
| """ | |
| handler_key = f"{integration_id}:{event_type}" | |
| self.webhook_handlers[handler_key] = handler | |
| logger.info(f"Registered webhook handler: {handler_key}") | |
| return True | |
| async def process_webhook_event( | |
| self, | |
| integration_id: str, | |
| event_type: str, | |
| payload: dict[str, Any], | |
| headers: dict[str, str], | |
| raw_body: bytes, | |
| ) -> dict[str, Any]: | |
| """ | |
| Process an incoming webhook event | |
| Args: | |
| integration_id: Integration identifier | |
| event_type: Event type | |
| payload: Event payload | |
| headers: Request headers | |
| raw_body: Raw request body for signature verification | |
| Returns: | |
| Processing result | |
| """ | |
| if integration_id not in self.integrations: | |
| raise ValueError(f"Integration not found: {integration_id}") | |
| config = self.integrations[integration_id] | |
| # Create webhook event record | |
| event_id = f"webhook_{integration_id}_{int(datetime.now().timestamp() * 1000)}" | |
| webhook_event = WebhookEvent( | |
| event_id=event_id, | |
| integration_id=integration_id, | |
| event_type=event_type, | |
| payload=payload, | |
| headers=headers, | |
| ) | |
| # Verify webhook signature if configured | |
| if config.webhook_secret: | |
| signature = headers.get("X-Signature", headers.get("X-Hub-Signature")) | |
| if signature: | |
| webhook_event.verified = self._verify_webhook_signature( | |
| raw_body, signature, config.webhook_secret | |
| ) | |
| else: | |
| logger.warning(f"No signature provided for webhook: {event_id}") | |
| # Find and execute handler | |
| handler_key = f"{integration_id}:{event_type}" | |
| if handler_key in self.webhook_handlers: | |
| try: | |
| handler = self.webhook_handlers[handler_key] | |
| result = await handler(webhook_event) | |
| webhook_event.processed_at = datetime.now() | |
| return { | |
| "success": True, | |
| "event_id": event_id, | |
| "result": result, | |
| "verified": webhook_event.verified, | |
| } | |
| except Exception as e: | |
| logger.error(f"Webhook handler failed: {handler_key} - {e}") | |
| return { | |
| "success": False, | |
| "event_id": event_id, | |
| "error": str(e), | |
| "verified": webhook_event.verified, | |
| } | |
| else: | |
| logger.warning(f"No handler found for webhook: {handler_key}") | |
| return { | |
| "success": False, | |
| "event_id": event_id, | |
| "error": "No handler registered for event type", | |
| "verified": webhook_event.verified, | |
| } | |
| async def get_integration_status( | |
| self, integration_id: str | |
| ) -> dict[str, Any] | None: | |
| """Get detailed status of an integration""" | |
| if integration_id not in self.integrations: | |
| return None | |
| config = self.integrations[integration_id] | |
| rate_limiter = self.rate_limiters.get(integration_id, {}) | |
| # Calculate current rate limit status | |
| calls_last_minute = len( | |
| [ | |
| call_time | |
| for call_time in rate_limiter.get("calls", []) | |
| if datetime.now() - call_time < timedelta(minutes=1) | |
| ] | |
| ) | |
| return { | |
| "integration_id": config.integration_id, | |
| "name": config.name, | |
| "type": config.type.value, | |
| "status": config.status.value, | |
| "endpoint_url": config.endpoint_url, | |
| "rate_limit": config.rate_limit, | |
| "current_usage": calls_last_minute, | |
| "success_count": config.success_count, | |
| "error_count": config.error_count, | |
| "last_used": config.last_used.isoformat() if config.last_used else None, | |
| "uptime_percentage": self._calculate_uptime_percentage(config), | |
| } | |
| def get_integration_metrics(self) -> dict[str, Any]: | |
| """Get overall integration hub metrics""" | |
| total_integrations = len(self.integrations) | |
| active_integrations = len( | |
| [ | |
| config | |
| for config in self.integrations.values() | |
| if config.status == IntegrationStatus.ACTIVE | |
| ] | |
| ) | |
| total_calls = len(self.call_history) | |
| successful_calls = len( | |
| [ | |
| call | |
| for call in self.call_history | |
| if call.status_code and 200 <= call.status_code < 300 | |
| ] | |
| ) | |
| avg_response_time = 0 | |
| if self.call_history: | |
| response_times = [ | |
| call.duration_ms for call in self.call_history if call.duration_ms | |
| ] | |
| if response_times: | |
| avg_response_time = sum(response_times) / len(response_times) | |
| return { | |
| "total_integrations": total_integrations, | |
| "active_integrations": active_integrations, | |
| "total_api_calls": total_calls, | |
| "successful_calls": successful_calls, | |
| "success_rate": ( | |
| (successful_calls / total_calls * 100) if total_calls > 0 else 0 | |
| ), | |
| "average_response_time_ms": avg_response_time, | |
| "integrations_by_type": self._count_integrations_by_type(), | |
| "integrations_by_status": self._count_integrations_by_status(), | |
| } | |
| async def _validate_integration_config(self, config: IntegrationConfig) -> None: | |
| """Validate integration configuration""" | |
| if not config.integration_id or not config.name: | |
| raise ValueError("Integration ID and name are required") | |
| if ( | |
| config.type | |
| in [ | |
| IntegrationType.REST_API, | |
| IntegrationType.GRAPHQL, | |
| IntegrationType.SOAP, | |
| ] | |
| and not config.endpoint_url | |
| ): | |
| raise ValueError("Endpoint URL is required for API integrations") | |
| # Test authentication configuration | |
| if config.authentication != AuthenticationType.NONE: | |
| required_fields = self._get_auth_required_fields(config.authentication) | |
| missing_fields = [ | |
| field for field in required_fields if field not in config.auth_config | |
| ] | |
| if missing_fields: | |
| raise ValueError(f"Missing authentication fields: {missing_fields}") | |
| def _get_auth_required_fields(self, auth_type: AuthenticationType) -> list[str]: | |
| """Get required fields for authentication type""" | |
| auth_fields = { | |
| AuthenticationType.API_KEY: ["api_key"], | |
| AuthenticationType.OAUTH2: ["client_id", "client_secret", "token_url"], | |
| AuthenticationType.JWT: ["secret_key"], | |
| AuthenticationType.BASIC_AUTH: ["username", "password"], | |
| AuthenticationType.CERTIFICATE: ["cert_path", "key_path"], | |
| } | |
| return auth_fields.get(auth_type, []) | |
| async def _create_session(self, config: IntegrationConfig) -> None: | |
| """Create HTTP session for integration""" | |
| connector = aiohttp.TCPConnector(limit=10, ttl_dns_cache=300) | |
| timeout = aiohttp.ClientTimeout(total=config.timeout_seconds) | |
| session = aiohttp.ClientSession( | |
| connector=connector, timeout=timeout, headers=config.headers | |
| ) | |
| self.session_pool[config.integration_id] = session | |
| async def _add_authentication( | |
| self, | |
| config: IntegrationConfig, | |
| headers: dict[str, str], | |
| method: str, | |
| url: str, | |
| data: dict[str, Any] | None, | |
| ) -> None: | |
| """Add authentication to request headers""" | |
| if config.authentication == AuthenticationType.API_KEY: | |
| api_key = config.auth_config.get("api_key") | |
| if api_key: | |
| headers["Authorization"] = f"Bearer {api_key}" | |
| elif config.authentication == AuthenticationType.BASIC_AUTH: | |
| import base64 | |
| username = config.auth_config.get("username", "") | |
| password = config.auth_config.get("password", "") | |
| credentials = base64.b64encode(f"{username}:{password}".encode()).decode() | |
| headers["Authorization"] = f"Basic {credentials}" | |
| elif config.authentication == AuthenticationType.OAUTH2: | |
| # Implement OAuth2 token refresh logic | |
| token = await self._get_oauth_token(config) | |
| if token: | |
| headers["Authorization"] = f"Bearer {token}" | |
| async def _get_oauth_token(self, config: IntegrationConfig) -> str | None: | |
| """Get OAuth2 access token""" | |
| # Simplified OAuth2 implementation | |
| # In production, this would handle token refresh, caching, etc. | |
| auth_config = config.auth_config | |
| token_url = auth_config.get("token_url") | |
| if not token_url: | |
| return None | |
| payload = { | |
| "grant_type": "client_credentials", | |
| "client_id": auth_config.get("client_id"), | |
| "client_secret": auth_config.get("client_secret"), | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(token_url, data=payload) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| return data.get("access_token") | |
| except Exception as e: | |
| logger.error(f"OAuth2 token request failed: {e}") | |
| return None | |
| async def _check_rate_limit(self, config: IntegrationConfig) -> bool: | |
| """Check if request is within rate limits""" | |
| rate_limiter = self.rate_limiters.get(config.integration_id) | |
| if not rate_limiter: | |
| return True | |
| now = datetime.now() | |
| calls = rate_limiter["calls"] | |
| # Clean old calls (older than 1 minute) | |
| calls[:] = [ | |
| call_time for call_time in calls if now - call_time < timedelta(minutes=1) | |
| ] | |
| # Check if under limit | |
| if len(calls) >= config.rate_limit: | |
| return False | |
| # Add current call | |
| calls.append(now) | |
| return True | |
| async def _make_rest_call( | |
| self, | |
| config: IntegrationConfig, | |
| method: str, | |
| url: str, | |
| data: dict[str, Any] | None, | |
| headers: dict[str, str], | |
| ) -> dict[str, Any]: | |
| """Make REST API call with circuit breaker protection""" | |
| session = self.session_pool.get(config.integration_id) | |
| if not session: | |
| raise Exception( | |
| f"No session available for integration: {config.integration_id}" | |
| ) | |
| # Prepare request data | |
| json_data = json.dumps(data) if data else None | |
| async with session.request( | |
| method, url, data=json_data, headers=headers | |
| ) as response: | |
| response_data = ( | |
| await response.json() | |
| if response.content_type == "application/json" | |
| else await response.text() | |
| ) | |
| return { | |
| "status_code": response.status, | |
| "headers": dict(response.headers), | |
| "data": ( | |
| response_data | |
| if isinstance(response_data, dict) | |
| else {"text": response_data} | |
| ), | |
| } | |
| async def _make_graphql_call( | |
| self, | |
| config: IntegrationConfig, | |
| url: str, | |
| data: dict[str, Any] | None, | |
| headers: dict[str, str], | |
| ) -> dict[str, Any]: | |
| """Make GraphQL API call""" | |
| if not data or "query" not in data: | |
| raise ValueError("GraphQL request must include 'query' field") | |
| graphql_payload = { | |
| "query": data["query"], | |
| "variables": data.get("variables", {}), | |
| "operation_name": data.get("operation_name"), | |
| } | |
| return await self._make_rest_call(config, "POST", url, graphql_payload, headers) | |
| def _verify_webhook_signature( | |
| self, body: bytes, signature: str, secret: str | |
| ) -> bool: | |
| """Verify webhook signature""" | |
| try: | |
| # GitHub-style signature verification | |
| if signature.startswith("sha256="): | |
| expected_signature = hmac.new( | |
| secret.encode(), body, hashlib.sha256 | |
| ).hexdigest() | |
| provided_signature = signature[7:] # Remove 'sha256=' prefix | |
| return hmac.compare_digest(expected_signature, provided_signature) | |
| else: | |
| # Simple HMAC verification | |
| expected_signature = hmac.new( | |
| secret.encode(), body, hashlib.sha256 | |
| ).hexdigest() | |
| return hmac.compare_digest(expected_signature, signature) | |
| except Exception as e: | |
| logger.error(f"Webhook signature verification failed: {e}") | |
| return False | |
| def _calculate_uptime_percentage(self, config: IntegrationConfig) -> float: | |
| """Calculate integration uptime percentage""" | |
| total_calls = config.success_count + config.error_count | |
| if total_calls == 0: | |
| return 100.0 | |
| return (config.success_count / total_calls) * 100 | |
| def _count_integrations_by_type(self) -> dict[str, int]: | |
| """Count integrations by type""" | |
| type_counts = {} | |
| for config in self.integrations.values(): | |
| type_name = config.type.value | |
| type_counts[type_name] = type_counts.get(type_name, 0) + 1 | |
| return type_counts | |
| def _count_integrations_by_status(self) -> dict[str, int]: | |
| """Count integrations by status""" | |
| status_counts = {} | |
| for config in self.integrations.values(): | |
| status_name = config.status.value | |
| status_counts[status_name] = status_counts.get(status_name, 0) + 1 | |
| return status_counts | |
| # Global instance | |
| api_integration_hub = APIIntegrationHub() | |