Spaces:
Runtime error
Runtime error
| """ | |
| Data models for sync operations. | |
| """ | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from typing import Optional | |
| # Entity type constants — single source of truth for queue names (all plural) | |
| ENTITY_TYPE_CATALOGUE = "catalogues" | |
| ENTITY_TYPE_MERCHANT = "merchants" | |
| ENTITY_TYPE_EMPLOYEE = "employees" | |
| ENTITY_TYPE_WAREHOUSE = "warehouses" | |
| ENTITY_TYPE_UOM = "uom" | |
| VALID_ENTITY_TYPES = { | |
| ENTITY_TYPE_CATALOGUE, | |
| ENTITY_TYPE_MERCHANT, | |
| ENTITY_TYPE_EMPLOYEE, | |
| ENTITY_TYPE_WAREHOUSE, | |
| ENTITY_TYPE_UOM, | |
| } | |
| class SyncOperation: | |
| """ | |
| Model for tracking sync operations. | |
| Attributes: | |
| entity_type: Type of entity being synced ("merchant" | "catalogue" | "employee" | "warehouse") | |
| entity_id: Unique identifier of the entity | |
| operation: Type of operation ("create" | "update" | "activate" | "deactivate" | "delete") | |
| timestamp: When the sync operation was created | |
| retry_count: Number of retry attempts made | |
| last_error: Last error message if sync failed | |
| """ | |
| entity_type: str | |
| entity_id: str | |
| operation: str | |
| timestamp: datetime = field(default_factory=datetime.utcnow) | |
| retry_count: int = 0 | |
| last_error: Optional[str] = None | |
| def __post_init__(self): | |
| """Validate entity_type and operation values""" | |
| if self.entity_type not in VALID_ENTITY_TYPES: | |
| raise ValueError(f"entity_type must be one of {VALID_ENTITY_TYPES}, got {self.entity_type}") | |
| valid_operations = {"create", "update", "activate", "deactivate", "delete"} | |
| if self.operation not in valid_operations: | |
| raise ValueError(f"operation must be one of {valid_operations}, got {self.operation}") | |