swiftops-backend / src /app /services /notification_service.py
kamau1's picture
Fix notification filtering by switching list-based fields from == to .in_() to prevent PostgreSQL text=text[] errors
15d1bcf
"""
Notification Service - WhatsApp, Email, and In-App notifications with polymorphic support
"""
import os
import logging
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
from uuid import UUID
import httpx
from jinja2 import Environment, FileSystemLoader, select_autoescape
from pathlib import Path
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from app.models.notification import Notification, NotificationChannel, NotificationStatus
from app.schemas.notification import (
NotificationCreate, NotificationSourceType, NotificationType,
NotificationFilters, NotificationListResponse, NotificationSummary,
NotificationStatsResponse
)
logger = logging.getLogger(__name__)
class NotificationService:
"""Service for sending notifications via WhatsApp and Email"""
def __init__(self):
# Configuration
self.app_domain = os.getenv('APP_DOMAIN', 'swiftops.atomio.tech')
self.app_protocol = os.getenv('APP_PROTOCOL', 'https')
# Resend configuration
self.resend_api_key = os.getenv('RESEND_API_KEY')
self.resend_from_email = os.getenv('RESEND_FROM_EMAIL', 'swiftops@atomio.tech')
# WaSender configuration
self.wasender_api_key = os.getenv('WASENDER_API_KEY')
self.wasender_phone = os.getenv('WASENDER_PHONE_NUMBER')
self.wasender_api_url = os.getenv('WASENDER_API_URL', 'https://www.wasenderapi.com/api')
# Setup Jinja2 for templates
template_dir = Path(__file__).parent.parent / 'templates'
self.jinja_env = Environment(
loader=FileSystemLoader(template_dir),
autoescape=select_autoescape(['html', 'xml'])
)
def construct_url(self, path: str) -> str:
"""
Construct full URL from domain and path
Args:
path: URL path (e.g., '/accept-invitation')
Returns:
Full URL (e.g., 'https://swiftops.atomio.tech/accept-invitation')
"""
path = path.lstrip('/')
return f"{self.app_protocol}://{self.app_domain}/{path}"
async def send_whatsapp_invitation(
self,
phone: str,
name: str,
organization_name: str,
role: str,
invitation_url: str,
expiry_hours: int = 72,
project_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Send invitation via WhatsApp using WaSender
Args:
phone: Recipient phone number (with country code)
name: Recipient name
organization_name: Organization name
role: User role
invitation_url: Full invitation URL
expiry_hours: Hours until expiry
project_name: Project name (optional)
Returns:
Dict with success status and message
"""
if not self.wasender_api_key:
logger.warning("WaSender API key not configured")
return {'success': False, 'error': 'WaSender not configured'}
try:
# Load WhatsApp template
template = self.jinja_env.get_template('whatsapp/invitation.txt')
message = template.render(
name=name,
organization_name=organization_name,
role=role.replace('_', ' ').title(),
invitation_url=invitation_url,
expiry_hours=expiry_hours,
app_domain=self.app_domain,
project_name=project_name
)
# Send via WaSender API
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.wasender_api_url}/send-message",
headers={
'Authorization': f'Bearer {self.wasender_api_key}',
'Content-Type': 'application/json'
},
json={
'to': phone,
'text': message
},
timeout=30.0
)
if response.status_code == 200:
logger.info(f"WhatsApp invitation sent to {phone}")
return {'success': True, 'message': 'WhatsApp sent successfully'}
else:
error_msg = f"WaSender API error: {response.status_code}"
logger.error(f"{error_msg} - {response.text}")
return {'success': False, 'error': error_msg}
except Exception as e:
logger.error(f"WhatsApp send error: {str(e)}")
return {'success': False, 'error': str(e)}
async def send_email_invitation(
self,
email: str,
name: str,
organization_name: str,
role: str,
invitation_url: str,
expiry_hours: int = 72,
project_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Send invitation via Email using Resend
Args:
email: Recipient email
name: Recipient name
organization_name: Organization name
role: User role
invitation_url: Full invitation URL
expiry_hours: Hours until expiry
project_name: Project name (optional)
Returns:
Dict with success status and message
"""
if not self.resend_api_key:
logger.warning("Resend API key not configured")
return {'success': False, 'error': 'Resend not configured'}
try:
# Load email template
template = self.jinja_env.get_template('emails/invitation.html')
html_content = template.render(
name=name,
organization_name=organization_name,
role=role.replace('_', ' ').title(),
invitation_url=invitation_url,
expiry_hours=expiry_hours,
app_domain=self.app_domain,
current_year=datetime.now().year,
project_name=project_name
)
# Send via Resend API
async with httpx.AsyncClient() as client:
response = await client.post(
'https://api.resend.com/emails',
headers={
'Authorization': f'Bearer {self.resend_api_key}',
'Content-Type': 'application/json'
},
json={
'from': self.resend_from_email,
'to': [email],
'subject': f'Invitation to join {organization_name} on SwiftOps',
'html': html_content
},
timeout=30.0
)
if response.status_code == 200:
logger.info(f"Email invitation sent to {email}")
return {'success': True, 'message': 'Email sent successfully'}
else:
error_msg = f"Resend API error: {response.status_code}"
logger.error(f"{error_msg} - {response.text}")
return {'success': False, 'error': error_msg}
except Exception as e:
logger.error(f"Email send error: {str(e)}")
return {'success': False, 'error': str(e)}
async def send_invitation(
self,
email: str,
phone: Optional[str],
name: str,
organization_name: str,
role: str,
token: str,
method: str = 'whatsapp',
expiry_hours: int = 72,
project_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Send invitation using specified method with smart fallback
Args:
email: Recipient email
phone: Recipient phone (optional)
name: Recipient name
organization_name: Organization name
role: User role
token: Invitation token
method: Delivery method ('whatsapp', 'email', 'both')
expiry_hours: Hours until expiry
project_name: Project name (optional, for project invitations)
Returns:
Dict with delivery results
"""
invitation_url = self.construct_url(f'accept-invitation?token={token}')
results = {
'whatsapp_sent': False,
'whatsapp_error': None,
'email_sent': False,
'email_error': None
}
# Try WhatsApp first if requested
if method in ['whatsapp', 'both'] and phone:
whatsapp_result = await self.send_whatsapp_invitation(
phone=phone,
name=name,
organization_name=organization_name,
role=role,
invitation_url=invitation_url,
expiry_hours=expiry_hours,
project_name=project_name
)
results['whatsapp_sent'] = whatsapp_result['success']
if not whatsapp_result['success']:
results['whatsapp_error'] = whatsapp_result.get('error')
logger.warning(f"WhatsApp failed, will try email fallback: {whatsapp_result.get('error')}")
# Try Email if requested or as fallback
if method in ['email', 'both'] or (method == 'whatsapp' and not results['whatsapp_sent']):
email_result = await self.send_email_invitation(
email=email,
name=name,
organization_name=organization_name,
role=role,
invitation_url=invitation_url,
expiry_hours=expiry_hours,
project_name=project_name
)
results['email_sent'] = email_result['success']
if not email_result['success']:
results['email_error'] = email_result.get('error')
return results
async def send_email(
self,
to_email: str,
subject: str,
template_name: str,
template_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Send email using template
Args:
to_email: Recipient email
subject: Email subject
template_name: Template file name (without .html extension)
template_data: Data to render in template
Returns:
Dict with success status and message
"""
if not self.resend_api_key:
logger.warning("Resend API key not configured")
return {'success': False, 'error': 'Resend not configured'}
try:
# Load email template
template = self.jinja_env.get_template(f'emails/{template_name}.html')
# Add common template data
template_data['app_domain'] = self.app_domain
template_data['current_year'] = datetime.now().year
html_content = template.render(**template_data)
# Send via Resend API
async with httpx.AsyncClient() as client:
response = await client.post(
'https://api.resend.com/emails',
headers={
'Authorization': f'Bearer {self.resend_api_key}',
'Content-Type': 'application/json'
},
json={
'from': self.resend_from_email,
'to': [to_email],
'subject': subject,
'html': html_content
},
timeout=30.0
)
if response.status_code == 200:
logger.info(f"Email sent to {to_email}: {subject}")
return {'success': True, 'message': 'Email sent successfully'}
else:
error_msg = f"Resend API error: {response.status_code}"
logger.error(f"{error_msg} - {response.text}")
return {'success': False, 'error': error_msg}
except Exception as e:
logger.error(f"Email send error: {str(e)}")
return {'success': False, 'error': str(e)}
# ============================================
# POLYMORPHIC NOTIFICATION METHODS
# ============================================
async def create_notification(
self,
db: Session,
user_id: UUID,
title: str,
message: str,
source_type: str,
source_id: Optional[UUID] = None,
notification_type: Optional[str] = None,
channel: str = 'in_app',
project_id: Optional[UUID] = None,
additional_metadata: Optional[Dict[str, Any]] = None,
send_now: bool = False
) -> Notification:
"""
Create a notification in the database
Args:
db: Database session
user_id: Recipient user ID
title: Notification title
message: Notification message
source_type: Source entity type ('ticket', 'project', 'expense', etc.)
source_id: Source entity ID
notification_type: Type of notification ('assignment', 'status_change', etc.)
channel: Delivery channel (default: 'in_app')
project_id: Optional project ID for scoping/filtering
additional_metadata: Additional data (action URLs, etc.)
send_now: Whether to send immediately via external channel
Returns:
Created Notification object
"""
try:
# Extract project_id from metadata if not provided directly
if not project_id and additional_metadata:
metadata_project_id = additional_metadata.get('project_id')
if metadata_project_id and metadata_project_id != 'None':
try:
project_id = UUID(str(metadata_project_id))
except (ValueError, TypeError):
pass
# Create notification record
notification = Notification(
user_id=user_id,
title=title,
message=message,
source_type=source_type,
source_id=source_id,
notification_type=notification_type,
channel=NotificationChannel(channel),
status=NotificationStatus.PENDING,
project_id=project_id,
additional_metadata=additional_metadata or {}
)
db.add(notification)
db.commit()
db.refresh(notification)
logger.info(f"Created notification {notification.id} for user {user_id}")
# Send via external channel if requested
if send_now and channel != 'in_app':
# Queue for background processing (will be implemented in tasks)
logger.info(f"Queuing notification {notification.id} for {channel} delivery")
return notification
except Exception as e:
db.rollback()
logger.error(f"Error creating notification: {str(e)}")
raise
async def create_bulk_notifications(
self,
db: Session,
user_ids: List[UUID],
title: str,
message: str,
source_type: str,
source_id: Optional[UUID] = None,
notification_type: Optional[str] = None,
channel: str = 'in_app',
project_id: Optional[UUID] = None,
additional_metadata: Optional[Dict[str, Any]] = None
) -> List[Notification]:
"""
Create notifications for multiple users
Args:
db: Database session
user_ids: List of recipient user IDs
title: Notification title
message: Notification message
source_type: Source entity type
source_id: Source entity ID
notification_type: Type of notification
channel: Delivery channel
project_id: Optional project ID for scoping
additional_metadata: Additional metadata
Returns:
List of created Notification objects
"""
notifications = []
for user_id in user_ids:
try:
notification = await self.create_notification(
db=db,
user_id=user_id,
title=title,
message=message,
source_type=source_type,
source_id=source_id,
notification_type=notification_type,
channel=channel,
project_id=project_id,
additional_metadata=additional_metadata
)
notifications.append(notification)
except Exception as e:
logger.error(f"Error creating notification for user {user_id}: {str(e)}")
continue
logger.info(f"Created {len(notifications)} notifications for {len(user_ids)} users")
return notifications
def get_user_notifications(
self,
db: Session,
user_id: UUID,
filters: Optional[NotificationFilters] = None
) -> NotificationListResponse:
"""
Get notifications for a user with optional filters
Args:
db: Database session
user_id: User ID
filters: Optional filters (status, type, etc.)
Returns:
NotificationListResponse with paginated results
"""
query = db.query(Notification).filter(Notification.user_id == user_id)
# Apply filters
if filters:
if filters.status:
query = query.filter(Notification.status.in_(filters.status))
if filters.notification_type:
query = query.filter(Notification.notification_type.in_(filters.notification_type))
if filters.source_type:
query = query.filter(Notification.source_type.in_(filters.source_type))
if filters.channel:
query = query.filter(Notification.channel.in_(filters.channel))
if filters.is_read is not None:
if filters.is_read:
query = query.filter(Notification.read_at.isnot(None))
else:
query = query.filter(Notification.read_at.is_(None))
if filters.project_id:
# Filter by project_id column (more efficient than JSONB)
query = query.filter(Notification.project_id == filters.project_id)
if filters.from_date:
query = query.filter(Notification.created_at >= filters.from_date)
if filters.to_date:
query = query.filter(Notification.created_at <= filters.to_date)
# Get total count
total = query.count()
# Apply pagination
page = filters.page if filters else 1
page_size = filters.page_size if filters else 20
offset = (page - 1) * page_size
# Get notifications
notifications = query.order_by(Notification.created_at.desc())\
.offset(offset)\
.limit(page_size)\
.all()
# Convert to summaries
summaries = [
NotificationSummary(
id=n.id,
title=n.title,
message=n.message,
notification_type=n.notification_type,
source_type=n.source_type,
source_id=n.source_id,
status=n.status,
is_read=n.is_read,
created_at=n.created_at,
additional_metadata=n.additional_metadata
)
for n in notifications
]
return NotificationListResponse(
notifications=summaries,
total=total,
page=page,
page_size=page_size,
has_more=total > (page * page_size)
)
def get_notification_stats(
self,
db: Session,
user_id: UUID
) -> NotificationStatsResponse:
"""
Get notification statistics for a user
Args:
db: Database session
user_id: User ID
Returns:
NotificationStatsResponse with counts
"""
base_query = db.query(Notification).filter(Notification.user_id == user_id)
total = base_query.count()
unread = base_query.filter(Notification.read_at.is_(None)).count()
read = base_query.filter(Notification.read_at.isnot(None)).count()
failed = base_query.filter(Notification.status == NotificationStatus.FAILED).count()
# Counts by type
by_type = {}
from sqlalchemy import func
type_results = db.query(
Notification.notification_type,
func.count(Notification.id)
).filter(Notification.user_id == user_id)\
.group_by(Notification.notification_type)\
.all()
for notif_type, count in type_results:
if notif_type:
by_type[notif_type] = count
# Counts by channel
by_channel = {}
channel_results = db.query(
Notification.channel,
func.count(Notification.id)
).filter(Notification.user_id == user_id)\
.group_by(Notification.channel)\
.all()
for channel, count in channel_results:
by_channel[channel.value] = count
return NotificationStatsResponse(
total=total,
unread=unread,
read=read,
failed=failed,
by_type=by_type,
by_channel=by_channel
)
def mark_as_read(
self,
db: Session,
notification_id: UUID,
user_id: UUID
) -> Optional[Notification]:
"""
Mark a notification as read
Args:
db: Database session
notification_id: Notification ID
user_id: User ID (for authorization)
Returns:
Updated Notification or None if not found
"""
notification = db.query(Notification).filter(
and_(
Notification.id == notification_id,
Notification.user_id == user_id
)
).first()
if notification:
notification.mark_as_read()
db.commit()
db.refresh(notification)
logger.info(f"Marked notification {notification_id} as read for user {user_id}")
return notification
def mark_all_as_read(
self,
db: Session,
user_id: UUID,
notification_ids: Optional[List[UUID]] = None
) -> int:
"""
Mark multiple notifications as read
Args:
db: Database session
user_id: User ID
notification_ids: Specific notification IDs (or None for all unread)
Returns:
Number of notifications marked as read
"""
query = db.query(Notification).filter(
and_(
Notification.user_id == user_id,
Notification.read_at.is_(None)
)
)
if notification_ids:
query = query.filter(Notification.id.in_(notification_ids))
count = query.update(
{
'read_at': datetime.now(timezone.utc),
'status': NotificationStatus.READ
},
synchronize_session=False
)
db.commit()
logger.info(f"Marked {count} notifications as read for user {user_id}")
return count
def delete_notification(
self,
db: Session,
notification_id: UUID,
user_id: UUID
) -> bool:
"""
Delete a notification
Args:
db: Database session
notification_id: Notification ID
user_id: User ID (for authorization)
Returns:
True if deleted, False if not found
"""
notification = db.query(Notification).filter(
and_(
Notification.id == notification_id,
Notification.user_id == user_id
)
).first()
if notification:
db.delete(notification)
db.commit()
logger.info(f"Deleted notification {notification_id} for user {user_id}")
return True
return False