File size: 5,076 Bytes
34e27fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c71cf2c
34e27fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import httpx
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
import asyncio
import logging
import time
from .models.task import Task  # Assuming Task model exists
from .utils.circuit_breaker import kafka_circuit_breaker
from .utils.metrics import (
    increment_event_published,
    observe_event_publish_duration,
    increment_event_publish_error,
    increment_rate_limiter_request,
    increment_rate_limiter_rejection
)
from .utils.rate_limiter import event_publisher_rate_limiter

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Retry configuration
MAX_RETRIES = 3
RETRY_DELAY = 1  # seconds


async def publish_task_event(event_type: str, task: Task):
    """
    Publish a task event to Kafka via Dapr with retry mechanism, circuit breaker, and rate limiting.
    Implements graceful degradation - operations continue even if event publishing fails.

    Args:
        event_type: The type of event ('created', 'updated', 'completed', 'deleted')
        task: The task object that triggered the event
    """
    start_time = time.time()

    # Rate limiting check - use user_id as the rate limiting key
    user_id = getattr(task, 'user_id', 'unknown')
    rate_limit_key = f"event_publisher:{user_id}"

    increment_rate_limiter_request(rate_limit_key)

    if not event_publisher_rate_limiter.is_allowed(rate_limit_key):
        logger.warning(f"Rate limit exceeded for user {user_id}, event type {event_type}")
        increment_rate_limiter_rejection(rate_limit_key)
        # Continue with the main operation but skip event publishing
        logger.info(f"Skipping event publishing due to rate limit for user {user_id}")
        return

    event = {
        "event_id": str(uuid.uuid4()),
        "event_type": event_type,
        "timestamp": datetime.now(datetime.UTC).isoformat() + "Z",
        "user_id": str(user_id),  # Convert to string for consistency
        "task_id": getattr(task, 'id', 0),  # Assuming id exists on task
        "task_data": {
            "title": getattr(task, 'title', ''),
            "description": getattr(task, 'description', ''),
            "completed": getattr(task, 'completed', False)
        }
    }

    # Use circuit breaker to wrap the publishing operation
    async def _publish_with_retry():
        # Publish via Dapr Pub/Sub with retry mechanism
        for attempt in range(MAX_RETRIES):
            try:
                async with httpx.AsyncClient() as client:
                    response = await client.post(
                        "http://localhost:3500/v1.0/publish/kafka-pubsub/task-events",
                        json=event
                    )
                    response.raise_for_status()
                    logger.info(f"Event published successfully: {event_type} for task {task.id} on attempt {attempt + 1}")
                    return  # Success, exit the function
            except httpx.RequestError as e:
                logger.warning(f"Attempt {attempt + 1} failed to publish event: {e}")
                if attempt == MAX_RETRIES - 1:  # Last attempt
                    logger.error(f"Failed to publish event after {MAX_RETRIES} attempts: {e}")
                    raise  # Re-raise the exception after all retries are exhausted

                # Wait before retrying (exponential backoff)
                await asyncio.sleep(RETRY_DELAY * (2 ** attempt))

        logger.error(f"All {MAX_RETRIES} attempts failed to publish event for task {task.id}")
        raise Exception(f"Failed to publish event after {MAX_RETRIES} attempts")

    # Call the publishing function through the circuit breaker
    # Use graceful degradation: if event publishing fails, log the error but don't fail the main operation
    try:
        await kafka_circuit_breaker.call(_publish_with_retry)
        duration = time.time() - start_time
        logger.info(f"Successfully published {event_type} event for task {task.id}")
        increment_event_published(event_type)
        observe_event_publish_duration(event_type, duration)
    except Exception as e:
        duration = time.time() - start_time
        logger.error(f"Event publishing failed for task {task.id}, but main operation continues: {e}")
        increment_event_publish_error(event_type)
        observe_event_publish_duration(event_type, duration)
        # Don't raise the exception - allow the main operation to continue (graceful degradation)


async def publish_created_event(task: Task):
    """Publish a 'created' event for a new task."""
    await publish_task_event("created", task)


async def publish_updated_event(task: Task):
    """Publish an 'updated' event for a modified task."""
    await publish_task_event("updated", task)


async def publish_deleted_event(task: Task):
    """Publish a 'deleted' event for a deleted task."""
    await publish_task_event("deleted", task)


async def publish_completed_event(task: Task):
    """Publish a 'completed' event for a completed task."""
    await publish_task_event("completed", task)