Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Helpers for encoding/decoding server_message_data values. | |
| These are Base64URL-encoded proto3 messages with shape: | |
| - field 1: string UUID (36 chars) | |
| - field 3: google.protobuf.Timestamp (1=seconds, 2=nanos) | |
| Supports UUID_ONLY, TIMESTAMP_ONLY, and UUID_AND_TIMESTAMP. | |
| """ | |
| from typing import Dict, Optional, Tuple | |
| import base64 | |
| from datetime import datetime, timezone | |
| try: | |
| from zoneinfo import ZoneInfo # Python 3.9+ | |
| except Exception: | |
| ZoneInfo = None # type: ignore | |
| def _b64url_decode_padded(s: str) -> bytes: | |
| t = s.replace("-", "+").replace("_", "/") | |
| pad = (-len(t)) % 4 | |
| if pad: | |
| t += "=" * pad | |
| return base64.b64decode(t) | |
| def _b64url_encode_nopad(b: bytes) -> str: | |
| return base64.urlsafe_b64encode(b).decode("ascii").rstrip("=") | |
| def _read_varint(buf: bytes, i: int) -> Tuple[int, int]: | |
| shift = 0 | |
| val = 0 | |
| while i < len(buf): | |
| b = buf[i] | |
| i += 1 | |
| val |= (b & 0x7F) << shift | |
| if not (b & 0x80): | |
| return val, i | |
| shift += 7 | |
| if shift > 63: | |
| break | |
| raise ValueError("invalid varint") | |
| def _write_varint(v: int) -> bytes: | |
| out = bytearray() | |
| vv = int(v) | |
| while True: | |
| to_write = vv & 0x7F | |
| vv >>= 7 | |
| if vv: | |
| out.append(to_write | 0x80) | |
| else: | |
| out.append(to_write) | |
| break | |
| return bytes(out) | |
| def _make_key(field_no: int, wire_type: int) -> bytes: | |
| return _write_varint((field_no << 3) | wire_type) | |
| def _decode_timestamp(buf: bytes) -> Tuple[Optional[int], Optional[int]]: | |
| i = 0 | |
| seconds: Optional[int] = None | |
| nanos: Optional[int] = None | |
| while i < len(buf): | |
| key, i = _read_varint(buf, i) | |
| field_no = key >> 3 | |
| wt = key & 0x07 | |
| if wt == 0: | |
| val, i = _read_varint(buf, i) | |
| if field_no == 1: | |
| seconds = int(val) | |
| elif field_no == 2: | |
| nanos = int(val) | |
| elif wt == 2: | |
| ln, i2 = _read_varint(buf, i) | |
| i = i2 + ln | |
| elif wt == 1: | |
| i += 8 | |
| elif wt == 5: | |
| i += 4 | |
| else: | |
| break | |
| return seconds, nanos | |
| def _encode_timestamp(seconds: Optional[int], nanos: Optional[int]) -> bytes: | |
| parts = bytearray() | |
| if seconds is not None: | |
| parts += _make_key(1, 0) | |
| parts += _write_varint(int(seconds)) | |
| if nanos is not None: | |
| parts += _make_key(2, 0) | |
| parts += _write_varint(int(nanos)) | |
| return bytes(parts) | |
| def decode_server_message_data(b64url: str) -> Dict: | |
| try: | |
| raw = _b64url_decode_padded(b64url) | |
| except Exception as e: | |
| return {"error": f"base64url decode failed: {e}"} | |
| i = 0 | |
| uuid: Optional[str] = None | |
| seconds: Optional[int] = None | |
| nanos: Optional[int] = None | |
| while i < len(raw): | |
| key, i = _read_varint(raw, i) | |
| field_no = key >> 3 | |
| wt = key & 0x07 | |
| if wt == 2: | |
| ln, i2 = _read_varint(raw, i) | |
| i = i2 | |
| data = raw[i:i+ln] | |
| i += ln | |
| if field_no == 1: | |
| try: | |
| uuid = data.decode("utf-8") | |
| except Exception: | |
| uuid = None | |
| elif field_no == 3: | |
| s, n = _decode_timestamp(data) | |
| if s is not None: | |
| seconds = s | |
| if n is not None: | |
| nanos = n | |
| elif wt == 0: | |
| _, i = _read_varint(raw, i) | |
| elif wt == 1: | |
| i += 8 | |
| elif wt == 5: | |
| i += 4 | |
| else: | |
| break | |
| iso_utc: Optional[str] = None | |
| iso_ny: Optional[str] = None | |
| if seconds is not None: | |
| micros = int((nanos or 0) / 1000) | |
| dt = datetime.fromtimestamp(int(seconds), tz=timezone.utc).replace(microsecond=micros) | |
| iso_utc = dt.isoformat().replace("+00:00", "Z") | |
| if ZoneInfo is not None: | |
| try: | |
| iso_ny = dt.astimezone(ZoneInfo("America/New_York")).isoformat() | |
| except Exception: | |
| iso_ny = None | |
| if uuid and (seconds is not None or nanos is not None): | |
| t = "UUID_AND_TIMESTAMP" | |
| elif uuid: | |
| t = "UUID_ONLY" | |
| elif seconds is not None or nanos is not None: | |
| t = "TIMESTAMP_ONLY" | |
| else: | |
| t = "UNKNOWN" | |
| return { | |
| "uuid": uuid, | |
| "seconds": seconds, | |
| "nanos": nanos, | |
| "iso_utc": iso_utc, | |
| "iso_ny": iso_ny, | |
| "type": t, | |
| } | |
| def encode_server_message_data(uuid: Optional[str] = None, | |
| seconds: Optional[int] = None, | |
| nanos: Optional[int] = None) -> str: | |
| parts = bytearray() | |
| if uuid: | |
| b = uuid.encode("utf-8") | |
| parts += _make_key(1, 2) | |
| parts += _write_varint(len(b)) | |
| parts += b | |
| if seconds is not None or nanos is not None: | |
| ts = _encode_timestamp(seconds, nanos) | |
| parts += _make_key(3, 2) | |
| parts += _write_varint(len(ts)) | |
| parts += ts | |
| return _b64url_encode_nopad(bytes(parts)) |