# SwiftOps Backend Architecture ## πŸ—οΈ Architectural Principles ### **1. Clean Architecture** The application follows clean architecture principles with clear separation of concerns: ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Presentation Layer β”‚ β”‚ (FastAPI Routes, WebSockets) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Service Layer β”‚ β”‚ (Business Logic, Orchestration) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Repository Layer β”‚ β”‚ (Data Access, Queries) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Database Layer β”‚ β”‚ (PostgreSQL via Supabase) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` **Benefits**: - **Testability**: Each layer can be tested independently - **Maintainability**: Changes in one layer don't affect others - **Scalability**: Easy to add new features without breaking existing code - **Flexibility**: Can swap implementations (e.g., change database, payment gateway) --- ## πŸ“¦ Layer Responsibilities ### **Presentation Layer** (`app/api/`) **Responsibility**: Handle HTTP requests/responses, input validation, authentication **What it does**: - Parse request data - Validate input using Pydantic schemas - Call service layer methods - Format responses - Handle errors and return appropriate HTTP status codes **What it DOESN'T do**: - Business logic - Direct database access - Complex calculations **Example**: ```python # app/api/v1/tickets.py @router.post("/tickets/{ticket_id}/assign") async def assign_ticket( ticket_id: UUID, assignment: TicketAssignmentCreate, current_user: User = Depends(get_current_user), ticket_service: TicketService = Depends(get_ticket_service) ): """Assign a ticket to a field agent.""" try: result = await ticket_service.assign_ticket( ticket_id=ticket_id, user_id=assignment.user_id, assigned_by=current_user.id ) return {"success": True, "data": result} except BusinessRuleViolation as e: raise HTTPException(status_code=400, detail=str(e)) ``` --- ### **Service Layer** (`app/services/`) **Responsibility**: Implement business logic, orchestrate operations, enforce business rules **What it does**: - Validate business rules - Coordinate multiple repository calls - Handle transactions - Trigger side effects (notifications, webhooks) - Calculate derived values (payroll, SLA deadlines) **What it DOESN'T do**: - Direct SQL queries - HTTP request handling - Data formatting for API responses **Example**: ```python # app/services/ticket_service.py class TicketService: def __init__( self, ticket_repo: TicketRepository, assignment_repo: AssignmentRepository, notification_service: NotificationService, sla_service: SLAService ): self.ticket_repo = ticket_repo self.assignment_repo = assignment_repo self.notification_service = notification_service self.sla_service = sla_service async def assign_ticket(self, ticket_id: UUID, user_id: UUID, assigned_by: UUID): """Assign ticket to user with business rule validation.""" # 1. Get ticket and validate ticket = await self.ticket_repo.get_by_id(ticket_id) if not ticket: raise TicketNotFoundError(ticket_id) if ticket.status != 'open': raise BusinessRuleViolation("Can only assign open tickets") # 2. Validate user can be assigned active_assignments = await self.assignment_repo.count_active_assignments(user_id) if active_assignments >= 3: raise BusinessRuleViolation("User already has 3 active assignments") # 3. Check user is in project team if not await self._user_in_project_team(user_id, ticket.project_id): raise BusinessRuleViolation("User not in project team") # 4. Create assignment assignment = await self.assignment_repo.create( ticket_id=ticket_id, user_id=user_id, action='assigned', assigned_at=datetime.utcnow() ) # 5. Update ticket status await self.ticket_repo.update_status(ticket_id, 'assigned') # 6. Calculate SLA deadline sla_deadline = await self.sla_service.calculate_deadline(ticket) await self.ticket_repo.update_sla(ticket_id, sla_deadline) # 7. Send notification await self.notification_service.send_assignment_notification(user_id, ticket) return assignment ``` --- ### **Repository Layer** (`app/repositories/`) **Responsibility**: Data access, database queries, ORM operations **What it does**: - CRUD operations - Complex queries - Filtering and pagination - Soft delete handling - Optimistic locking **What it DOESN'T do**: - Business logic - Validation (beyond data integrity) - Side effects (notifications, webhooks) **Example**: ```python # app/repositories/ticket_repository.py class TicketRepository: def __init__(self, db: Session): self.db = db async def get_by_id(self, ticket_id: UUID) -> Optional[Ticket]: """Get ticket by ID, excluding soft-deleted.""" return self.db.query(Ticket).filter( Ticket.id == ticket_id, Ticket.deleted_at.is_(None) ).first() async def get_open_tickets( self, project_id: UUID, limit: int = 50, offset: int = 0 ) -> List[Ticket]: """Get open tickets for a project with pagination.""" return self.db.query(Ticket).filter( Ticket.project_id == project_id, Ticket.status == 'open', Ticket.deleted_at.is_(None) ).order_by(Ticket.created_at.desc()).limit(limit).offset(offset).all() async def update_status(self, ticket_id: UUID, status: str) -> Ticket: """Update ticket status.""" ticket = await self.get_by_id(ticket_id) ticket.status = status ticket.updated_at = datetime.utcnow() self.db.commit() self.db.refresh(ticket) return ticket ``` --- ## πŸ”„ Data Flow Examples ### **Example 1: Ticket Assignment Flow** ``` 1. Frontend sends POST /api/v1/tickets/{id}/assign ↓ 2. API Route (tickets.py) - Validates JWT token - Parses request body - Calls TicketService.assign_ticket() ↓ 3. TicketService - Validates business rules (max 3 assignments) - Calls TicketRepository.get_by_id() - Calls AssignmentRepository.count_active_assignments() - Calls AssignmentRepository.create() - Calls TicketRepository.update_status() - Calls SLAService.calculate_deadline() - Calls NotificationService.send_notification() ↓ 4. Repositories - Execute SQL queries via SQLAlchemy - Return data to service ↓ 5. Service returns result to API route ↓ 6. API route formats response and returns to frontend ``` --- ### **Example 2: Payroll Generation Flow (Background Task)** ``` 1. Celery Beat triggers weekly payroll task (Friday 6 PM) ↓ 2. PayrollTask (tasks/payroll_tasks.py) - Calls PayrollService.generate_weekly_payroll() ↓ 3. PayrollService - Gets all active projects - For each project: - Gets project team members - For each member: - Calls PayrollRepository.get_tickets_closed() - Calls TimesheetRepository.get_hours_worked() - Calculates earnings based on compensation type - Calls PayrollRepository.create() - Calls FinanceService.create_transaction() ↓ 4. Repositories - Execute queries and insert payroll records ↓ 5. Service sends notifications to users ↓ 6. Task completes and logs result ``` --- ## πŸ” Security Architecture ### **1. Authentication Flow** ``` 1. User logs in via Supabase Auth ↓ 2. Supabase returns JWT token ↓ 3. Frontend includes token in Authorization header ↓ 4. FastAPI middleware validates token ↓ 5. Extracts user_id from token ↓ 6. Loads User from database ↓ 7. Checks user role and permissions ↓ 8. Allows/denies request ``` ### **2. Row-Level Security (RLS)** **Database Level** (Supabase RLS Policies): ```sql -- Example: Users can only see tickets from their projects CREATE POLICY "Users see own project tickets" ON Tickets FOR SELECT USING ( project_id IN ( SELECT project_id FROM ProjectTeam WHERE user_id = auth.uid() ) ); ``` **Application Level** (Service Layer): ```python # Always filter by user's accessible projects async def get_tickets(self, user: User): project_ids = await self._get_user_project_ids(user.id) return await self.ticket_repo.get_by_projects(project_ids) ``` ### **3. Multi-Tenancy Isolation** **Client Isolation**: ```python # Every query scoped to user's client async def get_customers(self, user: User): if user.client_id: return await self.customer_repo.get_by_client(user.client_id) elif user.contractor_id: # Contractor sees customers from their projects project_ids = await self._get_contractor_projects(user.contractor_id) return await self.customer_repo.get_by_projects(project_ids) ``` --- ## πŸ“Š Caching Strategy ### **What to Cache** 1. **User Sessions** (Redis, TTL: 30 minutes) - User profile - User permissions - User's active projects 2. **Dashboard Metrics** (Redis, TTL: 5 minutes) - Ticket counts by status - SLA compliance rates - Agent workload 3. **Configuration** (Redis, TTL: 1 hour) - System settings - Feature flags - SLA thresholds 4. **Location Data** (Redis, TTL: 1 minute) - Agent current locations - Real-time tracking data ### **Cache Invalidation** ```python # Example: Invalidate cache on ticket status change async def update_ticket_status(self, ticket_id: UUID, status: str): ticket = await self.ticket_repo.update_status(ticket_id, status) # Invalidate related caches await cache.delete(f"ticket:{ticket_id}") await cache.delete(f"project:{ticket.project_id}:tickets") await cache.delete(f"dashboard:metrics:{ticket.project_id}") return ticket ``` --- ## πŸ”„ Background Tasks Architecture ### **Celery Task Types** 1. **Scheduled Tasks** (Celery Beat) - Weekly payroll generation (Friday 6 PM) - Daily SLA monitoring (every hour) - Daily metrics computation (midnight) - Invoice generation (end of month) 2. **Async Tasks** (Triggered by API) - Send email notifications - Send SMS notifications - Process payment gateway callbacks - Generate reports 3. **Retry Tasks** (Failed payment retries) - Retry failed M-Pesa payments - Retry failed SMS deliveries ### **Task Configuration** ```python # app/tasks/celery_app.py from celery import Celery from celery.schedules import crontab celery_app = Celery('swiftops') celery_app.conf.beat_schedule = { 'generate-weekly-payroll': { 'task': 'app.tasks.payroll_tasks.generate_weekly_payroll', 'schedule': crontab(day_of_week=5, hour=18, minute=0), # Friday 6 PM }, 'monitor-sla-violations': { 'task': 'app.tasks.sla_tasks.monitor_sla_violations', 'schedule': crontab(minute=0), # Every hour }, 'compute-daily-metrics': { 'task': 'app.tasks.analytics_tasks.compute_daily_metrics', 'schedule': crontab(hour=0, minute=0), # Midnight }, } ``` --- ## πŸš€ Scalability Patterns ### **1. Horizontal Scaling** **Stateless API Design**: - No session state stored in API servers - All state in database or Redis - Can run multiple API instances behind load balancer **Load Balancing**: ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚Load Balancerβ”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” ↓ ↓ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ API #1 β”‚ β”‚ API #2 β”‚ β”‚ API #3 β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Database β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ### **2. Database Optimization** **Read Replicas**: - Use read replicas for reporting queries - Master for writes, replicas for reads **Connection Pooling**: ```python # app/db/session.py engine = create_engine( DATABASE_URL, pool_size=20, # Max connections in pool max_overflow=10, # Additional connections if pool full pool_pre_ping=True, # Verify connections before use pool_recycle=3600 # Recycle connections after 1 hour ) ``` **Query Optimization**: - Use indexes effectively (already in schema) - Avoid N+1 queries (use eager loading) - Paginate large result sets - Use database views for complex queries ### **3. Caching Strategy** **Multi-Level Caching**: ``` Request β†’ API β†’ L1 Cache (In-Memory) β†’ L2 Cache (Redis) β†’ Database ``` **Cache-Aside Pattern**: ```python async def get_ticket(self, ticket_id: UUID): # Try cache first cached = await cache.get(f"ticket:{ticket_id}") if cached: return cached # Cache miss, query database ticket = await self.ticket_repo.get_by_id(ticket_id) # Store in cache await cache.set(f"ticket:{ticket_id}", ticket, ttl=300) return ticket ``` --- ## πŸ§ͺ Testing Strategy ### **Test Pyramid** ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ E2E β”‚ (Few, slow, expensive) β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Integration β”‚ (Some, medium speed) β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Unit Tests β”‚ (Many, fast, cheap) β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ### **Unit Tests** (70% of tests) - Test individual functions - Mock external dependencies - Fast execution (< 1 second per test) ```python # tests/unit/test_services/test_payroll_service.py def test_calculate_flat_rate_payroll(): # Arrange role = Mock(compensation_type='flat_rate', flat_rate_amount=5000) # Act earnings = payroll_service._calculate_earnings(role, tickets=[], hours=0) # Assert assert earnings == 5000 ``` ### **Integration Tests** (25% of tests) - Test multiple components together - Use test database - Medium speed (1-5 seconds per test) ```python # tests/integration/test_api/test_tickets.py def test_assign_ticket_endpoint(client, test_db): # Create test data ticket = create_test_ticket(test_db) user = create_test_user(test_db) # Call API response = client.post( f"/api/v1/tickets/{ticket.id}/assign", json={"user_id": str(user.id)} ) # Assert assert response.status_code == 200 assert test_db.query(TicketAssignment).count() == 1 ``` ### **E2E Tests** (5% of tests) - Test complete user workflows - Use real database (or close replica) - Slow (10+ seconds per test) ```python # tests/e2e/test_ticket_workflow.py def test_complete_ticket_workflow(client, test_db): # 1. Create sales order # 2. Generate ticket from sales order # 3. Assign ticket to agent # 4. Agent accepts assignment # 5. Agent arrives at site # 6. Agent completes work # 7. Subscription activated # 8. Verify all state changes ``` --- ## πŸ“ˆ Monitoring & Observability ### **Logging Strategy** **Log Levels**: - **DEBUG**: Detailed information for debugging - **INFO**: General informational messages - **WARNING**: Warning messages (non-critical issues) - **ERROR**: Error messages (handled exceptions) - **CRITICAL**: Critical errors (system failures) **Structured Logging**: ```python import structlog logger = structlog.get_logger() logger.info( "ticket_assigned", ticket_id=str(ticket_id), user_id=str(user_id), project_id=str(project_id), assigned_by=str(assigned_by) ) ``` ### **Metrics to Track** 1. **API Metrics** - Request rate (requests/second) - Response time (p50, p95, p99) - Error rate (4xx, 5xx) 2. **Business Metrics** - Tickets created/assigned/completed per day - Average ticket completion time - SLA compliance rate - Payroll processing time 3. **System Metrics** - Database connection pool usage - Cache hit rate - Celery queue length - Memory/CPU usage ### **Error Tracking** Use Sentry for error tracking: ```python import sentry_sdk sentry_sdk.init( dsn=SENTRY_DSN, environment=ENVIRONMENT, traces_sample_rate=0.1 ) ``` --- ## πŸ”§ Development Workflow ### **Local Development** 1. Start services: ```bash docker-compose up -d postgres redis ``` 2. Run migrations: ```bash alembic upgrade head ``` 3. Start API: ```bash uvicorn app.main:app --reload ``` 4. Start Celery: ```bash celery -A app.tasks.celery_app worker --loglevel=info ``` ### **Code Quality** **Pre-commit Hooks**: - Black (code formatting) - isort (import sorting) - flake8 (linting) - mypy (type checking) **CI/CD Pipeline**: 1. Run tests 2. Check code coverage (> 80%) 3. Run linters 4. Build Docker image 5. Deploy to staging 6. Run E2E tests 7. Deploy to production --- ## πŸ“š Additional Resources - [FastAPI Documentation](https://fastapi.tiangolo.com/) - [SQLAlchemy Documentation](https://docs.sqlalchemy.org/) - [Celery Documentation](https://docs.celeryproject.org/) - [Supabase Documentation](https://supabase.com/docs) - [Clean Architecture](https://blog.cleancoder.com/uncle-bob/2012/08/13/the-clean-architecture.html)