Spaces:
Configuration error
Configuration error
| from datetime import datetime | |
| from typing import Any, Dict, Optional | |
| from pydantic import BaseModel, Field | |
| import json | |
| import uuid | |
| class Message(BaseModel): | |
| id: str = Field(default_factory=lambda: str(uuid.uuid4())) | |
| queue: str | |
| payload: Dict[str, Any] | |
| created_at: datetime = Field(default_factory=datetime.utcnow) | |
| retry_count: int = Field(default=0) | |
| max_retries: int = Field(default=3) | |
| next_retry_at: Optional[datetime] = Field(default=None) | |
| error: Optional[str] = Field(default=None) | |
| def to_json(self) -> str: | |
| return json.dumps({ | |
| "id": self.id, | |
| "queue": self.queue, | |
| "payload": self.payload, | |
| "created_at": self.created_at.isoformat(), | |
| "retry_count": self.retry_count, | |
| "max_retries": self.max_retries, | |
| "next_retry_at": self.next_retry_at.isoformat() if self.next_retry_at else None, | |
| "error": self.error | |
| }) | |
| def from_json(cls, data: str) -> "Message": | |
| json_data = json.loads(data) | |
| json_data["created_at"] = datetime.fromisoformat(json_data["created_at"]) | |
| if json_data["next_retry_at"]: | |
| json_data["next_retry_at"] = datetime.fromisoformat(json_data["next_retry_at"]) | |
| return cls(**json_data) | |