File size: 4,267 Bytes
edcd2ef | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | """
Recurring Task Subscription Endpoint - Phase 5
Dapr subscription handler for task.completed events
"""
from typing import Dict, Any
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
from pydantic import BaseModel
from sqlalchemy.orm import Session
from src.db.session import get_db
from src.services.recurring_task_service import get_recurring_task_service
from src.utils.logger import get_logger
router = APIRouter()
logger = get_logger(__name__)
class TaskCompletedEvent(BaseModel):
"""Schema for task.completed event from Kafka"""
event_id: str
event_type: str
correlation_id: str
timestamp: str
source_service: str
payload: Dict[str, Any]
@router.post("/task-completed")
async def handle_task_completed_event(
request: Request,
background_tasks: BackgroundTasks
):
"""
Dapr subscription endpoint for task.completed events.
When a task is marked complete, this endpoint is automatically invoked
by Dapr to generate the next occurrence for recurring tasks.
Flow:
1. Dapr delivers task.completed event from Kafka
2. Extract task_id and user_id from event payload
3. Check if task is recurring (has recurrence_rule)
4. Calculate next due date based on pattern
5. Create new task instance
6. Publish task.created event
"""
try:
# Parse event data
event_data = await request.json()
logger.info("Task completed event received", event_data=event_data)
# Validate event structure
if "payload" not in event_data or "task_id" not in event_data["payload"]:
logger.error("Invalid event payload", event_data=event_data)
raise HTTPException(status_code=400, detail="Invalid event payload")
task_id = event_data["payload"]["task_id"]
user_id = event_data["payload"].get("user_id")
if not task_id or not user_id:
logger.error("Missing task_id or user_id in event", event_data=event_data)
raise HTTPException(status_code=400, detail="Missing task_id or user_id")
logger.info(
"Processing task completed for recurring generation",
task_id=task_id,
user_id=user_id
)
# Handle recurring task generation in background
async def process_recurring_task():
db: Session = next(get_db())
try:
service = get_recurring_task_service()
result = await service.handle_task_completed(
task_id=task_id,
user_id=user_id,
db=db
)
if result:
logger.info(
"Recurring task generated successfully",
task_id=task_id,
result=result
)
else:
logger.debug(
"Task is not recurring or no more occurrences to generate",
task_id=task_id
)
except Exception as e:
logger.error(
"Failed to process recurring task generation",
task_id=task_id,
error=str(e),
exc_info=True
)
finally:
db.close()
background_tasks.add_task(process_recurring_task)
return {
"status": "accepted",
"message": "Task completed event received, processing in background"
}
except Exception as e:
logger.error(
"Failed to handle task completed event",
error=str(e),
exc_info=True
)
raise HTTPException(status_code=500, detail=f"Failed to process event: {str(e)}")
@router.get("/health")
async def health_check():
"""Health check endpoint for the recurring task subscription"""
return {
"status": "healthy",
"service": "recurring-task-subscription",
"subscription": "task-completed"
}
@router.get("/ready")
async def readiness_check():
"""Readiness check endpoint"""
return {
"status": "ready",
"service": "recurring-task-subscription"
}
|