Prabha-AIMLOPS's picture
initial commit
3243379 verified
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
import json
import uuid
class Message(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
queue: str
payload: Dict[str, Any]
created_at: datetime = Field(default_factory=datetime.utcnow)
retry_count: int = Field(default=0)
max_retries: int = Field(default=3)
next_retry_at: Optional[datetime] = Field(default=None)
error: Optional[str] = Field(default=None)
def to_json(self) -> str:
return json.dumps({
"id": self.id,
"queue": self.queue,
"payload": self.payload,
"created_at": self.created_at.isoformat(),
"retry_count": self.retry_count,
"max_retries": self.max_retries,
"next_retry_at": self.next_retry_at.isoformat() if self.next_retry_at else None,
"error": self.error
})
@classmethod
def from_json(cls, data: str) -> "Message":
json_data = json.loads(data)
json_data["created_at"] = datetime.fromisoformat(json_data["created_at"])
if json_data["next_retry_at"]:
json_data["next_retry_at"] = datetime.fromisoformat(json_data["next_retry_at"])
return cls(**json_data)