| import json |
| from dataclasses import dataclass, asdict |
| from datetime import datetime, timezone |
| from typing import Any |
|
|
| @dataclass |
| class CloudEvent: |
| id: str |
| type: str |
| source: str |
| time: str |
| data: Any |
|
|
| @staticmethod |
| def wrap(obj: Any, *, event_type: str, source: str, id: str) -> bytes: |
| evt = CloudEvent( |
| id=id, |
| type=event_type or (obj.__class__.__name__ if obj is not None else "NullOrEmpty"), |
| source=source, |
| time=datetime.now(timezone.utc).isoformat(), |
| data=obj, |
| ) |
| return json.dumps(asdict(evt), ensure_ascii=False).encode("utf-8") |
|
|