Spaces:
Sleeping
Sleeping
| """ | |
| Payroll API Endpoints - Weekly payroll generation and management | |
| """ | |
| from fastapi import APIRouter, Depends, HTTPException, status, Query, Request, BackgroundTasks | |
| from sqlalchemy.orm import Session | |
| from typing import Optional, List | |
| from uuid import UUID | |
| from datetime import date | |
| import logging | |
| from app.api.deps import get_db, get_current_active_user | |
| from app.models.user import User | |
| from app.models.user_payroll import UserPayroll | |
| from app.models.enums import AppRole | |
| from app.schemas.payroll import ( | |
| PayrollGenerateRequest, | |
| PayrollBatchGenerateRequest, | |
| PayrollUpdateRequest, | |
| PayrollMarkPaidRequest, | |
| PayrollResponse, | |
| PayrollListResponse, | |
| PayrollGenerateResponse, | |
| PayrollBatchGenerateResponse, | |
| PayrollFilterParams | |
| ) | |
| from app.services.payroll_service import PayrollService | |
| from app.services.audit_service import AuditService | |
| from app.services.notification_creator import NotificationCreator | |
| from app.services.notification_delivery import NotificationDelivery | |
| from app.core.permissions import require_permission | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/payroll", tags=["Payroll"]) | |
| # ============================================ | |
| # PAYROLL GENERATION | |
| # ============================================ | |
| async def generate_payroll( | |
| data: PayrollGenerateRequest, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Generate payroll for a specific user and period | |
| **Authorization:** | |
| - Platform admins | |
| - Project managers | |
| - Dispatchers | |
| **Required Fields:** | |
| - user_id: Worker to generate payroll for | |
| - project_id: Project context | |
| - period_start_date: Pay period start (must be Monday) | |
| - period_end_date: Pay period end (must be Sunday) | |
| **Business Rules:** | |
| - Pay period must be exactly 7 days (Monday-Sunday) | |
| - Cannot regenerate if already paid (unless force flag) | |
| - Aggregates data from timesheets | |
| - Uses compensation rates from project_roles | |
| **Calculation:** | |
| - base_earnings: Calculated from compensation_type: | |
| * FIXED_RATE: days_worked × base_rate (or hours × base_rate for hourly) | |
| * PER_UNIT: tickets_closed × per_unit_rate | |
| * COMMISSION: ticket_value × commission_percentage | |
| * FIXED_PLUS_COMMISSION: (days × base_rate) + (ticket_value × commission%) | |
| - days_worked: Count of PRESENT days from timesheets | |
| - tickets_closed: Count from timesheet ticket metrics | |
| - total_amount: base_earnings + bonus - deductions | |
| **Response:** | |
| - success: True if generated, False if skipped | |
| - payroll: Complete payroll record | |
| - skipped_reason: "already_paid", "already_exists", etc. | |
| """ | |
| try: | |
| result = PayrollService.generate_payroll_for_period( | |
| db=db, | |
| user_id=data.user_id, | |
| project_id=data.project_id, | |
| period_start_date=data.period_start_date, | |
| period_end_date=data.period_end_date, | |
| current_user=current_user, | |
| force_regenerate=False | |
| ) | |
| if result.payroll: | |
| # Log audit trail | |
| AuditService.log_action( | |
| db=db, | |
| action="generate_payroll", | |
| entity_type="payroll", | |
| description=f"Generated payroll for user {data.user_id} project {data.project_id} period {data.period_start_date} to {data.period_end_date}", | |
| user=current_user, | |
| entity_id=str(result.payroll.id), | |
| request=request, | |
| additional_metadata={ | |
| "user_id": str(data.user_id), | |
| "project_id": str(data.project_id), | |
| "period": f"{data.period_start_date} to {data.period_end_date}", | |
| "success": result.success, | |
| "skipped_reason": result.skipped_reason | |
| } | |
| ) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error generating payroll: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to generate payroll: {str(e)}" | |
| ) | |
| async def generate_payroll_batch( | |
| data: PayrollBatchGenerateRequest, | |
| request: Request, | |
| background_tasks: BackgroundTasks, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Generate payroll for all active workers in a specific week | |
| **Authorization:** | |
| - Platform admins | |
| - Project managers | |
| **Required Fields:** | |
| - target_date: Any date in the target week (will calculate Monday-Sunday) | |
| **Optional Fields:** | |
| - project_id: Filter for specific project only | |
| **Business Rules:** | |
| - Processes all active project_team members | |
| - Skips already-paid payrolls | |
| - Continues on error (collects errors for review) | |
| - Idempotent (safe to run multiple times) | |
| **Use Cases:** | |
| - Manual weekly payroll run (Monday morning) | |
| - Catch-up for missed weeks | |
| - Project-specific payroll generation | |
| **Response:** | |
| - total_processed: Number of workers processed | |
| - total_generated: Successfully generated | |
| - total_skipped: Skipped (already exists, paid, etc.) | |
| - payrolls: List of generated payrolls | |
| - errors: List of errors (user_id, error message) | |
| """ | |
| try: | |
| result = PayrollService.generate_weekly_payroll_batch( | |
| db=db, | |
| target_date=data.target_date, | |
| project_id=data.project_id, | |
| current_user=current_user | |
| ) | |
| # Log audit trail | |
| AuditService.log_action( | |
| db=db, | |
| action="generate_batch_payroll", | |
| entity_type="payroll", | |
| description=f"Generated batch payroll for target date {data.target_date} project {data.project_id if data.project_id else 'all'}", | |
| user=current_user, | |
| entity_id=None, | |
| request=request, | |
| additional_metadata={ | |
| "target_date": str(data.target_date), | |
| "project_id": str(data.project_id) if data.project_id else None, | |
| "total_processed": result.total_processed, | |
| "total_generated": result.total_generated, | |
| "total_skipped": result.total_skipped, | |
| "errors_count": len(result.errors) | |
| }, | |
| ip_address=request.client.host if request.client else None | |
| ) | |
| # Create notifications for payroll generation (Tier 1 - Synchronous) | |
| try: | |
| if result.total_generated > 0 and result.payrolls: | |
| # Calculate totals | |
| total_amount = sum(float(p.total_amount) for p in result.payrolls if p.total_amount) | |
| total_days = sum(p.days_worked or 0 for p in result.payrolls) | |
| total_tickets = sum(p.tickets_closed or 0 for p in result.payrolls) | |
| # Get period info from first payroll | |
| first_payroll = result.payrolls[0] | |
| period_start = first_payroll.period_start_date | |
| period_end = first_payroll.period_end_date | |
| # Create manager notification | |
| manager_notification = NotificationCreator.create( | |
| db=db, | |
| user_id=current_user.id, | |
| title=f"✅ Payroll Generated: {result.total_generated} workers", | |
| message=( | |
| f"Successfully generated payroll for period {period_start} to {period_end}.\n\n" | |
| f"📊 Summary:\n" | |
| f"• Workers: {result.total_generated}\n" | |
| f"• Total Amount: {total_amount:,.2f} KES\n" | |
| f"• Total Days Worked: {total_days}\n" | |
| f"• Total Tickets Closed: {total_tickets}\n" | |
| f"• Skipped: {result.total_skipped}\n" | |
| f"• Errors: {len(result.errors)}" | |
| ), | |
| source_type="payroll", | |
| source_id=None, | |
| notification_type="payroll_generated", | |
| channel="in_app", | |
| project_id=data.project_id, | |
| metadata={ | |
| "payroll_count": result.total_generated, | |
| "total_amount": float(total_amount), | |
| "total_days_worked": total_days, | |
| "total_tickets_closed": total_tickets, | |
| "period_start": str(period_start), | |
| "period_end": str(period_end), | |
| "skipped_count": result.total_skipped, | |
| "error_count": len(result.errors), | |
| "payroll_ids": [str(p.id) for p in result.payrolls], | |
| "action_url": f"/payroll?period_start_date={period_start}&is_paid=false" | |
| } | |
| ) | |
| # Create worker notifications | |
| notifications_created = [manager_notification] | |
| for payroll in result.payrolls: | |
| worker = db.query(User).filter(User.id == payroll.user_id).first() | |
| if worker: | |
| worker_notification = NotificationCreator.create( | |
| db=db, | |
| user_id=worker.id, | |
| title=f"📋 Payroll Generated: {payroll.total_amount:,.2f} KES", | |
| message=( | |
| f"Your payroll for {period_start} to {period_end} has been generated.\n\n" | |
| f"💼 Work Summary:\n" | |
| f"• Days Worked: {payroll.days_worked or 0}\n" | |
| f"• Tickets Closed: {payroll.tickets_closed or 0}\n" | |
| f"• Base Earnings: {payroll.base_earnings:,.2f} KES\n" | |
| f"• Total Amount: {payroll.total_amount:,.2f} KES\n\n" | |
| f"Payment will be processed soon." | |
| ), | |
| source_type="payroll", | |
| source_id=payroll.id, | |
| notification_type="payroll_generated", | |
| channel="in_app", | |
| project_id=data.project_id, | |
| metadata={ | |
| "payroll_id": str(payroll.id), | |
| "period_start": str(period_start), | |
| "period_end": str(period_end), | |
| "total_amount": float(payroll.total_amount), | |
| "days_worked": payroll.days_worked or 0, | |
| "tickets_closed": payroll.tickets_closed or 0, | |
| "action_url": f"/payroll/{payroll.id}" | |
| } | |
| ) | |
| notifications_created.append(worker_notification) | |
| # Commit all notifications | |
| db.commit() | |
| logger.info( | |
| f"Created {len(notifications_created)} payroll generation notifications " | |
| f"({result.total_generated} workers + 1 manager)" | |
| ) | |
| # Queue delivery (Tier 2 - Asynchronous) | |
| NotificationDelivery.queue_bulk_delivery( | |
| background_tasks=background_tasks, | |
| notification_ids=[n.id for n in notifications_created] | |
| ) | |
| except Exception as e: | |
| # Don't fail the request if notification fails | |
| logger.warning(f"Failed to create payroll generation notifications: {str(e)}", exc_info=True) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error generating batch payroll: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to generate batch payroll: {str(e)}" | |
| ) | |
| # ============================================ | |
| # PAYROLL CRUD | |
| # ============================================ | |
| # NOTE: Specific routes MUST come before parameterized routes | |
| # /stats and /users/{user_id}/payroll must be before /{payroll_id} | |
| async def get_payroll_stats( | |
| project_id: Optional[UUID] = Query(None, description="Filter by project"), | |
| period_start_date: Optional[date] = Query(None, description="Filter by period start"), | |
| period_end_date: Optional[date] = Query(None, description="Filter by period end"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get aggregated payroll statistics for dashboard | |
| **Authorization:** | |
| - Platform admins (view all) | |
| - Project managers (view all) | |
| - Dispatchers (view all) | |
| **Query Parameters:** | |
| - project_id: Filter by specific project (optional) | |
| - period_start_date: Filter by period start (optional) | |
| - period_end_date: Filter by period end (optional) | |
| **Returns:** | |
| - total_payrolls: Total number of payroll records | |
| - total_paid: Number of paid payrolls | |
| - total_unpaid: Number of unpaid payrolls | |
| - total_amount: Sum of all payroll amounts | |
| - total_paid_amount: Sum of paid amounts | |
| - total_unpaid_amount: Sum of unpaid amounts | |
| - total_workers: Number of unique workers | |
| - average_per_worker: Average payment per worker | |
| - total_days_worked: Sum of all days worked | |
| - total_tickets_closed: Sum of all tickets closed | |
| """ | |
| try: | |
| # Authorization | |
| if current_user.role not in [AppRole.PLATFORM_ADMIN, AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Not authorized to view payroll statistics" | |
| ) | |
| # Build query | |
| query = db.query(UserPayroll) | |
| # Apply filters | |
| if project_id: | |
| query = query.filter(UserPayroll.project_id == project_id) | |
| if period_start_date: | |
| query = query.filter(UserPayroll.period_start_date >= period_start_date) | |
| if period_end_date: | |
| query = query.filter(UserPayroll.period_end_date <= period_end_date) | |
| # Get all payrolls | |
| payrolls = query.all() | |
| # Calculate statistics | |
| total_payrolls = len(payrolls) | |
| paid_payrolls = [p for p in payrolls if p.is_paid] | |
| unpaid_payrolls = [p for p in payrolls if not p.is_paid] | |
| total_paid = len(paid_payrolls) | |
| total_unpaid = len(unpaid_payrolls) | |
| total_amount = sum(float(p.total_amount) for p in payrolls) | |
| total_paid_amount = sum(float(p.total_amount) for p in paid_payrolls) | |
| total_unpaid_amount = sum(float(p.total_amount) for p in unpaid_payrolls) | |
| unique_workers = len(set(p.user_id for p in payrolls)) | |
| average_per_worker = total_amount / unique_workers if unique_workers > 0 else 0 | |
| total_days_worked = sum(p.days_worked or 0 for p in payrolls) | |
| total_tickets_closed = sum(p.tickets_closed or 0 for p in payrolls) | |
| return { | |
| "total_payrolls": total_payrolls, | |
| "total_paid": total_paid, | |
| "total_unpaid": total_unpaid, | |
| "total_amount": round(total_amount, 2), | |
| "total_paid_amount": round(total_paid_amount, 2), | |
| "total_unpaid_amount": round(total_unpaid_amount, 2), | |
| "total_workers": unique_workers, | |
| "average_per_worker": round(average_per_worker, 2), | |
| "total_days_worked": total_days_worked, | |
| "total_tickets_closed": total_tickets_closed, | |
| "period_start_date": period_start_date, | |
| "period_end_date": period_end_date, | |
| "project_id": str(project_id) if project_id else None | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting payroll stats: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to get payroll stats: {str(e)}" | |
| ) | |
| async def get_user_payroll_history( | |
| user_id: UUID, | |
| project_id: Optional[UUID] = Query(None, description="Filter by project"), | |
| period_start_date: Optional[date] = Query(None, description="Filter by period start"), | |
| period_end_date: Optional[date] = Query(None, description="Filter by period end"), | |
| is_paid: Optional[bool] = Query(None, description="Filter by payment status"), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| page_size: int = Query(20, ge=1, le=100, description="Items per page"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get payroll history for a specific user (for managers viewing worker's payroll) | |
| **Authorization:** | |
| - Platform admins (view any user) | |
| - Project managers (view any user) | |
| - Dispatchers (view any user) | |
| - Workers (view their own only) | |
| **Use Cases:** | |
| - Manager reviews worker's payment history | |
| - Worker views their own payroll history | |
| - Performance review with payment data | |
| - Payroll verification | |
| **Query Parameters:** | |
| - project_id: Filter by specific project (optional) | |
| - period_start_date: Filter by period start (optional) | |
| - period_end_date: Filter by period end (optional) | |
| - is_paid: Filter by payment status (optional) | |
| - page: Page number (default: 1) | |
| - page_size: Items per page (default: 20, max: 100) | |
| **Response:** | |
| - items: List of payroll records for the user | |
| - total: Total count | |
| - page: Current page | |
| - page_size: Page size | |
| - has_more: Whether more pages exist | |
| """ | |
| try: | |
| # Authorization check | |
| if current_user.id != user_id: | |
| # Only managers can view other users' payroll | |
| if current_user.role not in [AppRole.PLATFORM_ADMIN, AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only view your own payroll history" | |
| ) | |
| # Build query | |
| query = db.query(UserPayroll).filter(UserPayroll.user_id == user_id) | |
| # Apply filters | |
| if project_id: | |
| query = query.filter(UserPayroll.project_id == project_id) | |
| if is_paid is not None: | |
| query = query.filter(UserPayroll.is_paid == is_paid) | |
| if period_start_date: | |
| query = query.filter(UserPayroll.period_start_date >= period_start_date) | |
| if period_end_date: | |
| query = query.filter(UserPayroll.period_end_date <= period_end_date) | |
| # Get total count | |
| total = query.count() | |
| # Apply sorting (most recent first) | |
| query = query.order_by(UserPayroll.period_start_date.desc()) | |
| # Apply pagination | |
| offset = (page - 1) * page_size | |
| payrolls = query.offset(offset).limit(page_size).all() | |
| # Build responses | |
| items = [PayrollService._build_payroll_response(db, p) for p in payrolls] | |
| return PayrollListResponse( | |
| items=items, | |
| total=total, | |
| page=page, | |
| page_size=page_size, | |
| has_more=offset + len(items) < total | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting user payroll history: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to get user payroll history: {str(e)}" | |
| ) | |
| async def list_payrolls( | |
| user_id: Optional[UUID] = Query(None, description="Filter by user"), | |
| project_id: Optional[UUID] = Query(None, description="Filter by project"), | |
| is_paid: Optional[bool] = Query(None, description="Filter by payment status"), | |
| period_start_date: Optional[date] = Query(None, description="Filter by period start"), | |
| period_end_date: Optional[date] = Query(None, description="Filter by period end"), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| page_size: int = Query(20, ge=1, le=100, description="Items per page"), | |
| sort_by: str = Query("period_start_date", description="Sort field"), | |
| sort_order: str = Query("desc", description="Sort order (asc/desc)"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| List payroll records with filtering and pagination | |
| **Authorization:** | |
| - Platform admins (view all) | |
| - Project managers (view all) | |
| - Dispatchers (view all) | |
| - Workers (view their own only - auto-filtered) | |
| **Query Parameters:** | |
| - user_id: Filter by specific worker | |
| - project_id: Filter by specific project | |
| - is_paid: Filter by payment status (true/false) | |
| - period_start_date: Filter by period start (>=) | |
| - period_end_date: Filter by period end (<=) | |
| - page: Page number (default: 1) | |
| - page_size: Items per page (default: 20, max: 100) | |
| - sort_by: Sort field (period_start_date, total_amount, created_at, paid_at, user_id) | |
| - sort_order: Sort order (asc/desc) | |
| **Response:** | |
| - items: List of payroll records | |
| - total: Total count (all pages) | |
| - page: Current page | |
| - page_size: Page size | |
| - has_more: Whether more pages exist | |
| **Use Cases:** | |
| - View all unpaid payrolls (is_paid=false) | |
| - View payrolls for specific week (period_start_date=2024-12-09) | |
| - View worker's payroll history (user_id=<uuid>) | |
| - View project payrolls (project_id=<uuid>) | |
| """ | |
| try: | |
| # Build query | |
| query = db.query(UserPayroll) | |
| # Authorization: Workers can only see their own | |
| if current_user.role not in [AppRole.PLATFORM_ADMIN, AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]: | |
| query = query.filter(UserPayroll.user_id == current_user.id) | |
| elif user_id: | |
| query = query.filter(UserPayroll.user_id == user_id) | |
| # Apply filters | |
| if project_id: | |
| query = query.filter(UserPayroll.project_id == project_id) | |
| if is_paid is not None: | |
| query = query.filter(UserPayroll.is_paid == is_paid) | |
| if period_start_date: | |
| query = query.filter(UserPayroll.period_start_date >= period_start_date) | |
| if period_end_date: | |
| query = query.filter(UserPayroll.period_end_date <= period_end_date) | |
| # Get total count | |
| total = query.count() | |
| # Apply sorting | |
| sort_field = getattr(UserPayroll, sort_by, UserPayroll.period_start_date) | |
| if sort_order.lower() == "desc": | |
| query = query.order_by(sort_field.desc()) | |
| else: | |
| query = query.order_by(sort_field.asc()) | |
| # Apply pagination | |
| offset = (page - 1) * page_size | |
| payrolls = query.offset(offset).limit(page_size).all() | |
| # Build responses | |
| items = [PayrollService._build_payroll_response(db, p) for p in payrolls] | |
| return PayrollListResponse( | |
| items=items, | |
| total=total, | |
| page=page, | |
| page_size=page_size, | |
| has_more=offset + len(items) < total | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error listing payrolls: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to list payrolls: {str(e)}" | |
| ) | |
| # ============================================ | |
| # PAYROLL ACTIONS | |
| # ============================================ | |
| async def recalculate_payroll( | |
| payroll_id: UUID, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Recalculate payroll from fresh timesheet and ticket data | |
| **Authorization:** | |
| - Platform admins | |
| - Project managers | |
| **Business Rules:** | |
| - Can only recalculate unpaid payrolls | |
| - Preserves manually added bonuses and deductions | |
| - Increments version number (optimistic locking) | |
| - Updates calculation_notes with timestamp | |
| **Use Cases:** | |
| - Timesheet was corrected after payroll generation | |
| - Ticket completion dates were adjusted | |
| - Manager wants to refresh calculations before payment | |
| **Response:** | |
| - success: True if recalculated | |
| - payroll: Updated payroll record | |
| - message: Success/error message | |
| """ | |
| try: | |
| result = PayrollService.recalculate_payroll(db, payroll_id, current_user) | |
| # Log audit trail | |
| AuditService.log_action( | |
| db=db, | |
| action="recalculate_payroll", | |
| entity_type="payroll", | |
| description=f"Recalculated payroll {payroll_id}", | |
| user=current_user, | |
| entity_id=str(payroll_id), | |
| request=request, | |
| additional_metadata={ | |
| "payroll_id": str(payroll_id), | |
| "success": result.success | |
| } | |
| ) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error recalculating payroll: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to recalculate payroll: {str(e)}" | |
| ) | |
| async def mark_payroll_as_paid( | |
| payroll_id: UUID, | |
| data: PayrollMarkPaidRequest, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Mark payroll as paid | |
| **Authorization:** | |
| - Platform admins | |
| - Project managers | |
| **Required Fields:** | |
| - payment_method: mobile_money, bank_transfer, cash, cheque | |
| - payment_reference: M-Pesa transaction ID, bank reference, etc. (optional) | |
| **Business Rules:** | |
| - Can only mark unpaid payrolls as paid | |
| - Once marked paid, cannot be recalculated | |
| - Records who paid and when | |
| - Permanent audit trail | |
| **Use Cases:** | |
| - After disbursing payment to worker | |
| - After bank transfer is confirmed | |
| - After M-Pesa payment succeeds | |
| **Response:** | |
| - Updated payroll record with payment details | |
| - is_paid: true | |
| - paid_at: timestamp | |
| - paid_by_user_id: current user | |
| """ | |
| try: | |
| payroll = PayrollService.mark_as_paid( | |
| db=db, | |
| payroll_id=payroll_id, | |
| payment_method=data.payment_method, | |
| payment_reference=data.payment_reference, | |
| current_user=current_user | |
| ) | |
| # Log audit trail | |
| AuditService.log_action( | |
| db=db, | |
| action="mark_payroll_paid", | |
| entity_type="payroll", | |
| description=f"Marked payroll {payroll_id} as paid via {data.payment_method}", | |
| user=current_user, | |
| entity_id=str(payroll_id), | |
| request=request, | |
| additional_metadata={ | |
| "payroll_id": str(payroll_id), | |
| "payment_method": data.payment_method, | |
| "payment_reference": data.payment_reference, | |
| "total_amount": str(payroll.total_amount) | |
| } | |
| ) | |
| return payroll | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error marking payroll as paid: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to mark payroll as paid: {str(e)}" | |
| ) | |
| # ============================================ | |
| # PAYROLL EXPORT FOR PAYMENT | |
| # ============================================ | |
| async def export_payroll_for_payment( | |
| background_tasks: BackgroundTasks, | |
| from_date: date = Query(..., description="Period start date (inclusive)"), | |
| to_date: date = Query(..., description="Period end date (inclusive)"), | |
| project_id: Optional[UUID] = Query(None, description="Filter by project"), | |
| user_id: Optional[UUID] = Query(None, description="Filter by user"), | |
| request: Request = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Export unpaid payroll to Tende Pay CSV format and mark as paid. | |
| **How Payroll Works:** | |
| 1. Payroll is generated/updated in REAL-TIME when timesheets are created/updated | |
| 2. When timesheet is created, system checks if payroll exists for that user+project+period | |
| 3. If exists → UPDATE existing payroll | If not → CREATE new payroll | |
| 4. On payment day, manager exports payroll for the period | |
| 5. System retrieves MOST RECENT payout details at export time (not stored in payroll) | |
| 6. Manager reviews CSV + warnings, then exports | |
| 7. System marks exported payroll records as paid | |
| **Authorization:** | |
| - Platform admins | |
| - Project managers | |
| **Query Parameters:** | |
| - from_date: Period start date (filters by period_start_date >= from_date) | |
| - to_date: Period end date (filters by period_end_date <= to_date) | |
| - project_id: Optional - filter by specific project | |
| - user_id: Optional - filter by specific user | |
| **Business Rules:** | |
| - Only exports unpaid payroll (is_paid = false) | |
| - Retrieves payment details from user's primary financial account AT EXPORT TIME | |
| - Missing fields generate WARNINGS but do NOT fail the export | |
| - ALL payroll records are exported (even with missing payment details) | |
| - Marks all exported payroll as paid (irreversible) | |
| - Sets payment_reference to PAYROLL_EXPORT_{timestamp}_{user_id} | |
| **Payment Details (Retrieved at Export Time):** | |
| - Fetched from user_financial_accounts (is_primary = true, is_active = true) | |
| - Supports: mobile_money (M-Pesa), bank_transfer | |
| - Missing/invalid details → WARNING (not failure) | |
| - Manager can manually fix CSV before uploading to Tende Pay | |
| **Narration Format:** | |
| - "Payroll {period}: {days} days worked, {tickets} tickets closed, {amount} KES" | |
| - Example: "Payroll 2024-12-02 to 2024-12-08: 5 days worked, 12 tickets closed, 5000 KES" | |
| **Response:** | |
| - CSV file with Tende Pay bulk payment format | |
| - Warnings included as comments at end of file | |
| - X-Warning-Count header with number of warnings | |
| - X-Exported-Count header with number of payroll records exported | |
| **Warnings (Non-Blocking):** | |
| - User has no financial account → exported with empty payment details | |
| - Invalid payment details → exported with warning | |
| - Missing ID number → exported with empty ID field | |
| - Unsupported payment method → exported with empty payment details | |
| **Example:** | |
| ``` | |
| POST /api/v1/payroll/export?from_date=2024-12-02&to_date=2024-12-08&project_id=xxx | |
| ``` | |
| """ | |
| import csv | |
| import io | |
| from datetime import datetime | |
| from fastapi.responses import StreamingResponse | |
| try: | |
| # Export payroll | |
| csv_rows, warnings, exported_payroll_records = PayrollService.export_for_payment( | |
| db=db, | |
| from_date=from_date, | |
| to_date=to_date, | |
| current_user=current_user, | |
| project_id=project_id, | |
| user_id=user_id | |
| ) | |
| # Generate Tende Pay CSV | |
| output = io.StringIO() | |
| if csv_rows: | |
| # Tende Pay column order (must match template exactly) | |
| fieldnames = [ | |
| "NAME", | |
| "ID NUMBER", | |
| "PHONE NUMBER", | |
| "AMOUNT", | |
| "PAYMENT MODE", | |
| "BANK (Optional)", | |
| "BANK ACCOUNT NO (Optional)", | |
| "PAYBILL BUSINESS NO (Optional)", | |
| "PAYBILL ACCOUNT NO (Optional)", | |
| "BUY GOODS TILL NO (Optional)", | |
| "BILL PAYMENT BILLER CODE (Optional)", | |
| "BILL PAYMENT ACCOUNT NO (Optional)", | |
| "NARRATION (OPTIONAL)" | |
| ] | |
| writer = csv.DictWriter(output, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(csv_rows) | |
| # Add warnings as comments at the end | |
| if warnings: | |
| output.write("\n# WARNINGS:\n") | |
| for warning in warnings: | |
| output.write(f"# {warning}\n") | |
| output.seek(0) | |
| filename = f"tende_pay_payroll_{from_date.isoformat()}_{to_date.isoformat()}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv" | |
| # Log audit trail | |
| AuditService.log_action( | |
| db=db, | |
| action="export_payroll_for_payment", | |
| entity_type="payroll", | |
| description=f"Exported {len(csv_rows)} payroll records for payment (period: {from_date} to {to_date})", | |
| user=current_user, | |
| entity_id=None, | |
| request=request, | |
| additional_metadata={ | |
| "from_date": from_date.isoformat(), | |
| "to_date": to_date.isoformat(), | |
| "project_id": str(project_id) if project_id else None, | |
| "user_id": str(user_id) if user_id else None, | |
| "exported_count": len(csv_rows), | |
| "warning_count": len(warnings) | |
| } | |
| ) | |
| logger.info( | |
| f"Exported {len(csv_rows)} payroll records for payment " | |
| f"(period: {from_date} to {to_date}, warnings: {len(warnings)})" | |
| ) | |
| # Create notifications (Tier 1 - Synchronous) | |
| notifications_created = [] | |
| if exported_payroll_records: | |
| # Calculate totals for manager notification | |
| total_amount = sum(p.total_amount for p in exported_payroll_records) | |
| total_days_worked = sum(p.days_worked or 0 for p in exported_payroll_records) | |
| total_tickets_closed = sum(p.tickets_closed or 0 for p in exported_payroll_records) | |
| # Create manager notification | |
| manager_notification = NotificationCreator.create( | |
| db=db, | |
| user_id=current_user.id, | |
| title=f"Payroll Export Complete: {len(exported_payroll_records)} records", | |
| message=( | |
| f"Successfully exported payroll for period {from_date.isoformat()} to {to_date.isoformat()}.\n\n" | |
| f"📊 Summary:\n" | |
| f"• Workers: {len(exported_payroll_records)}\n" | |
| f"• Total Amount: {total_amount:,.2f} KES\n" | |
| f"• Total Days Worked: {total_days_worked}\n" | |
| f"• Total Tickets Closed: {total_tickets_closed}\n" | |
| f"• Warnings: {len(warnings)}" | |
| ), | |
| source_type="payroll", | |
| source_id=None, # Bulk export, no single ID | |
| notification_type="payroll_exported", | |
| channel="in_app", | |
| project_id=project_id, | |
| metadata={ | |
| "payroll_count": len(exported_payroll_records), | |
| "total_amount": float(total_amount), | |
| "total_days_worked": total_days_worked, | |
| "total_tickets_closed": total_tickets_closed, | |
| "period_start": from_date.isoformat(), | |
| "period_end": to_date.isoformat(), | |
| "warning_count": len(warnings), | |
| "warnings": warnings[:10] if warnings else [] # First 10 warnings | |
| } | |
| ) | |
| notifications_created.append(manager_notification) | |
| # Create worker notifications | |
| for payroll in exported_payroll_records: | |
| worker = db.query(User).filter(User.id == payroll.user_id).first() | |
| if worker: | |
| # Build work breakdown message | |
| work_details = [] | |
| if payroll.days_worked and payroll.days_worked > 0: | |
| work_details.append(f"{payroll.days_worked} days worked") | |
| if payroll.tickets_closed and payroll.tickets_closed > 0: | |
| work_details.append(f"{payroll.tickets_closed} tickets closed") | |
| work_summary = ", ".join(work_details) if work_details else "work completed" | |
| # Build earnings breakdown | |
| earnings_parts = [] | |
| if payroll.base_earnings and payroll.base_earnings > 0: | |
| earnings_parts.append(f"Base: {payroll.base_earnings:,.2f} KES") | |
| if payroll.bonus_amount and payroll.bonus_amount > 0: | |
| earnings_parts.append(f"Bonus: {payroll.bonus_amount:,.2f} KES") | |
| if payroll.deductions and payroll.deductions > 0: | |
| earnings_parts.append(f"Deductions: -{payroll.deductions:,.2f} KES") | |
| earnings_breakdown = "\n".join([f"• {part}" for part in earnings_parts]) | |
| worker_notification = NotificationCreator.create( | |
| db=db, | |
| user_id=worker.id, | |
| title=f"💰 Payment Processed: {payroll.total_amount:,.2f} KES", | |
| message=( | |
| f"Your payment for {from_date.isoformat()} to {to_date.isoformat()} has been processed.\n\n" | |
| f"📋 Work Summary:\n" | |
| f"• {work_summary}\n\n" | |
| f"💵 Earnings Breakdown:\n" | |
| f"{earnings_breakdown}\n\n" | |
| f"Total Payment: {payroll.total_amount:,.2f} KES" | |
| ), | |
| source_type="payroll", | |
| source_id=payroll.id, | |
| notification_type="payment", | |
| channel="whatsapp", # Workers get WhatsApp notifications | |
| project_id=project_id, | |
| metadata={ | |
| "payroll_id": str(payroll.id), | |
| "period_start": from_date.isoformat(), | |
| "period_end": to_date.isoformat(), | |
| "total_amount": float(payroll.total_amount), | |
| "base_earnings": float(payroll.base_earnings) if payroll.base_earnings else 0, | |
| "bonus_amount": float(payroll.bonus_amount) if payroll.bonus_amount else 0, | |
| "deductions": float(payroll.deductions) if payroll.deductions else 0, | |
| "days_worked": payroll.days_worked or 0, | |
| "tickets_closed": payroll.tickets_closed or 0 | |
| } | |
| ) | |
| notifications_created.append(worker_notification) | |
| # Commit all notifications | |
| db.commit() | |
| logger.info( | |
| f"Created {len(notifications_created)} notifications " | |
| f"({len(exported_payroll_records)} workers + 1 manager)" | |
| ) | |
| # Queue delivery for external channels (Tier 2 - Asynchronous) | |
| NotificationDelivery.queue_bulk_delivery( | |
| background_tasks=background_tasks, | |
| notification_ids=[n.id for n in notifications_created] | |
| ) | |
| # Return CSV file | |
| return StreamingResponse( | |
| iter([output.getvalue()]), | |
| media_type="text/csv", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={filename}", | |
| "X-Warning-Count": str(len(warnings)), | |
| "X-Exported-Count": str(len(csv_rows)) | |
| } | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error exporting payroll: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to export payroll: {str(e)}" | |
| ) | |