Spaces:
Sleeping
Sleeping
| """ | |
| Notification Service - WhatsApp, Email, and In-App notifications with polymorphic support | |
| """ | |
| import os | |
| import logging | |
| from typing import Optional, Dict, Any, List | |
| from datetime import datetime, timezone | |
| from uuid import UUID | |
| import httpx | |
| from jinja2 import Environment, FileSystemLoader, select_autoescape | |
| from pathlib import Path | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy import and_, or_ | |
| from app.models.notification import Notification, NotificationChannel, NotificationStatus | |
| from app.schemas.notification import ( | |
| NotificationCreate, NotificationSourceType, NotificationType, | |
| NotificationFilters, NotificationListResponse, NotificationSummary, | |
| NotificationStatsResponse | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class NotificationService: | |
| """Service for sending notifications via WhatsApp and Email""" | |
| def __init__(self): | |
| # Configuration | |
| self.app_domain = os.getenv('APP_DOMAIN', 'swiftops.atomio.tech') | |
| self.app_protocol = os.getenv('APP_PROTOCOL', 'https') | |
| # Resend configuration | |
| self.resend_api_key = os.getenv('RESEND_API_KEY') | |
| self.resend_from_email = os.getenv('RESEND_FROM_EMAIL', 'swiftops@atomio.tech') | |
| # WaSender configuration | |
| self.wasender_api_key = os.getenv('WASENDER_API_KEY') | |
| self.wasender_phone = os.getenv('WASENDER_PHONE_NUMBER') | |
| self.wasender_api_url = os.getenv('WASENDER_API_URL', 'https://www.wasenderapi.com/api') | |
| # Setup Jinja2 for templates | |
| template_dir = Path(__file__).parent.parent / 'templates' | |
| self.jinja_env = Environment( | |
| loader=FileSystemLoader(template_dir), | |
| autoescape=select_autoescape(['html', 'xml']) | |
| ) | |
| def construct_url(self, path: str) -> str: | |
| """ | |
| Construct full URL from domain and path | |
| Args: | |
| path: URL path (e.g., '/accept-invitation') | |
| Returns: | |
| Full URL (e.g., 'https://swiftops.atomio.tech/accept-invitation') | |
| """ | |
| path = path.lstrip('/') | |
| return f"{self.app_protocol}://{self.app_domain}/{path}" | |
| async def send_whatsapp_invitation( | |
| self, | |
| phone: str, | |
| name: str, | |
| organization_name: str, | |
| role: str, | |
| invitation_url: str, | |
| expiry_hours: int = 72, | |
| project_name: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Send invitation via WhatsApp using WaSender | |
| Args: | |
| phone: Recipient phone number (with country code) | |
| name: Recipient name | |
| organization_name: Organization name | |
| role: User role | |
| invitation_url: Full invitation URL | |
| expiry_hours: Hours until expiry | |
| project_name: Project name (optional) | |
| Returns: | |
| Dict with success status and message | |
| """ | |
| if not self.wasender_api_key: | |
| logger.warning("WaSender API key not configured") | |
| return {'success': False, 'error': 'WaSender not configured'} | |
| try: | |
| # Load WhatsApp template | |
| template = self.jinja_env.get_template('whatsapp/invitation.txt') | |
| message = template.render( | |
| name=name, | |
| organization_name=organization_name, | |
| role=role.replace('_', ' ').title(), | |
| invitation_url=invitation_url, | |
| expiry_hours=expiry_hours, | |
| app_domain=self.app_domain, | |
| project_name=project_name | |
| ) | |
| # Send via WaSender API | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| f"{self.wasender_api_url}/send-message", | |
| headers={ | |
| 'Authorization': f'Bearer {self.wasender_api_key}', | |
| 'Content-Type': 'application/json' | |
| }, | |
| json={ | |
| 'to': phone, | |
| 'text': message | |
| }, | |
| timeout=30.0 | |
| ) | |
| if response.status_code == 200: | |
| logger.info(f"WhatsApp invitation sent to {phone}") | |
| return {'success': True, 'message': 'WhatsApp sent successfully'} | |
| else: | |
| error_msg = f"WaSender API error: {response.status_code}" | |
| logger.error(f"{error_msg} - {response.text}") | |
| return {'success': False, 'error': error_msg} | |
| except Exception as e: | |
| logger.error(f"WhatsApp send error: {str(e)}") | |
| return {'success': False, 'error': str(e)} | |
| async def send_email_invitation( | |
| self, | |
| email: str, | |
| name: str, | |
| organization_name: str, | |
| role: str, | |
| invitation_url: str, | |
| expiry_hours: int = 72, | |
| project_name: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Send invitation via Email using Resend | |
| Args: | |
| email: Recipient email | |
| name: Recipient name | |
| organization_name: Organization name | |
| role: User role | |
| invitation_url: Full invitation URL | |
| expiry_hours: Hours until expiry | |
| project_name: Project name (optional) | |
| Returns: | |
| Dict with success status and message | |
| """ | |
| if not self.resend_api_key: | |
| logger.warning("Resend API key not configured") | |
| return {'success': False, 'error': 'Resend not configured'} | |
| try: | |
| # Load email template | |
| template = self.jinja_env.get_template('emails/invitation.html') | |
| html_content = template.render( | |
| name=name, | |
| organization_name=organization_name, | |
| role=role.replace('_', ' ').title(), | |
| invitation_url=invitation_url, | |
| expiry_hours=expiry_hours, | |
| app_domain=self.app_domain, | |
| current_year=datetime.now().year, | |
| project_name=project_name | |
| ) | |
| # Send via Resend API | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| 'https://api.resend.com/emails', | |
| headers={ | |
| 'Authorization': f'Bearer {self.resend_api_key}', | |
| 'Content-Type': 'application/json' | |
| }, | |
| json={ | |
| 'from': self.resend_from_email, | |
| 'to': [email], | |
| 'subject': f'Invitation to join {organization_name} on SwiftOps', | |
| 'html': html_content | |
| }, | |
| timeout=30.0 | |
| ) | |
| if response.status_code == 200: | |
| logger.info(f"Email invitation sent to {email}") | |
| return {'success': True, 'message': 'Email sent successfully'} | |
| else: | |
| error_msg = f"Resend API error: {response.status_code}" | |
| logger.error(f"{error_msg} - {response.text}") | |
| return {'success': False, 'error': error_msg} | |
| except Exception as e: | |
| logger.error(f"Email send error: {str(e)}") | |
| return {'success': False, 'error': str(e)} | |
| async def send_invitation( | |
| self, | |
| email: str, | |
| phone: Optional[str], | |
| name: str, | |
| organization_name: str, | |
| role: str, | |
| token: str, | |
| method: str = 'whatsapp', | |
| expiry_hours: int = 72, | |
| project_name: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Send invitation using specified method with smart fallback | |
| Args: | |
| email: Recipient email | |
| phone: Recipient phone (optional) | |
| name: Recipient name | |
| organization_name: Organization name | |
| role: User role | |
| token: Invitation token | |
| method: Delivery method ('whatsapp', 'email', 'both') | |
| expiry_hours: Hours until expiry | |
| project_name: Project name (optional, for project invitations) | |
| Returns: | |
| Dict with delivery results | |
| """ | |
| invitation_url = self.construct_url(f'accept-invitation?token={token}') | |
| results = { | |
| 'whatsapp_sent': False, | |
| 'whatsapp_error': None, | |
| 'email_sent': False, | |
| 'email_error': None | |
| } | |
| # Try WhatsApp first if requested | |
| if method in ['whatsapp', 'both'] and phone: | |
| whatsapp_result = await self.send_whatsapp_invitation( | |
| phone=phone, | |
| name=name, | |
| organization_name=organization_name, | |
| role=role, | |
| invitation_url=invitation_url, | |
| expiry_hours=expiry_hours, | |
| project_name=project_name | |
| ) | |
| results['whatsapp_sent'] = whatsapp_result['success'] | |
| if not whatsapp_result['success']: | |
| results['whatsapp_error'] = whatsapp_result.get('error') | |
| logger.warning(f"WhatsApp failed, will try email fallback: {whatsapp_result.get('error')}") | |
| # Try Email if requested or as fallback | |
| if method in ['email', 'both'] or (method == 'whatsapp' and not results['whatsapp_sent']): | |
| email_result = await self.send_email_invitation( | |
| email=email, | |
| name=name, | |
| organization_name=organization_name, | |
| role=role, | |
| invitation_url=invitation_url, | |
| expiry_hours=expiry_hours, | |
| project_name=project_name | |
| ) | |
| results['email_sent'] = email_result['success'] | |
| if not email_result['success']: | |
| results['email_error'] = email_result.get('error') | |
| return results | |
| async def send_email( | |
| self, | |
| to_email: str, | |
| subject: str, | |
| template_name: str, | |
| template_data: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Send email using template | |
| Args: | |
| to_email: Recipient email | |
| subject: Email subject | |
| template_name: Template file name (without .html extension) | |
| template_data: Data to render in template | |
| Returns: | |
| Dict with success status and message | |
| """ | |
| if not self.resend_api_key: | |
| logger.warning("Resend API key not configured") | |
| return {'success': False, 'error': 'Resend not configured'} | |
| try: | |
| # Load email template | |
| template = self.jinja_env.get_template(f'emails/{template_name}.html') | |
| # Add common template data | |
| template_data['app_domain'] = self.app_domain | |
| template_data['current_year'] = datetime.now().year | |
| html_content = template.render(**template_data) | |
| # Send via Resend API | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| 'https://api.resend.com/emails', | |
| headers={ | |
| 'Authorization': f'Bearer {self.resend_api_key}', | |
| 'Content-Type': 'application/json' | |
| }, | |
| json={ | |
| 'from': self.resend_from_email, | |
| 'to': [to_email], | |
| 'subject': subject, | |
| 'html': html_content | |
| }, | |
| timeout=30.0 | |
| ) | |
| if response.status_code == 200: | |
| logger.info(f"Email sent to {to_email}: {subject}") | |
| return {'success': True, 'message': 'Email sent successfully'} | |
| else: | |
| error_msg = f"Resend API error: {response.status_code}" | |
| logger.error(f"{error_msg} - {response.text}") | |
| return {'success': False, 'error': error_msg} | |
| except Exception as e: | |
| logger.error(f"Email send error: {str(e)}") | |
| return {'success': False, 'error': str(e)} | |
| # ============================================ | |
| # POLYMORPHIC NOTIFICATION METHODS | |
| # ============================================ | |
| async def create_notification( | |
| self, | |
| db: Session, | |
| user_id: UUID, | |
| title: str, | |
| message: str, | |
| source_type: str, | |
| source_id: Optional[UUID] = None, | |
| notification_type: Optional[str] = None, | |
| channel: str = 'in_app', | |
| project_id: Optional[UUID] = None, | |
| additional_metadata: Optional[Dict[str, Any]] = None, | |
| send_now: bool = False | |
| ) -> Notification: | |
| """ | |
| Create a notification in the database | |
| Args: | |
| db: Database session | |
| user_id: Recipient user ID | |
| title: Notification title | |
| message: Notification message | |
| source_type: Source entity type ('ticket', 'project', 'expense', etc.) | |
| source_id: Source entity ID | |
| notification_type: Type of notification ('assignment', 'status_change', etc.) | |
| channel: Delivery channel (default: 'in_app') | |
| project_id: Optional project ID for scoping/filtering | |
| additional_metadata: Additional data (action URLs, etc.) | |
| send_now: Whether to send immediately via external channel | |
| Returns: | |
| Created Notification object | |
| """ | |
| try: | |
| # Extract project_id from metadata if not provided directly | |
| if not project_id and additional_metadata: | |
| metadata_project_id = additional_metadata.get('project_id') | |
| if metadata_project_id and metadata_project_id != 'None': | |
| try: | |
| project_id = UUID(str(metadata_project_id)) | |
| except (ValueError, TypeError): | |
| pass | |
| # Create notification record | |
| notification = Notification( | |
| user_id=user_id, | |
| title=title, | |
| message=message, | |
| source_type=source_type, | |
| source_id=source_id, | |
| notification_type=notification_type, | |
| channel=NotificationChannel(channel), | |
| status=NotificationStatus.PENDING, | |
| project_id=project_id, | |
| additional_metadata=additional_metadata or {} | |
| ) | |
| db.add(notification) | |
| db.commit() | |
| db.refresh(notification) | |
| logger.info(f"Created notification {notification.id} for user {user_id}") | |
| # Send via external channel if requested | |
| if send_now and channel != 'in_app': | |
| # Queue for background processing (will be implemented in tasks) | |
| logger.info(f"Queuing notification {notification.id} for {channel} delivery") | |
| return notification | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error creating notification: {str(e)}") | |
| raise | |
| async def create_bulk_notifications( | |
| self, | |
| db: Session, | |
| user_ids: List[UUID], | |
| title: str, | |
| message: str, | |
| source_type: str, | |
| source_id: Optional[UUID] = None, | |
| notification_type: Optional[str] = None, | |
| channel: str = 'in_app', | |
| project_id: Optional[UUID] = None, | |
| additional_metadata: Optional[Dict[str, Any]] = None | |
| ) -> List[Notification]: | |
| """ | |
| Create notifications for multiple users | |
| Args: | |
| db: Database session | |
| user_ids: List of recipient user IDs | |
| title: Notification title | |
| message: Notification message | |
| source_type: Source entity type | |
| source_id: Source entity ID | |
| notification_type: Type of notification | |
| channel: Delivery channel | |
| project_id: Optional project ID for scoping | |
| additional_metadata: Additional metadata | |
| Returns: | |
| List of created Notification objects | |
| """ | |
| notifications = [] | |
| for user_id in user_ids: | |
| try: | |
| notification = await self.create_notification( | |
| db=db, | |
| user_id=user_id, | |
| title=title, | |
| message=message, | |
| source_type=source_type, | |
| source_id=source_id, | |
| notification_type=notification_type, | |
| channel=channel, | |
| project_id=project_id, | |
| additional_metadata=additional_metadata | |
| ) | |
| notifications.append(notification) | |
| except Exception as e: | |
| logger.error(f"Error creating notification for user {user_id}: {str(e)}") | |
| continue | |
| logger.info(f"Created {len(notifications)} notifications for {len(user_ids)} users") | |
| return notifications | |
| def get_user_notifications( | |
| self, | |
| db: Session, | |
| user_id: UUID, | |
| filters: Optional[NotificationFilters] = None | |
| ) -> NotificationListResponse: | |
| """ | |
| Get notifications for a user with optional filters | |
| Args: | |
| db: Database session | |
| user_id: User ID | |
| filters: Optional filters (status, type, etc.) | |
| Returns: | |
| NotificationListResponse with paginated results | |
| """ | |
| query = db.query(Notification).filter(Notification.user_id == user_id) | |
| # Apply filters | |
| if filters: | |
| if filters.status: | |
| query = query.filter(Notification.status.in_(filters.status)) | |
| if filters.notification_type: | |
| query = query.filter(Notification.notification_type.in_(filters.notification_type)) | |
| if filters.source_type: | |
| query = query.filter(Notification.source_type.in_(filters.source_type)) | |
| if filters.channel: | |
| query = query.filter(Notification.channel.in_(filters.channel)) | |
| if filters.is_read is not None: | |
| if filters.is_read: | |
| query = query.filter(Notification.read_at.isnot(None)) | |
| else: | |
| query = query.filter(Notification.read_at.is_(None)) | |
| if filters.project_id: | |
| # Filter by project_id column (more efficient than JSONB) | |
| query = query.filter(Notification.project_id == filters.project_id) | |
| if filters.from_date: | |
| query = query.filter(Notification.created_at >= filters.from_date) | |
| if filters.to_date: | |
| query = query.filter(Notification.created_at <= filters.to_date) | |
| # Get total count | |
| total = query.count() | |
| # Apply pagination | |
| page = filters.page if filters else 1 | |
| page_size = filters.page_size if filters else 20 | |
| offset = (page - 1) * page_size | |
| # Get notifications | |
| notifications = query.order_by(Notification.created_at.desc())\ | |
| .offset(offset)\ | |
| .limit(page_size)\ | |
| .all() | |
| # Convert to summaries | |
| summaries = [ | |
| NotificationSummary( | |
| id=n.id, | |
| title=n.title, | |
| message=n.message, | |
| notification_type=n.notification_type, | |
| source_type=n.source_type, | |
| source_id=n.source_id, | |
| status=n.status, | |
| is_read=n.is_read, | |
| created_at=n.created_at, | |
| additional_metadata=n.additional_metadata | |
| ) | |
| for n in notifications | |
| ] | |
| return NotificationListResponse( | |
| notifications=summaries, | |
| total=total, | |
| page=page, | |
| page_size=page_size, | |
| has_more=total > (page * page_size) | |
| ) | |
| def get_notification_stats( | |
| self, | |
| db: Session, | |
| user_id: UUID | |
| ) -> NotificationStatsResponse: | |
| """ | |
| Get notification statistics for a user | |
| Args: | |
| db: Database session | |
| user_id: User ID | |
| Returns: | |
| NotificationStatsResponse with counts | |
| """ | |
| base_query = db.query(Notification).filter(Notification.user_id == user_id) | |
| total = base_query.count() | |
| unread = base_query.filter(Notification.read_at.is_(None)).count() | |
| read = base_query.filter(Notification.read_at.isnot(None)).count() | |
| failed = base_query.filter(Notification.status == NotificationStatus.FAILED).count() | |
| # Counts by type | |
| by_type = {} | |
| from sqlalchemy import func | |
| type_results = db.query( | |
| Notification.notification_type, | |
| func.count(Notification.id) | |
| ).filter(Notification.user_id == user_id)\ | |
| .group_by(Notification.notification_type)\ | |
| .all() | |
| for notif_type, count in type_results: | |
| if notif_type: | |
| by_type[notif_type] = count | |
| # Counts by channel | |
| by_channel = {} | |
| channel_results = db.query( | |
| Notification.channel, | |
| func.count(Notification.id) | |
| ).filter(Notification.user_id == user_id)\ | |
| .group_by(Notification.channel)\ | |
| .all() | |
| for channel, count in channel_results: | |
| by_channel[channel.value] = count | |
| return NotificationStatsResponse( | |
| total=total, | |
| unread=unread, | |
| read=read, | |
| failed=failed, | |
| by_type=by_type, | |
| by_channel=by_channel | |
| ) | |
| def mark_as_read( | |
| self, | |
| db: Session, | |
| notification_id: UUID, | |
| user_id: UUID | |
| ) -> Optional[Notification]: | |
| """ | |
| Mark a notification as read | |
| Args: | |
| db: Database session | |
| notification_id: Notification ID | |
| user_id: User ID (for authorization) | |
| Returns: | |
| Updated Notification or None if not found | |
| """ | |
| notification = db.query(Notification).filter( | |
| and_( | |
| Notification.id == notification_id, | |
| Notification.user_id == user_id | |
| ) | |
| ).first() | |
| if notification: | |
| notification.mark_as_read() | |
| db.commit() | |
| db.refresh(notification) | |
| logger.info(f"Marked notification {notification_id} as read for user {user_id}") | |
| return notification | |
| def mark_all_as_read( | |
| self, | |
| db: Session, | |
| user_id: UUID, | |
| notification_ids: Optional[List[UUID]] = None | |
| ) -> int: | |
| """ | |
| Mark multiple notifications as read | |
| Args: | |
| db: Database session | |
| user_id: User ID | |
| notification_ids: Specific notification IDs (or None for all unread) | |
| Returns: | |
| Number of notifications marked as read | |
| """ | |
| query = db.query(Notification).filter( | |
| and_( | |
| Notification.user_id == user_id, | |
| Notification.read_at.is_(None) | |
| ) | |
| ) | |
| if notification_ids: | |
| query = query.filter(Notification.id.in_(notification_ids)) | |
| count = query.update( | |
| { | |
| 'read_at': datetime.now(timezone.utc), | |
| 'status': NotificationStatus.READ | |
| }, | |
| synchronize_session=False | |
| ) | |
| db.commit() | |
| logger.info(f"Marked {count} notifications as read for user {user_id}") | |
| return count | |
| def delete_notification( | |
| self, | |
| db: Session, | |
| notification_id: UUID, | |
| user_id: UUID | |
| ) -> bool: | |
| """ | |
| Delete a notification | |
| Args: | |
| db: Database session | |
| notification_id: Notification ID | |
| user_id: User ID (for authorization) | |
| Returns: | |
| True if deleted, False if not found | |
| """ | |
| notification = db.query(Notification).filter( | |
| and_( | |
| Notification.id == notification_id, | |
| Notification.user_id == user_id | |
| ) | |
| ).first() | |
| if notification: | |
| db.delete(notification) | |
| db.commit() | |
| logger.info(f"Deleted notification {notification_id} for user {user_id}") | |
| return True | |
| return False | |