Spaces:
Sleeping
Sleeping
| """ | |
| Notification Creator - Tier 1: Synchronous Notification Creation | |
| This is the ONLY way to create notifications in the system. | |
| All services must use this to ensure consistency and reliability. | |
| Architecture: | |
| - Tier 1 (This file): Create notification records synchronously in database | |
| - Tier 2 (notification_delivery.py): Deliver notifications via external channels (WhatsApp, Email, SMS) | |
| Design Principles: | |
| 1. Synchronous - Notifications are created immediately, guaranteed to be saved | |
| 2. Transaction-safe - Notifications roll back with parent operation | |
| 3. Simple - No async complexity, easy to test and debug | |
| 4. Consistent - One pattern used everywhere in the codebase | |
| 5. Scalable - Designed to handle 10,000+ notifications/day | |
| Usage: | |
| from app.services.notification_creator import NotificationCreator | |
| # Single notification | |
| notification = NotificationCreator.create( | |
| db=db, | |
| user_id=user.id, | |
| title="Ticket Assigned", | |
| message="You have been assigned to ticket #123", | |
| source_type="ticket", | |
| source_id=ticket.id, | |
| notification_type="assignment", | |
| channel="in_app" | |
| ) | |
| db.commit() | |
| # Bulk notifications | |
| notifications = NotificationCreator.create_bulk( | |
| db=db, | |
| user_ids=[user1.id, user2.id, user3.id], | |
| title="Project Update", | |
| message="Project status changed", | |
| source_type="project", | |
| source_id=project.id, | |
| notification_type="status_change" | |
| ) | |
| db.commit() | |
| """ | |
| import logging | |
| from sqlalchemy.orm import Session | |
| from uuid import UUID | |
| from typing import Optional, List, Dict, Any, Union | |
| from datetime import datetime | |
| from app.models.notification import Notification, NotificationChannel, NotificationStatus | |
| from app.models.user import User | |
| from app.models.enums import AppRole | |
| logger = logging.getLogger(__name__) | |
| class NotificationCreator: | |
| """ | |
| Synchronous notification creator. | |
| Creates notification records in database - does NOT send them. | |
| Delivery is handled separately by NotificationDelivery service. | |
| """ | |
| def create( | |
| db: Session, | |
| user_id: UUID, | |
| title: str, | |
| message: str, | |
| source_type: str, | |
| source_id: Optional[UUID], | |
| notification_type: str, | |
| channel: str = "in_app", | |
| metadata: Optional[Dict[str, Any]] = None, | |
| project_id: Optional[UUID] = None | |
| ) -> Notification: | |
| """ | |
| Create a single notification record. | |
| Args: | |
| db: Database session | |
| user_id: User to notify | |
| title: Notification title (short, for display) | |
| message: Notification message (detailed) | |
| source_type: Type of entity (ticket, project, expense, payroll, etc.) | |
| source_id: ID of the source entity | |
| notification_type: Type of notification (assignment, payment, alert, etc.) | |
| channel: Delivery channel (in_app, whatsapp, email, sms, push) | |
| metadata: Additional data (action URLs, context, etc.) | |
| project_id: Optional project ID for filtering | |
| Returns: | |
| Notification: Created notification record | |
| Example: | |
| notification = NotificationCreator.create( | |
| db=db, | |
| user_id=agent.id, | |
| title="New Ticket Assigned", | |
| message="You have been assigned to install fiber at Customer A", | |
| source_type="ticket", | |
| source_id=ticket.id, | |
| notification_type="assignment", | |
| channel="whatsapp", | |
| metadata={ | |
| "ticket_number": "TKT-001", | |
| "priority": "high", | |
| "action_url": f"/tickets/{ticket.id}" | |
| }, | |
| project_id=ticket.project_id | |
| ) | |
| db.commit() | |
| """ | |
| try: | |
| notification = Notification( | |
| user_id=user_id, | |
| project_id=project_id, | |
| source_type=source_type, | |
| source_id=source_id, | |
| title=title, | |
| message=message, | |
| notification_type=notification_type, | |
| channel=NotificationChannel(channel), | |
| status=NotificationStatus.PENDING, | |
| additional_metadata=metadata or {}, | |
| created_at=datetime.utcnow() | |
| ) | |
| db.add(notification) | |
| db.flush() # Get ID without committing (caller controls commit) | |
| logger.debug( | |
| f"Created notification {notification.id}: {notification_type} for user {user_id} " | |
| f"via {channel}" | |
| ) | |
| return notification | |
| except Exception as e: | |
| logger.error(f"Failed to create notification: {str(e)}", exc_info=True) | |
| raise | |
| def create_bulk( | |
| db: Session, | |
| user_ids: List[UUID], | |
| title: str, | |
| message: str, | |
| source_type: str, | |
| source_id: Optional[UUID], | |
| notification_type: str, | |
| channel: str = "in_app", | |
| metadata: Optional[Dict[str, Any]] = None, | |
| project_id: Optional[UUID] = None | |
| ) -> List[Notification]: | |
| """ | |
| Create multiple notifications for different users with the same content. | |
| Args: | |
| db: Database session | |
| user_ids: List of user IDs to notify | |
| title: Notification title (same for all) | |
| message: Notification message (same for all) | |
| source_type: Type of entity | |
| source_id: ID of the source entity | |
| notification_type: Type of notification | |
| channel: Delivery channel | |
| metadata: Additional data (same for all) | |
| project_id: Optional project ID | |
| Returns: | |
| List[Notification]: List of created notification records | |
| Example: | |
| # Notify all managers about ticket drop | |
| manager_ids = [pm1.id, pm2.id, dispatcher.id] | |
| notifications = NotificationCreator.create_bulk( | |
| db=db, | |
| user_ids=manager_ids, | |
| title="⚠️ Ticket Dropped - Action Required", | |
| message=f"{agent.name} dropped ticket: {ticket.name}", | |
| source_type="ticket", | |
| source_id=ticket.id, | |
| notification_type="ticket_dropped", | |
| channel="in_app", | |
| project_id=ticket.project_id | |
| ) | |
| db.commit() | |
| """ | |
| notifications = [] | |
| try: | |
| for user_id in user_ids: | |
| notification = NotificationCreator.create( | |
| db=db, | |
| user_id=user_id, | |
| title=title, | |
| message=message, | |
| source_type=source_type, | |
| source_id=source_id, | |
| notification_type=notification_type, | |
| channel=channel, | |
| metadata=metadata, | |
| project_id=project_id | |
| ) | |
| notifications.append(notification) | |
| logger.info( | |
| f"Created {len(notifications)} bulk notifications: {notification_type} " | |
| f"via {channel}" | |
| ) | |
| return notifications | |
| except Exception as e: | |
| logger.error(f"Failed to create bulk notifications: {str(e)}", exc_info=True) | |
| raise | |
| def notify_project_team( | |
| db: Session, | |
| project_id: UUID, | |
| title: str, | |
| message: str, | |
| source_type: str, | |
| source_id: Optional[UUID], | |
| notification_type: str, | |
| roles: Optional[List[Union[str, AppRole]]] = None, | |
| channel: str = "in_app", | |
| metadata: Optional[Dict[str, Any]] = None, | |
| exclude_user_ids: Optional[List[UUID]] = None | |
| ) -> List[Notification]: | |
| """ | |
| Notify all team members in a project (optionally filtered by role). | |
| Args: | |
| db: Database session | |
| project_id: Project ID | |
| title: Notification title | |
| message: Notification message | |
| source_type: Type of entity | |
| source_id: ID of the source entity | |
| notification_type: Type of notification | |
| roles: Optional list of roles to notify (e.g., [AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]) | |
| channel: Delivery channel | |
| metadata: Additional data | |
| exclude_user_ids: Optional list of user IDs to exclude | |
| Returns: | |
| List[Notification]: List of created notification records | |
| Example: | |
| # Notify all managers and dispatchers in project | |
| notifications = NotificationCreator.notify_project_team( | |
| db=db, | |
| project_id=ticket.project_id, | |
| title="Ticket Dropped", | |
| message=f"{agent.name} dropped ticket", | |
| source_type="ticket", | |
| source_id=ticket.id, | |
| notification_type="ticket_dropped", | |
| roles=[AppRole.PROJECT_MANAGER, AppRole.DISPATCHER], | |
| exclude_user_ids=[agent.id] # Don't notify the agent who dropped | |
| ) | |
| db.commit() | |
| """ | |
| from app.models.project_team import ProjectTeam | |
| try: | |
| # Build query for project team members | |
| query = db.query(User).join(ProjectTeam).filter( | |
| ProjectTeam.project_id == project_id, | |
| User.is_active == True, | |
| User.deleted_at.is_(None) | |
| ) | |
| # Filter by roles if specified | |
| if roles: | |
| # Handle both string roles and AppRole enum objects | |
| role_values = [] | |
| for role in roles: | |
| if isinstance(role, str): | |
| role_values.append(role) | |
| elif hasattr(role, 'value'): | |
| role_values.append(role.value) | |
| else: | |
| role_values.append(str(role)) | |
| query = query.filter(User.role.in_(role_values)) | |
| # Exclude specific users if specified | |
| if exclude_user_ids: | |
| query = query.filter(~User.id.in_(exclude_user_ids)) | |
| team_members = query.all() | |
| if not team_members: | |
| logger.warning( | |
| f"No team members found for project {project_id} " | |
| f"with roles {roles}" | |
| ) | |
| return [] | |
| user_ids = [member.id for member in team_members] | |
| notifications = NotificationCreator.create_bulk( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type=source_type, | |
| source_id=source_id, | |
| notification_type=notification_type, | |
| channel=channel, | |
| metadata=metadata, | |
| project_id=project_id | |
| ) | |
| logger.info( | |
| f"Notified {len(notifications)} team members in project {project_id}" | |
| ) | |
| return notifications | |
| except Exception as e: | |
| logger.error( | |
| f"Failed to notify project team {project_id}: {str(e)}", | |
| exc_info=True | |
| ) | |
| raise | |