Taskflow-App / src /events.py
Tahasaif3's picture
Update src/events.py
c71cf2c verified
import httpx
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
import asyncio
import logging
import time
from .models.task import Task # Assuming Task model exists
from .utils.circuit_breaker import kafka_circuit_breaker
from .utils.metrics import (
increment_event_published,
observe_event_publish_duration,
increment_event_publish_error,
increment_rate_limiter_request,
increment_rate_limiter_rejection
)
from .utils.rate_limiter import event_publisher_rate_limiter
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Retry configuration
MAX_RETRIES = 3
RETRY_DELAY = 1 # seconds
async def publish_task_event(event_type: str, task: Task):
"""
Publish a task event to Kafka via Dapr with retry mechanism, circuit breaker, and rate limiting.
Implements graceful degradation - operations continue even if event publishing fails.
Args:
event_type: The type of event ('created', 'updated', 'completed', 'deleted')
task: The task object that triggered the event
"""
start_time = time.time()
# Rate limiting check - use user_id as the rate limiting key
user_id = getattr(task, 'user_id', 'unknown')
rate_limit_key = f"event_publisher:{user_id}"
increment_rate_limiter_request(rate_limit_key)
if not event_publisher_rate_limiter.is_allowed(rate_limit_key):
logger.warning(f"Rate limit exceeded for user {user_id}, event type {event_type}")
increment_rate_limiter_rejection(rate_limit_key)
# Continue with the main operation but skip event publishing
logger.info(f"Skipping event publishing due to rate limit for user {user_id}")
return
event = {
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"timestamp": datetime.now(datetime.UTC).isoformat() + "Z",
"user_id": str(user_id), # Convert to string for consistency
"task_id": getattr(task, 'id', 0), # Assuming id exists on task
"task_data": {
"title": getattr(task, 'title', ''),
"description": getattr(task, 'description', ''),
"completed": getattr(task, 'completed', False)
}
}
# Use circuit breaker to wrap the publishing operation
async def _publish_with_retry():
# Publish via Dapr Pub/Sub with retry mechanism
for attempt in range(MAX_RETRIES):
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:3500/v1.0/publish/kafka-pubsub/task-events",
json=event
)
response.raise_for_status()
logger.info(f"Event published successfully: {event_type} for task {task.id} on attempt {attempt + 1}")
return # Success, exit the function
except httpx.RequestError as e:
logger.warning(f"Attempt {attempt + 1} failed to publish event: {e}")
if attempt == MAX_RETRIES - 1: # Last attempt
logger.error(f"Failed to publish event after {MAX_RETRIES} attempts: {e}")
raise # Re-raise the exception after all retries are exhausted
# Wait before retrying (exponential backoff)
await asyncio.sleep(RETRY_DELAY * (2 ** attempt))
logger.error(f"All {MAX_RETRIES} attempts failed to publish event for task {task.id}")
raise Exception(f"Failed to publish event after {MAX_RETRIES} attempts")
# Call the publishing function through the circuit breaker
# Use graceful degradation: if event publishing fails, log the error but don't fail the main operation
try:
await kafka_circuit_breaker.call(_publish_with_retry)
duration = time.time() - start_time
logger.info(f"Successfully published {event_type} event for task {task.id}")
increment_event_published(event_type)
observe_event_publish_duration(event_type, duration)
except Exception as e:
duration = time.time() - start_time
logger.error(f"Event publishing failed for task {task.id}, but main operation continues: {e}")
increment_event_publish_error(event_type)
observe_event_publish_duration(event_type, duration)
# Don't raise the exception - allow the main operation to continue (graceful degradation)
async def publish_created_event(task: Task):
"""Publish a 'created' event for a new task."""
await publish_task_event("created", task)
async def publish_updated_event(task: Task):
"""Publish an 'updated' event for a modified task."""
await publish_task_event("updated", task)
async def publish_deleted_event(task: Task):
"""Publish a 'deleted' event for a deleted task."""
await publish_task_event("deleted", task)
async def publish_completed_event(task: Task):
"""Publish a 'completed' event for a completed task."""
await publish_task_event("completed", task)