Taskflow-App / src /utils /health_check.py
Tahasaif3's picture
'code
34e27fb
import asyncio
import httpx
import logging
from typing import Dict, Any
from datetime import datetime
logger = logging.getLogger(__name__)
class KafkaHealthChecker:
def __init__(self, dapr_http_port: int = 3500):
self.dapr_http_port = dapr_http_port
self.last_check_time = None
self.last_status = None
async def check_kafka_connectivity(self) -> Dict[str, Any]:
"""
Check if Dapr can reach Kafka by attempting to publish a test message.
Returns:
Dictionary with health check results
"""
start_time = datetime.now()
try:
# Try to ping Dapr sidecar first
async with httpx.AsyncClient(timeout=5.0) as client:
# Check if Dapr sidecar is running
dapr_health_url = f"http://localhost:{self.dapr_http_port}/v1.0/healthz"
dapr_response = await client.get(dapr_health_url)
if dapr_response.status_code != 200:
logger.error(f"Dapr sidecar health check failed: {dapr_response.status_code}")
return {
"status": "unhealthy",
"details": f"Dapr sidecar not healthy: {dapr_response.status_code}",
"timestamp": start_time.isoformat(),
"response_time_ms": (datetime.now() - start_time).total_seconds() * 1000
}
# Try to get the list of pubsub components
components_url = f"http://localhost:{self.dapr_http_port}/v1.0/components"
components_response = await client.get(components_url)
if components_response.status_code != 200:
logger.warning("Could not fetch Dapr components list")
# Continue anyway, as this might not indicate a Kafka problem
# Try to publish a test message to verify Kafka connectivity
test_event = {
"event_id": "health-check-" + str(int(start_time.timestamp())),
"event_type": "health_check",
"timestamp": start_time.isoformat() + "Z",
"user_id": "system",
"task_id": 0,
"task_data": {
"title": "Health Check",
"description": "System health verification",
"completed": False
}
}
# Attempt to publish to Kafka via Dapr (but don't actually process it)
# In a real scenario, we might use a test topic
response = await client.post(
f"http://localhost:{self.dapr_http_port}/v1.0/publish/kafka-pubsub/task-events",
json=test_event
)
# If we get here, assume Kafka is reachable (though the message may not be processed)
status = "healthy"
details = "Successfully connected to Kafka via Dapr"
self.last_check_time = start_time
self.last_status = status
logger.info("Kafka connectivity check: healthy")
except httpx.TimeoutException:
status = "unhealthy"
details = "Timeout connecting to Dapr sidecar or Kafka"
logger.error("Kafka connectivity check: timeout")
except httpx.RequestError as e:
status = "unhealthy"
details = f"Connection error: {str(e)}"
logger.error(f"Kafka connectivity check: connection error - {e}")
except Exception as e:
status = "unhealthy"
details = f"Unexpected error: {str(e)}"
logger.error(f"Kafka connectivity check: unexpected error - {e}")
return {
"status": status,
"details": details,
"timestamp": start_time.isoformat(),
"response_time_ms": (datetime.now() - start_time).total_seconds() * 1000
}
# Global instance
kafka_health_checker = KafkaHealthChecker()