File size: 4,100 Bytes
34e27fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import asyncio
import httpx
import logging
from typing import Dict, Any
from datetime import datetime

logger = logging.getLogger(__name__)


class KafkaHealthChecker:
    def __init__(self, dapr_http_port: int = 3500):
        self.dapr_http_port = dapr_http_port
        self.last_check_time = None
        self.last_status = None

    async def check_kafka_connectivity(self) -> Dict[str, Any]:
        """
        Check if Dapr can reach Kafka by attempting to publish a test message.

        Returns:
            Dictionary with health check results
        """
        start_time = datetime.now()

        try:
            # Try to ping Dapr sidecar first
            async with httpx.AsyncClient(timeout=5.0) as client:
                # Check if Dapr sidecar is running
                dapr_health_url = f"http://localhost:{self.dapr_http_port}/v1.0/healthz"
                dapr_response = await client.get(dapr_health_url)

                if dapr_response.status_code != 200:
                    logger.error(f"Dapr sidecar health check failed: {dapr_response.status_code}")
                    return {
                        "status": "unhealthy",
                        "details": f"Dapr sidecar not healthy: {dapr_response.status_code}",
                        "timestamp": start_time.isoformat(),
                        "response_time_ms": (datetime.now() - start_time).total_seconds() * 1000
                    }

                # Try to get the list of pubsub components
                components_url = f"http://localhost:{self.dapr_http_port}/v1.0/components"
                components_response = await client.get(components_url)

                if components_response.status_code != 200:
                    logger.warning("Could not fetch Dapr components list")
                    # Continue anyway, as this might not indicate a Kafka problem

                # Try to publish a test message to verify Kafka connectivity
                test_event = {
                    "event_id": "health-check-" + str(int(start_time.timestamp())),
                    "event_type": "health_check",
                    "timestamp": start_time.isoformat() + "Z",
                    "user_id": "system",
                    "task_id": 0,
                    "task_data": {
                        "title": "Health Check",
                        "description": "System health verification",
                        "completed": False
                    }
                }

                # Attempt to publish to Kafka via Dapr (but don't actually process it)
                # In a real scenario, we might use a test topic
                response = await client.post(
                    f"http://localhost:{self.dapr_http_port}/v1.0/publish/kafka-pubsub/task-events",
                    json=test_event
                )

                # If we get here, assume Kafka is reachable (though the message may not be processed)
                status = "healthy"
                details = "Successfully connected to Kafka via Dapr"

                self.last_check_time = start_time
                self.last_status = status

                logger.info("Kafka connectivity check: healthy")

        except httpx.TimeoutException:
            status = "unhealthy"
            details = "Timeout connecting to Dapr sidecar or Kafka"
            logger.error("Kafka connectivity check: timeout")

        except httpx.RequestError as e:
            status = "unhealthy"
            details = f"Connection error: {str(e)}"
            logger.error(f"Kafka connectivity check: connection error - {e}")

        except Exception as e:
            status = "unhealthy"
            details = f"Unexpected error: {str(e)}"
            logger.error(f"Kafka connectivity check: unexpected error - {e}")

        return {
            "status": status,
            "details": details,
            "timestamp": start_time.isoformat(),
            "response_time_ms": (datetime.now() - start_time).total_seconds() * 1000
        }


# Global instance
kafka_health_checker = KafkaHealthChecker()