Spaces:
Runtime error
Runtime error
File size: 1,745 Bytes
6391dfd 0979938 f09390e 6391dfd d197a66 6391dfd f09390e 6391dfd d197a66 6391dfd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | """
Data models for sync operations.
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
# Entity type constants — single source of truth for queue names (all plural)
ENTITY_TYPE_CATALOGUE = "catalogues"
ENTITY_TYPE_MERCHANT = "merchants"
ENTITY_TYPE_EMPLOYEE = "employees"
ENTITY_TYPE_WAREHOUSE = "warehouses"
ENTITY_TYPE_UOM = "uom"
VALID_ENTITY_TYPES = {
ENTITY_TYPE_CATALOGUE,
ENTITY_TYPE_MERCHANT,
ENTITY_TYPE_EMPLOYEE,
ENTITY_TYPE_WAREHOUSE,
ENTITY_TYPE_UOM,
}
@dataclass
class SyncOperation:
"""
Model for tracking sync operations.
Attributes:
entity_type: Type of entity being synced ("merchant" | "catalogue" | "employee" | "warehouse")
entity_id: Unique identifier of the entity
operation: Type of operation ("create" | "update" | "activate" | "deactivate" | "delete")
timestamp: When the sync operation was created
retry_count: Number of retry attempts made
last_error: Last error message if sync failed
"""
entity_type: str
entity_id: str
operation: str
timestamp: datetime = field(default_factory=datetime.utcnow)
retry_count: int = 0
last_error: Optional[str] = None
def __post_init__(self):
"""Validate entity_type and operation values"""
if self.entity_type not in VALID_ENTITY_TYPES:
raise ValueError(f"entity_type must be one of {VALID_ENTITY_TYPES}, got {self.entity_type}")
valid_operations = {"create", "update", "activate", "deactivate", "delete"}
if self.operation not in valid_operations:
raise ValueError(f"operation must be one of {valid_operations}, got {self.operation}")
|