swiftops-backend / src /app /api /v1 /payroll.py
kamau1's picture
Fix payroll stats 422 by removing duplicate /stats endpoint and correcting route ordering.
bb7143c
"""
Payroll API Endpoints - Weekly payroll generation and management
"""
from fastapi import APIRouter, Depends, HTTPException, status, Query, Request, BackgroundTasks
from sqlalchemy.orm import Session
from typing import Optional, List
from uuid import UUID
from datetime import date
import logging
from app.api.deps import get_db, get_current_active_user
from app.models.user import User
from app.models.user_payroll import UserPayroll
from app.models.enums import AppRole
from app.schemas.payroll import (
PayrollGenerateRequest,
PayrollBatchGenerateRequest,
PayrollUpdateRequest,
PayrollMarkPaidRequest,
PayrollResponse,
PayrollListResponse,
PayrollGenerateResponse,
PayrollBatchGenerateResponse,
PayrollFilterParams
)
from app.services.payroll_service import PayrollService
from app.services.audit_service import AuditService
from app.services.notification_creator import NotificationCreator
from app.services.notification_delivery import NotificationDelivery
from app.core.permissions import require_permission
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/payroll", tags=["Payroll"])
# ============================================
# PAYROLL GENERATION
# ============================================
@router.post("/generate", response_model=PayrollGenerateResponse, status_code=status.HTTP_200_OK)
@require_permission("manage_payroll")
async def generate_payroll(
data: PayrollGenerateRequest,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Generate payroll for a specific user and period
**Authorization:**
- Platform admins
- Project managers
- Dispatchers
**Required Fields:**
- user_id: Worker to generate payroll for
- project_id: Project context
- period_start_date: Pay period start (must be Monday)
- period_end_date: Pay period end (must be Sunday)
**Business Rules:**
- Pay period must be exactly 7 days (Monday-Sunday)
- Cannot regenerate if already paid (unless force flag)
- Aggregates data from timesheets
- Uses compensation rates from project_roles
**Calculation:**
- base_earnings: Calculated from compensation_type:
* FIXED_RATE: days_worked × base_rate (or hours × base_rate for hourly)
* PER_UNIT: tickets_closed × per_unit_rate
* COMMISSION: ticket_value × commission_percentage
* FIXED_PLUS_COMMISSION: (days × base_rate) + (ticket_value × commission%)
- days_worked: Count of PRESENT days from timesheets
- tickets_closed: Count from timesheet ticket metrics
- total_amount: base_earnings + bonus - deductions
**Response:**
- success: True if generated, False if skipped
- payroll: Complete payroll record
- skipped_reason: "already_paid", "already_exists", etc.
"""
try:
result = PayrollService.generate_payroll_for_period(
db=db,
user_id=data.user_id,
project_id=data.project_id,
period_start_date=data.period_start_date,
period_end_date=data.period_end_date,
current_user=current_user,
force_regenerate=False
)
if result.payroll:
# Log audit trail
AuditService.log_action(
db=db,
action="generate_payroll",
entity_type="payroll",
description=f"Generated payroll for user {data.user_id} project {data.project_id} period {data.period_start_date} to {data.period_end_date}",
user=current_user,
entity_id=str(result.payroll.id),
request=request,
additional_metadata={
"user_id": str(data.user_id),
"project_id": str(data.project_id),
"period": f"{data.period_start_date} to {data.period_end_date}",
"success": result.success,
"skipped_reason": result.skipped_reason
}
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error generating payroll: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to generate payroll: {str(e)}"
)
@router.post("/generate-batch", response_model=PayrollBatchGenerateResponse, status_code=status.HTTP_200_OK)
@require_permission("manage_payroll")
async def generate_payroll_batch(
data: PayrollBatchGenerateRequest,
request: Request,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Generate payroll for all active workers in a specific week
**Authorization:**
- Platform admins
- Project managers
**Required Fields:**
- target_date: Any date in the target week (will calculate Monday-Sunday)
**Optional Fields:**
- project_id: Filter for specific project only
**Business Rules:**
- Processes all active project_team members
- Skips already-paid payrolls
- Continues on error (collects errors for review)
- Idempotent (safe to run multiple times)
**Use Cases:**
- Manual weekly payroll run (Monday morning)
- Catch-up for missed weeks
- Project-specific payroll generation
**Response:**
- total_processed: Number of workers processed
- total_generated: Successfully generated
- total_skipped: Skipped (already exists, paid, etc.)
- payrolls: List of generated payrolls
- errors: List of errors (user_id, error message)
"""
try:
result = PayrollService.generate_weekly_payroll_batch(
db=db,
target_date=data.target_date,
project_id=data.project_id,
current_user=current_user
)
# Log audit trail
AuditService.log_action(
db=db,
action="generate_batch_payroll",
entity_type="payroll",
description=f"Generated batch payroll for target date {data.target_date} project {data.project_id if data.project_id else 'all'}",
user=current_user,
entity_id=None,
request=request,
additional_metadata={
"target_date": str(data.target_date),
"project_id": str(data.project_id) if data.project_id else None,
"total_processed": result.total_processed,
"total_generated": result.total_generated,
"total_skipped": result.total_skipped,
"errors_count": len(result.errors)
},
ip_address=request.client.host if request.client else None
)
# Create notifications for payroll generation (Tier 1 - Synchronous)
try:
if result.total_generated > 0 and result.payrolls:
# Calculate totals
total_amount = sum(float(p.total_amount) for p in result.payrolls if p.total_amount)
total_days = sum(p.days_worked or 0 for p in result.payrolls)
total_tickets = sum(p.tickets_closed or 0 for p in result.payrolls)
# Get period info from first payroll
first_payroll = result.payrolls[0]
period_start = first_payroll.period_start_date
period_end = first_payroll.period_end_date
# Create manager notification
manager_notification = NotificationCreator.create(
db=db,
user_id=current_user.id,
title=f"✅ Payroll Generated: {result.total_generated} workers",
message=(
f"Successfully generated payroll for period {period_start} to {period_end}.\n\n"
f"📊 Summary:\n"
f"• Workers: {result.total_generated}\n"
f"• Total Amount: {total_amount:,.2f} KES\n"
f"• Total Days Worked: {total_days}\n"
f"• Total Tickets Closed: {total_tickets}\n"
f"• Skipped: {result.total_skipped}\n"
f"• Errors: {len(result.errors)}"
),
source_type="payroll",
source_id=None,
notification_type="payroll_generated",
channel="in_app",
project_id=data.project_id,
metadata={
"payroll_count": result.total_generated,
"total_amount": float(total_amount),
"total_days_worked": total_days,
"total_tickets_closed": total_tickets,
"period_start": str(period_start),
"period_end": str(period_end),
"skipped_count": result.total_skipped,
"error_count": len(result.errors),
"payroll_ids": [str(p.id) for p in result.payrolls],
"action_url": f"/payroll?period_start_date={period_start}&is_paid=false"
}
)
# Create worker notifications
notifications_created = [manager_notification]
for payroll in result.payrolls:
worker = db.query(User).filter(User.id == payroll.user_id).first()
if worker:
worker_notification = NotificationCreator.create(
db=db,
user_id=worker.id,
title=f"📋 Payroll Generated: {payroll.total_amount:,.2f} KES",
message=(
f"Your payroll for {period_start} to {period_end} has been generated.\n\n"
f"💼 Work Summary:\n"
f"• Days Worked: {payroll.days_worked or 0}\n"
f"• Tickets Closed: {payroll.tickets_closed or 0}\n"
f"• Base Earnings: {payroll.base_earnings:,.2f} KES\n"
f"• Total Amount: {payroll.total_amount:,.2f} KES\n\n"
f"Payment will be processed soon."
),
source_type="payroll",
source_id=payroll.id,
notification_type="payroll_generated",
channel="in_app",
project_id=data.project_id,
metadata={
"payroll_id": str(payroll.id),
"period_start": str(period_start),
"period_end": str(period_end),
"total_amount": float(payroll.total_amount),
"days_worked": payroll.days_worked or 0,
"tickets_closed": payroll.tickets_closed or 0,
"action_url": f"/payroll/{payroll.id}"
}
)
notifications_created.append(worker_notification)
# Commit all notifications
db.commit()
logger.info(
f"Created {len(notifications_created)} payroll generation notifications "
f"({result.total_generated} workers + 1 manager)"
)
# Queue delivery (Tier 2 - Asynchronous)
NotificationDelivery.queue_bulk_delivery(
background_tasks=background_tasks,
notification_ids=[n.id for n in notifications_created]
)
except Exception as e:
# Don't fail the request if notification fails
logger.warning(f"Failed to create payroll generation notifications: {str(e)}", exc_info=True)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error generating batch payroll: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to generate batch payroll: {str(e)}"
)
# ============================================
# PAYROLL CRUD
# ============================================
# NOTE: Specific routes MUST come before parameterized routes
# /stats and /users/{user_id}/payroll must be before /{payroll_id}
@router.get("/stats", status_code=status.HTTP_200_OK)
async def get_payroll_stats(
project_id: Optional[UUID] = Query(None, description="Filter by project"),
period_start_date: Optional[date] = Query(None, description="Filter by period start"),
period_end_date: Optional[date] = Query(None, description="Filter by period end"),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get aggregated payroll statistics for dashboard
**Authorization:**
- Platform admins (view all)
- Project managers (view all)
- Dispatchers (view all)
**Query Parameters:**
- project_id: Filter by specific project (optional)
- period_start_date: Filter by period start (optional)
- period_end_date: Filter by period end (optional)
**Returns:**
- total_payrolls: Total number of payroll records
- total_paid: Number of paid payrolls
- total_unpaid: Number of unpaid payrolls
- total_amount: Sum of all payroll amounts
- total_paid_amount: Sum of paid amounts
- total_unpaid_amount: Sum of unpaid amounts
- total_workers: Number of unique workers
- average_per_worker: Average payment per worker
- total_days_worked: Sum of all days worked
- total_tickets_closed: Sum of all tickets closed
"""
try:
# Authorization
if current_user.role not in [AppRole.PLATFORM_ADMIN, AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to view payroll statistics"
)
# Build query
query = db.query(UserPayroll)
# Apply filters
if project_id:
query = query.filter(UserPayroll.project_id == project_id)
if period_start_date:
query = query.filter(UserPayroll.period_start_date >= period_start_date)
if period_end_date:
query = query.filter(UserPayroll.period_end_date <= period_end_date)
# Get all payrolls
payrolls = query.all()
# Calculate statistics
total_payrolls = len(payrolls)
paid_payrolls = [p for p in payrolls if p.is_paid]
unpaid_payrolls = [p for p in payrolls if not p.is_paid]
total_paid = len(paid_payrolls)
total_unpaid = len(unpaid_payrolls)
total_amount = sum(float(p.total_amount) for p in payrolls)
total_paid_amount = sum(float(p.total_amount) for p in paid_payrolls)
total_unpaid_amount = sum(float(p.total_amount) for p in unpaid_payrolls)
unique_workers = len(set(p.user_id for p in payrolls))
average_per_worker = total_amount / unique_workers if unique_workers > 0 else 0
total_days_worked = sum(p.days_worked or 0 for p in payrolls)
total_tickets_closed = sum(p.tickets_closed or 0 for p in payrolls)
return {
"total_payrolls": total_payrolls,
"total_paid": total_paid,
"total_unpaid": total_unpaid,
"total_amount": round(total_amount, 2),
"total_paid_amount": round(total_paid_amount, 2),
"total_unpaid_amount": round(total_unpaid_amount, 2),
"total_workers": unique_workers,
"average_per_worker": round(average_per_worker, 2),
"total_days_worked": total_days_worked,
"total_tickets_closed": total_tickets_closed,
"period_start_date": period_start_date,
"period_end_date": period_end_date,
"project_id": str(project_id) if project_id else None
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting payroll stats: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get payroll stats: {str(e)}"
)
@router.get("/users/{user_id}/payroll", response_model=PayrollListResponse, status_code=status.HTTP_200_OK)
async def get_user_payroll_history(
user_id: UUID,
project_id: Optional[UUID] = Query(None, description="Filter by project"),
period_start_date: Optional[date] = Query(None, description="Filter by period start"),
period_end_date: Optional[date] = Query(None, description="Filter by period end"),
is_paid: Optional[bool] = Query(None, description="Filter by payment status"),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(20, ge=1, le=100, description="Items per page"),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get payroll history for a specific user (for managers viewing worker's payroll)
**Authorization:**
- Platform admins (view any user)
- Project managers (view any user)
- Dispatchers (view any user)
- Workers (view their own only)
**Use Cases:**
- Manager reviews worker's payment history
- Worker views their own payroll history
- Performance review with payment data
- Payroll verification
**Query Parameters:**
- project_id: Filter by specific project (optional)
- period_start_date: Filter by period start (optional)
- period_end_date: Filter by period end (optional)
- is_paid: Filter by payment status (optional)
- page: Page number (default: 1)
- page_size: Items per page (default: 20, max: 100)
**Response:**
- items: List of payroll records for the user
- total: Total count
- page: Current page
- page_size: Page size
- has_more: Whether more pages exist
"""
try:
# Authorization check
if current_user.id != user_id:
# Only managers can view other users' payroll
if current_user.role not in [AppRole.PLATFORM_ADMIN, AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only view your own payroll history"
)
# Build query
query = db.query(UserPayroll).filter(UserPayroll.user_id == user_id)
# Apply filters
if project_id:
query = query.filter(UserPayroll.project_id == project_id)
if is_paid is not None:
query = query.filter(UserPayroll.is_paid == is_paid)
if period_start_date:
query = query.filter(UserPayroll.period_start_date >= period_start_date)
if period_end_date:
query = query.filter(UserPayroll.period_end_date <= period_end_date)
# Get total count
total = query.count()
# Apply sorting (most recent first)
query = query.order_by(UserPayroll.period_start_date.desc())
# Apply pagination
offset = (page - 1) * page_size
payrolls = query.offset(offset).limit(page_size).all()
# Build responses
items = [PayrollService._build_payroll_response(db, p) for p in payrolls]
return PayrollListResponse(
items=items,
total=total,
page=page,
page_size=page_size,
has_more=offset + len(items) < total
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting user payroll history: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get user payroll history: {str(e)}"
)
@router.get("", response_model=PayrollListResponse, status_code=status.HTTP_200_OK)
async def list_payrolls(
user_id: Optional[UUID] = Query(None, description="Filter by user"),
project_id: Optional[UUID] = Query(None, description="Filter by project"),
is_paid: Optional[bool] = Query(None, description="Filter by payment status"),
period_start_date: Optional[date] = Query(None, description="Filter by period start"),
period_end_date: Optional[date] = Query(None, description="Filter by period end"),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(20, ge=1, le=100, description="Items per page"),
sort_by: str = Query("period_start_date", description="Sort field"),
sort_order: str = Query("desc", description="Sort order (asc/desc)"),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
List payroll records with filtering and pagination
**Authorization:**
- Platform admins (view all)
- Project managers (view all)
- Dispatchers (view all)
- Workers (view their own only - auto-filtered)
**Query Parameters:**
- user_id: Filter by specific worker
- project_id: Filter by specific project
- is_paid: Filter by payment status (true/false)
- period_start_date: Filter by period start (>=)
- period_end_date: Filter by period end (<=)
- page: Page number (default: 1)
- page_size: Items per page (default: 20, max: 100)
- sort_by: Sort field (period_start_date, total_amount, created_at, paid_at, user_id)
- sort_order: Sort order (asc/desc)
**Response:**
- items: List of payroll records
- total: Total count (all pages)
- page: Current page
- page_size: Page size
- has_more: Whether more pages exist
**Use Cases:**
- View all unpaid payrolls (is_paid=false)
- View payrolls for specific week (period_start_date=2024-12-09)
- View worker's payroll history (user_id=<uuid>)
- View project payrolls (project_id=<uuid>)
"""
try:
# Build query
query = db.query(UserPayroll)
# Authorization: Workers can only see their own
if current_user.role not in [AppRole.PLATFORM_ADMIN, AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]:
query = query.filter(UserPayroll.user_id == current_user.id)
elif user_id:
query = query.filter(UserPayroll.user_id == user_id)
# Apply filters
if project_id:
query = query.filter(UserPayroll.project_id == project_id)
if is_paid is not None:
query = query.filter(UserPayroll.is_paid == is_paid)
if period_start_date:
query = query.filter(UserPayroll.period_start_date >= period_start_date)
if period_end_date:
query = query.filter(UserPayroll.period_end_date <= period_end_date)
# Get total count
total = query.count()
# Apply sorting
sort_field = getattr(UserPayroll, sort_by, UserPayroll.period_start_date)
if sort_order.lower() == "desc":
query = query.order_by(sort_field.desc())
else:
query = query.order_by(sort_field.asc())
# Apply pagination
offset = (page - 1) * page_size
payrolls = query.offset(offset).limit(page_size).all()
# Build responses
items = [PayrollService._build_payroll_response(db, p) for p in payrolls]
return PayrollListResponse(
items=items,
total=total,
page=page,
page_size=page_size,
has_more=offset + len(items) < total
)
except Exception as e:
logger.error(f"Error listing payrolls: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list payrolls: {str(e)}"
)
# ============================================
# PAYROLL ACTIONS
# ============================================
@router.post("/{payroll_id}/recalculate", response_model=PayrollGenerateResponse, status_code=status.HTTP_200_OK)
@require_permission("manage_payroll")
async def recalculate_payroll(
payroll_id: UUID,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Recalculate payroll from fresh timesheet and ticket data
**Authorization:**
- Platform admins
- Project managers
**Business Rules:**
- Can only recalculate unpaid payrolls
- Preserves manually added bonuses and deductions
- Increments version number (optimistic locking)
- Updates calculation_notes with timestamp
**Use Cases:**
- Timesheet was corrected after payroll generation
- Ticket completion dates were adjusted
- Manager wants to refresh calculations before payment
**Response:**
- success: True if recalculated
- payroll: Updated payroll record
- message: Success/error message
"""
try:
result = PayrollService.recalculate_payroll(db, payroll_id, current_user)
# Log audit trail
AuditService.log_action(
db=db,
action="recalculate_payroll",
entity_type="payroll",
description=f"Recalculated payroll {payroll_id}",
user=current_user,
entity_id=str(payroll_id),
request=request,
additional_metadata={
"payroll_id": str(payroll_id),
"success": result.success
}
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error recalculating payroll: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to recalculate payroll: {str(e)}"
)
@router.post("/{payroll_id}/mark-paid", response_model=PayrollResponse, status_code=status.HTTP_200_OK)
@require_permission("manage_payroll")
async def mark_payroll_as_paid(
payroll_id: UUID,
data: PayrollMarkPaidRequest,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Mark payroll as paid
**Authorization:**
- Platform admins
- Project managers
**Required Fields:**
- payment_method: mobile_money, bank_transfer, cash, cheque
- payment_reference: M-Pesa transaction ID, bank reference, etc. (optional)
**Business Rules:**
- Can only mark unpaid payrolls as paid
- Once marked paid, cannot be recalculated
- Records who paid and when
- Permanent audit trail
**Use Cases:**
- After disbursing payment to worker
- After bank transfer is confirmed
- After M-Pesa payment succeeds
**Response:**
- Updated payroll record with payment details
- is_paid: true
- paid_at: timestamp
- paid_by_user_id: current user
"""
try:
payroll = PayrollService.mark_as_paid(
db=db,
payroll_id=payroll_id,
payment_method=data.payment_method,
payment_reference=data.payment_reference,
current_user=current_user
)
# Log audit trail
AuditService.log_action(
db=db,
action="mark_payroll_paid",
entity_type="payroll",
description=f"Marked payroll {payroll_id} as paid via {data.payment_method}",
user=current_user,
entity_id=str(payroll_id),
request=request,
additional_metadata={
"payroll_id": str(payroll_id),
"payment_method": data.payment_method,
"payment_reference": data.payment_reference,
"total_amount": str(payroll.total_amount)
}
)
return payroll
except HTTPException:
raise
except Exception as e:
logger.error(f"Error marking payroll as paid: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to mark payroll as paid: {str(e)}"
)
# ============================================
# PAYROLL EXPORT FOR PAYMENT
# ============================================
@router.post("/export", status_code=status.HTTP_200_OK)
@require_permission("manage_payroll")
async def export_payroll_for_payment(
background_tasks: BackgroundTasks,
from_date: date = Query(..., description="Period start date (inclusive)"),
to_date: date = Query(..., description="Period end date (inclusive)"),
project_id: Optional[UUID] = Query(None, description="Filter by project"),
user_id: Optional[UUID] = Query(None, description="Filter by user"),
request: Request = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Export unpaid payroll to Tende Pay CSV format and mark as paid.
**How Payroll Works:**
1. Payroll is generated/updated in REAL-TIME when timesheets are created/updated
2. When timesheet is created, system checks if payroll exists for that user+project+period
3. If exists → UPDATE existing payroll | If not → CREATE new payroll
4. On payment day, manager exports payroll for the period
5. System retrieves MOST RECENT payout details at export time (not stored in payroll)
6. Manager reviews CSV + warnings, then exports
7. System marks exported payroll records as paid
**Authorization:**
- Platform admins
- Project managers
**Query Parameters:**
- from_date: Period start date (filters by period_start_date >= from_date)
- to_date: Period end date (filters by period_end_date <= to_date)
- project_id: Optional - filter by specific project
- user_id: Optional - filter by specific user
**Business Rules:**
- Only exports unpaid payroll (is_paid = false)
- Retrieves payment details from user's primary financial account AT EXPORT TIME
- Missing fields generate WARNINGS but do NOT fail the export
- ALL payroll records are exported (even with missing payment details)
- Marks all exported payroll as paid (irreversible)
- Sets payment_reference to PAYROLL_EXPORT_{timestamp}_{user_id}
**Payment Details (Retrieved at Export Time):**
- Fetched from user_financial_accounts (is_primary = true, is_active = true)
- Supports: mobile_money (M-Pesa), bank_transfer
- Missing/invalid details → WARNING (not failure)
- Manager can manually fix CSV before uploading to Tende Pay
**Narration Format:**
- "Payroll {period}: {days} days worked, {tickets} tickets closed, {amount} KES"
- Example: "Payroll 2024-12-02 to 2024-12-08: 5 days worked, 12 tickets closed, 5000 KES"
**Response:**
- CSV file with Tende Pay bulk payment format
- Warnings included as comments at end of file
- X-Warning-Count header with number of warnings
- X-Exported-Count header with number of payroll records exported
**Warnings (Non-Blocking):**
- User has no financial account → exported with empty payment details
- Invalid payment details → exported with warning
- Missing ID number → exported with empty ID field
- Unsupported payment method → exported with empty payment details
**Example:**
```
POST /api/v1/payroll/export?from_date=2024-12-02&to_date=2024-12-08&project_id=xxx
```
"""
import csv
import io
from datetime import datetime
from fastapi.responses import StreamingResponse
try:
# Export payroll
csv_rows, warnings, exported_payroll_records = PayrollService.export_for_payment(
db=db,
from_date=from_date,
to_date=to_date,
current_user=current_user,
project_id=project_id,
user_id=user_id
)
# Generate Tende Pay CSV
output = io.StringIO()
if csv_rows:
# Tende Pay column order (must match template exactly)
fieldnames = [
"NAME",
"ID NUMBER",
"PHONE NUMBER",
"AMOUNT",
"PAYMENT MODE",
"BANK (Optional)",
"BANK ACCOUNT NO (Optional)",
"PAYBILL BUSINESS NO (Optional)",
"PAYBILL ACCOUNT NO (Optional)",
"BUY GOODS TILL NO (Optional)",
"BILL PAYMENT BILLER CODE (Optional)",
"BILL PAYMENT ACCOUNT NO (Optional)",
"NARRATION (OPTIONAL)"
]
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(csv_rows)
# Add warnings as comments at the end
if warnings:
output.write("\n# WARNINGS:\n")
for warning in warnings:
output.write(f"# {warning}\n")
output.seek(0)
filename = f"tende_pay_payroll_{from_date.isoformat()}_{to_date.isoformat()}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
# Log audit trail
AuditService.log_action(
db=db,
action="export_payroll_for_payment",
entity_type="payroll",
description=f"Exported {len(csv_rows)} payroll records for payment (period: {from_date} to {to_date})",
user=current_user,
entity_id=None,
request=request,
additional_metadata={
"from_date": from_date.isoformat(),
"to_date": to_date.isoformat(),
"project_id": str(project_id) if project_id else None,
"user_id": str(user_id) if user_id else None,
"exported_count": len(csv_rows),
"warning_count": len(warnings)
}
)
logger.info(
f"Exported {len(csv_rows)} payroll records for payment "
f"(period: {from_date} to {to_date}, warnings: {len(warnings)})"
)
# Create notifications (Tier 1 - Synchronous)
notifications_created = []
if exported_payroll_records:
# Calculate totals for manager notification
total_amount = sum(p.total_amount for p in exported_payroll_records)
total_days_worked = sum(p.days_worked or 0 for p in exported_payroll_records)
total_tickets_closed = sum(p.tickets_closed or 0 for p in exported_payroll_records)
# Create manager notification
manager_notification = NotificationCreator.create(
db=db,
user_id=current_user.id,
title=f"Payroll Export Complete: {len(exported_payroll_records)} records",
message=(
f"Successfully exported payroll for period {from_date.isoformat()} to {to_date.isoformat()}.\n\n"
f"📊 Summary:\n"
f"• Workers: {len(exported_payroll_records)}\n"
f"• Total Amount: {total_amount:,.2f} KES\n"
f"• Total Days Worked: {total_days_worked}\n"
f"• Total Tickets Closed: {total_tickets_closed}\n"
f"• Warnings: {len(warnings)}"
),
source_type="payroll",
source_id=None, # Bulk export, no single ID
notification_type="payroll_exported",
channel="in_app",
project_id=project_id,
metadata={
"payroll_count": len(exported_payroll_records),
"total_amount": float(total_amount),
"total_days_worked": total_days_worked,
"total_tickets_closed": total_tickets_closed,
"period_start": from_date.isoformat(),
"period_end": to_date.isoformat(),
"warning_count": len(warnings),
"warnings": warnings[:10] if warnings else [] # First 10 warnings
}
)
notifications_created.append(manager_notification)
# Create worker notifications
for payroll in exported_payroll_records:
worker = db.query(User).filter(User.id == payroll.user_id).first()
if worker:
# Build work breakdown message
work_details = []
if payroll.days_worked and payroll.days_worked > 0:
work_details.append(f"{payroll.days_worked} days worked")
if payroll.tickets_closed and payroll.tickets_closed > 0:
work_details.append(f"{payroll.tickets_closed} tickets closed")
work_summary = ", ".join(work_details) if work_details else "work completed"
# Build earnings breakdown
earnings_parts = []
if payroll.base_earnings and payroll.base_earnings > 0:
earnings_parts.append(f"Base: {payroll.base_earnings:,.2f} KES")
if payroll.bonus_amount and payroll.bonus_amount > 0:
earnings_parts.append(f"Bonus: {payroll.bonus_amount:,.2f} KES")
if payroll.deductions and payroll.deductions > 0:
earnings_parts.append(f"Deductions: -{payroll.deductions:,.2f} KES")
earnings_breakdown = "\n".join([f"• {part}" for part in earnings_parts])
worker_notification = NotificationCreator.create(
db=db,
user_id=worker.id,
title=f"💰 Payment Processed: {payroll.total_amount:,.2f} KES",
message=(
f"Your payment for {from_date.isoformat()} to {to_date.isoformat()} has been processed.\n\n"
f"📋 Work Summary:\n"
f"• {work_summary}\n\n"
f"💵 Earnings Breakdown:\n"
f"{earnings_breakdown}\n\n"
f"Total Payment: {payroll.total_amount:,.2f} KES"
),
source_type="payroll",
source_id=payroll.id,
notification_type="payment",
channel="whatsapp", # Workers get WhatsApp notifications
project_id=project_id,
metadata={
"payroll_id": str(payroll.id),
"period_start": from_date.isoformat(),
"period_end": to_date.isoformat(),
"total_amount": float(payroll.total_amount),
"base_earnings": float(payroll.base_earnings) if payroll.base_earnings else 0,
"bonus_amount": float(payroll.bonus_amount) if payroll.bonus_amount else 0,
"deductions": float(payroll.deductions) if payroll.deductions else 0,
"days_worked": payroll.days_worked or 0,
"tickets_closed": payroll.tickets_closed or 0
}
)
notifications_created.append(worker_notification)
# Commit all notifications
db.commit()
logger.info(
f"Created {len(notifications_created)} notifications "
f"({len(exported_payroll_records)} workers + 1 manager)"
)
# Queue delivery for external channels (Tier 2 - Asynchronous)
NotificationDelivery.queue_bulk_delivery(
background_tasks=background_tasks,
notification_ids=[n.id for n in notifications_created]
)
# Return CSV file
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={filename}",
"X-Warning-Count": str(len(warnings)),
"X-Exported-Count": str(len(csv_rows))
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error exporting payroll: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to export payroll: {str(e)}"
)