Spaces:
Sleeping
Sleeping
| import httpx | |
| import uuid | |
| from datetime import datetime | |
| from typing import Any, Dict, Optional | |
| import asyncio | |
| import logging | |
| import time | |
| from .models.task import Task # Assuming Task model exists | |
| from .utils.circuit_breaker import kafka_circuit_breaker | |
| from .utils.metrics import ( | |
| increment_event_published, | |
| observe_event_publish_duration, | |
| increment_event_publish_error, | |
| increment_rate_limiter_request, | |
| increment_rate_limiter_rejection | |
| ) | |
| from .utils.rate_limiter import event_publisher_rate_limiter | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Retry configuration | |
| MAX_RETRIES = 3 | |
| RETRY_DELAY = 1 # seconds | |
| async def publish_task_event(event_type: str, task: Task): | |
| """ | |
| Publish a task event to Kafka via Dapr with retry mechanism, circuit breaker, and rate limiting. | |
| Implements graceful degradation - operations continue even if event publishing fails. | |
| Args: | |
| event_type: The type of event ('created', 'updated', 'completed', 'deleted') | |
| task: The task object that triggered the event | |
| """ | |
| start_time = time.time() | |
| # Rate limiting check - use user_id as the rate limiting key | |
| user_id = getattr(task, 'user_id', 'unknown') | |
| rate_limit_key = f"event_publisher:{user_id}" | |
| increment_rate_limiter_request(rate_limit_key) | |
| if not event_publisher_rate_limiter.is_allowed(rate_limit_key): | |
| logger.warning(f"Rate limit exceeded for user {user_id}, event type {event_type}") | |
| increment_rate_limiter_rejection(rate_limit_key) | |
| # Continue with the main operation but skip event publishing | |
| logger.info(f"Skipping event publishing due to rate limit for user {user_id}") | |
| return | |
| event = { | |
| "event_id": str(uuid.uuid4()), | |
| "event_type": event_type, | |
| "timestamp": datetime.now(datetime.UTC).isoformat() + "Z", | |
| "user_id": str(user_id), # Convert to string for consistency | |
| "task_id": getattr(task, 'id', 0), # Assuming id exists on task | |
| "task_data": { | |
| "title": getattr(task, 'title', ''), | |
| "description": getattr(task, 'description', ''), | |
| "completed": getattr(task, 'completed', False) | |
| } | |
| } | |
| # Use circuit breaker to wrap the publishing operation | |
| async def _publish_with_retry(): | |
| # Publish via Dapr Pub/Sub with retry mechanism | |
| for attempt in range(MAX_RETRIES): | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| "http://localhost:3500/v1.0/publish/kafka-pubsub/task-events", | |
| json=event | |
| ) | |
| response.raise_for_status() | |
| logger.info(f"Event published successfully: {event_type} for task {task.id} on attempt {attempt + 1}") | |
| return # Success, exit the function | |
| except httpx.RequestError as e: | |
| logger.warning(f"Attempt {attempt + 1} failed to publish event: {e}") | |
| if attempt == MAX_RETRIES - 1: # Last attempt | |
| logger.error(f"Failed to publish event after {MAX_RETRIES} attempts: {e}") | |
| raise # Re-raise the exception after all retries are exhausted | |
| # Wait before retrying (exponential backoff) | |
| await asyncio.sleep(RETRY_DELAY * (2 ** attempt)) | |
| logger.error(f"All {MAX_RETRIES} attempts failed to publish event for task {task.id}") | |
| raise Exception(f"Failed to publish event after {MAX_RETRIES} attempts") | |
| # Call the publishing function through the circuit breaker | |
| # Use graceful degradation: if event publishing fails, log the error but don't fail the main operation | |
| try: | |
| await kafka_circuit_breaker.call(_publish_with_retry) | |
| duration = time.time() - start_time | |
| logger.info(f"Successfully published {event_type} event for task {task.id}") | |
| increment_event_published(event_type) | |
| observe_event_publish_duration(event_type, duration) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| logger.error(f"Event publishing failed for task {task.id}, but main operation continues: {e}") | |
| increment_event_publish_error(event_type) | |
| observe_event_publish_duration(event_type, duration) | |
| # Don't raise the exception - allow the main operation to continue (graceful degradation) | |
| async def publish_created_event(task: Task): | |
| """Publish a 'created' event for a new task.""" | |
| await publish_task_event("created", task) | |
| async def publish_updated_event(task: Task): | |
| """Publish an 'updated' event for a modified task.""" | |
| await publish_task_event("updated", task) | |
| async def publish_deleted_event(task: Task): | |
| """Publish a 'deleted' event for a deleted task.""" | |
| await publish_task_event("deleted", task) | |
| async def publish_completed_event(task: Task): | |
| """Publish a 'completed' event for a completed task.""" | |
| await publish_task_event("completed", task) |