Spaces:
Sleeping
Sleeping
| """ | |
| Notification Helper - Convenience functions for creating notifications across the system | |
| This module provides high-level functions for triggering notifications from business logic. | |
| Each function handles the notification creation and queuing for delivery. | |
| """ | |
| import logging | |
| from typing import Optional, List | |
| from uuid import UUID | |
| from sqlalchemy.orm import Session | |
| from datetime import datetime | |
| from app.services.notification_service import NotificationService | |
| from app.models.user import User | |
| from app.models.ticket import Ticket | |
| from app.models.sales_order import SalesOrder | |
| from app.models.enums import AppRole | |
| logger = logging.getLogger(__name__) | |
| class NotificationHelper: | |
| """Helper class for creating notifications throughout the application""" | |
| async def notify_ticket_assigned( | |
| db: Session, | |
| ticket: Ticket, | |
| agent: User, | |
| assigned_by: User, | |
| execution_order: Optional[int] = None | |
| ): | |
| """ | |
| Notify agent when ticket is assigned to them | |
| Args: | |
| db: Database session | |
| ticket: Ticket that was assigned | |
| agent: User who was assigned the ticket | |
| assigned_by: User who made the assignment | |
| execution_order: Position in agent's queue | |
| """ | |
| service = NotificationService() | |
| # Determine priority based on ticket priority | |
| channel = 'whatsapp' if ticket.priority in ['urgent', 'high'] else 'in_app' | |
| title = f"New Ticket Assigned: {ticket.ticket_name or ticket.ticket_type}" | |
| message = f"You have been assigned a {ticket.ticket_type} ticket" | |
| if ticket.scheduled_date: | |
| message += f" scheduled for {ticket.scheduled_date}" | |
| if execution_order: | |
| message += f". Position in queue: #{execution_order}" | |
| metadata = { | |
| 'project_id': str(ticket.project_id), | |
| 'ticket_id': str(ticket.id), | |
| 'ticket_type': ticket.ticket_type, | |
| 'priority': ticket.priority, | |
| 'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}', | |
| 'assigned_by': assigned_by.name | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=agent.id, | |
| title=title, | |
| message=message, | |
| source_type='ticket', | |
| source_id=ticket.id, | |
| notification_type='assignment', | |
| channel=channel, | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created ticket assignment notification for user {agent.id}, ticket {ticket.id}") | |
| async def notify_ticket_status_changed( | |
| db: Session, | |
| ticket: Ticket, | |
| old_status: str, | |
| new_status: str, | |
| changed_by: User, | |
| notify_users: List[User] | |
| ): | |
| """ | |
| Notify relevant users when ticket status changes | |
| Args: | |
| db: Database session | |
| ticket: Ticket that changed | |
| old_status: Previous status | |
| new_status: New status | |
| changed_by: User who changed the status | |
| notify_users: List of users to notify (PM, dispatcher, etc.) | |
| """ | |
| service = NotificationService() | |
| title = f"Ticket Status Updated: {ticket.ticket_name or ticket.ticket_type}" | |
| message = f"Ticket status changed from {old_status} to {new_status}" | |
| metadata = { | |
| 'project_id': str(ticket.project_id), | |
| 'ticket_id': str(ticket.id), | |
| 'old_status': old_status, | |
| 'new_status': new_status, | |
| 'changed_by': changed_by.name, | |
| 'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}' | |
| } | |
| user_ids = [user.id for user in notify_users] | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type='ticket', | |
| source_id=ticket.id, | |
| notification_type='status_change', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created status change notifications for ticket {ticket.id} to {len(user_ids)} users") | |
| async def notify_ticket_completed( | |
| db: Session, | |
| ticket: Ticket, | |
| completed_by: User, | |
| notify_users: List[User] | |
| ): | |
| """ | |
| Notify PM/Dispatcher when ticket is completed | |
| Args: | |
| db: Database session | |
| ticket: Completed ticket | |
| completed_by: User who completed the ticket | |
| notify_users: List of users to notify | |
| """ | |
| service = NotificationService() | |
| title = f"Ticket Completed: {ticket.ticket_name or ticket.ticket_type}" | |
| message = f"{completed_by.name} completed {ticket.ticket_type} ticket" | |
| metadata = { | |
| 'project_id': str(ticket.project_id), | |
| 'ticket_id': str(ticket.id), | |
| 'completed_by': completed_by.name, | |
| 'completed_at': ticket.completed_at.isoformat() if ticket.completed_at else None, | |
| 'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}' | |
| } | |
| user_ids = [user.id for user in notify_users] | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type='ticket', | |
| source_id=ticket.id, | |
| notification_type='completion', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created completion notifications for ticket {ticket.id}") | |
| async def notify_assignment_rejected( | |
| db: Session, | |
| ticket: Ticket, | |
| agent: User, | |
| reason: str, | |
| notify_users: List[User] | |
| ): | |
| """ | |
| Notify dispatcher/PM when agent rejects assignment | |
| Args: | |
| db: Database session | |
| ticket: Ticket that was rejected | |
| agent: Agent who rejected | |
| reason: Rejection reason | |
| notify_users: Users to notify (dispatcher, PM) | |
| """ | |
| service = NotificationService() | |
| title = f"Assignment Rejected: {ticket.ticket_name or ticket.ticket_type}" | |
| message = f"{agent.name} rejected ticket assignment. Reason: {reason}" | |
| metadata = { | |
| 'project_id': str(ticket.project_id), | |
| 'ticket_id': str(ticket.id), | |
| 'rejected_by': agent.name, | |
| 'reason': reason, | |
| 'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}', | |
| 'requires_action': True | |
| } | |
| user_ids = [user.id for user in notify_users] | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type='ticket', | |
| source_id=ticket.id, | |
| notification_type='rejection', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created rejection notifications for ticket {ticket.id}") | |
| async def notify_bulk_import_complete( | |
| db: Session, | |
| user_id: UUID, | |
| entity_type: str, | |
| total: int, | |
| successful: int, | |
| failed: int, | |
| project_id: Optional[UUID] = None, | |
| errors: Optional[List[str]] = None | |
| ): | |
| """ | |
| Notify user when bulk import completes | |
| Args: | |
| db: Database session | |
| user_id: User who initiated the import | |
| entity_type: Type of entity imported (sales_orders, customers, etc.) | |
| total: Total records processed | |
| successful: Successfully imported | |
| failed: Failed imports | |
| project_id: Optional project ID for scoping | |
| errors: List of error messages | |
| """ | |
| service = NotificationService() | |
| title = f"Bulk Import Complete: {entity_type.replace('_', ' ').title()}" | |
| if failed == 0: | |
| message = f"Successfully imported all {successful} records" | |
| status_type = 'success' | |
| else: | |
| message = f"Imported {successful}/{total} records. {failed} failed." | |
| status_type = 'warning' | |
| metadata = { | |
| 'project_id': str(project_id) if project_id else None, | |
| 'entity_type': entity_type, | |
| 'total': total, | |
| 'successful': successful, | |
| 'failed': failed, | |
| 'status_type': status_type, # Frontend can show icon based on this | |
| 'errors': errors[:5] if errors else [], # First 5 errors only | |
| 'action_url': f'/{entity_type}' | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=user_id, | |
| title=title, | |
| message=message, | |
| source_type='system', | |
| notification_type='bulk_operation', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created bulk import notification for user {user_id}: {successful}/{total} successful") | |
| async def notify_bulk_promote_complete( | |
| db: Session, | |
| user_id: UUID, | |
| total: int, | |
| successful: int, | |
| failed: int, | |
| created_ticket_ids: List[UUID], | |
| project_id: Optional[UUID] = None, | |
| errors: Optional[List[str]] = None | |
| ): | |
| """ | |
| Notify user when bulk ticket promotion completes | |
| Args: | |
| db: Database session | |
| user_id: User who initiated the promotion | |
| total: Total sales orders processed | |
| successful: Successfully created tickets | |
| failed: Failed promotions | |
| created_ticket_ids: IDs of created tickets | |
| project_id: Optional project ID for scoping | |
| errors: List of error messages | |
| """ | |
| service = NotificationService() | |
| title = f"Bulk Ticket Creation Complete" | |
| if failed == 0: | |
| message = f"Successfully created {successful} tickets from sales orders" | |
| status_type = 'success' | |
| else: | |
| message = f"Created {successful}/{total} tickets. {failed} failed." | |
| status_type = 'warning' | |
| metadata = { | |
| 'project_id': str(project_id) if project_id else None, | |
| 'total': total, | |
| 'successful': successful, | |
| 'failed': failed, | |
| 'status_type': status_type, # Frontend can show icon based on this | |
| 'created_ticket_ids': [str(tid) for tid in created_ticket_ids[:10]], # First 10 only | |
| 'errors': errors[:5] if errors else [], | |
| 'action_url': f'/projects/{project_id}/tickets' if project_id else '/tickets' | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=user_id, | |
| title=title, | |
| message=message, | |
| source_type='system', | |
| notification_type='bulk_operation', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created bulk promote notification for user {user_id}: {successful}/{total} successful") | |
| async def notify_expense_submitted( | |
| db: Session, | |
| expense, | |
| submitted_by: User, | |
| notify_users: List[User] | |
| ): | |
| """ | |
| Notify PM/Dispatcher when expense is submitted for approval | |
| Args: | |
| db: Database session | |
| expense: Expense that was submitted | |
| submitted_by: User who submitted the expense | |
| notify_users: Users who can approve (PM, dispatcher) | |
| """ | |
| service = NotificationService() | |
| title = f"Expense Approval Required" | |
| message = f"{submitted_by.name} submitted {expense.category} expense for {expense.total_cost} {expense.additional_metadata.get('currency', 'KES')}" | |
| # Get project_id from ticket | |
| from app.models.ticket import Ticket | |
| ticket = db.query(Ticket).filter(Ticket.id == expense.ticket_id).first() | |
| project_id = str(ticket.project_id) if ticket else None | |
| metadata = { | |
| 'project_id': project_id, | |
| 'expense_id': str(expense.id), | |
| 'ticket_id': str(expense.ticket_id), | |
| 'category': expense.category, | |
| 'amount': float(expense.total_cost), | |
| 'submitted_by': submitted_by.name, | |
| 'action_url': f'/projects/{project_id}/expenses/{expense.id}' if project_id else f'/expenses/{expense.id}', | |
| 'requires_action': True | |
| } | |
| user_ids = [user.id for user in notify_users] | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type='ticket_expense', | |
| source_id=expense.id, | |
| notification_type='approval_required', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created expense approval notifications for expense {expense.id}") | |
| async def notify_expense_approved( | |
| db: Session, | |
| expense, | |
| approved_by: User, | |
| agent: User | |
| ): | |
| """ | |
| Notify agent when their expense is approved | |
| Args: | |
| db: Database session | |
| expense: Approved expense | |
| approved_by: User who approved | |
| agent: Agent who submitted the expense | |
| """ | |
| service = NotificationService() | |
| title = f"Expense Approved" | |
| message = f"Your {expense.category} expense for {expense.total_cost} has been approved by {approved_by.name}" | |
| # Get project_id from ticket | |
| from app.models.ticket import Ticket | |
| ticket = db.query(Ticket).filter(Ticket.id == expense.ticket_id).first() | |
| project_id = str(ticket.project_id) if ticket else None | |
| metadata = { | |
| 'project_id': project_id, | |
| 'expense_id': str(expense.id), | |
| 'ticket_id': str(expense.ticket_id), | |
| 'category': expense.category, | |
| 'amount': float(expense.total_cost), | |
| 'approved_by': approved_by.name, | |
| 'action_url': f'/projects/{project_id}/expenses/{expense.id}' if project_id else f'/expenses/{expense.id}' | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=agent.id, | |
| title=title, | |
| message=message, | |
| source_type='ticket_expense', | |
| source_id=expense.id, | |
| notification_type='approval', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created expense approval notification for user {agent.id}") | |
| async def notify_expense_rejected( | |
| db: Session, | |
| expense, | |
| rejected_by: User, | |
| agent: User, | |
| reason: str | |
| ): | |
| """ | |
| Notify agent when their expense is rejected | |
| Args: | |
| db: Database session | |
| expense: Rejected expense | |
| rejected_by: User who rejected | |
| agent: Agent who submitted the expense | |
| reason: Rejection reason | |
| """ | |
| service = NotificationService() | |
| title = f"Expense Rejected" | |
| message = f"Your {expense.category} expense was rejected by {rejected_by.name}. Reason: {reason}" | |
| # Get project_id from ticket | |
| from app.models.ticket import Ticket | |
| ticket = db.query(Ticket).filter(Ticket.id == expense.ticket_id).first() | |
| project_id = str(ticket.project_id) if ticket else None | |
| metadata = { | |
| 'project_id': project_id, | |
| 'expense_id': str(expense.id), | |
| 'ticket_id': str(expense.ticket_id), | |
| 'category': expense.category, | |
| 'amount': float(expense.total_cost), | |
| 'rejected_by': rejected_by.name, | |
| 'reason': reason, | |
| 'action_url': f'/projects/{project_id}/expenses/{expense.id}' if project_id else f'/expenses/{expense.id}' | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=agent.id, | |
| title=title, | |
| message=message, | |
| source_type='ticket_expense', | |
| source_id=expense.id, | |
| notification_type='rejection', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created expense rejection notification for user {agent.id}") | |
| async def notify_expense_paid( | |
| db: Session, | |
| expense, | |
| paid_by: User, | |
| agent: User | |
| ): | |
| """ | |
| Notify agent when their expense is paid | |
| Args: | |
| db: Database session | |
| expense: Paid expense | |
| paid_by: User who marked as paid | |
| agent: Agent who submitted the expense | |
| """ | |
| service = NotificationService() | |
| title = f"Expense Paid" | |
| message = f"Your {expense.category} expense for {expense.total_cost} has been paid" | |
| # Get project_id from ticket | |
| from app.models.ticket import Ticket | |
| ticket = db.query(Ticket).filter(Ticket.id == expense.ticket_id).first() | |
| project_id = str(ticket.project_id) if ticket else None | |
| metadata = { | |
| 'project_id': project_id, | |
| 'expense_id': str(expense.id), | |
| 'ticket_id': str(expense.ticket_id), | |
| 'category': expense.category, | |
| 'amount': float(expense.total_cost), | |
| 'payment_method': expense.payment_method, | |
| 'payment_reference': expense.payment_reference, | |
| 'paid_by': paid_by.name, | |
| 'action_url': f'/projects/{project_id}/expenses/{expense.id}' if project_id else f'/expenses/{expense.id}' | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=agent.id, | |
| title=title, | |
| message=message, | |
| source_type='ticket_expense', | |
| source_id=expense.id, | |
| notification_type='payment', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created expense payment notification for user {agent.id}") | |
| async def notify_bulk_expense_approval_complete( | |
| db: Session, | |
| user_id: UUID, | |
| total: int, | |
| successful: int, | |
| failed: int, | |
| is_approved: bool, | |
| project_id: Optional[UUID] = None, | |
| errors: Optional[List[str]] = None | |
| ): | |
| """ | |
| Notify PM when bulk expense approval/rejection completes | |
| Args: | |
| db: Database session | |
| user_id: User who initiated the bulk action | |
| total: Total expenses processed | |
| successful: Successfully updated | |
| failed: Failed updates | |
| is_approved: True if approved, False if rejected | |
| project_id: Optional project ID for scoping | |
| errors: List of error messages | |
| """ | |
| service = NotificationService() | |
| action = "Approval" if is_approved else "Rejection" | |
| title = f"Bulk Expense {action} Complete" | |
| if failed == 0: | |
| message = f"Successfully {('approved' if is_approved else 'rejected')} all {successful} expenses" | |
| status_type = 'success' | |
| else: | |
| message = f"{('Approved' if is_approved else 'Rejected')} {successful}/{total} expenses. {failed} failed." | |
| status_type = 'warning' | |
| metadata = { | |
| 'project_id': str(project_id) if project_id else None, | |
| 'total': total, | |
| 'successful': successful, | |
| 'failed': failed, | |
| 'is_approved': is_approved, | |
| 'status_type': status_type, | |
| 'errors': errors[:5] if errors else [], | |
| 'action_url': f'/projects/{project_id}/expenses' if project_id else '/expenses' | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=user_id, | |
| title=title, | |
| message=message, | |
| source_type='system', | |
| notification_type='bulk_operation', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created bulk expense {action.lower()} notification for user {user_id}: {successful}/{total} successful") | |
| async def notify_expense_export_complete( | |
| db: Session, | |
| user_id: UUID, | |
| total_expenses: int, | |
| total_amount: float, | |
| payment_groups: int, | |
| project_id: Optional[UUID] = None, | |
| expense_ids: Optional[List[UUID]] = None | |
| ): | |
| """ | |
| Notify PM when expense CSV export completes (marks as paid) | |
| Args: | |
| db: Database session | |
| user_id: User who initiated the export | |
| total_expenses: Total expenses exported | |
| total_amount: Total amount to be paid | |
| payment_groups: Number of payment groups (user+date combinations) | |
| project_id: Optional project ID for scoping | |
| expense_ids: List of expense IDs exported | |
| """ | |
| service = NotificationService() | |
| title = f"Tende Pay Export Complete" | |
| message = f"Exported {total_expenses} expenses (KES {total_amount:,.2f}) in {payment_groups} payment groups. CSV ready for Tende Pay upload. All expenses marked as paid." | |
| metadata = { | |
| 'project_id': str(project_id) if project_id else None, | |
| 'total_expenses': total_expenses, | |
| 'total_amount': total_amount, | |
| 'payment_groups': payment_groups, | |
| 'status_type': 'success', | |
| 'expense_ids': [str(eid) for eid in (expense_ids[:20] if expense_ids else [])], # First 20 only | |
| 'action_url': f'/projects/{project_id}/expenses?is_paid=true' if project_id else '/expenses?is_paid=true' | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=user_id, | |
| title=title, | |
| message=message, | |
| source_type='system', | |
| notification_type='bulk_operation', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created expense export notification for user {user_id}: {total_expenses} expenses, ${total_amount:.2f}") | |
| async def notify_user_invited_to_project( | |
| db: Session, | |
| user_id: UUID, | |
| project_id: UUID, | |
| project_name: str, | |
| invited_by: User, | |
| role_name: str | |
| ): | |
| """ | |
| Notify user when added to project team | |
| Args: | |
| db: Database session | |
| user_id: User who was added | |
| project_id: Project ID | |
| project_name: Project name | |
| invited_by: User who added them | |
| role_name: Their role in the project | |
| """ | |
| service = NotificationService() | |
| title = f"Added to Project: {project_name}" | |
| message = f"{invited_by.name} added you to {project_name} as {role_name}" | |
| metadata = { | |
| 'project_id': str(project_id), | |
| 'project_name': project_name, | |
| 'role': role_name, | |
| 'invited_by': invited_by.name, | |
| 'action_url': f'/projects/{project_id}' | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=user_id, | |
| title=title, | |
| message=message, | |
| source_type='project', | |
| notification_type='team_addition', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created project invitation notification for user {user_id}") | |
| async def notify_ticket_overdue( | |
| db: Session, | |
| ticket: Ticket, | |
| notify_users: List[User] | |
| ): | |
| """ | |
| Notify PM/Dispatcher about overdue ticket | |
| Args: | |
| db: Database session | |
| ticket: Overdue ticket | |
| notify_users: Users to notify | |
| """ | |
| service = NotificationService() | |
| title = f"Ticket Overdue: {ticket.ticket_name or ticket.ticket_type}" | |
| message = f"Ticket is overdue. Due date was {ticket.due_date.date() if ticket.due_date else 'N/A'}" | |
| metadata = { | |
| 'project_id': str(ticket.project_id), | |
| 'ticket_id': str(ticket.id), | |
| 'due_date': ticket.due_date.isoformat() if ticket.due_date else None, | |
| 'priority': ticket.priority, | |
| 'status_type': 'error', # Frontend can show red/warning icon | |
| 'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}', | |
| 'requires_action': True | |
| } | |
| user_ids = [user.id for user in notify_users] | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type='ticket', | |
| source_id=ticket.id, | |
| notification_type='overdue', | |
| channel='email', # Send email for overdue tickets | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created overdue notifications for ticket {ticket.id}") | |
| # ============================================ | |
| # TASK NOTIFICATIONS | |
| # ============================================ | |
| async def notify_task_created( | |
| db: Session, | |
| task: 'Task', | |
| created_by: User, | |
| notify_users: List[User] | |
| ): | |
| """ | |
| Notify PMs/managers when a new task is created | |
| Args: | |
| db: Database session | |
| task: Task that was created | |
| created_by: User who created the task | |
| notify_users: Users to notify (PMs, managers) | |
| """ | |
| service = NotificationService() | |
| # Check if auto-created from distribution | |
| is_auto_created = task.additional_metadata.get('auto_created', False) if task.additional_metadata else False | |
| if is_auto_created: | |
| title = f"Delivery Task Created: {task.task_title}" | |
| message = f"Auto-created from inventory distribution" | |
| else: | |
| title = f"New Task: {task.task_title}" | |
| message = f"Task created by {created_by.name}" | |
| metadata = { | |
| 'project_id': str(task.project_id), | |
| 'task_id': str(task.id), | |
| 'task_type': task.task_type, | |
| 'priority': task.priority.value if hasattr(task.priority, 'value') else str(task.priority), | |
| 'scheduled_date': task.scheduled_date.isoformat() if task.scheduled_date else None, | |
| 'created_by': created_by.name, | |
| 'auto_created': is_auto_created, | |
| 'action_url': f'/projects/{task.project_id}/tasks/{task.id}' | |
| } | |
| user_ids = [user.id for user in notify_users if user.id != created_by.id] # Don't notify creator | |
| if user_ids: | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type='task', | |
| source_id=task.id, | |
| notification_type='task_created', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created task creation notifications for task {task.id} to {len(user_ids)} users") | |
| async def notify_task_status_changed( | |
| db: Session, | |
| task: 'Task', | |
| old_status: str, | |
| new_status: str, | |
| changed_by: User, | |
| notify_users: List[User] | |
| ): | |
| """ | |
| Notify relevant users when task status changes | |
| Args: | |
| db: Database session | |
| task: Task that changed | |
| old_status: Previous status | |
| new_status: New status | |
| changed_by: User who changed the status | |
| notify_users: List of users to notify | |
| """ | |
| service = NotificationService() | |
| title = f"Task Status Updated: {task.task_title}" | |
| message = f"Status changed from {old_status} to {new_status}" | |
| metadata = { | |
| 'project_id': str(task.project_id), | |
| 'task_id': str(task.id), | |
| 'old_status': old_status, | |
| 'new_status': new_status, | |
| 'changed_by': changed_by.name, | |
| 'action_url': f'/projects/{task.project_id}/tasks/{task.id}' | |
| } | |
| user_ids = [user.id for user in notify_users if user.id != changed_by.id] | |
| if user_ids: | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type='task', | |
| source_id=task.id, | |
| notification_type='status_change', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created status change notifications for task {task.id} to {len(user_ids)} users") | |
| async def notify_task_completed( | |
| db: Session, | |
| task: 'Task', | |
| completed_by: User, | |
| notify_users: List[User] | |
| ): | |
| """ | |
| Notify PMs/managers when task is completed | |
| Args: | |
| db: Database session | |
| task: Task that was completed | |
| completed_by: User who completed the task | |
| notify_users: Users to notify | |
| """ | |
| service = NotificationService() | |
| title = f"Task Completed: {task.task_title}" | |
| message = f"Task completed by {completed_by.name}" | |
| metadata = { | |
| 'project_id': str(task.project_id), | |
| 'task_id': str(task.id), | |
| 'task_type': task.task_type, | |
| 'completed_by': completed_by.name, | |
| 'completed_at': task.completed_at.isoformat() if task.completed_at else None, | |
| 'status_type': 'success', | |
| 'action_url': f'/projects/{task.project_id}/tasks/{task.id}' | |
| } | |
| user_ids = [user.id for user in notify_users if user.id != completed_by.id] | |
| if user_ids: | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type='task', | |
| source_id=task.id, | |
| notification_type='task_completed', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created completion notifications for task {task.id} to {len(user_ids)} users") | |
| async def notify_task_cancelled( | |
| db: Session, | |
| task: 'Task', | |
| cancelled_by: User, | |
| reason: str, | |
| notify_users: List[User] | |
| ): | |
| """ | |
| Notify PMs/managers when task is cancelled | |
| Args: | |
| db: Database session | |
| task: Task that was cancelled | |
| cancelled_by: User who cancelled the task | |
| reason: Cancellation reason | |
| notify_users: Users to notify | |
| """ | |
| service = NotificationService() | |
| title = f"Task Cancelled: {task.task_title}" | |
| message = f"Cancelled by {cancelled_by.name}: {reason}" | |
| metadata = { | |
| 'project_id': str(task.project_id), | |
| 'task_id': str(task.id), | |
| 'cancelled_by': cancelled_by.name, | |
| 'cancellation_reason': reason, | |
| 'status_type': 'warning', | |
| 'action_url': f'/projects/{task.project_id}/tasks/{task.id}' | |
| } | |
| user_ids = [user.id for user in notify_users if user.id != cancelled_by.id] | |
| if user_ids: | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type='task', | |
| source_id=task.id, | |
| notification_type='task_cancelled', | |
| channel='in_app', | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created cancellation notifications for task {task.id} to {len(user_ids)} users") | |
| def get_project_managers_and_dispatchers(db: Session, project_id: UUID) -> List[User]: | |
| """ | |
| Get all PMs and dispatchers for a project | |
| Args: | |
| db: Database session | |
| project_id: Project ID | |
| Returns: | |
| List of users who should be notified about project events | |
| """ | |
| from app.models.project import Project | |
| from app.models.project_team import ProjectTeam | |
| # Get project | |
| project = db.query(Project).filter(Project.id == project_id).first() | |
| if not project: | |
| return [] | |
| notify_users = [] | |
| # Add primary manager | |
| if project.primary_manager_id: | |
| manager = db.query(User).filter(User.id == project.primary_manager_id).first() | |
| if manager: | |
| notify_users.append(manager) | |
| # Add dispatchers from contractor | |
| if project.contractor_id: | |
| dispatchers = db.query(User).filter( | |
| User.contractor_id == project.contractor_id, | |
| User.role == AppRole.DISPATCHER.value, | |
| User.is_active == True, | |
| User.deleted_at.is_(None) | |
| ).all() | |
| notify_users.extend(dispatchers) | |
| return notify_users | |
| def get_users_by_role_in_project( | |
| db: Session, | |
| project_id: UUID, | |
| roles: List[AppRole] | |
| ) -> List[User]: | |
| """ | |
| Get all users with specific roles in a project | |
| Args: | |
| db: Database session | |
| project_id: Project ID | |
| roles: List of roles to filter by | |
| Returns: | |
| List of users with specified roles in the project | |
| """ | |
| from app.models.project import Project | |
| from app.models.project_team import ProjectTeam | |
| # Get project | |
| project = db.query(Project).filter(Project.id == project_id).first() | |
| if not project: | |
| return [] | |
| users = [] | |
| role_values = [role.value for role in roles] | |
| # Add primary manager if PM role requested | |
| if AppRole.PROJECT_MANAGER in roles and project.primary_manager_id: | |
| manager = db.query(User).filter( | |
| User.id == project.primary_manager_id, | |
| User.is_active == True, | |
| User.deleted_at.is_(None) | |
| ).first() | |
| if manager: | |
| users.append(manager) | |
| # Add users from contractor with specified roles | |
| if project.contractor_id: | |
| contractor_users = db.query(User).filter( | |
| User.contractor_id == project.contractor_id, | |
| User.role.in_(role_values), | |
| User.is_active == True, | |
| User.deleted_at.is_(None) | |
| ).all() | |
| users.extend(contractor_users) | |
| # Add users from project team with specified roles | |
| team_users = db.query(User).join(ProjectTeam).filter( | |
| ProjectTeam.project_id == project_id, | |
| ProjectTeam.removed_at.is_(None), | |
| ProjectTeam.deleted_at.is_(None), | |
| User.role.in_(role_values), | |
| User.is_active == True, | |
| User.deleted_at.is_(None) | |
| ).all() | |
| # Deduplicate users (in case they're in both contractor and team) | |
| user_ids_seen = set() | |
| unique_users = [] | |
| for user in users + team_users: | |
| if user.id not in user_ids_seen: | |
| user_ids_seen.add(user.id) | |
| unique_users.append(user) | |
| return unique_users | |
| async def notify_users_by_role( | |
| db: Session, | |
| project_id: UUID, | |
| roles: List[AppRole], | |
| title: str, | |
| message: str, | |
| source_type: str, | |
| source_id: Optional[UUID] = None, | |
| notification_type: Optional[str] = None, | |
| channel: str = 'in_app', | |
| additional_metadata: Optional[dict] = None | |
| ): | |
| """ | |
| Notify all users with specific roles in a project | |
| Args: | |
| db: Database session | |
| project_id: Project ID | |
| roles: List of roles to notify (e.g., [AppRole.PM, AppRole.DISPATCHER]) | |
| title: Notification title | |
| message: Notification message | |
| source_type: Source type (ticket, expense, etc.) | |
| source_id: Optional source ID | |
| notification_type: Optional notification type | |
| channel: Delivery channel (default: in_app) | |
| additional_metadata: Optional additional metadata | |
| Example: | |
| await NotificationHelper.notify_users_by_role( | |
| db=db, | |
| project_id=ticket.project_id, | |
| roles=[AppRole.PROJECT_MANAGER, AppRole.DISPATCHER], | |
| title="Ticket Completed", | |
| message=f"Ticket {ticket.id} was completed", | |
| source_type='ticket', | |
| source_id=ticket.id, | |
| notification_type='completion', | |
| additional_metadata={'ticket_id': str(ticket.id)} | |
| ) | |
| """ | |
| service = NotificationService() | |
| # Find all users with specified roles in project | |
| users = NotificationHelper.get_users_by_role_in_project(db, project_id, roles) | |
| if not users: | |
| logger.warning(f"No users found with roles {roles} in project {project_id}") | |
| return | |
| # Ensure project_id is in metadata | |
| if additional_metadata is None: | |
| additional_metadata = {} | |
| if 'project_id' not in additional_metadata: | |
| additional_metadata['project_id'] = str(project_id) | |
| user_ids = [user.id for user in users] | |
| await service.create_bulk_notifications( | |
| db=db, | |
| user_ids=user_ids, | |
| title=title, | |
| message=message, | |
| source_type=source_type, | |
| source_id=source_id, | |
| notification_type=notification_type, | |
| channel=channel, | |
| additional_metadata=additional_metadata | |
| ) | |
| logger.info(f"Created notifications for {len(user_ids)} users with roles {roles} in project {project_id}") | |
| # ============================================ | |
| # PAYROLL NOTIFICATIONS | |
| # ============================================ | |
| async def notify_payroll_exported( | |
| db: Session, | |
| exported_by: User, | |
| payroll_records: List, | |
| total_amount: float, | |
| total_days_worked: int, | |
| total_tickets_closed: int, | |
| period_start: str, | |
| period_end: str, | |
| project_id: Optional[UUID] = None, | |
| warnings: Optional[List[str]] = None | |
| ): | |
| """ | |
| Notify manager/admin who exported payroll with summary. | |
| Args: | |
| db: Database session | |
| exported_by: User who initiated the export | |
| payroll_records: List of UserPayroll objects that were exported | |
| total_amount: Total payroll amount exported | |
| total_days_worked: Total days worked across all payroll | |
| total_tickets_closed: Total tickets closed across all payroll | |
| period_start: Period start date (ISO format) | |
| period_end: Period end date (ISO format) | |
| project_id: Optional project ID | |
| warnings: Optional list of warnings from export | |
| """ | |
| service = NotificationService() | |
| # Build summary message | |
| payroll_count = len(payroll_records) | |
| payroll_ids = [str(p.id) for p in payroll_records] | |
| title = f"Payroll Export Complete: {payroll_count} records" | |
| message = ( | |
| f"Successfully exported payroll for period {period_start} to {period_end}.\n\n" | |
| f"📊 Summary:\n" | |
| f"• {payroll_count} workers paid\n" | |
| f"• {total_days_worked} total days worked\n" | |
| f"• {total_tickets_closed} total tickets closed\n" | |
| f"• {total_amount:,.2f} KES total payout" | |
| ) | |
| if warnings: | |
| message += f"\n\n⚠️ {len(warnings)} warnings - review CSV before uploading to Tende Pay" | |
| metadata = { | |
| 'project_id': str(project_id) if project_id else None, | |
| 'payroll_ids': payroll_ids, | |
| 'payroll_count': payroll_count, | |
| 'total_amount': total_amount, | |
| 'total_days_worked': total_days_worked, | |
| 'total_tickets_closed': total_tickets_closed, | |
| 'period_start': period_start, | |
| 'period_end': period_end, | |
| 'warning_count': len(warnings) if warnings else 0, | |
| 'action_url': f'/projects/{project_id}/payroll' if project_id else '/payroll', | |
| 'export_timestamp': datetime.utcnow().isoformat() | |
| } | |
| await service.create_notification( | |
| db=db, | |
| user_id=exported_by.id, | |
| title=title, | |
| message=message, | |
| source_type='payroll', | |
| source_id=None, # Bulk operation, no single source | |
| notification_type='payroll_exported', | |
| channel='in_app', | |
| project_id=project_id, | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created payroll export notification for user {exported_by.id}") | |
| async def notify_payroll_payment( | |
| db: Session, | |
| payroll, | |
| user: User, | |
| project_id: Optional[UUID] = None | |
| ): | |
| """ | |
| Notify worker that their payroll has been exported for payment. | |
| Args: | |
| db: Database session | |
| payroll: UserPayroll object | |
| user: User receiving payment | |
| project_id: Optional project ID | |
| """ | |
| service = NotificationService() | |
| # Build payment details message | |
| title = f"💰 Payment Processed: {payroll.total_amount:,.2f} KES" | |
| # Build breakdown | |
| breakdown_parts = [] | |
| if payroll.days_worked: | |
| breakdown_parts.append(f"{payroll.days_worked} days worked") | |
| if payroll.tickets_closed: | |
| breakdown_parts.append(f"{payroll.tickets_closed} tickets closed") | |
| breakdown = ", ".join(breakdown_parts) if breakdown_parts else "work completed" | |
| message = ( | |
| f"Your payment for {payroll.period_start_date.strftime('%b %d')} to " | |
| f"{payroll.period_end_date.strftime('%b %d, %Y')} has been processed.\n\n" | |
| f"📋 Work Summary:\n" | |
| f"• {breakdown}\n" | |
| f"• Base earnings: {payroll.base_earnings:,.2f} KES\n" | |
| ) | |
| if payroll.bonus_amount and payroll.bonus_amount > 0: | |
| message += f"• Bonus: {payroll.bonus_amount:,.2f} KES\n" | |
| if payroll.deductions and payroll.deductions > 0: | |
| message += f"• Deductions: -{payroll.deductions:,.2f} KES\n" | |
| message += f"\n💵 Total Payment: {payroll.total_amount:,.2f} KES" | |
| message += f"\n\nPayment will be sent to your registered account shortly." | |
| metadata = { | |
| 'project_id': str(project_id) if project_id else None, | |
| 'payroll_id': str(payroll.id), | |
| 'period_start': payroll.period_start_date.isoformat(), | |
| 'period_end': payroll.period_end_date.isoformat(), | |
| 'days_worked': payroll.days_worked, | |
| 'tickets_closed': payroll.tickets_closed, | |
| 'base_earnings': float(payroll.base_earnings), | |
| 'bonus_amount': float(payroll.bonus_amount) if payroll.bonus_amount else 0, | |
| 'deductions': float(payroll.deductions) if payroll.deductions else 0, | |
| 'total_amount': float(payroll.total_amount), | |
| 'action_url': f'/payroll/{payroll.id}', | |
| 'payment_method': payroll.payment_method | |
| } | |
| # Send via WhatsApp for important payment notifications | |
| await service.create_notification( | |
| db=db, | |
| user_id=user.id, | |
| title=title, | |
| message=message, | |
| source_type='payroll', | |
| source_id=payroll.id, | |
| notification_type='payment', | |
| channel='whatsapp', # Use WhatsApp for payment notifications | |
| project_id=project_id, | |
| additional_metadata=metadata, | |
| send_now=True # Send immediately | |
| ) | |
| logger.info(f"Created payment notification for user {user.id}, payroll {payroll.id}") | |
| # ============================================ | |
| # TICKET DROP NOTIFICATIONS | |
| # ============================================ | |
| async def notify_ticket_dropped( | |
| db: Session, | |
| ticket, | |
| assignment, | |
| dropped_by: User, | |
| drop_type: str, | |
| reason: str | |
| ): | |
| """ | |
| Notify managers when agent drops a ticket (goes to PENDING_REVIEW). | |
| Critical notification - ticket needs immediate manager attention. | |
| Args: | |
| db: Database session | |
| ticket: Ticket that was dropped | |
| assignment: TicketAssignment that was dropped | |
| dropped_by: User who dropped the ticket | |
| drop_type: Type of drop (reschedule, equipment_issue, customer_issue, etc.) | |
| reason: Reason for dropping | |
| """ | |
| service = NotificationService() | |
| # Build notification message | |
| title = f"⚠️ Ticket Dropped - Action Required" | |
| # Format drop type for display | |
| drop_type_display = drop_type.replace('_', ' ').title() | |
| message = ( | |
| f"{dropped_by.name} dropped ticket: {ticket.ticket_name or ticket.ticket_type}\n\n" | |
| f"🔴 Reason: {drop_type_display}\n" | |
| f"💬 Details: {reason}\n\n" | |
| f"⚡ This ticket is now in PENDING REVIEW and needs your immediate attention." | |
| ) | |
| metadata = { | |
| 'project_id': str(ticket.project_id), | |
| 'ticket_id': str(ticket.id), | |
| 'assignment_id': str(assignment.id), | |
| 'dropped_by_user_id': str(dropped_by.id), | |
| 'dropped_by_name': dropped_by.name, | |
| 'drop_type': drop_type, | |
| 'reason': reason, | |
| 'ticket_name': ticket.ticket_name, | |
| 'ticket_type': ticket.ticket_type, | |
| 'ticket_status': ticket.status, | |
| 'action_url': f'/projects/{ticket.project_id}/tickets/{ticket.id}', | |
| 'requires_action': True, | |
| 'priority': 'high' | |
| } | |
| # Notify all managers and dispatchers in the project | |
| await NotificationHelper.notify_users_by_role( | |
| db=db, | |
| project_id=ticket.project_id, | |
| roles=[AppRole.PROJECT_MANAGER, AppRole.DISPATCHER], | |
| title=title, | |
| message=message, | |
| source_type='ticket', | |
| source_id=ticket.id, | |
| notification_type='ticket_dropped', | |
| channel='in_app', # Could also use WhatsApp for urgent drops | |
| additional_metadata=metadata | |
| ) | |
| logger.info(f"Created ticket drop notification for ticket {ticket.id}, dropped by {dropped_by.id}") | |