Spaces:
Sleeping
Sleeping
| # Notification System Refactor - TODO | |
| ## Current State (MVP) | |
| **Problem:** Mixed sync/async code causing event loop errors. Services are sync but notification helpers are async. | |
| **Temporary Solution:** Removed notification calls entirely, just logging "notification queued". | |
| **Limitations:** | |
| - No actual notifications sent | |
| - Lost on server restart | |
| - No retries on failure | |
| - Runs in web worker (blocks requests) | |
| - Tight coupling (endpoints know about notifications) | |
| ## Proper Implementation Plan | |
| ### Phase 1: Sync Notification Wrapper (Quick Fix) | |
| **Goal:** Get notifications working without full async refactor. | |
| **Implementation:** | |
| ```python | |
| # app/background/notifications.py | |
| from fastapi import BackgroundTasks | |
| def notify_ticket_assigned(db, ticket_id, agent_id, background_tasks: BackgroundTasks): | |
| """Sync wrapper - creates notification and queues sending""" | |
| # 1. Create notification record (sync DB insert) | |
| notification = NotificationService().create_notification_sync(...) | |
| # 2. Queue async sending | |
| background_tasks.add_task(send_notification_async, notification.id) | |
| ``` | |
| **Changes needed:** | |
| - Create sync version of `create_notification()` | |
| - Add `BackgroundTasks` parameter to all endpoints | |
| - Update services to accept `background_tasks` parameter | |
| - Call notification wrapper from services | |
| **Pros:** Works now, minimal changes | |
| **Cons:** Still not production-ready, lost on crash | |
| ### Phase 2: Full Async/Await (Proper Fix) | |
| **Goal:** Make everything async for proper async notification handling. | |
| **Changes:** | |
| ```python | |
| # All services become async | |
| class TicketAssignmentService: | |
| async def self_assign_ticket(self, ...): # Add async | |
| assignment = await create_assignment() # Add await | |
| await NotificationHelper.notify_ticket_assigned(...) # Works now | |
| return assignment | |
| # All endpoints become async | |
| @router.post("/tickets/{id}/self-assign") | |
| async def self_assign(...): # Add async | |
| assignment = await service.self_assign_ticket(...) # Add await | |
| return assignment | |
| ``` | |
| **Migration steps:** | |
| 1. Convert database operations to async (use `asyncpg` or SQLAlchemy async) | |
| 2. Convert all service methods to `async def` | |
| 3. Convert all endpoints to `async def` | |
| 4. Add `await` to all async calls | |
| 5. Test thoroughly (async bugs are subtle) | |
| **Pros:** Proper async, notifications work correctly | |
| **Cons:** Large refactor, risky, time-consuming | |
| ### Phase 3: Celery for Critical Tasks (Production) | |
| **Goal:** Reliable, persistent, retriable background tasks. | |
| **Setup:** | |
| ```bash | |
| pip install celery redis | |
| ``` | |
| **Implementation:** | |
| ```python | |
| # celery_app/celery.py | |
| from celery import Celery | |
| celery_app = Celery('swiftops', broker='redis://localhost:6379/0') | |
| # celery_app/tasks/notifications.py | |
| @celery_app.task(bind=True, max_retries=3) | |
| def send_notification(self, notification_id): | |
| try: | |
| # Send notification | |
| pass | |
| except Exception as e: | |
| raise self.retry(exc=e, countdown=60) | |
| # In service | |
| from celery_app.tasks.notifications import send_notification | |
| def self_assign_ticket(self, ...): | |
| assignment = create_assignment() | |
| notification = create_notification_record(...) | |
| send_notification.delay(notification.id) # Queue in Celery | |
| return assignment | |
| ``` | |
| **Run:** | |
| ```bash | |
| # Terminal 1: Web server | |
| uvicorn app.main:app | |
| # Terminal 2: Celery worker | |
| celery -A celery_app worker --loglevel=info | |
| ``` | |
| **Pros:** Persistent, retriable, scalable, monitoring | |
| **Cons:** Extra process to manage, more complexity | |
| ### Phase 4: Event-Driven Architecture (Scale) | |
| **Goal:** Decouple services from notifications completely. | |
| **Pattern:** | |
| ```python | |
| # Services emit events | |
| event_bus.emit(TicketAssignedEvent(ticket_id, agent_id)) | |
| # Handlers react | |
| @event_bus.on(TicketAssignedEvent) | |
| def on_ticket_assigned(event): | |
| send_notification.delay(event.ticket_id, event.agent_id) | |
| update_analytics(event) | |
| log_audit_trail(event) | |
| ``` | |
| **Benefits:** Clean separation, easy to add new reactions, testable | |
| ## Recommendation | |
| **Now:** Skip Phase 1, go straight to Phase 3 (Celery) when you implement notifications properly. | |
| **Why skip Phase 1?** It's a half-measure that you'll throw away anyway. Better to do it right once. | |
| **When to do it:** When you implement payroll (needs Celery anyway), add notifications at same time. | |
| ## Files to Update (Phase 3) | |
| 1. Create `celery_app/celery.py` - Celery config | |
| 2. Create `celery_app/tasks/notifications.py` - Notification tasks | |
| 3. Update `src/app/services/ticket_assignment_service.py` - Call Celery tasks | |
| 4. Update `src/app/services/notification_service.py` - Add sync create method | |
| 5. Update `requirements.txt` - Add celery, redis | |
| 6. Update deployment - Run celery worker process | |
| ## Testing Checklist | |
| - [ ] Notifications sent on ticket assignment | |
| - [ ] Notifications sent on inventory distribution | |
| - [ ] Notifications sent on bulk sales order promotion | |
| - [ ] Failed notifications retry automatically | |
| - [ ] Server restart doesn't lose queued notifications | |
| - [ ] Can monitor notification queue status | |
| - [ ] Can manually retry failed notifications | |