kamau1's picture
chore: migrate to useast organize the docs, delete redundant migrations
c4f7e3e
# SwiftOps Backend Architecture
## πŸ—οΈ Architectural Principles
### **1. Clean Architecture**
The application follows clean architecture principles with clear separation of concerns:
```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Presentation Layer β”‚
β”‚ (FastAPI Routes, WebSockets) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Service Layer β”‚
β”‚ (Business Logic, Orchestration) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Repository Layer β”‚
β”‚ (Data Access, Queries) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Database Layer β”‚
β”‚ (PostgreSQL via Supabase) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```
**Benefits**:
- **Testability**: Each layer can be tested independently
- **Maintainability**: Changes in one layer don't affect others
- **Scalability**: Easy to add new features without breaking existing code
- **Flexibility**: Can swap implementations (e.g., change database, payment gateway)
---
## πŸ“¦ Layer Responsibilities
### **Presentation Layer** (`app/api/`)
**Responsibility**: Handle HTTP requests/responses, input validation, authentication
**What it does**:
- Parse request data
- Validate input using Pydantic schemas
- Call service layer methods
- Format responses
- Handle errors and return appropriate HTTP status codes
**What it DOESN'T do**:
- Business logic
- Direct database access
- Complex calculations
**Example**:
```python
# app/api/v1/tickets.py
@router.post("/tickets/{ticket_id}/assign")
async def assign_ticket(
ticket_id: UUID,
assignment: TicketAssignmentCreate,
current_user: User = Depends(get_current_user),
ticket_service: TicketService = Depends(get_ticket_service)
):
"""Assign a ticket to a field agent."""
try:
result = await ticket_service.assign_ticket(
ticket_id=ticket_id,
user_id=assignment.user_id,
assigned_by=current_user.id
)
return {"success": True, "data": result}
except BusinessRuleViolation as e:
raise HTTPException(status_code=400, detail=str(e))
```
---
### **Service Layer** (`app/services/`)
**Responsibility**: Implement business logic, orchestrate operations, enforce business rules
**What it does**:
- Validate business rules
- Coordinate multiple repository calls
- Handle transactions
- Trigger side effects (notifications, webhooks)
- Calculate derived values (payroll, SLA deadlines)
**What it DOESN'T do**:
- Direct SQL queries
- HTTP request handling
- Data formatting for API responses
**Example**:
```python
# app/services/ticket_service.py
class TicketService:
def __init__(
self,
ticket_repo: TicketRepository,
assignment_repo: AssignmentRepository,
notification_service: NotificationService,
sla_service: SLAService
):
self.ticket_repo = ticket_repo
self.assignment_repo = assignment_repo
self.notification_service = notification_service
self.sla_service = sla_service
async def assign_ticket(self, ticket_id: UUID, user_id: UUID, assigned_by: UUID):
"""Assign ticket to user with business rule validation."""
# 1. Get ticket and validate
ticket = await self.ticket_repo.get_by_id(ticket_id)
if not ticket:
raise TicketNotFoundError(ticket_id)
if ticket.status != 'open':
raise BusinessRuleViolation("Can only assign open tickets")
# 2. Validate user can be assigned
active_assignments = await self.assignment_repo.count_active_assignments(user_id)
if active_assignments >= 3:
raise BusinessRuleViolation("User already has 3 active assignments")
# 3. Check user is in project team
if not await self._user_in_project_team(user_id, ticket.project_id):
raise BusinessRuleViolation("User not in project team")
# 4. Create assignment
assignment = await self.assignment_repo.create(
ticket_id=ticket_id,
user_id=user_id,
action='assigned',
assigned_at=datetime.utcnow()
)
# 5. Update ticket status
await self.ticket_repo.update_status(ticket_id, 'assigned')
# 6. Calculate SLA deadline
sla_deadline = await self.sla_service.calculate_deadline(ticket)
await self.ticket_repo.update_sla(ticket_id, sla_deadline)
# 7. Send notification
await self.notification_service.send_assignment_notification(user_id, ticket)
return assignment
```
---
### **Repository Layer** (`app/repositories/`)
**Responsibility**: Data access, database queries, ORM operations
**What it does**:
- CRUD operations
- Complex queries
- Filtering and pagination
- Soft delete handling
- Optimistic locking
**What it DOESN'T do**:
- Business logic
- Validation (beyond data integrity)
- Side effects (notifications, webhooks)
**Example**:
```python
# app/repositories/ticket_repository.py
class TicketRepository:
def __init__(self, db: Session):
self.db = db
async def get_by_id(self, ticket_id: UUID) -> Optional[Ticket]:
"""Get ticket by ID, excluding soft-deleted."""
return self.db.query(Ticket).filter(
Ticket.id == ticket_id,
Ticket.deleted_at.is_(None)
).first()
async def get_open_tickets(
self,
project_id: UUID,
limit: int = 50,
offset: int = 0
) -> List[Ticket]:
"""Get open tickets for a project with pagination."""
return self.db.query(Ticket).filter(
Ticket.project_id == project_id,
Ticket.status == 'open',
Ticket.deleted_at.is_(None)
).order_by(Ticket.created_at.desc()).limit(limit).offset(offset).all()
async def update_status(self, ticket_id: UUID, status: str) -> Ticket:
"""Update ticket status."""
ticket = await self.get_by_id(ticket_id)
ticket.status = status
ticket.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(ticket)
return ticket
```
---
## πŸ”„ Data Flow Examples
### **Example 1: Ticket Assignment Flow**
```
1. Frontend sends POST /api/v1/tickets/{id}/assign
↓
2. API Route (tickets.py)
- Validates JWT token
- Parses request body
- Calls TicketService.assign_ticket()
↓
3. TicketService
- Validates business rules (max 3 assignments)
- Calls TicketRepository.get_by_id()
- Calls AssignmentRepository.count_active_assignments()
- Calls AssignmentRepository.create()
- Calls TicketRepository.update_status()
- Calls SLAService.calculate_deadline()
- Calls NotificationService.send_notification()
↓
4. Repositories
- Execute SQL queries via SQLAlchemy
- Return data to service
↓
5. Service returns result to API route
↓
6. API route formats response and returns to frontend
```
---
### **Example 2: Payroll Generation Flow (Background Task)**
```
1. Celery Beat triggers weekly payroll task (Friday 6 PM)
↓
2. PayrollTask (tasks/payroll_tasks.py)
- Calls PayrollService.generate_weekly_payroll()
↓
3. PayrollService
- Gets all active projects
- For each project:
- Gets project team members
- For each member:
- Calls PayrollRepository.get_tickets_closed()
- Calls TimesheetRepository.get_hours_worked()
- Calculates earnings based on compensation type
- Calls PayrollRepository.create()
- Calls FinanceService.create_transaction()
↓
4. Repositories
- Execute queries and insert payroll records
↓
5. Service sends notifications to users
↓
6. Task completes and logs result
```
---
## πŸ” Security Architecture
### **1. Authentication Flow**
```
1. User logs in via Supabase Auth
↓
2. Supabase returns JWT token
↓
3. Frontend includes token in Authorization header
↓
4. FastAPI middleware validates token
↓
5. Extracts user_id from token
↓
6. Loads User from database
↓
7. Checks user role and permissions
↓
8. Allows/denies request
```
### **2. Row-Level Security (RLS)**
**Database Level** (Supabase RLS Policies):
```sql
-- Example: Users can only see tickets from their projects
CREATE POLICY "Users see own project tickets"
ON Tickets FOR SELECT
USING (
project_id IN (
SELECT project_id FROM ProjectTeam WHERE user_id = auth.uid()
)
);
```
**Application Level** (Service Layer):
```python
# Always filter by user's accessible projects
async def get_tickets(self, user: User):
project_ids = await self._get_user_project_ids(user.id)
return await self.ticket_repo.get_by_projects(project_ids)
```
### **3. Multi-Tenancy Isolation**
**Client Isolation**:
```python
# Every query scoped to user's client
async def get_customers(self, user: User):
if user.client_id:
return await self.customer_repo.get_by_client(user.client_id)
elif user.contractor_id:
# Contractor sees customers from their projects
project_ids = await self._get_contractor_projects(user.contractor_id)
return await self.customer_repo.get_by_projects(project_ids)
```
---
## πŸ“Š Caching Strategy
### **What to Cache**
1. **User Sessions** (Redis, TTL: 30 minutes)
- User profile
- User permissions
- User's active projects
2. **Dashboard Metrics** (Redis, TTL: 5 minutes)
- Ticket counts by status
- SLA compliance rates
- Agent workload
3. **Configuration** (Redis, TTL: 1 hour)
- System settings
- Feature flags
- SLA thresholds
4. **Location Data** (Redis, TTL: 1 minute)
- Agent current locations
- Real-time tracking data
### **Cache Invalidation**
```python
# Example: Invalidate cache on ticket status change
async def update_ticket_status(self, ticket_id: UUID, status: str):
ticket = await self.ticket_repo.update_status(ticket_id, status)
# Invalidate related caches
await cache.delete(f"ticket:{ticket_id}")
await cache.delete(f"project:{ticket.project_id}:tickets")
await cache.delete(f"dashboard:metrics:{ticket.project_id}")
return ticket
```
---
## πŸ”„ Background Tasks Architecture
### **Celery Task Types**
1. **Scheduled Tasks** (Celery Beat)
- Weekly payroll generation (Friday 6 PM)
- Daily SLA monitoring (every hour)
- Daily metrics computation (midnight)
- Invoice generation (end of month)
2. **Async Tasks** (Triggered by API)
- Send email notifications
- Send SMS notifications
- Process payment gateway callbacks
- Generate reports
3. **Retry Tasks** (Failed payment retries)
- Retry failed M-Pesa payments
- Retry failed SMS deliveries
### **Task Configuration**
```python
# app/tasks/celery_app.py
from celery import Celery
from celery.schedules import crontab
celery_app = Celery('swiftops')
celery_app.conf.beat_schedule = {
'generate-weekly-payroll': {
'task': 'app.tasks.payroll_tasks.generate_weekly_payroll',
'schedule': crontab(day_of_week=5, hour=18, minute=0), # Friday 6 PM
},
'monitor-sla-violations': {
'task': 'app.tasks.sla_tasks.monitor_sla_violations',
'schedule': crontab(minute=0), # Every hour
},
'compute-daily-metrics': {
'task': 'app.tasks.analytics_tasks.compute_daily_metrics',
'schedule': crontab(hour=0, minute=0), # Midnight
},
}
```
---
## πŸš€ Scalability Patterns
### **1. Horizontal Scaling**
**Stateless API Design**:
- No session state stored in API servers
- All state in database or Redis
- Can run multiple API instances behind load balancer
**Load Balancing**:
```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚Load Balancerβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
↓ ↓ ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ API #1 β”‚ β”‚ API #2 β”‚ β”‚ API #3 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Database β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```
### **2. Database Optimization**
**Read Replicas**:
- Use read replicas for reporting queries
- Master for writes, replicas for reads
**Connection Pooling**:
```python
# app/db/session.py
engine = create_engine(
DATABASE_URL,
pool_size=20, # Max connections in pool
max_overflow=10, # Additional connections if pool full
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600 # Recycle connections after 1 hour
)
```
**Query Optimization**:
- Use indexes effectively (already in schema)
- Avoid N+1 queries (use eager loading)
- Paginate large result sets
- Use database views for complex queries
### **3. Caching Strategy**
**Multi-Level Caching**:
```
Request β†’ API β†’ L1 Cache (In-Memory) β†’ L2 Cache (Redis) β†’ Database
```
**Cache-Aside Pattern**:
```python
async def get_ticket(self, ticket_id: UUID):
# Try cache first
cached = await cache.get(f"ticket:{ticket_id}")
if cached:
return cached
# Cache miss, query database
ticket = await self.ticket_repo.get_by_id(ticket_id)
# Store in cache
await cache.set(f"ticket:{ticket_id}", ticket, ttl=300)
return ticket
```
---
## πŸ§ͺ Testing Strategy
### **Test Pyramid**
```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ E2E β”‚ (Few, slow, expensive)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Integration β”‚ (Some, medium speed)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Unit Tests β”‚ (Many, fast, cheap)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```
### **Unit Tests** (70% of tests)
- Test individual functions
- Mock external dependencies
- Fast execution (< 1 second per test)
```python
# tests/unit/test_services/test_payroll_service.py
def test_calculate_flat_rate_payroll():
# Arrange
role = Mock(compensation_type='flat_rate', flat_rate_amount=5000)
# Act
earnings = payroll_service._calculate_earnings(role, tickets=[], hours=0)
# Assert
assert earnings == 5000
```
### **Integration Tests** (25% of tests)
- Test multiple components together
- Use test database
- Medium speed (1-5 seconds per test)
```python
# tests/integration/test_api/test_tickets.py
def test_assign_ticket_endpoint(client, test_db):
# Create test data
ticket = create_test_ticket(test_db)
user = create_test_user(test_db)
# Call API
response = client.post(
f"/api/v1/tickets/{ticket.id}/assign",
json={"user_id": str(user.id)}
)
# Assert
assert response.status_code == 200
assert test_db.query(TicketAssignment).count() == 1
```
### **E2E Tests** (5% of tests)
- Test complete user workflows
- Use real database (or close replica)
- Slow (10+ seconds per test)
```python
# tests/e2e/test_ticket_workflow.py
def test_complete_ticket_workflow(client, test_db):
# 1. Create sales order
# 2. Generate ticket from sales order
# 3. Assign ticket to agent
# 4. Agent accepts assignment
# 5. Agent arrives at site
# 6. Agent completes work
# 7. Subscription activated
# 8. Verify all state changes
```
---
## πŸ“ˆ Monitoring & Observability
### **Logging Strategy**
**Log Levels**:
- **DEBUG**: Detailed information for debugging
- **INFO**: General informational messages
- **WARNING**: Warning messages (non-critical issues)
- **ERROR**: Error messages (handled exceptions)
- **CRITICAL**: Critical errors (system failures)
**Structured Logging**:
```python
import structlog
logger = structlog.get_logger()
logger.info(
"ticket_assigned",
ticket_id=str(ticket_id),
user_id=str(user_id),
project_id=str(project_id),
assigned_by=str(assigned_by)
)
```
### **Metrics to Track**
1. **API Metrics**
- Request rate (requests/second)
- Response time (p50, p95, p99)
- Error rate (4xx, 5xx)
2. **Business Metrics**
- Tickets created/assigned/completed per day
- Average ticket completion time
- SLA compliance rate
- Payroll processing time
3. **System Metrics**
- Database connection pool usage
- Cache hit rate
- Celery queue length
- Memory/CPU usage
### **Error Tracking**
Use Sentry for error tracking:
```python
import sentry_sdk
sentry_sdk.init(
dsn=SENTRY_DSN,
environment=ENVIRONMENT,
traces_sample_rate=0.1
)
```
---
## πŸ”§ Development Workflow
### **Local Development**
1. Start services:
```bash
docker-compose up -d postgres redis
```
2. Run migrations:
```bash
alembic upgrade head
```
3. Start API:
```bash
uvicorn app.main:app --reload
```
4. Start Celery:
```bash
celery -A app.tasks.celery_app worker --loglevel=info
```
### **Code Quality**
**Pre-commit Hooks**:
- Black (code formatting)
- isort (import sorting)
- flake8 (linting)
- mypy (type checking)
**CI/CD Pipeline**:
1. Run tests
2. Check code coverage (> 80%)
3. Run linters
4. Build Docker image
5. Deploy to staging
6. Run E2E tests
7. Deploy to production
---
## πŸ“š Additional Resources
- [FastAPI Documentation](https://fastapi.tiangolo.com/)
- [SQLAlchemy Documentation](https://docs.sqlalchemy.org/)
- [Celery Documentation](https://docs.celeryproject.org/)
- [Supabase Documentation](https://supabase.com/docs)
- [Clean Architecture](https://blog.cleancoder.com/uncle-bob/2012/08/13/the-clean-architecture.html)