zenith-backend / app /services /integration /api_integration_hub.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
API Integration Hub
Centralized management of third-party API integrations with webhook support,
rate limiting, and partner ecosystem management.
"""
import hashlib
import hmac
import json
import logging
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
import aiohttp
from app.services.infrastructure.circuit_breaker import (
CircuitBreakerConfig,
circuit_breaker,
)
logger = logging.getLogger(__name__)
class IntegrationStatus(Enum):
ACTIVE = "active"
INACTIVE = "inactive"
ERROR = "error"
MAINTENANCE = "maintenance"
class IntegrationType(Enum):
WEBHOOK = "webhook"
REST_API = "rest_api"
GRAPHQL = "graphql"
SOAP = "soap"
DATABASE = "database"
FILE_UPLOAD = "file_upload"
class AuthenticationType(Enum):
NONE = "none"
API_KEY = "api_key"
OAUTH2 = "oauth2"
JWT = "jwt"
BASIC_AUTH = "basic_auth"
CERTIFICATE = "certificate"
@dataclass
class IntegrationConfig:
"""Configuration for a third-party integration"""
integration_id: str
name: str
type: IntegrationType
status: IntegrationStatus
endpoint_url: str
authentication: AuthenticationType
auth_config: dict[str, Any] = field(default_factory=dict)
rate_limit: int = 100 # requests per minute
timeout_seconds: int = 30
retry_attempts: int = 3
headers: dict[str, str] = field(default_factory=dict)
query_params: dict[str, str] = field(default_factory=dict)
webhook_secret: str | None = None
created_at: datetime = field(default_factory=datetime.now)
last_used: datetime | None = None
error_count: int = 0
success_count: int = 0
@dataclass
class IntegrationCall:
"""Record of an integration API call"""
call_id: str
integration_id: str
method: str
endpoint: str
request_data: dict[str, Any] | None = None
response_data: dict[str, Any] | None = None
status_code: int | None = None
duration_ms: int | None = None
error_message: str | None = None
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class WebhookEvent:
"""Incoming webhook event"""
event_id: str
integration_id: str
event_type: str
payload: dict[str, Any]
headers: dict[str, str]
signature: str | None = None
verified: bool = False
processed_at: datetime | None = None
timestamp: datetime = field(default_factory=datetime.now)
class APIIntegrationHub:
"""Centralized API integration management system"""
def __init__(self):
self.integrations: dict[str, IntegrationConfig] = {}
self.call_history: list[IntegrationCall] = []
self.webhook_handlers: dict[str, Callable] = {}
self.rate_limiters: dict[str, dict[str, list[datetime]]] = {}
self.session_pool: dict[str, aiohttp.ClientSession] = {}
self.db = None
async def initialize(self, db_session):
"""Initialize the hub with persistent data from DB"""
self.db = db_session
try:
from core.database import IntegrationConfigModel
db_integrations = self.db.query(IntegrationConfigModel).all()
for db_int in db_integrations:
config = IntegrationConfig(
integration_id=db_int.id,
name=db_int.name,
type=IntegrationType(db_int.type),
status=IntegrationStatus(db_int.status),
endpoint_url=db_int.endpoint_url,
authentication=AuthenticationType(db_int.auth_type),
auth_config=db_int.auth_config or {},
rate_limit=db_int.rate_limit,
created_at=db_int.created_at,
last_used=db_int.last_used,
)
self.integrations[config.integration_id] = config
# Setup session and rate limiter
if config.type in [IntegrationType.REST_API, IntegrationType.GRAPHQL]:
await self._create_session(config)
self.rate_limiters[config.integration_id] = {
"calls": [],
"last_reset": datetime.now(),
}
logger.info(f"Loaded {len(db_integrations)} integrations from DB")
except Exception as e:
logger.error(f"Failed to load integrations from DB: {e}")
async def register_integration(self, config: IntegrationConfig) -> bool:
"""
Register a new third-party integration and persist to DB
"""
try:
# Validate configuration
await self._validate_integration_config(config)
# Store in DB if session available
if self.db:
from core.database import IntegrationConfigModel
db_int = (
self.db.query(IntegrationConfigModel)
.filter(IntegrationConfigModel.id == config.integration_id)
.first()
)
if not db_int:
db_int = IntegrationConfigModel(
id=config.integration_id,
name=config.name,
type=config.type.value,
status=config.status.value,
endpoint_url=config.endpoint_url,
auth_type=config.authentication.value,
auth_config=config.auth_config,
rate_limit=config.rate_limit,
created_at=config.created_at,
)
self.db.add(db_int)
else:
db_int.name = config.name
db_int.type = config.type.value
db_int.status = config.status.value
db_int.endpoint_url = config.endpoint_url
db_int.auth_type = config.authentication.value
db_int.auth_config = config.auth_config
db_int.rate_limit = config.rate_limit
self.db.commit()
# Store in memory
self.integrations[config.integration_id] = config
# Initialize session pool for REST APIs
if config.type in [IntegrationType.REST_API, IntegrationType.GRAPHQL]:
await self._create_session(config)
# Initialize rate limiter
self.rate_limiters[config.integration_id] = {
"calls": [],
"last_reset": datetime.now(),
}
logger.info(
f"Registered integration: {config.name} ({config.integration_id})"
)
return True
except Exception as e:
logger.error(f"Failed to register integration {config.integration_id}: {e}")
return False
async def call_integration(
self,
integration_id: str,
method: str = "GET",
endpoint: str = "",
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""
Make an API call to a registered integration
Args:
integration_id: Integration identifier
method: HTTP method
endpoint: API endpoint path
data: Request data
headers: Additional headers
Returns:
API response
"""
if integration_id not in self.integrations:
raise ValueError(f"Integration not found: {integration_id}")
config = self.integrations[integration_id]
# Check rate limits
if not await self._check_rate_limit(config):
raise Exception(f"Rate limit exceeded for integration: {integration_id}")
# Prepare request
start_time = datetime.now()
call_id = f"call_{integration_id}_{int(start_time.timestamp() * 1000)}"
try:
# Build full URL
base_url = config.endpoint_url.rstrip("/")
full_url = f"{base_url}/{endpoint.lstrip('/')}" if endpoint else base_url
# Add query parameters
if config.query_params:
from urllib.parse import urlencode
query_string = urlencode(config.query_params)
full_url += f"?{query_string}"
# Prepare headers
request_headers = dict(config.headers)
if headers:
request_headers.update(headers)
# Add authentication
await self._add_authentication(
config, request_headers, method, full_url, data
)
# Make request based on integration type
if config.type == IntegrationType.REST_API:
response = await self._make_rest_call(
config, method, full_url, data, request_headers
)
elif config.type == IntegrationType.GRAPHQL:
response = await self._make_graphql_call(
config, full_url, data, request_headers
)
else:
raise ValueError(f"Unsupported integration type: {config.type}")
# Record successful call
duration = int((datetime.now() - start_time).total_seconds() * 1000)
call_record = IntegrationCall(
call_id=call_id,
integration_id=integration_id,
method=method,
endpoint=endpoint,
request_data=data,
response_data=response.get("data"),
status_code=response.get("status_code"),
duration_ms=duration,
)
self.call_history.append(call_record)
config.success_count += 1
config.last_used = datetime.now()
# Update DB last_used
if self.db:
try:
from core.database import IntegrationConfigModel
db_int = (
self.db.query(IntegrationConfigModel)
.filter(IntegrationConfigModel.id == integration_id)
.first()
)
if db_int:
db_int.last_used = config.last_used
self.db.commit()
except Exception:
pass
# Keep only recent history
self.call_history = self.call_history[-1000:]
return response
except Exception as e:
# Record failed call
duration = int((datetime.now() - start_time).total_seconds() * 1000)
call_record = IntegrationCall(
call_id=call_id,
integration_id=integration_id,
method=method,
endpoint=endpoint,
request_data=data,
error_message=str(e),
duration_ms=duration,
)
self.call_history.append(call_record)
config.error_count += 1
logger.error(f"Integration call failed: {integration_id} - {e}")
raise
async def register_webhook_handler(
self, integration_id: str, event_type: str, handler: Callable
) -> bool:
"""
Register a webhook event handler
Args:
integration_id: Integration identifier
event_type: Webhook event type
handler: Async function to handle the event
Returns:
Registration success
"""
handler_key = f"{integration_id}:{event_type}"
self.webhook_handlers[handler_key] = handler
logger.info(f"Registered webhook handler: {handler_key}")
return True
async def process_webhook_event(
self,
integration_id: str,
event_type: str,
payload: dict[str, Any],
headers: dict[str, str],
raw_body: bytes,
) -> dict[str, Any]:
"""
Process an incoming webhook event
Args:
integration_id: Integration identifier
event_type: Event type
payload: Event payload
headers: Request headers
raw_body: Raw request body for signature verification
Returns:
Processing result
"""
if integration_id not in self.integrations:
raise ValueError(f"Integration not found: {integration_id}")
config = self.integrations[integration_id]
# Create webhook event record
event_id = f"webhook_{integration_id}_{int(datetime.now().timestamp() * 1000)}"
webhook_event = WebhookEvent(
event_id=event_id,
integration_id=integration_id,
event_type=event_type,
payload=payload,
headers=headers,
)
# Verify webhook signature if configured
if config.webhook_secret:
signature = headers.get("X-Signature", headers.get("X-Hub-Signature"))
if signature:
webhook_event.verified = self._verify_webhook_signature(
raw_body, signature, config.webhook_secret
)
else:
logger.warning(f"No signature provided for webhook: {event_id}")
# Find and execute handler
handler_key = f"{integration_id}:{event_type}"
if handler_key in self.webhook_handlers:
try:
handler = self.webhook_handlers[handler_key]
result = await handler(webhook_event)
webhook_event.processed_at = datetime.now()
return {
"success": True,
"event_id": event_id,
"result": result,
"verified": webhook_event.verified,
}
except Exception as e:
logger.error(f"Webhook handler failed: {handler_key} - {e}")
return {
"success": False,
"event_id": event_id,
"error": str(e),
"verified": webhook_event.verified,
}
else:
logger.warning(f"No handler found for webhook: {handler_key}")
return {
"success": False,
"event_id": event_id,
"error": "No handler registered for event type",
"verified": webhook_event.verified,
}
async def get_integration_status(
self, integration_id: str
) -> dict[str, Any] | None:
"""Get detailed status of an integration"""
if integration_id not in self.integrations:
return None
config = self.integrations[integration_id]
rate_limiter = self.rate_limiters.get(integration_id, {})
# Calculate current rate limit status
calls_last_minute = len(
[
call_time
for call_time in rate_limiter.get("calls", [])
if datetime.now() - call_time < timedelta(minutes=1)
]
)
return {
"integration_id": config.integration_id,
"name": config.name,
"type": config.type.value,
"status": config.status.value,
"endpoint_url": config.endpoint_url,
"rate_limit": config.rate_limit,
"current_usage": calls_last_minute,
"success_count": config.success_count,
"error_count": config.error_count,
"last_used": config.last_used.isoformat() if config.last_used else None,
"uptime_percentage": self._calculate_uptime_percentage(config),
}
def get_integration_metrics(self) -> dict[str, Any]:
"""Get overall integration hub metrics"""
total_integrations = len(self.integrations)
active_integrations = len(
[
config
for config in self.integrations.values()
if config.status == IntegrationStatus.ACTIVE
]
)
total_calls = len(self.call_history)
successful_calls = len(
[
call
for call in self.call_history
if call.status_code and 200 <= call.status_code < 300
]
)
avg_response_time = 0
if self.call_history:
response_times = [
call.duration_ms for call in self.call_history if call.duration_ms
]
if response_times:
avg_response_time = sum(response_times) / len(response_times)
return {
"total_integrations": total_integrations,
"active_integrations": active_integrations,
"total_api_calls": total_calls,
"successful_calls": successful_calls,
"success_rate": (
(successful_calls / total_calls * 100) if total_calls > 0 else 0
),
"average_response_time_ms": avg_response_time,
"integrations_by_type": self._count_integrations_by_type(),
"integrations_by_status": self._count_integrations_by_status(),
}
async def _validate_integration_config(self, config: IntegrationConfig) -> None:
"""Validate integration configuration"""
if not config.integration_id or not config.name:
raise ValueError("Integration ID and name are required")
if (
config.type
in [
IntegrationType.REST_API,
IntegrationType.GRAPHQL,
IntegrationType.SOAP,
]
and not config.endpoint_url
):
raise ValueError("Endpoint URL is required for API integrations")
# Test authentication configuration
if config.authentication != AuthenticationType.NONE:
required_fields = self._get_auth_required_fields(config.authentication)
missing_fields = [
field for field in required_fields if field not in config.auth_config
]
if missing_fields:
raise ValueError(f"Missing authentication fields: {missing_fields}")
def _get_auth_required_fields(self, auth_type: AuthenticationType) -> list[str]:
"""Get required fields for authentication type"""
auth_fields = {
AuthenticationType.API_KEY: ["api_key"],
AuthenticationType.OAUTH2: ["client_id", "client_secret", "token_url"],
AuthenticationType.JWT: ["secret_key"],
AuthenticationType.BASIC_AUTH: ["username", "password"],
AuthenticationType.CERTIFICATE: ["cert_path", "key_path"],
}
return auth_fields.get(auth_type, [])
async def _create_session(self, config: IntegrationConfig) -> None:
"""Create HTTP session for integration"""
connector = aiohttp.TCPConnector(limit=10, ttl_dns_cache=300)
timeout = aiohttp.ClientTimeout(total=config.timeout_seconds)
session = aiohttp.ClientSession(
connector=connector, timeout=timeout, headers=config.headers
)
self.session_pool[config.integration_id] = session
async def _add_authentication(
self,
config: IntegrationConfig,
headers: dict[str, str],
method: str,
url: str,
data: dict[str, Any] | None,
) -> None:
"""Add authentication to request headers"""
if config.authentication == AuthenticationType.API_KEY:
api_key = config.auth_config.get("api_key")
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
elif config.authentication == AuthenticationType.BASIC_AUTH:
import base64
username = config.auth_config.get("username", "")
password = config.auth_config.get("password", "")
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
headers["Authorization"] = f"Basic {credentials}"
elif config.authentication == AuthenticationType.OAUTH2:
# Implement OAuth2 token refresh logic
token = await self._get_oauth_token(config)
if token:
headers["Authorization"] = f"Bearer {token}"
async def _get_oauth_token(self, config: IntegrationConfig) -> str | None:
"""Get OAuth2 access token"""
# Simplified OAuth2 implementation
# In production, this would handle token refresh, caching, etc.
auth_config = config.auth_config
token_url = auth_config.get("token_url")
if not token_url:
return None
payload = {
"grant_type": "client_credentials",
"client_id": auth_config.get("client_id"),
"client_secret": auth_config.get("client_secret"),
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(token_url, data=payload) as response:
if response.status == 200:
data = await response.json()
return data.get("access_token")
except Exception as e:
logger.error(f"OAuth2 token request failed: {e}")
return None
async def _check_rate_limit(self, config: IntegrationConfig) -> bool:
"""Check if request is within rate limits"""
rate_limiter = self.rate_limiters.get(config.integration_id)
if not rate_limiter:
return True
now = datetime.now()
calls = rate_limiter["calls"]
# Clean old calls (older than 1 minute)
calls[:] = [
call_time for call_time in calls if now - call_time < timedelta(minutes=1)
]
# Check if under limit
if len(calls) >= config.rate_limit:
return False
# Add current call
calls.append(now)
return True
@circuit_breaker(
"external_api_integration",
CircuitBreakerConfig(
failure_threshold=3,
recovery_timeout=45.0,
expected_exception=(aiohttp.ClientError, Exception),
),
)
async def _make_rest_call(
self,
config: IntegrationConfig,
method: str,
url: str,
data: dict[str, Any] | None,
headers: dict[str, str],
) -> dict[str, Any]:
"""Make REST API call with circuit breaker protection"""
session = self.session_pool.get(config.integration_id)
if not session:
raise Exception(
f"No session available for integration: {config.integration_id}"
)
# Prepare request data
json_data = json.dumps(data) if data else None
async with session.request(
method, url, data=json_data, headers=headers
) as response:
response_data = (
await response.json()
if response.content_type == "application/json"
else await response.text()
)
return {
"status_code": response.status,
"headers": dict(response.headers),
"data": (
response_data
if isinstance(response_data, dict)
else {"text": response_data}
),
}
async def _make_graphql_call(
self,
config: IntegrationConfig,
url: str,
data: dict[str, Any] | None,
headers: dict[str, str],
) -> dict[str, Any]:
"""Make GraphQL API call"""
if not data or "query" not in data:
raise ValueError("GraphQL request must include 'query' field")
graphql_payload = {
"query": data["query"],
"variables": data.get("variables", {}),
"operation_name": data.get("operation_name"),
}
return await self._make_rest_call(config, "POST", url, graphql_payload, headers)
def _verify_webhook_signature(
self, body: bytes, signature: str, secret: str
) -> bool:
"""Verify webhook signature"""
try:
# GitHub-style signature verification
if signature.startswith("sha256="):
expected_signature = hmac.new(
secret.encode(), body, hashlib.sha256
).hexdigest()
provided_signature = signature[7:] # Remove 'sha256=' prefix
return hmac.compare_digest(expected_signature, provided_signature)
else:
# Simple HMAC verification
expected_signature = hmac.new(
secret.encode(), body, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected_signature, signature)
except Exception as e:
logger.error(f"Webhook signature verification failed: {e}")
return False
def _calculate_uptime_percentage(self, config: IntegrationConfig) -> float:
"""Calculate integration uptime percentage"""
total_calls = config.success_count + config.error_count
if total_calls == 0:
return 100.0
return (config.success_count / total_calls) * 100
def _count_integrations_by_type(self) -> dict[str, int]:
"""Count integrations by type"""
type_counts = {}
for config in self.integrations.values():
type_name = config.type.value
type_counts[type_name] = type_counts.get(type_name, 0) + 1
return type_counts
def _count_integrations_by_status(self) -> dict[str, int]:
"""Count integrations by status"""
status_counts = {}
for config in self.integrations.values():
status_name = config.status.value
status_counts[status_name] = status_counts.get(status_name, 0) + 1
return status_counts
# Global instance
api_integration_hub = APIIntegrationHub()