kamau1's picture
chore: migrate to useast organize the docs, delete redundant migrations
c4f7e3e

SwiftOps Backend Architecture

πŸ—οΈ Architectural Principles

1. Clean Architecture

The application follows clean architecture principles with clear separation of concerns:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     Presentation Layer                       β”‚
β”‚              (FastAPI Routes, WebSockets)                    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      Service Layer                           β”‚
β”‚           (Business Logic, Orchestration)                    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Repository Layer                          β”‚
β”‚              (Data Access, Queries)                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      Database Layer                          β”‚
β”‚            (PostgreSQL via Supabase)                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Benefits:

  • Testability: Each layer can be tested independently
  • Maintainability: Changes in one layer don't affect others
  • Scalability: Easy to add new features without breaking existing code
  • Flexibility: Can swap implementations (e.g., change database, payment gateway)

πŸ“¦ Layer Responsibilities

Presentation Layer (app/api/)

Responsibility: Handle HTTP requests/responses, input validation, authentication

What it does:

  • Parse request data
  • Validate input using Pydantic schemas
  • Call service layer methods
  • Format responses
  • Handle errors and return appropriate HTTP status codes

What it DOESN'T do:

  • Business logic
  • Direct database access
  • Complex calculations

Example:

# app/api/v1/tickets.py
@router.post("/tickets/{ticket_id}/assign")
async def assign_ticket(
    ticket_id: UUID,
    assignment: TicketAssignmentCreate,
    current_user: User = Depends(get_current_user),
    ticket_service: TicketService = Depends(get_ticket_service)
):
    """Assign a ticket to a field agent."""
    try:
        result = await ticket_service.assign_ticket(
            ticket_id=ticket_id,
            user_id=assignment.user_id,
            assigned_by=current_user.id
        )
        return {"success": True, "data": result}
    except BusinessRuleViolation as e:
        raise HTTPException(status_code=400, detail=str(e))

Service Layer (app/services/)

Responsibility: Implement business logic, orchestrate operations, enforce business rules

What it does:

  • Validate business rules
  • Coordinate multiple repository calls
  • Handle transactions
  • Trigger side effects (notifications, webhooks)
  • Calculate derived values (payroll, SLA deadlines)

What it DOESN'T do:

  • Direct SQL queries
  • HTTP request handling
  • Data formatting for API responses

Example:

# app/services/ticket_service.py
class TicketService:
    def __init__(
        self,
        ticket_repo: TicketRepository,
        assignment_repo: AssignmentRepository,
        notification_service: NotificationService,
        sla_service: SLAService
    ):
        self.ticket_repo = ticket_repo
        self.assignment_repo = assignment_repo
        self.notification_service = notification_service
        self.sla_service = sla_service
    
    async def assign_ticket(self, ticket_id: UUID, user_id: UUID, assigned_by: UUID):
        """Assign ticket to user with business rule validation."""
        
        # 1. Get ticket and validate
        ticket = await self.ticket_repo.get_by_id(ticket_id)
        if not ticket:
            raise TicketNotFoundError(ticket_id)
        
        if ticket.status != 'open':
            raise BusinessRuleViolation("Can only assign open tickets")
        
        # 2. Validate user can be assigned
        active_assignments = await self.assignment_repo.count_active_assignments(user_id)
        if active_assignments >= 3:
            raise BusinessRuleViolation("User already has 3 active assignments")
        
        # 3. Check user is in project team
        if not await self._user_in_project_team(user_id, ticket.project_id):
            raise BusinessRuleViolation("User not in project team")
        
        # 4. Create assignment
        assignment = await self.assignment_repo.create(
            ticket_id=ticket_id,
            user_id=user_id,
            action='assigned',
            assigned_at=datetime.utcnow()
        )
        
        # 5. Update ticket status
        await self.ticket_repo.update_status(ticket_id, 'assigned')
        
        # 6. Calculate SLA deadline
        sla_deadline = await self.sla_service.calculate_deadline(ticket)
        await self.ticket_repo.update_sla(ticket_id, sla_deadline)
        
        # 7. Send notification
        await self.notification_service.send_assignment_notification(user_id, ticket)
        
        return assignment

Repository Layer (app/repositories/)

Responsibility: Data access, database queries, ORM operations

What it does:

  • CRUD operations
  • Complex queries
  • Filtering and pagination
  • Soft delete handling
  • Optimistic locking

What it DOESN'T do:

  • Business logic
  • Validation (beyond data integrity)
  • Side effects (notifications, webhooks)

Example:

# app/repositories/ticket_repository.py
class TicketRepository:
    def __init__(self, db: Session):
        self.db = db
    
    async def get_by_id(self, ticket_id: UUID) -> Optional[Ticket]:
        """Get ticket by ID, excluding soft-deleted."""
        return self.db.query(Ticket).filter(
            Ticket.id == ticket_id,
            Ticket.deleted_at.is_(None)
        ).first()
    
    async def get_open_tickets(
        self,
        project_id: UUID,
        limit: int = 50,
        offset: int = 0
    ) -> List[Ticket]:
        """Get open tickets for a project with pagination."""
        return self.db.query(Ticket).filter(
            Ticket.project_id == project_id,
            Ticket.status == 'open',
            Ticket.deleted_at.is_(None)
        ).order_by(Ticket.created_at.desc()).limit(limit).offset(offset).all()
    
    async def update_status(self, ticket_id: UUID, status: str) -> Ticket:
        """Update ticket status."""
        ticket = await self.get_by_id(ticket_id)
        ticket.status = status
        ticket.updated_at = datetime.utcnow()
        self.db.commit()
        self.db.refresh(ticket)
        return ticket

πŸ”„ Data Flow Examples

Example 1: Ticket Assignment Flow

1. Frontend sends POST /api/v1/tickets/{id}/assign
                    ↓
2. API Route (tickets.py)
   - Validates JWT token
   - Parses request body
   - Calls TicketService.assign_ticket()
                    ↓
3. TicketService
   - Validates business rules (max 3 assignments)
   - Calls TicketRepository.get_by_id()
   - Calls AssignmentRepository.count_active_assignments()
   - Calls AssignmentRepository.create()
   - Calls TicketRepository.update_status()
   - Calls SLAService.calculate_deadline()
   - Calls NotificationService.send_notification()
                    ↓
4. Repositories
   - Execute SQL queries via SQLAlchemy
   - Return data to service
                    ↓
5. Service returns result to API route
                    ↓
6. API route formats response and returns to frontend

Example 2: Payroll Generation Flow (Background Task)

1. Celery Beat triggers weekly payroll task (Friday 6 PM)
                    ↓
2. PayrollTask (tasks/payroll_tasks.py)
   - Calls PayrollService.generate_weekly_payroll()
                    ↓
3. PayrollService
   - Gets all active projects
   - For each project:
     - Gets project team members
     - For each member:
       - Calls PayrollRepository.get_tickets_closed()
       - Calls TimesheetRepository.get_hours_worked()
       - Calculates earnings based on compensation type
       - Calls PayrollRepository.create()
       - Calls FinanceService.create_transaction()
                    ↓
4. Repositories
   - Execute queries and insert payroll records
                    ↓
5. Service sends notifications to users
                    ↓
6. Task completes and logs result

πŸ” Security Architecture

1. Authentication Flow

1. User logs in via Supabase Auth
                    ↓
2. Supabase returns JWT token
                    ↓
3. Frontend includes token in Authorization header
                    ↓
4. FastAPI middleware validates token
                    ↓
5. Extracts user_id from token
                    ↓
6. Loads User from database
                    ↓
7. Checks user role and permissions
                    ↓
8. Allows/denies request

2. Row-Level Security (RLS)

Database Level (Supabase RLS Policies):

-- Example: Users can only see tickets from their projects
CREATE POLICY "Users see own project tickets"
ON Tickets FOR SELECT
USING (
    project_id IN (
        SELECT project_id FROM ProjectTeam WHERE user_id = auth.uid()
    )
);

Application Level (Service Layer):

# Always filter by user's accessible projects
async def get_tickets(self, user: User):
    project_ids = await self._get_user_project_ids(user.id)
    return await self.ticket_repo.get_by_projects(project_ids)

3. Multi-Tenancy Isolation

Client Isolation:

# Every query scoped to user's client
async def get_customers(self, user: User):
    if user.client_id:
        return await self.customer_repo.get_by_client(user.client_id)
    elif user.contractor_id:
        # Contractor sees customers from their projects
        project_ids = await self._get_contractor_projects(user.contractor_id)
        return await self.customer_repo.get_by_projects(project_ids)

πŸ“Š Caching Strategy

What to Cache

  1. User Sessions (Redis, TTL: 30 minutes)

    • User profile
    • User permissions
    • User's active projects
  2. Dashboard Metrics (Redis, TTL: 5 minutes)

    • Ticket counts by status
    • SLA compliance rates
    • Agent workload
  3. Configuration (Redis, TTL: 1 hour)

    • System settings
    • Feature flags
    • SLA thresholds
  4. Location Data (Redis, TTL: 1 minute)

    • Agent current locations
    • Real-time tracking data

Cache Invalidation

# Example: Invalidate cache on ticket status change
async def update_ticket_status(self, ticket_id: UUID, status: str):
    ticket = await self.ticket_repo.update_status(ticket_id, status)
    
    # Invalidate related caches
    await cache.delete(f"ticket:{ticket_id}")
    await cache.delete(f"project:{ticket.project_id}:tickets")
    await cache.delete(f"dashboard:metrics:{ticket.project_id}")
    
    return ticket

πŸ”„ Background Tasks Architecture

Celery Task Types

  1. Scheduled Tasks (Celery Beat)

    • Weekly payroll generation (Friday 6 PM)
    • Daily SLA monitoring (every hour)
    • Daily metrics computation (midnight)
    • Invoice generation (end of month)
  2. Async Tasks (Triggered by API)

    • Send email notifications
    • Send SMS notifications
    • Process payment gateway callbacks
    • Generate reports
  3. Retry Tasks (Failed payment retries)

    • Retry failed M-Pesa payments
    • Retry failed SMS deliveries

Task Configuration

# app/tasks/celery_app.py
from celery import Celery
from celery.schedules import crontab

celery_app = Celery('swiftops')

celery_app.conf.beat_schedule = {
    'generate-weekly-payroll': {
        'task': 'app.tasks.payroll_tasks.generate_weekly_payroll',
        'schedule': crontab(day_of_week=5, hour=18, minute=0),  # Friday 6 PM
    },
    'monitor-sla-violations': {
        'task': 'app.tasks.sla_tasks.monitor_sla_violations',
        'schedule': crontab(minute=0),  # Every hour
    },
    'compute-daily-metrics': {
        'task': 'app.tasks.analytics_tasks.compute_daily_metrics',
        'schedule': crontab(hour=0, minute=0),  # Midnight
    },
}

πŸš€ Scalability Patterns

1. Horizontal Scaling

Stateless API Design:

  • No session state stored in API servers
  • All state in database or Redis
  • Can run multiple API instances behind load balancer

Load Balancing:

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚Load Balancerβ”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                           β”‚
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        ↓                  ↓                  ↓
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚ API #1  β”‚       β”‚ API #2  β”‚       β”‚ API #3  β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚                  β”‚                  β”‚
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                           ↓
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚  Database   β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2. Database Optimization

Read Replicas:

  • Use read replicas for reporting queries
  • Master for writes, replicas for reads

Connection Pooling:

# app/db/session.py
engine = create_engine(
    DATABASE_URL,
    pool_size=20,          # Max connections in pool
    max_overflow=10,       # Additional connections if pool full
    pool_pre_ping=True,    # Verify connections before use
    pool_recycle=3600      # Recycle connections after 1 hour
)

Query Optimization:

  • Use indexes effectively (already in schema)
  • Avoid N+1 queries (use eager loading)
  • Paginate large result sets
  • Use database views for complex queries

3. Caching Strategy

Multi-Level Caching:

Request β†’ API β†’ L1 Cache (In-Memory) β†’ L2 Cache (Redis) β†’ Database

Cache-Aside Pattern:

async def get_ticket(self, ticket_id: UUID):
    # Try cache first
    cached = await cache.get(f"ticket:{ticket_id}")
    if cached:
        return cached
    
    # Cache miss, query database
    ticket = await self.ticket_repo.get_by_id(ticket_id)
    
    # Store in cache
    await cache.set(f"ticket:{ticket_id}", ticket, ttl=300)
    
    return ticket

πŸ§ͺ Testing Strategy

Test Pyramid

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚   E2E   β”‚  (Few, slow, expensive)
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                β”‚  Integration  β”‚  (Some, medium speed)
                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β”‚      Unit Tests       β”‚  (Many, fast, cheap)
            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Unit Tests (70% of tests)

  • Test individual functions
  • Mock external dependencies
  • Fast execution (< 1 second per test)
# tests/unit/test_services/test_payroll_service.py
def test_calculate_flat_rate_payroll():
    # Arrange
    role = Mock(compensation_type='flat_rate', flat_rate_amount=5000)
    
    # Act
    earnings = payroll_service._calculate_earnings(role, tickets=[], hours=0)
    
    # Assert
    assert earnings == 5000

Integration Tests (25% of tests)

  • Test multiple components together
  • Use test database
  • Medium speed (1-5 seconds per test)
# tests/integration/test_api/test_tickets.py
def test_assign_ticket_endpoint(client, test_db):
    # Create test data
    ticket = create_test_ticket(test_db)
    user = create_test_user(test_db)
    
    # Call API
    response = client.post(
        f"/api/v1/tickets/{ticket.id}/assign",
        json={"user_id": str(user.id)}
    )
    
    # Assert
    assert response.status_code == 200
    assert test_db.query(TicketAssignment).count() == 1

E2E Tests (5% of tests)

  • Test complete user workflows
  • Use real database (or close replica)
  • Slow (10+ seconds per test)
# tests/e2e/test_ticket_workflow.py
def test_complete_ticket_workflow(client, test_db):
    # 1. Create sales order
    # 2. Generate ticket from sales order
    # 3. Assign ticket to agent
    # 4. Agent accepts assignment
    # 5. Agent arrives at site
    # 6. Agent completes work
    # 7. Subscription activated
    # 8. Verify all state changes

πŸ“ˆ Monitoring & Observability

Logging Strategy

Log Levels:

  • DEBUG: Detailed information for debugging
  • INFO: General informational messages
  • WARNING: Warning messages (non-critical issues)
  • ERROR: Error messages (handled exceptions)
  • CRITICAL: Critical errors (system failures)

Structured Logging:

import structlog

logger = structlog.get_logger()

logger.info(
    "ticket_assigned",
    ticket_id=str(ticket_id),
    user_id=str(user_id),
    project_id=str(project_id),
    assigned_by=str(assigned_by)
)

Metrics to Track

  1. API Metrics

    • Request rate (requests/second)
    • Response time (p50, p95, p99)
    • Error rate (4xx, 5xx)
  2. Business Metrics

    • Tickets created/assigned/completed per day
    • Average ticket completion time
    • SLA compliance rate
    • Payroll processing time
  3. System Metrics

    • Database connection pool usage
    • Cache hit rate
    • Celery queue length
    • Memory/CPU usage

Error Tracking

Use Sentry for error tracking:

import sentry_sdk

sentry_sdk.init(
    dsn=SENTRY_DSN,
    environment=ENVIRONMENT,
    traces_sample_rate=0.1
)

πŸ”§ Development Workflow

Local Development

  1. Start services:
docker-compose up -d postgres redis
  1. Run migrations:
alembic upgrade head
  1. Start API:
uvicorn app.main:app --reload
  1. Start Celery:
celery -A app.tasks.celery_app worker --loglevel=info

Code Quality

Pre-commit Hooks:

  • Black (code formatting)
  • isort (import sorting)
  • flake8 (linting)
  • mypy (type checking)

CI/CD Pipeline:

  1. Run tests
  2. Check code coverage (> 80%)
  3. Run linters
  4. Build Docker image
  5. Deploy to staging
  6. Run E2E tests
  7. Deploy to production

πŸ“š Additional Resources