Spaces:
Configuration error
Configuration error
File size: 1,337 Bytes
3243379 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
import json
import uuid
class Message(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
queue: str
payload: Dict[str, Any]
created_at: datetime = Field(default_factory=datetime.utcnow)
retry_count: int = Field(default=0)
max_retries: int = Field(default=3)
next_retry_at: Optional[datetime] = Field(default=None)
error: Optional[str] = Field(default=None)
def to_json(self) -> str:
return json.dumps({
"id": self.id,
"queue": self.queue,
"payload": self.payload,
"created_at": self.created_at.isoformat(),
"retry_count": self.retry_count,
"max_retries": self.max_retries,
"next_retry_at": self.next_retry_at.isoformat() if self.next_retry_at else None,
"error": self.error
})
@classmethod
def from_json(cls, data: str) -> "Message":
json_data = json.loads(data)
json_data["created_at"] = datetime.fromisoformat(json_data["created_at"])
if json_data["next_retry_at"]:
json_data["next_retry_at"] = datetime.fromisoformat(json_data["next_retry_at"])
return cls(**json_data)
|