swiftops-backend / src /app /services /notification_creator.py
kamau1's picture
Fix notification role handling to support both enum and string role values
bb7aae2
"""
Notification Creator - Tier 1: Synchronous Notification Creation
This is the ONLY way to create notifications in the system.
All services must use this to ensure consistency and reliability.
Architecture:
- Tier 1 (This file): Create notification records synchronously in database
- Tier 2 (notification_delivery.py): Deliver notifications via external channels (WhatsApp, Email, SMS)
Design Principles:
1. Synchronous - Notifications are created immediately, guaranteed to be saved
2. Transaction-safe - Notifications roll back with parent operation
3. Simple - No async complexity, easy to test and debug
4. Consistent - One pattern used everywhere in the codebase
5. Scalable - Designed to handle 10,000+ notifications/day
Usage:
from app.services.notification_creator import NotificationCreator
# Single notification
notification = NotificationCreator.create(
db=db,
user_id=user.id,
title="Ticket Assigned",
message="You have been assigned to ticket #123",
source_type="ticket",
source_id=ticket.id,
notification_type="assignment",
channel="in_app"
)
db.commit()
# Bulk notifications
notifications = NotificationCreator.create_bulk(
db=db,
user_ids=[user1.id, user2.id, user3.id],
title="Project Update",
message="Project status changed",
source_type="project",
source_id=project.id,
notification_type="status_change"
)
db.commit()
"""
import logging
from sqlalchemy.orm import Session
from uuid import UUID
from typing import Optional, List, Dict, Any, Union
from datetime import datetime
from app.models.notification import Notification, NotificationChannel, NotificationStatus
from app.models.user import User
from app.models.enums import AppRole
logger = logging.getLogger(__name__)
class NotificationCreator:
"""
Synchronous notification creator.
Creates notification records in database - does NOT send them.
Delivery is handled separately by NotificationDelivery service.
"""
@staticmethod
def create(
db: Session,
user_id: UUID,
title: str,
message: str,
source_type: str,
source_id: Optional[UUID],
notification_type: str,
channel: str = "in_app",
metadata: Optional[Dict[str, Any]] = None,
project_id: Optional[UUID] = None
) -> Notification:
"""
Create a single notification record.
Args:
db: Database session
user_id: User to notify
title: Notification title (short, for display)
message: Notification message (detailed)
source_type: Type of entity (ticket, project, expense, payroll, etc.)
source_id: ID of the source entity
notification_type: Type of notification (assignment, payment, alert, etc.)
channel: Delivery channel (in_app, whatsapp, email, sms, push)
metadata: Additional data (action URLs, context, etc.)
project_id: Optional project ID for filtering
Returns:
Notification: Created notification record
Example:
notification = NotificationCreator.create(
db=db,
user_id=agent.id,
title="New Ticket Assigned",
message="You have been assigned to install fiber at Customer A",
source_type="ticket",
source_id=ticket.id,
notification_type="assignment",
channel="whatsapp",
metadata={
"ticket_number": "TKT-001",
"priority": "high",
"action_url": f"/tickets/{ticket.id}"
},
project_id=ticket.project_id
)
db.commit()
"""
try:
notification = Notification(
user_id=user_id,
project_id=project_id,
source_type=source_type,
source_id=source_id,
title=title,
message=message,
notification_type=notification_type,
channel=NotificationChannel(channel),
status=NotificationStatus.PENDING,
additional_metadata=metadata or {},
created_at=datetime.utcnow()
)
db.add(notification)
db.flush() # Get ID without committing (caller controls commit)
logger.debug(
f"Created notification {notification.id}: {notification_type} for user {user_id} "
f"via {channel}"
)
return notification
except Exception as e:
logger.error(f"Failed to create notification: {str(e)}", exc_info=True)
raise
@staticmethod
def create_bulk(
db: Session,
user_ids: List[UUID],
title: str,
message: str,
source_type: str,
source_id: Optional[UUID],
notification_type: str,
channel: str = "in_app",
metadata: Optional[Dict[str, Any]] = None,
project_id: Optional[UUID] = None
) -> List[Notification]:
"""
Create multiple notifications for different users with the same content.
Args:
db: Database session
user_ids: List of user IDs to notify
title: Notification title (same for all)
message: Notification message (same for all)
source_type: Type of entity
source_id: ID of the source entity
notification_type: Type of notification
channel: Delivery channel
metadata: Additional data (same for all)
project_id: Optional project ID
Returns:
List[Notification]: List of created notification records
Example:
# Notify all managers about ticket drop
manager_ids = [pm1.id, pm2.id, dispatcher.id]
notifications = NotificationCreator.create_bulk(
db=db,
user_ids=manager_ids,
title="⚠️ Ticket Dropped - Action Required",
message=f"{agent.name} dropped ticket: {ticket.name}",
source_type="ticket",
source_id=ticket.id,
notification_type="ticket_dropped",
channel="in_app",
project_id=ticket.project_id
)
db.commit()
"""
notifications = []
try:
for user_id in user_ids:
notification = NotificationCreator.create(
db=db,
user_id=user_id,
title=title,
message=message,
source_type=source_type,
source_id=source_id,
notification_type=notification_type,
channel=channel,
metadata=metadata,
project_id=project_id
)
notifications.append(notification)
logger.info(
f"Created {len(notifications)} bulk notifications: {notification_type} "
f"via {channel}"
)
return notifications
except Exception as e:
logger.error(f"Failed to create bulk notifications: {str(e)}", exc_info=True)
raise
@staticmethod
def notify_project_team(
db: Session,
project_id: UUID,
title: str,
message: str,
source_type: str,
source_id: Optional[UUID],
notification_type: str,
roles: Optional[List[Union[str, AppRole]]] = None,
channel: str = "in_app",
metadata: Optional[Dict[str, Any]] = None,
exclude_user_ids: Optional[List[UUID]] = None
) -> List[Notification]:
"""
Notify all team members in a project (optionally filtered by role).
Args:
db: Database session
project_id: Project ID
title: Notification title
message: Notification message
source_type: Type of entity
source_id: ID of the source entity
notification_type: Type of notification
roles: Optional list of roles to notify (e.g., [AppRole.PROJECT_MANAGER, AppRole.DISPATCHER])
channel: Delivery channel
metadata: Additional data
exclude_user_ids: Optional list of user IDs to exclude
Returns:
List[Notification]: List of created notification records
Example:
# Notify all managers and dispatchers in project
notifications = NotificationCreator.notify_project_team(
db=db,
project_id=ticket.project_id,
title="Ticket Dropped",
message=f"{agent.name} dropped ticket",
source_type="ticket",
source_id=ticket.id,
notification_type="ticket_dropped",
roles=[AppRole.PROJECT_MANAGER, AppRole.DISPATCHER],
exclude_user_ids=[agent.id] # Don't notify the agent who dropped
)
db.commit()
"""
from app.models.project_team import ProjectTeam
try:
# Build query for project team members
query = db.query(User).join(ProjectTeam).filter(
ProjectTeam.project_id == project_id,
User.is_active == True,
User.deleted_at.is_(None)
)
# Filter by roles if specified
if roles:
# Handle both string roles and AppRole enum objects
role_values = []
for role in roles:
if isinstance(role, str):
role_values.append(role)
elif hasattr(role, 'value'):
role_values.append(role.value)
else:
role_values.append(str(role))
query = query.filter(User.role.in_(role_values))
# Exclude specific users if specified
if exclude_user_ids:
query = query.filter(~User.id.in_(exclude_user_ids))
team_members = query.all()
if not team_members:
logger.warning(
f"No team members found for project {project_id} "
f"with roles {roles}"
)
return []
user_ids = [member.id for member in team_members]
notifications = NotificationCreator.create_bulk(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type=source_type,
source_id=source_id,
notification_type=notification_type,
channel=channel,
metadata=metadata,
project_id=project_id
)
logger.info(
f"Notified {len(notifications)} team members in project {project_id}"
)
return notifications
except Exception as e:
logger.error(
f"Failed to notify project team {project_id}: {str(e)}",
exc_info=True
)
raise