Spaces:
Sleeping
Sleeping
| import asyncio | |
| import httpx | |
| import logging | |
| from typing import Dict, Any | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class KafkaHealthChecker: | |
| def __init__(self, dapr_http_port: int = 3500): | |
| self.dapr_http_port = dapr_http_port | |
| self.last_check_time = None | |
| self.last_status = None | |
| async def check_kafka_connectivity(self) -> Dict[str, Any]: | |
| """ | |
| Check if Dapr can reach Kafka by attempting to publish a test message. | |
| Returns: | |
| Dictionary with health check results | |
| """ | |
| start_time = datetime.now() | |
| try: | |
| # Try to ping Dapr sidecar first | |
| async with httpx.AsyncClient(timeout=5.0) as client: | |
| # Check if Dapr sidecar is running | |
| dapr_health_url = f"http://localhost:{self.dapr_http_port}/v1.0/healthz" | |
| dapr_response = await client.get(dapr_health_url) | |
| if dapr_response.status_code != 200: | |
| logger.error(f"Dapr sidecar health check failed: {dapr_response.status_code}") | |
| return { | |
| "status": "unhealthy", | |
| "details": f"Dapr sidecar not healthy: {dapr_response.status_code}", | |
| "timestamp": start_time.isoformat(), | |
| "response_time_ms": (datetime.now() - start_time).total_seconds() * 1000 | |
| } | |
| # Try to get the list of pubsub components | |
| components_url = f"http://localhost:{self.dapr_http_port}/v1.0/components" | |
| components_response = await client.get(components_url) | |
| if components_response.status_code != 200: | |
| logger.warning("Could not fetch Dapr components list") | |
| # Continue anyway, as this might not indicate a Kafka problem | |
| # Try to publish a test message to verify Kafka connectivity | |
| test_event = { | |
| "event_id": "health-check-" + str(int(start_time.timestamp())), | |
| "event_type": "health_check", | |
| "timestamp": start_time.isoformat() + "Z", | |
| "user_id": "system", | |
| "task_id": 0, | |
| "task_data": { | |
| "title": "Health Check", | |
| "description": "System health verification", | |
| "completed": False | |
| } | |
| } | |
| # Attempt to publish to Kafka via Dapr (but don't actually process it) | |
| # In a real scenario, we might use a test topic | |
| response = await client.post( | |
| f"http://localhost:{self.dapr_http_port}/v1.0/publish/kafka-pubsub/task-events", | |
| json=test_event | |
| ) | |
| # If we get here, assume Kafka is reachable (though the message may not be processed) | |
| status = "healthy" | |
| details = "Successfully connected to Kafka via Dapr" | |
| self.last_check_time = start_time | |
| self.last_status = status | |
| logger.info("Kafka connectivity check: healthy") | |
| except httpx.TimeoutException: | |
| status = "unhealthy" | |
| details = "Timeout connecting to Dapr sidecar or Kafka" | |
| logger.error("Kafka connectivity check: timeout") | |
| except httpx.RequestError as e: | |
| status = "unhealthy" | |
| details = f"Connection error: {str(e)}" | |
| logger.error(f"Kafka connectivity check: connection error - {e}") | |
| except Exception as e: | |
| status = "unhealthy" | |
| details = f"Unexpected error: {str(e)}" | |
| logger.error(f"Kafka connectivity check: unexpected error - {e}") | |
| return { | |
| "status": status, | |
| "details": details, | |
| "timestamp": start_time.isoformat(), | |
| "response_time_ms": (datetime.now() - start_time).total_seconds() * 1000 | |
| } | |
| # Global instance | |
| kafka_health_checker = KafkaHealthChecker() |