Spaces:
Sleeping
Notification System Refactor - TODO
Current State (MVP)
Problem: Mixed sync/async code causing event loop errors. Services are sync but notification helpers are async.
Temporary Solution: Removed notification calls entirely, just logging "notification queued".
Limitations:
- No actual notifications sent
- Lost on server restart
- No retries on failure
- Runs in web worker (blocks requests)
- Tight coupling (endpoints know about notifications)
Proper Implementation Plan
Phase 1: Sync Notification Wrapper (Quick Fix)
Goal: Get notifications working without full async refactor.
Implementation:
# app/background/notifications.py
from fastapi import BackgroundTasks
def notify_ticket_assigned(db, ticket_id, agent_id, background_tasks: BackgroundTasks):
"""Sync wrapper - creates notification and queues sending"""
# 1. Create notification record (sync DB insert)
notification = NotificationService().create_notification_sync(...)
# 2. Queue async sending
background_tasks.add_task(send_notification_async, notification.id)
Changes needed:
- Create sync version of
create_notification() - Add
BackgroundTasksparameter to all endpoints - Update services to accept
background_tasksparameter - Call notification wrapper from services
Pros: Works now, minimal changes Cons: Still not production-ready, lost on crash
Phase 2: Full Async/Await (Proper Fix)
Goal: Make everything async for proper async notification handling.
Changes:
# All services become async
class TicketAssignmentService:
async def self_assign_ticket(self, ...): # Add async
assignment = await create_assignment() # Add await
await NotificationHelper.notify_ticket_assigned(...) # Works now
return assignment
# All endpoints become async
@router.post("/tickets/{id}/self-assign")
async def self_assign(...): # Add async
assignment = await service.self_assign_ticket(...) # Add await
return assignment
Migration steps:
- Convert database operations to async (use
asyncpgor SQLAlchemy async) - Convert all service methods to
async def - Convert all endpoints to
async def - Add
awaitto all async calls - Test thoroughly (async bugs are subtle)
Pros: Proper async, notifications work correctly Cons: Large refactor, risky, time-consuming
Phase 3: Celery for Critical Tasks (Production)
Goal: Reliable, persistent, retriable background tasks.
Setup:
pip install celery redis
Implementation:
# celery_app/celery.py
from celery import Celery
celery_app = Celery('swiftops', broker='redis://localhost:6379/0')
# celery_app/tasks/notifications.py
@celery_app.task(bind=True, max_retries=3)
def send_notification(self, notification_id):
try:
# Send notification
pass
except Exception as e:
raise self.retry(exc=e, countdown=60)
# In service
from celery_app.tasks.notifications import send_notification
def self_assign_ticket(self, ...):
assignment = create_assignment()
notification = create_notification_record(...)
send_notification.delay(notification.id) # Queue in Celery
return assignment
Run:
# Terminal 1: Web server
uvicorn app.main:app
# Terminal 2: Celery worker
celery -A celery_app worker --loglevel=info
Pros: Persistent, retriable, scalable, monitoring Cons: Extra process to manage, more complexity
Phase 4: Event-Driven Architecture (Scale)
Goal: Decouple services from notifications completely.
Pattern:
# Services emit events
event_bus.emit(TicketAssignedEvent(ticket_id, agent_id))
# Handlers react
@event_bus.on(TicketAssignedEvent)
def on_ticket_assigned(event):
send_notification.delay(event.ticket_id, event.agent_id)
update_analytics(event)
log_audit_trail(event)
Benefits: Clean separation, easy to add new reactions, testable
Recommendation
Now: Skip Phase 1, go straight to Phase 3 (Celery) when you implement notifications properly.
Why skip Phase 1? It's a half-measure that you'll throw away anyway. Better to do it right once.
When to do it: When you implement payroll (needs Celery anyway), add notifications at same time.
Files to Update (Phase 3)
- Create
celery_app/celery.py- Celery config - Create
celery_app/tasks/notifications.py- Notification tasks - Update
src/app/services/ticket_assignment_service.py- Call Celery tasks - Update
src/app/services/notification_service.py- Add sync create method - Update
requirements.txt- Add celery, redis - Update deployment - Run celery worker process
Testing Checklist
- Notifications sent on ticket assignment
- Notifications sent on inventory distribution
- Notifications sent on bulk sales order promotion
- Failed notifications retry automatically
- Server restart doesn't lose queued notifications
- Can monitor notification queue status
- Can manually retry failed notifications