import httpx import uuid from datetime import datetime from typing import Any, Dict, Optional import asyncio import logging import time from .models.task import Task # Assuming Task model exists from .utils.circuit_breaker import kafka_circuit_breaker from .utils.metrics import ( increment_event_published, observe_event_publish_duration, increment_event_publish_error, increment_rate_limiter_request, increment_rate_limiter_rejection ) from .utils.rate_limiter import event_publisher_rate_limiter # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Retry configuration MAX_RETRIES = 3 RETRY_DELAY = 1 # seconds async def publish_task_event(event_type: str, task: Task): """ Publish a task event to Kafka via Dapr with retry mechanism, circuit breaker, and rate limiting. Implements graceful degradation - operations continue even if event publishing fails. Args: event_type: The type of event ('created', 'updated', 'completed', 'deleted') task: The task object that triggered the event """ start_time = time.time() # Rate limiting check - use user_id as the rate limiting key user_id = getattr(task, 'user_id', 'unknown') rate_limit_key = f"event_publisher:{user_id}" increment_rate_limiter_request(rate_limit_key) if not event_publisher_rate_limiter.is_allowed(rate_limit_key): logger.warning(f"Rate limit exceeded for user {user_id}, event type {event_type}") increment_rate_limiter_rejection(rate_limit_key) # Continue with the main operation but skip event publishing logger.info(f"Skipping event publishing due to rate limit for user {user_id}") return event = { "event_id": str(uuid.uuid4()), "event_type": event_type, "timestamp": datetime.now(datetime.UTC).isoformat() + "Z", "user_id": str(user_id), # Convert to string for consistency "task_id": getattr(task, 'id', 0), # Assuming id exists on task "task_data": { "title": getattr(task, 'title', ''), "description": getattr(task, 'description', ''), "completed": getattr(task, 'completed', False) } } # Use circuit breaker to wrap the publishing operation async def _publish_with_retry(): # Publish via Dapr Pub/Sub with retry mechanism for attempt in range(MAX_RETRIES): try: async with httpx.AsyncClient() as client: response = await client.post( "http://localhost:3500/v1.0/publish/kafka-pubsub/task-events", json=event ) response.raise_for_status() logger.info(f"Event published successfully: {event_type} for task {task.id} on attempt {attempt + 1}") return # Success, exit the function except httpx.RequestError as e: logger.warning(f"Attempt {attempt + 1} failed to publish event: {e}") if attempt == MAX_RETRIES - 1: # Last attempt logger.error(f"Failed to publish event after {MAX_RETRIES} attempts: {e}") raise # Re-raise the exception after all retries are exhausted # Wait before retrying (exponential backoff) await asyncio.sleep(RETRY_DELAY * (2 ** attempt)) logger.error(f"All {MAX_RETRIES} attempts failed to publish event for task {task.id}") raise Exception(f"Failed to publish event after {MAX_RETRIES} attempts") # Call the publishing function through the circuit breaker # Use graceful degradation: if event publishing fails, log the error but don't fail the main operation try: await kafka_circuit_breaker.call(_publish_with_retry) duration = time.time() - start_time logger.info(f"Successfully published {event_type} event for task {task.id}") increment_event_published(event_type) observe_event_publish_duration(event_type, duration) except Exception as e: duration = time.time() - start_time logger.error(f"Event publishing failed for task {task.id}, but main operation continues: {e}") increment_event_publish_error(event_type) observe_event_publish_duration(event_type, duration) # Don't raise the exception - allow the main operation to continue (graceful degradation) async def publish_created_event(task: Task): """Publish a 'created' event for a new task.""" await publish_task_event("created", task) async def publish_updated_event(task: Task): """Publish an 'updated' event for a modified task.""" await publish_task_event("updated", task) async def publish_deleted_event(task: Task): """Publish a 'deleted' event for a deleted task.""" await publish_task_event("deleted", task) async def publish_completed_event(task: Task): """Publish a 'completed' event for a completed task.""" await publish_task_event("completed", task)