Spaces:
Sleeping
Sleeping
| # SwiftOps Backend Architecture | |
| ## ποΈ Architectural Principles | |
| ### **1. Clean Architecture** | |
| The application follows clean architecture principles with clear separation of concerns: | |
| ``` | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β Presentation Layer β | |
| β (FastAPI Routes, WebSockets) β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β Service Layer β | |
| β (Business Logic, Orchestration) β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β Repository Layer β | |
| β (Data Access, Queries) β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β Database Layer β | |
| β (PostgreSQL via Supabase) β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ``` | |
| **Benefits**: | |
| - **Testability**: Each layer can be tested independently | |
| - **Maintainability**: Changes in one layer don't affect others | |
| - **Scalability**: Easy to add new features without breaking existing code | |
| - **Flexibility**: Can swap implementations (e.g., change database, payment gateway) | |
| --- | |
| ## π¦ Layer Responsibilities | |
| ### **Presentation Layer** (`app/api/`) | |
| **Responsibility**: Handle HTTP requests/responses, input validation, authentication | |
| **What it does**: | |
| - Parse request data | |
| - Validate input using Pydantic schemas | |
| - Call service layer methods | |
| - Format responses | |
| - Handle errors and return appropriate HTTP status codes | |
| **What it DOESN'T do**: | |
| - Business logic | |
| - Direct database access | |
| - Complex calculations | |
| **Example**: | |
| ```python | |
| # app/api/v1/tickets.py | |
| @router.post("/tickets/{ticket_id}/assign") | |
| async def assign_ticket( | |
| ticket_id: UUID, | |
| assignment: TicketAssignmentCreate, | |
| current_user: User = Depends(get_current_user), | |
| ticket_service: TicketService = Depends(get_ticket_service) | |
| ): | |
| """Assign a ticket to a field agent.""" | |
| try: | |
| result = await ticket_service.assign_ticket( | |
| ticket_id=ticket_id, | |
| user_id=assignment.user_id, | |
| assigned_by=current_user.id | |
| ) | |
| return {"success": True, "data": result} | |
| except BusinessRuleViolation as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| ``` | |
| --- | |
| ### **Service Layer** (`app/services/`) | |
| **Responsibility**: Implement business logic, orchestrate operations, enforce business rules | |
| **What it does**: | |
| - Validate business rules | |
| - Coordinate multiple repository calls | |
| - Handle transactions | |
| - Trigger side effects (notifications, webhooks) | |
| - Calculate derived values (payroll, SLA deadlines) | |
| **What it DOESN'T do**: | |
| - Direct SQL queries | |
| - HTTP request handling | |
| - Data formatting for API responses | |
| **Example**: | |
| ```python | |
| # app/services/ticket_service.py | |
| class TicketService: | |
| def __init__( | |
| self, | |
| ticket_repo: TicketRepository, | |
| assignment_repo: AssignmentRepository, | |
| notification_service: NotificationService, | |
| sla_service: SLAService | |
| ): | |
| self.ticket_repo = ticket_repo | |
| self.assignment_repo = assignment_repo | |
| self.notification_service = notification_service | |
| self.sla_service = sla_service | |
| async def assign_ticket(self, ticket_id: UUID, user_id: UUID, assigned_by: UUID): | |
| """Assign ticket to user with business rule validation.""" | |
| # 1. Get ticket and validate | |
| ticket = await self.ticket_repo.get_by_id(ticket_id) | |
| if not ticket: | |
| raise TicketNotFoundError(ticket_id) | |
| if ticket.status != 'open': | |
| raise BusinessRuleViolation("Can only assign open tickets") | |
| # 2. Validate user can be assigned | |
| active_assignments = await self.assignment_repo.count_active_assignments(user_id) | |
| if active_assignments >= 3: | |
| raise BusinessRuleViolation("User already has 3 active assignments") | |
| # 3. Check user is in project team | |
| if not await self._user_in_project_team(user_id, ticket.project_id): | |
| raise BusinessRuleViolation("User not in project team") | |
| # 4. Create assignment | |
| assignment = await self.assignment_repo.create( | |
| ticket_id=ticket_id, | |
| user_id=user_id, | |
| action='assigned', | |
| assigned_at=datetime.utcnow() | |
| ) | |
| # 5. Update ticket status | |
| await self.ticket_repo.update_status(ticket_id, 'assigned') | |
| # 6. Calculate SLA deadline | |
| sla_deadline = await self.sla_service.calculate_deadline(ticket) | |
| await self.ticket_repo.update_sla(ticket_id, sla_deadline) | |
| # 7. Send notification | |
| await self.notification_service.send_assignment_notification(user_id, ticket) | |
| return assignment | |
| ``` | |
| --- | |
| ### **Repository Layer** (`app/repositories/`) | |
| **Responsibility**: Data access, database queries, ORM operations | |
| **What it does**: | |
| - CRUD operations | |
| - Complex queries | |
| - Filtering and pagination | |
| - Soft delete handling | |
| - Optimistic locking | |
| **What it DOESN'T do**: | |
| - Business logic | |
| - Validation (beyond data integrity) | |
| - Side effects (notifications, webhooks) | |
| **Example**: | |
| ```python | |
| # app/repositories/ticket_repository.py | |
| class TicketRepository: | |
| def __init__(self, db: Session): | |
| self.db = db | |
| async def get_by_id(self, ticket_id: UUID) -> Optional[Ticket]: | |
| """Get ticket by ID, excluding soft-deleted.""" | |
| return self.db.query(Ticket).filter( | |
| Ticket.id == ticket_id, | |
| Ticket.deleted_at.is_(None) | |
| ).first() | |
| async def get_open_tickets( | |
| self, | |
| project_id: UUID, | |
| limit: int = 50, | |
| offset: int = 0 | |
| ) -> List[Ticket]: | |
| """Get open tickets for a project with pagination.""" | |
| return self.db.query(Ticket).filter( | |
| Ticket.project_id == project_id, | |
| Ticket.status == 'open', | |
| Ticket.deleted_at.is_(None) | |
| ).order_by(Ticket.created_at.desc()).limit(limit).offset(offset).all() | |
| async def update_status(self, ticket_id: UUID, status: str) -> Ticket: | |
| """Update ticket status.""" | |
| ticket = await self.get_by_id(ticket_id) | |
| ticket.status = status | |
| ticket.updated_at = datetime.utcnow() | |
| self.db.commit() | |
| self.db.refresh(ticket) | |
| return ticket | |
| ``` | |
| --- | |
| ## π Data Flow Examples | |
| ### **Example 1: Ticket Assignment Flow** | |
| ``` | |
| 1. Frontend sends POST /api/v1/tickets/{id}/assign | |
| β | |
| 2. API Route (tickets.py) | |
| - Validates JWT token | |
| - Parses request body | |
| - Calls TicketService.assign_ticket() | |
| β | |
| 3. TicketService | |
| - Validates business rules (max 3 assignments) | |
| - Calls TicketRepository.get_by_id() | |
| - Calls AssignmentRepository.count_active_assignments() | |
| - Calls AssignmentRepository.create() | |
| - Calls TicketRepository.update_status() | |
| - Calls SLAService.calculate_deadline() | |
| - Calls NotificationService.send_notification() | |
| β | |
| 4. Repositories | |
| - Execute SQL queries via SQLAlchemy | |
| - Return data to service | |
| β | |
| 5. Service returns result to API route | |
| β | |
| 6. API route formats response and returns to frontend | |
| ``` | |
| --- | |
| ### **Example 2: Payroll Generation Flow (Background Task)** | |
| ``` | |
| 1. Celery Beat triggers weekly payroll task (Friday 6 PM) | |
| β | |
| 2. PayrollTask (tasks/payroll_tasks.py) | |
| - Calls PayrollService.generate_weekly_payroll() | |
| β | |
| 3. PayrollService | |
| - Gets all active projects | |
| - For each project: | |
| - Gets project team members | |
| - For each member: | |
| - Calls PayrollRepository.get_tickets_closed() | |
| - Calls TimesheetRepository.get_hours_worked() | |
| - Calculates earnings based on compensation type | |
| - Calls PayrollRepository.create() | |
| - Calls FinanceService.create_transaction() | |
| β | |
| 4. Repositories | |
| - Execute queries and insert payroll records | |
| β | |
| 5. Service sends notifications to users | |
| β | |
| 6. Task completes and logs result | |
| ``` | |
| --- | |
| ## π Security Architecture | |
| ### **1. Authentication Flow** | |
| ``` | |
| 1. User logs in via Supabase Auth | |
| β | |
| 2. Supabase returns JWT token | |
| β | |
| 3. Frontend includes token in Authorization header | |
| β | |
| 4. FastAPI middleware validates token | |
| β | |
| 5. Extracts user_id from token | |
| β | |
| 6. Loads User from database | |
| β | |
| 7. Checks user role and permissions | |
| β | |
| 8. Allows/denies request | |
| ``` | |
| ### **2. Row-Level Security (RLS)** | |
| **Database Level** (Supabase RLS Policies): | |
| ```sql | |
| -- Example: Users can only see tickets from their projects | |
| CREATE POLICY "Users see own project tickets" | |
| ON Tickets FOR SELECT | |
| USING ( | |
| project_id IN ( | |
| SELECT project_id FROM ProjectTeam WHERE user_id = auth.uid() | |
| ) | |
| ); | |
| ``` | |
| **Application Level** (Service Layer): | |
| ```python | |
| # Always filter by user's accessible projects | |
| async def get_tickets(self, user: User): | |
| project_ids = await self._get_user_project_ids(user.id) | |
| return await self.ticket_repo.get_by_projects(project_ids) | |
| ``` | |
| ### **3. Multi-Tenancy Isolation** | |
| **Client Isolation**: | |
| ```python | |
| # Every query scoped to user's client | |
| async def get_customers(self, user: User): | |
| if user.client_id: | |
| return await self.customer_repo.get_by_client(user.client_id) | |
| elif user.contractor_id: | |
| # Contractor sees customers from their projects | |
| project_ids = await self._get_contractor_projects(user.contractor_id) | |
| return await self.customer_repo.get_by_projects(project_ids) | |
| ``` | |
| --- | |
| ## π Caching Strategy | |
| ### **What to Cache** | |
| 1. **User Sessions** (Redis, TTL: 30 minutes) | |
| - User profile | |
| - User permissions | |
| - User's active projects | |
| 2. **Dashboard Metrics** (Redis, TTL: 5 minutes) | |
| - Ticket counts by status | |
| - SLA compliance rates | |
| - Agent workload | |
| 3. **Configuration** (Redis, TTL: 1 hour) | |
| - System settings | |
| - Feature flags | |
| - SLA thresholds | |
| 4. **Location Data** (Redis, TTL: 1 minute) | |
| - Agent current locations | |
| - Real-time tracking data | |
| ### **Cache Invalidation** | |
| ```python | |
| # Example: Invalidate cache on ticket status change | |
| async def update_ticket_status(self, ticket_id: UUID, status: str): | |
| ticket = await self.ticket_repo.update_status(ticket_id, status) | |
| # Invalidate related caches | |
| await cache.delete(f"ticket:{ticket_id}") | |
| await cache.delete(f"project:{ticket.project_id}:tickets") | |
| await cache.delete(f"dashboard:metrics:{ticket.project_id}") | |
| return ticket | |
| ``` | |
| --- | |
| ## π Background Tasks Architecture | |
| ### **Celery Task Types** | |
| 1. **Scheduled Tasks** (Celery Beat) | |
| - Weekly payroll generation (Friday 6 PM) | |
| - Daily SLA monitoring (every hour) | |
| - Daily metrics computation (midnight) | |
| - Invoice generation (end of month) | |
| 2. **Async Tasks** (Triggered by API) | |
| - Send email notifications | |
| - Send SMS notifications | |
| - Process payment gateway callbacks | |
| - Generate reports | |
| 3. **Retry Tasks** (Failed payment retries) | |
| - Retry failed M-Pesa payments | |
| - Retry failed SMS deliveries | |
| ### **Task Configuration** | |
| ```python | |
| # app/tasks/celery_app.py | |
| from celery import Celery | |
| from celery.schedules import crontab | |
| celery_app = Celery('swiftops') | |
| celery_app.conf.beat_schedule = { | |
| 'generate-weekly-payroll': { | |
| 'task': 'app.tasks.payroll_tasks.generate_weekly_payroll', | |
| 'schedule': crontab(day_of_week=5, hour=18, minute=0), # Friday 6 PM | |
| }, | |
| 'monitor-sla-violations': { | |
| 'task': 'app.tasks.sla_tasks.monitor_sla_violations', | |
| 'schedule': crontab(minute=0), # Every hour | |
| }, | |
| 'compute-daily-metrics': { | |
| 'task': 'app.tasks.analytics_tasks.compute_daily_metrics', | |
| 'schedule': crontab(hour=0, minute=0), # Midnight | |
| }, | |
| } | |
| ``` | |
| --- | |
| ## π Scalability Patterns | |
| ### **1. Horizontal Scaling** | |
| **Stateless API Design**: | |
| - No session state stored in API servers | |
| - All state in database or Redis | |
| - Can run multiple API instances behind load balancer | |
| **Load Balancing**: | |
| ``` | |
| βββββββββββββββ | |
| βLoad Balancerβ | |
| βββββββββββββββ | |
| β | |
| ββββββββββββββββββββΌβββββββββββββββββββ | |
| β β β | |
| βββββββββββ βββββββββββ βββββββββββ | |
| β API #1 β β API #2 β β API #3 β | |
| βββββββββββ βββββββββββ βββββββββββ | |
| β β β | |
| ββββββββββββββββββββΌβββββββββββββββββββ | |
| β | |
| βββββββββββββββ | |
| β Database β | |
| βββββββββββββββ | |
| ``` | |
| ### **2. Database Optimization** | |
| **Read Replicas**: | |
| - Use read replicas for reporting queries | |
| - Master for writes, replicas for reads | |
| **Connection Pooling**: | |
| ```python | |
| # app/db/session.py | |
| engine = create_engine( | |
| DATABASE_URL, | |
| pool_size=20, # Max connections in pool | |
| max_overflow=10, # Additional connections if pool full | |
| pool_pre_ping=True, # Verify connections before use | |
| pool_recycle=3600 # Recycle connections after 1 hour | |
| ) | |
| ``` | |
| **Query Optimization**: | |
| - Use indexes effectively (already in schema) | |
| - Avoid N+1 queries (use eager loading) | |
| - Paginate large result sets | |
| - Use database views for complex queries | |
| ### **3. Caching Strategy** | |
| **Multi-Level Caching**: | |
| ``` | |
| Request β API β L1 Cache (In-Memory) β L2 Cache (Redis) β Database | |
| ``` | |
| **Cache-Aside Pattern**: | |
| ```python | |
| async def get_ticket(self, ticket_id: UUID): | |
| # Try cache first | |
| cached = await cache.get(f"ticket:{ticket_id}") | |
| if cached: | |
| return cached | |
| # Cache miss, query database | |
| ticket = await self.ticket_repo.get_by_id(ticket_id) | |
| # Store in cache | |
| await cache.set(f"ticket:{ticket_id}", ticket, ttl=300) | |
| return ticket | |
| ``` | |
| --- | |
| ## π§ͺ Testing Strategy | |
| ### **Test Pyramid** | |
| ``` | |
| βββββββββββ | |
| β E2E β (Few, slow, expensive) | |
| βββββββββββ | |
| βββββββββββββββββ | |
| β Integration β (Some, medium speed) | |
| βββββββββββββββββ | |
| βββββββββββββββββββββββββ | |
| β Unit Tests β (Many, fast, cheap) | |
| βββββββββββββββββββββββββ | |
| ``` | |
| ### **Unit Tests** (70% of tests) | |
| - Test individual functions | |
| - Mock external dependencies | |
| - Fast execution (< 1 second per test) | |
| ```python | |
| # tests/unit/test_services/test_payroll_service.py | |
| def test_calculate_flat_rate_payroll(): | |
| # Arrange | |
| role = Mock(compensation_type='flat_rate', flat_rate_amount=5000) | |
| # Act | |
| earnings = payroll_service._calculate_earnings(role, tickets=[], hours=0) | |
| # Assert | |
| assert earnings == 5000 | |
| ``` | |
| ### **Integration Tests** (25% of tests) | |
| - Test multiple components together | |
| - Use test database | |
| - Medium speed (1-5 seconds per test) | |
| ```python | |
| # tests/integration/test_api/test_tickets.py | |
| def test_assign_ticket_endpoint(client, test_db): | |
| # Create test data | |
| ticket = create_test_ticket(test_db) | |
| user = create_test_user(test_db) | |
| # Call API | |
| response = client.post( | |
| f"/api/v1/tickets/{ticket.id}/assign", | |
| json={"user_id": str(user.id)} | |
| ) | |
| # Assert | |
| assert response.status_code == 200 | |
| assert test_db.query(TicketAssignment).count() == 1 | |
| ``` | |
| ### **E2E Tests** (5% of tests) | |
| - Test complete user workflows | |
| - Use real database (or close replica) | |
| - Slow (10+ seconds per test) | |
| ```python | |
| # tests/e2e/test_ticket_workflow.py | |
| def test_complete_ticket_workflow(client, test_db): | |
| # 1. Create sales order | |
| # 2. Generate ticket from sales order | |
| # 3. Assign ticket to agent | |
| # 4. Agent accepts assignment | |
| # 5. Agent arrives at site | |
| # 6. Agent completes work | |
| # 7. Subscription activated | |
| # 8. Verify all state changes | |
| ``` | |
| --- | |
| ## π Monitoring & Observability | |
| ### **Logging Strategy** | |
| **Log Levels**: | |
| - **DEBUG**: Detailed information for debugging | |
| - **INFO**: General informational messages | |
| - **WARNING**: Warning messages (non-critical issues) | |
| - **ERROR**: Error messages (handled exceptions) | |
| - **CRITICAL**: Critical errors (system failures) | |
| **Structured Logging**: | |
| ```python | |
| import structlog | |
| logger = structlog.get_logger() | |
| logger.info( | |
| "ticket_assigned", | |
| ticket_id=str(ticket_id), | |
| user_id=str(user_id), | |
| project_id=str(project_id), | |
| assigned_by=str(assigned_by) | |
| ) | |
| ``` | |
| ### **Metrics to Track** | |
| 1. **API Metrics** | |
| - Request rate (requests/second) | |
| - Response time (p50, p95, p99) | |
| - Error rate (4xx, 5xx) | |
| 2. **Business Metrics** | |
| - Tickets created/assigned/completed per day | |
| - Average ticket completion time | |
| - SLA compliance rate | |
| - Payroll processing time | |
| 3. **System Metrics** | |
| - Database connection pool usage | |
| - Cache hit rate | |
| - Celery queue length | |
| - Memory/CPU usage | |
| ### **Error Tracking** | |
| Use Sentry for error tracking: | |
| ```python | |
| import sentry_sdk | |
| sentry_sdk.init( | |
| dsn=SENTRY_DSN, | |
| environment=ENVIRONMENT, | |
| traces_sample_rate=0.1 | |
| ) | |
| ``` | |
| --- | |
| ## π§ Development Workflow | |
| ### **Local Development** | |
| 1. Start services: | |
| ```bash | |
| docker-compose up -d postgres redis | |
| ``` | |
| 2. Run migrations: | |
| ```bash | |
| alembic upgrade head | |
| ``` | |
| 3. Start API: | |
| ```bash | |
| uvicorn app.main:app --reload | |
| ``` | |
| 4. Start Celery: | |
| ```bash | |
| celery -A app.tasks.celery_app worker --loglevel=info | |
| ``` | |
| ### **Code Quality** | |
| **Pre-commit Hooks**: | |
| - Black (code formatting) | |
| - isort (import sorting) | |
| - flake8 (linting) | |
| - mypy (type checking) | |
| **CI/CD Pipeline**: | |
| 1. Run tests | |
| 2. Check code coverage (> 80%) | |
| 3. Run linters | |
| 4. Build Docker image | |
| 5. Deploy to staging | |
| 6. Run E2E tests | |
| 7. Deploy to production | |
| --- | |
| ## π Additional Resources | |
| - [FastAPI Documentation](https://fastapi.tiangolo.com/) | |
| - [SQLAlchemy Documentation](https://docs.sqlalchemy.org/) | |
| - [Celery Documentation](https://docs.celeryproject.org/) | |
| - [Supabase Documentation](https://supabase.com/docs) | |
| - [Clean Architecture](https://blog.cleancoder.com/uncle-bob/2012/08/13/the-clean-architecture.html) | |