swiftops-backend / src /app /services /notification_helper.py
kamau1's picture
feat: unified sync notification system, realtime timesheets and payroll, simplified project role compensation
95005e1
"""
Notification Helper - Convenience functions for creating notifications across the system
This module provides high-level functions for triggering notifications from business logic.
Each function handles the notification creation and queuing for delivery.
"""
import logging
from typing import Optional, List
from uuid import UUID
from sqlalchemy.orm import Session
from datetime import datetime
from app.services.notification_service import NotificationService
from app.models.user import User
from app.models.ticket import Ticket
from app.models.sales_order import SalesOrder
from app.models.enums import AppRole
logger = logging.getLogger(__name__)
class NotificationHelper:
"""Helper class for creating notifications throughout the application"""
@staticmethod
async def notify_ticket_assigned(
db: Session,
ticket: Ticket,
agent: User,
assigned_by: User,
execution_order: Optional[int] = None
):
"""
Notify agent when ticket is assigned to them
Args:
db: Database session
ticket: Ticket that was assigned
agent: User who was assigned the ticket
assigned_by: User who made the assignment
execution_order: Position in agent's queue
"""
service = NotificationService()
# Determine priority based on ticket priority
channel = 'whatsapp' if ticket.priority in ['urgent', 'high'] else 'in_app'
title = f"New Ticket Assigned: {ticket.ticket_name or ticket.ticket_type}"
message = f"You have been assigned a {ticket.ticket_type} ticket"
if ticket.scheduled_date:
message += f" scheduled for {ticket.scheduled_date}"
if execution_order:
message += f". Position in queue: #{execution_order}"
metadata = {
'project_id': str(ticket.project_id),
'ticket_id': str(ticket.id),
'ticket_type': ticket.ticket_type,
'priority': ticket.priority,
'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}',
'assigned_by': assigned_by.name
}
await service.create_notification(
db=db,
user_id=agent.id,
title=title,
message=message,
source_type='ticket',
source_id=ticket.id,
notification_type='assignment',
channel=channel,
additional_metadata=metadata
)
logger.info(f"Created ticket assignment notification for user {agent.id}, ticket {ticket.id}")
@staticmethod
async def notify_ticket_status_changed(
db: Session,
ticket: Ticket,
old_status: str,
new_status: str,
changed_by: User,
notify_users: List[User]
):
"""
Notify relevant users when ticket status changes
Args:
db: Database session
ticket: Ticket that changed
old_status: Previous status
new_status: New status
changed_by: User who changed the status
notify_users: List of users to notify (PM, dispatcher, etc.)
"""
service = NotificationService()
title = f"Ticket Status Updated: {ticket.ticket_name or ticket.ticket_type}"
message = f"Ticket status changed from {old_status} to {new_status}"
metadata = {
'project_id': str(ticket.project_id),
'ticket_id': str(ticket.id),
'old_status': old_status,
'new_status': new_status,
'changed_by': changed_by.name,
'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}'
}
user_ids = [user.id for user in notify_users]
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type='ticket',
source_id=ticket.id,
notification_type='status_change',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created status change notifications for ticket {ticket.id} to {len(user_ids)} users")
@staticmethod
async def notify_ticket_completed(
db: Session,
ticket: Ticket,
completed_by: User,
notify_users: List[User]
):
"""
Notify PM/Dispatcher when ticket is completed
Args:
db: Database session
ticket: Completed ticket
completed_by: User who completed the ticket
notify_users: List of users to notify
"""
service = NotificationService()
title = f"Ticket Completed: {ticket.ticket_name or ticket.ticket_type}"
message = f"{completed_by.name} completed {ticket.ticket_type} ticket"
metadata = {
'project_id': str(ticket.project_id),
'ticket_id': str(ticket.id),
'completed_by': completed_by.name,
'completed_at': ticket.completed_at.isoformat() if ticket.completed_at else None,
'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}'
}
user_ids = [user.id for user in notify_users]
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type='ticket',
source_id=ticket.id,
notification_type='completion',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created completion notifications for ticket {ticket.id}")
@staticmethod
async def notify_assignment_rejected(
db: Session,
ticket: Ticket,
agent: User,
reason: str,
notify_users: List[User]
):
"""
Notify dispatcher/PM when agent rejects assignment
Args:
db: Database session
ticket: Ticket that was rejected
agent: Agent who rejected
reason: Rejection reason
notify_users: Users to notify (dispatcher, PM)
"""
service = NotificationService()
title = f"Assignment Rejected: {ticket.ticket_name or ticket.ticket_type}"
message = f"{agent.name} rejected ticket assignment. Reason: {reason}"
metadata = {
'project_id': str(ticket.project_id),
'ticket_id': str(ticket.id),
'rejected_by': agent.name,
'reason': reason,
'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}',
'requires_action': True
}
user_ids = [user.id for user in notify_users]
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type='ticket',
source_id=ticket.id,
notification_type='rejection',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created rejection notifications for ticket {ticket.id}")
@staticmethod
async def notify_bulk_import_complete(
db: Session,
user_id: UUID,
entity_type: str,
total: int,
successful: int,
failed: int,
project_id: Optional[UUID] = None,
errors: Optional[List[str]] = None
):
"""
Notify user when bulk import completes
Args:
db: Database session
user_id: User who initiated the import
entity_type: Type of entity imported (sales_orders, customers, etc.)
total: Total records processed
successful: Successfully imported
failed: Failed imports
project_id: Optional project ID for scoping
errors: List of error messages
"""
service = NotificationService()
title = f"Bulk Import Complete: {entity_type.replace('_', ' ').title()}"
if failed == 0:
message = f"Successfully imported all {successful} records"
status_type = 'success'
else:
message = f"Imported {successful}/{total} records. {failed} failed."
status_type = 'warning'
metadata = {
'project_id': str(project_id) if project_id else None,
'entity_type': entity_type,
'total': total,
'successful': successful,
'failed': failed,
'status_type': status_type, # Frontend can show icon based on this
'errors': errors[:5] if errors else [], # First 5 errors only
'action_url': f'/{entity_type}'
}
await service.create_notification(
db=db,
user_id=user_id,
title=title,
message=message,
source_type='system',
notification_type='bulk_operation',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created bulk import notification for user {user_id}: {successful}/{total} successful")
@staticmethod
async def notify_bulk_promote_complete(
db: Session,
user_id: UUID,
total: int,
successful: int,
failed: int,
created_ticket_ids: List[UUID],
project_id: Optional[UUID] = None,
errors: Optional[List[str]] = None
):
"""
Notify user when bulk ticket promotion completes
Args:
db: Database session
user_id: User who initiated the promotion
total: Total sales orders processed
successful: Successfully created tickets
failed: Failed promotions
created_ticket_ids: IDs of created tickets
project_id: Optional project ID for scoping
errors: List of error messages
"""
service = NotificationService()
title = f"Bulk Ticket Creation Complete"
if failed == 0:
message = f"Successfully created {successful} tickets from sales orders"
status_type = 'success'
else:
message = f"Created {successful}/{total} tickets. {failed} failed."
status_type = 'warning'
metadata = {
'project_id': str(project_id) if project_id else None,
'total': total,
'successful': successful,
'failed': failed,
'status_type': status_type, # Frontend can show icon based on this
'created_ticket_ids': [str(tid) for tid in created_ticket_ids[:10]], # First 10 only
'errors': errors[:5] if errors else [],
'action_url': f'/projects/{project_id}/tickets' if project_id else '/tickets'
}
await service.create_notification(
db=db,
user_id=user_id,
title=title,
message=message,
source_type='system',
notification_type='bulk_operation',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created bulk promote notification for user {user_id}: {successful}/{total} successful")
@staticmethod
async def notify_expense_submitted(
db: Session,
expense,
submitted_by: User,
notify_users: List[User]
):
"""
Notify PM/Dispatcher when expense is submitted for approval
Args:
db: Database session
expense: Expense that was submitted
submitted_by: User who submitted the expense
notify_users: Users who can approve (PM, dispatcher)
"""
service = NotificationService()
title = f"Expense Approval Required"
message = f"{submitted_by.name} submitted {expense.category} expense for {expense.total_cost} {expense.additional_metadata.get('currency', 'KES')}"
# Get project_id from ticket
from app.models.ticket import Ticket
ticket = db.query(Ticket).filter(Ticket.id == expense.ticket_id).first()
project_id = str(ticket.project_id) if ticket else None
metadata = {
'project_id': project_id,
'expense_id': str(expense.id),
'ticket_id': str(expense.ticket_id),
'category': expense.category,
'amount': float(expense.total_cost),
'submitted_by': submitted_by.name,
'action_url': f'/projects/{project_id}/expenses/{expense.id}' if project_id else f'/expenses/{expense.id}',
'requires_action': True
}
user_ids = [user.id for user in notify_users]
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type='ticket_expense',
source_id=expense.id,
notification_type='approval_required',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created expense approval notifications for expense {expense.id}")
@staticmethod
async def notify_expense_approved(
db: Session,
expense,
approved_by: User,
agent: User
):
"""
Notify agent when their expense is approved
Args:
db: Database session
expense: Approved expense
approved_by: User who approved
agent: Agent who submitted the expense
"""
service = NotificationService()
title = f"Expense Approved"
message = f"Your {expense.category} expense for {expense.total_cost} has been approved by {approved_by.name}"
# Get project_id from ticket
from app.models.ticket import Ticket
ticket = db.query(Ticket).filter(Ticket.id == expense.ticket_id).first()
project_id = str(ticket.project_id) if ticket else None
metadata = {
'project_id': project_id,
'expense_id': str(expense.id),
'ticket_id': str(expense.ticket_id),
'category': expense.category,
'amount': float(expense.total_cost),
'approved_by': approved_by.name,
'action_url': f'/projects/{project_id}/expenses/{expense.id}' if project_id else f'/expenses/{expense.id}'
}
await service.create_notification(
db=db,
user_id=agent.id,
title=title,
message=message,
source_type='ticket_expense',
source_id=expense.id,
notification_type='approval',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created expense approval notification for user {agent.id}")
@staticmethod
async def notify_expense_rejected(
db: Session,
expense,
rejected_by: User,
agent: User,
reason: str
):
"""
Notify agent when their expense is rejected
Args:
db: Database session
expense: Rejected expense
rejected_by: User who rejected
agent: Agent who submitted the expense
reason: Rejection reason
"""
service = NotificationService()
title = f"Expense Rejected"
message = f"Your {expense.category} expense was rejected by {rejected_by.name}. Reason: {reason}"
# Get project_id from ticket
from app.models.ticket import Ticket
ticket = db.query(Ticket).filter(Ticket.id == expense.ticket_id).first()
project_id = str(ticket.project_id) if ticket else None
metadata = {
'project_id': project_id,
'expense_id': str(expense.id),
'ticket_id': str(expense.ticket_id),
'category': expense.category,
'amount': float(expense.total_cost),
'rejected_by': rejected_by.name,
'reason': reason,
'action_url': f'/projects/{project_id}/expenses/{expense.id}' if project_id else f'/expenses/{expense.id}'
}
await service.create_notification(
db=db,
user_id=agent.id,
title=title,
message=message,
source_type='ticket_expense',
source_id=expense.id,
notification_type='rejection',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created expense rejection notification for user {agent.id}")
@staticmethod
async def notify_expense_paid(
db: Session,
expense,
paid_by: User,
agent: User
):
"""
Notify agent when their expense is paid
Args:
db: Database session
expense: Paid expense
paid_by: User who marked as paid
agent: Agent who submitted the expense
"""
service = NotificationService()
title = f"Expense Paid"
message = f"Your {expense.category} expense for {expense.total_cost} has been paid"
# Get project_id from ticket
from app.models.ticket import Ticket
ticket = db.query(Ticket).filter(Ticket.id == expense.ticket_id).first()
project_id = str(ticket.project_id) if ticket else None
metadata = {
'project_id': project_id,
'expense_id': str(expense.id),
'ticket_id': str(expense.ticket_id),
'category': expense.category,
'amount': float(expense.total_cost),
'payment_method': expense.payment_method,
'payment_reference': expense.payment_reference,
'paid_by': paid_by.name,
'action_url': f'/projects/{project_id}/expenses/{expense.id}' if project_id else f'/expenses/{expense.id}'
}
await service.create_notification(
db=db,
user_id=agent.id,
title=title,
message=message,
source_type='ticket_expense',
source_id=expense.id,
notification_type='payment',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created expense payment notification for user {agent.id}")
@staticmethod
async def notify_bulk_expense_approval_complete(
db: Session,
user_id: UUID,
total: int,
successful: int,
failed: int,
is_approved: bool,
project_id: Optional[UUID] = None,
errors: Optional[List[str]] = None
):
"""
Notify PM when bulk expense approval/rejection completes
Args:
db: Database session
user_id: User who initiated the bulk action
total: Total expenses processed
successful: Successfully updated
failed: Failed updates
is_approved: True if approved, False if rejected
project_id: Optional project ID for scoping
errors: List of error messages
"""
service = NotificationService()
action = "Approval" if is_approved else "Rejection"
title = f"Bulk Expense {action} Complete"
if failed == 0:
message = f"Successfully {('approved' if is_approved else 'rejected')} all {successful} expenses"
status_type = 'success'
else:
message = f"{('Approved' if is_approved else 'Rejected')} {successful}/{total} expenses. {failed} failed."
status_type = 'warning'
metadata = {
'project_id': str(project_id) if project_id else None,
'total': total,
'successful': successful,
'failed': failed,
'is_approved': is_approved,
'status_type': status_type,
'errors': errors[:5] if errors else [],
'action_url': f'/projects/{project_id}/expenses' if project_id else '/expenses'
}
await service.create_notification(
db=db,
user_id=user_id,
title=title,
message=message,
source_type='system',
notification_type='bulk_operation',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created bulk expense {action.lower()} notification for user {user_id}: {successful}/{total} successful")
@staticmethod
async def notify_expense_export_complete(
db: Session,
user_id: UUID,
total_expenses: int,
total_amount: float,
payment_groups: int,
project_id: Optional[UUID] = None,
expense_ids: Optional[List[UUID]] = None
):
"""
Notify PM when expense CSV export completes (marks as paid)
Args:
db: Database session
user_id: User who initiated the export
total_expenses: Total expenses exported
total_amount: Total amount to be paid
payment_groups: Number of payment groups (user+date combinations)
project_id: Optional project ID for scoping
expense_ids: List of expense IDs exported
"""
service = NotificationService()
title = f"Tende Pay Export Complete"
message = f"Exported {total_expenses} expenses (KES {total_amount:,.2f}) in {payment_groups} payment groups. CSV ready for Tende Pay upload. All expenses marked as paid."
metadata = {
'project_id': str(project_id) if project_id else None,
'total_expenses': total_expenses,
'total_amount': total_amount,
'payment_groups': payment_groups,
'status_type': 'success',
'expense_ids': [str(eid) for eid in (expense_ids[:20] if expense_ids else [])], # First 20 only
'action_url': f'/projects/{project_id}/expenses?is_paid=true' if project_id else '/expenses?is_paid=true'
}
await service.create_notification(
db=db,
user_id=user_id,
title=title,
message=message,
source_type='system',
notification_type='bulk_operation',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created expense export notification for user {user_id}: {total_expenses} expenses, ${total_amount:.2f}")
@staticmethod
async def notify_user_invited_to_project(
db: Session,
user_id: UUID,
project_id: UUID,
project_name: str,
invited_by: User,
role_name: str
):
"""
Notify user when added to project team
Args:
db: Database session
user_id: User who was added
project_id: Project ID
project_name: Project name
invited_by: User who added them
role_name: Their role in the project
"""
service = NotificationService()
title = f"Added to Project: {project_name}"
message = f"{invited_by.name} added you to {project_name} as {role_name}"
metadata = {
'project_id': str(project_id),
'project_name': project_name,
'role': role_name,
'invited_by': invited_by.name,
'action_url': f'/projects/{project_id}'
}
await service.create_notification(
db=db,
user_id=user_id,
title=title,
message=message,
source_type='project',
notification_type='team_addition',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created project invitation notification for user {user_id}")
@staticmethod
async def notify_ticket_overdue(
db: Session,
ticket: Ticket,
notify_users: List[User]
):
"""
Notify PM/Dispatcher about overdue ticket
Args:
db: Database session
ticket: Overdue ticket
notify_users: Users to notify
"""
service = NotificationService()
title = f"Ticket Overdue: {ticket.ticket_name or ticket.ticket_type}"
message = f"Ticket is overdue. Due date was {ticket.due_date.date() if ticket.due_date else 'N/A'}"
metadata = {
'project_id': str(ticket.project_id),
'ticket_id': str(ticket.id),
'due_date': ticket.due_date.isoformat() if ticket.due_date else None,
'priority': ticket.priority,
'status_type': 'error', # Frontend can show red/warning icon
'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}',
'requires_action': True
}
user_ids = [user.id for user in notify_users]
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type='ticket',
source_id=ticket.id,
notification_type='overdue',
channel='email', # Send email for overdue tickets
additional_metadata=metadata
)
logger.info(f"Created overdue notifications for ticket {ticket.id}")
# ============================================
# TASK NOTIFICATIONS
# ============================================
@staticmethod
async def notify_task_created(
db: Session,
task: 'Task',
created_by: User,
notify_users: List[User]
):
"""
Notify PMs/managers when a new task is created
Args:
db: Database session
task: Task that was created
created_by: User who created the task
notify_users: Users to notify (PMs, managers)
"""
service = NotificationService()
# Check if auto-created from distribution
is_auto_created = task.additional_metadata.get('auto_created', False) if task.additional_metadata else False
if is_auto_created:
title = f"Delivery Task Created: {task.task_title}"
message = f"Auto-created from inventory distribution"
else:
title = f"New Task: {task.task_title}"
message = f"Task created by {created_by.name}"
metadata = {
'project_id': str(task.project_id),
'task_id': str(task.id),
'task_type': task.task_type,
'priority': task.priority.value if hasattr(task.priority, 'value') else str(task.priority),
'scheduled_date': task.scheduled_date.isoformat() if task.scheduled_date else None,
'created_by': created_by.name,
'auto_created': is_auto_created,
'action_url': f'/projects/{task.project_id}/tasks/{task.id}'
}
user_ids = [user.id for user in notify_users if user.id != created_by.id] # Don't notify creator
if user_ids:
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type='task',
source_id=task.id,
notification_type='task_created',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created task creation notifications for task {task.id} to {len(user_ids)} users")
@staticmethod
async def notify_task_status_changed(
db: Session,
task: 'Task',
old_status: str,
new_status: str,
changed_by: User,
notify_users: List[User]
):
"""
Notify relevant users when task status changes
Args:
db: Database session
task: Task that changed
old_status: Previous status
new_status: New status
changed_by: User who changed the status
notify_users: List of users to notify
"""
service = NotificationService()
title = f"Task Status Updated: {task.task_title}"
message = f"Status changed from {old_status} to {new_status}"
metadata = {
'project_id': str(task.project_id),
'task_id': str(task.id),
'old_status': old_status,
'new_status': new_status,
'changed_by': changed_by.name,
'action_url': f'/projects/{task.project_id}/tasks/{task.id}'
}
user_ids = [user.id for user in notify_users if user.id != changed_by.id]
if user_ids:
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type='task',
source_id=task.id,
notification_type='status_change',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created status change notifications for task {task.id} to {len(user_ids)} users")
@staticmethod
async def notify_task_completed(
db: Session,
task: 'Task',
completed_by: User,
notify_users: List[User]
):
"""
Notify PMs/managers when task is completed
Args:
db: Database session
task: Task that was completed
completed_by: User who completed the task
notify_users: Users to notify
"""
service = NotificationService()
title = f"Task Completed: {task.task_title}"
message = f"Task completed by {completed_by.name}"
metadata = {
'project_id': str(task.project_id),
'task_id': str(task.id),
'task_type': task.task_type,
'completed_by': completed_by.name,
'completed_at': task.completed_at.isoformat() if task.completed_at else None,
'status_type': 'success',
'action_url': f'/projects/{task.project_id}/tasks/{task.id}'
}
user_ids = [user.id for user in notify_users if user.id != completed_by.id]
if user_ids:
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type='task',
source_id=task.id,
notification_type='task_completed',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created completion notifications for task {task.id} to {len(user_ids)} users")
@staticmethod
async def notify_task_cancelled(
db: Session,
task: 'Task',
cancelled_by: User,
reason: str,
notify_users: List[User]
):
"""
Notify PMs/managers when task is cancelled
Args:
db: Database session
task: Task that was cancelled
cancelled_by: User who cancelled the task
reason: Cancellation reason
notify_users: Users to notify
"""
service = NotificationService()
title = f"Task Cancelled: {task.task_title}"
message = f"Cancelled by {cancelled_by.name}: {reason}"
metadata = {
'project_id': str(task.project_id),
'task_id': str(task.id),
'cancelled_by': cancelled_by.name,
'cancellation_reason': reason,
'status_type': 'warning',
'action_url': f'/projects/{task.project_id}/tasks/{task.id}'
}
user_ids = [user.id for user in notify_users if user.id != cancelled_by.id]
if user_ids:
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type='task',
source_id=task.id,
notification_type='task_cancelled',
channel='in_app',
additional_metadata=metadata
)
logger.info(f"Created cancellation notifications for task {task.id} to {len(user_ids)} users")
@staticmethod
def get_project_managers_and_dispatchers(db: Session, project_id: UUID) -> List[User]:
"""
Get all PMs and dispatchers for a project
Args:
db: Database session
project_id: Project ID
Returns:
List of users who should be notified about project events
"""
from app.models.project import Project
from app.models.project_team import ProjectTeam
# Get project
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return []
notify_users = []
# Add primary manager
if project.primary_manager_id:
manager = db.query(User).filter(User.id == project.primary_manager_id).first()
if manager:
notify_users.append(manager)
# Add dispatchers from contractor
if project.contractor_id:
dispatchers = db.query(User).filter(
User.contractor_id == project.contractor_id,
User.role == AppRole.DISPATCHER.value,
User.is_active == True,
User.deleted_at.is_(None)
).all()
notify_users.extend(dispatchers)
return notify_users
@staticmethod
def get_users_by_role_in_project(
db: Session,
project_id: UUID,
roles: List[AppRole]
) -> List[User]:
"""
Get all users with specific roles in a project
Args:
db: Database session
project_id: Project ID
roles: List of roles to filter by
Returns:
List of users with specified roles in the project
"""
from app.models.project import Project
from app.models.project_team import ProjectTeam
# Get project
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return []
users = []
role_values = [role.value for role in roles]
# Add primary manager if PM role requested
if AppRole.PROJECT_MANAGER in roles and project.primary_manager_id:
manager = db.query(User).filter(
User.id == project.primary_manager_id,
User.is_active == True,
User.deleted_at.is_(None)
).first()
if manager:
users.append(manager)
# Add users from contractor with specified roles
if project.contractor_id:
contractor_users = db.query(User).filter(
User.contractor_id == project.contractor_id,
User.role.in_(role_values),
User.is_active == True,
User.deleted_at.is_(None)
).all()
users.extend(contractor_users)
# Add users from project team with specified roles
team_users = db.query(User).join(ProjectTeam).filter(
ProjectTeam.project_id == project_id,
ProjectTeam.removed_at.is_(None),
ProjectTeam.deleted_at.is_(None),
User.role.in_(role_values),
User.is_active == True,
User.deleted_at.is_(None)
).all()
# Deduplicate users (in case they're in both contractor and team)
user_ids_seen = set()
unique_users = []
for user in users + team_users:
if user.id not in user_ids_seen:
user_ids_seen.add(user.id)
unique_users.append(user)
return unique_users
@staticmethod
async def notify_users_by_role(
db: Session,
project_id: UUID,
roles: List[AppRole],
title: str,
message: str,
source_type: str,
source_id: Optional[UUID] = None,
notification_type: Optional[str] = None,
channel: str = 'in_app',
additional_metadata: Optional[dict] = None
):
"""
Notify all users with specific roles in a project
Args:
db: Database session
project_id: Project ID
roles: List of roles to notify (e.g., [AppRole.PM, AppRole.DISPATCHER])
title: Notification title
message: Notification message
source_type: Source type (ticket, expense, etc.)
source_id: Optional source ID
notification_type: Optional notification type
channel: Delivery channel (default: in_app)
additional_metadata: Optional additional metadata
Example:
await NotificationHelper.notify_users_by_role(
db=db,
project_id=ticket.project_id,
roles=[AppRole.PROJECT_MANAGER, AppRole.DISPATCHER],
title="Ticket Completed",
message=f"Ticket {ticket.id} was completed",
source_type='ticket',
source_id=ticket.id,
notification_type='completion',
additional_metadata={'ticket_id': str(ticket.id)}
)
"""
service = NotificationService()
# Find all users with specified roles in project
users = NotificationHelper.get_users_by_role_in_project(db, project_id, roles)
if not users:
logger.warning(f"No users found with roles {roles} in project {project_id}")
return
# Ensure project_id is in metadata
if additional_metadata is None:
additional_metadata = {}
if 'project_id' not in additional_metadata:
additional_metadata['project_id'] = str(project_id)
user_ids = [user.id for user in users]
await service.create_bulk_notifications(
db=db,
user_ids=user_ids,
title=title,
message=message,
source_type=source_type,
source_id=source_id,
notification_type=notification_type,
channel=channel,
additional_metadata=additional_metadata
)
logger.info(f"Created notifications for {len(user_ids)} users with roles {roles} in project {project_id}")
# ============================================
# PAYROLL NOTIFICATIONS
# ============================================
@staticmethod
async def notify_payroll_exported(
db: Session,
exported_by: User,
payroll_records: List,
total_amount: float,
total_days_worked: int,
total_tickets_closed: int,
period_start: str,
period_end: str,
project_id: Optional[UUID] = None,
warnings: Optional[List[str]] = None
):
"""
Notify manager/admin who exported payroll with summary.
Args:
db: Database session
exported_by: User who initiated the export
payroll_records: List of UserPayroll objects that were exported
total_amount: Total payroll amount exported
total_days_worked: Total days worked across all payroll
total_tickets_closed: Total tickets closed across all payroll
period_start: Period start date (ISO format)
period_end: Period end date (ISO format)
project_id: Optional project ID
warnings: Optional list of warnings from export
"""
service = NotificationService()
# Build summary message
payroll_count = len(payroll_records)
payroll_ids = [str(p.id) for p in payroll_records]
title = f"Payroll Export Complete: {payroll_count} records"
message = (
f"Successfully exported payroll for period {period_start} to {period_end}.\n\n"
f"📊 Summary:\n"
f"• {payroll_count} workers paid\n"
f"• {total_days_worked} total days worked\n"
f"• {total_tickets_closed} total tickets closed\n"
f"• {total_amount:,.2f} KES total payout"
)
if warnings:
message += f"\n\n⚠️ {len(warnings)} warnings - review CSV before uploading to Tende Pay"
metadata = {
'project_id': str(project_id) if project_id else None,
'payroll_ids': payroll_ids,
'payroll_count': payroll_count,
'total_amount': total_amount,
'total_days_worked': total_days_worked,
'total_tickets_closed': total_tickets_closed,
'period_start': period_start,
'period_end': period_end,
'warning_count': len(warnings) if warnings else 0,
'action_url': f'/projects/{project_id}/payroll' if project_id else '/payroll',
'export_timestamp': datetime.utcnow().isoformat()
}
await service.create_notification(
db=db,
user_id=exported_by.id,
title=title,
message=message,
source_type='payroll',
source_id=None, # Bulk operation, no single source
notification_type='payroll_exported',
channel='in_app',
project_id=project_id,
additional_metadata=metadata
)
logger.info(f"Created payroll export notification for user {exported_by.id}")
@staticmethod
async def notify_payroll_payment(
db: Session,
payroll,
user: User,
project_id: Optional[UUID] = None
):
"""
Notify worker that their payroll has been exported for payment.
Args:
db: Database session
payroll: UserPayroll object
user: User receiving payment
project_id: Optional project ID
"""
service = NotificationService()
# Build payment details message
title = f"💰 Payment Processed: {payroll.total_amount:,.2f} KES"
# Build breakdown
breakdown_parts = []
if payroll.days_worked:
breakdown_parts.append(f"{payroll.days_worked} days worked")
if payroll.tickets_closed:
breakdown_parts.append(f"{payroll.tickets_closed} tickets closed")
breakdown = ", ".join(breakdown_parts) if breakdown_parts else "work completed"
message = (
f"Your payment for {payroll.period_start_date.strftime('%b %d')} to "
f"{payroll.period_end_date.strftime('%b %d, %Y')} has been processed.\n\n"
f"📋 Work Summary:\n"
f"• {breakdown}\n"
f"• Base earnings: {payroll.base_earnings:,.2f} KES\n"
)
if payroll.bonus_amount and payroll.bonus_amount > 0:
message += f"• Bonus: {payroll.bonus_amount:,.2f} KES\n"
if payroll.deductions and payroll.deductions > 0:
message += f"• Deductions: -{payroll.deductions:,.2f} KES\n"
message += f"\n💵 Total Payment: {payroll.total_amount:,.2f} KES"
message += f"\n\nPayment will be sent to your registered account shortly."
metadata = {
'project_id': str(project_id) if project_id else None,
'payroll_id': str(payroll.id),
'period_start': payroll.period_start_date.isoformat(),
'period_end': payroll.period_end_date.isoformat(),
'days_worked': payroll.days_worked,
'tickets_closed': payroll.tickets_closed,
'base_earnings': float(payroll.base_earnings),
'bonus_amount': float(payroll.bonus_amount) if payroll.bonus_amount else 0,
'deductions': float(payroll.deductions) if payroll.deductions else 0,
'total_amount': float(payroll.total_amount),
'action_url': f'/payroll/{payroll.id}',
'payment_method': payroll.payment_method
}
# Send via WhatsApp for important payment notifications
await service.create_notification(
db=db,
user_id=user.id,
title=title,
message=message,
source_type='payroll',
source_id=payroll.id,
notification_type='payment',
channel='whatsapp', # Use WhatsApp for payment notifications
project_id=project_id,
additional_metadata=metadata,
send_now=True # Send immediately
)
logger.info(f"Created payment notification for user {user.id}, payroll {payroll.id}")
# ============================================
# TICKET DROP NOTIFICATIONS
# ============================================
@staticmethod
async def notify_ticket_dropped(
db: Session,
ticket,
assignment,
dropped_by: User,
drop_type: str,
reason: str
):
"""
Notify managers when agent drops a ticket (goes to PENDING_REVIEW).
Critical notification - ticket needs immediate manager attention.
Args:
db: Database session
ticket: Ticket that was dropped
assignment: TicketAssignment that was dropped
dropped_by: User who dropped the ticket
drop_type: Type of drop (reschedule, equipment_issue, customer_issue, etc.)
reason: Reason for dropping
"""
service = NotificationService()
# Build notification message
title = f"⚠️ Ticket Dropped - Action Required"
# Format drop type for display
drop_type_display = drop_type.replace('_', ' ').title()
message = (
f"{dropped_by.name} dropped ticket: {ticket.ticket_name or ticket.ticket_type}\n\n"
f"🔴 Reason: {drop_type_display}\n"
f"💬 Details: {reason}\n\n"
f"⚡ This ticket is now in PENDING REVIEW and needs your immediate attention."
)
metadata = {
'project_id': str(ticket.project_id),
'ticket_id': str(ticket.id),
'assignment_id': str(assignment.id),
'dropped_by_user_id': str(dropped_by.id),
'dropped_by_name': dropped_by.name,
'drop_type': drop_type,
'reason': reason,
'ticket_name': ticket.ticket_name,
'ticket_type': ticket.ticket_type,
'ticket_status': ticket.status,
'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}',
'requires_action': True,
'priority': 'high'
}
# Notify all managers and dispatchers in the project
await NotificationHelper.notify_users_by_role(
db=db,
project_id=ticket.project_id,
roles=[AppRole.PROJECT_MANAGER, AppRole.DISPATCHER],
title=title,
message=message,
source_type='ticket',
source_id=ticket.id,
notification_type='ticket_dropped',
channel='in_app', # Could also use WhatsApp for urgent drops
additional_metadata=metadata
)
logger.info(f"Created ticket drop notification for ticket {ticket.id}, dropped by {dropped_by.id}")