todo-api / phase-5 /backend /src /api /recurring_subscription.py
Nanny7's picture
feat: Phase 5 Complete - Production-Ready AI Todo Application ๐ŸŽ‰
edcd2ef
"""
Recurring Task Subscription Endpoint - Phase 5
Dapr subscription handler for task.completed events
"""
from typing import Dict, Any
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
from pydantic import BaseModel
from sqlalchemy.orm import Session
from src.db.session import get_db
from src.services.recurring_task_service import get_recurring_task_service
from src.utils.logger import get_logger
router = APIRouter()
logger = get_logger(__name__)
class TaskCompletedEvent(BaseModel):
"""Schema for task.completed event from Kafka"""
event_id: str
event_type: str
correlation_id: str
timestamp: str
source_service: str
payload: Dict[str, Any]
@router.post("/task-completed")
async def handle_task_completed_event(
request: Request,
background_tasks: BackgroundTasks
):
"""
Dapr subscription endpoint for task.completed events.
When a task is marked complete, this endpoint is automatically invoked
by Dapr to generate the next occurrence for recurring tasks.
Flow:
1. Dapr delivers task.completed event from Kafka
2. Extract task_id and user_id from event payload
3. Check if task is recurring (has recurrence_rule)
4. Calculate next due date based on pattern
5. Create new task instance
6. Publish task.created event
"""
try:
# Parse event data
event_data = await request.json()
logger.info("Task completed event received", event_data=event_data)
# Validate event structure
if "payload" not in event_data or "task_id" not in event_data["payload"]:
logger.error("Invalid event payload", event_data=event_data)
raise HTTPException(status_code=400, detail="Invalid event payload")
task_id = event_data["payload"]["task_id"]
user_id = event_data["payload"].get("user_id")
if not task_id or not user_id:
logger.error("Missing task_id or user_id in event", event_data=event_data)
raise HTTPException(status_code=400, detail="Missing task_id or user_id")
logger.info(
"Processing task completed for recurring generation",
task_id=task_id,
user_id=user_id
)
# Handle recurring task generation in background
async def process_recurring_task():
db: Session = next(get_db())
try:
service = get_recurring_task_service()
result = await service.handle_task_completed(
task_id=task_id,
user_id=user_id,
db=db
)
if result:
logger.info(
"Recurring task generated successfully",
task_id=task_id,
result=result
)
else:
logger.debug(
"Task is not recurring or no more occurrences to generate",
task_id=task_id
)
except Exception as e:
logger.error(
"Failed to process recurring task generation",
task_id=task_id,
error=str(e),
exc_info=True
)
finally:
db.close()
background_tasks.add_task(process_recurring_task)
return {
"status": "accepted",
"message": "Task completed event received, processing in background"
}
except Exception as e:
logger.error(
"Failed to handle task completed event",
error=str(e),
exc_info=True
)
raise HTTPException(status_code=500, detail=f"Failed to process event: {str(e)}")
@router.get("/health")
async def health_check():
"""Health check endpoint for the recurring task subscription"""
return {
"status": "healthy",
"service": "recurring-task-subscription",
"subscription": "task-completed"
}
@router.get("/ready")
async def readiness_check():
"""Readiness check endpoint"""
return {
"status": "ready",
"service": "recurring-task-subscription"
}