import asyncio import httpx import logging from typing import Dict, Any from datetime import datetime logger = logging.getLogger(__name__) class KafkaHealthChecker: def __init__(self, dapr_http_port: int = 3500): self.dapr_http_port = dapr_http_port self.last_check_time = None self.last_status = None async def check_kafka_connectivity(self) -> Dict[str, Any]: """ Check if Dapr can reach Kafka by attempting to publish a test message. Returns: Dictionary with health check results """ start_time = datetime.now() try: # Try to ping Dapr sidecar first async with httpx.AsyncClient(timeout=5.0) as client: # Check if Dapr sidecar is running dapr_health_url = f"http://localhost:{self.dapr_http_port}/v1.0/healthz" dapr_response = await client.get(dapr_health_url) if dapr_response.status_code != 200: logger.error(f"Dapr sidecar health check failed: {dapr_response.status_code}") return { "status": "unhealthy", "details": f"Dapr sidecar not healthy: {dapr_response.status_code}", "timestamp": start_time.isoformat(), "response_time_ms": (datetime.now() - start_time).total_seconds() * 1000 } # Try to get the list of pubsub components components_url = f"http://localhost:{self.dapr_http_port}/v1.0/components" components_response = await client.get(components_url) if components_response.status_code != 200: logger.warning("Could not fetch Dapr components list") # Continue anyway, as this might not indicate a Kafka problem # Try to publish a test message to verify Kafka connectivity test_event = { "event_id": "health-check-" + str(int(start_time.timestamp())), "event_type": "health_check", "timestamp": start_time.isoformat() + "Z", "user_id": "system", "task_id": 0, "task_data": { "title": "Health Check", "description": "System health verification", "completed": False } } # Attempt to publish to Kafka via Dapr (but don't actually process it) # In a real scenario, we might use a test topic response = await client.post( f"http://localhost:{self.dapr_http_port}/v1.0/publish/kafka-pubsub/task-events", json=test_event ) # If we get here, assume Kafka is reachable (though the message may not be processed) status = "healthy" details = "Successfully connected to Kafka via Dapr" self.last_check_time = start_time self.last_status = status logger.info("Kafka connectivity check: healthy") except httpx.TimeoutException: status = "unhealthy" details = "Timeout connecting to Dapr sidecar or Kafka" logger.error("Kafka connectivity check: timeout") except httpx.RequestError as e: status = "unhealthy" details = f"Connection error: {str(e)}" logger.error(f"Kafka connectivity check: connection error - {e}") except Exception as e: status = "unhealthy" details = f"Unexpected error: {str(e)}" logger.error(f"Kafka connectivity check: unexpected error - {e}") return { "status": status, "details": details, "timestamp": start_time.isoformat(), "response_time_ms": (datetime.now() - start_time).total_seconds() * 1000 } # Global instance kafka_health_checker = KafkaHealthChecker()