|
|
|
|
|
|
|
|
"""
|
|
|
Helpers for encoding/decoding server_message_data values.
|
|
|
|
|
|
These are Base64URL-encoded proto3 messages with shape:
|
|
|
- field 1: string UUID (36 chars)
|
|
|
- field 3: google.protobuf.Timestamp (1=seconds, 2=nanos)
|
|
|
|
|
|
Supports UUID_ONLY, TIMESTAMP_ONLY, and UUID_AND_TIMESTAMP.
|
|
|
"""
|
|
|
from typing import Dict, Optional, Tuple
|
|
|
import base64
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
try:
|
|
|
from zoneinfo import ZoneInfo
|
|
|
except Exception:
|
|
|
ZoneInfo = None
|
|
|
|
|
|
|
|
|
def _b64url_decode_padded(s: str) -> bytes:
|
|
|
t = s.replace("-", "+").replace("_", "/")
|
|
|
pad = (-len(t)) % 4
|
|
|
if pad:
|
|
|
t += "=" * pad
|
|
|
return base64.b64decode(t)
|
|
|
|
|
|
|
|
|
def _b64url_encode_nopad(b: bytes) -> str:
|
|
|
return base64.urlsafe_b64encode(b).decode("ascii").rstrip("=")
|
|
|
|
|
|
|
|
|
def _read_varint(buf: bytes, i: int) -> Tuple[int, int]:
|
|
|
shift = 0
|
|
|
val = 0
|
|
|
while i < len(buf):
|
|
|
b = buf[i]
|
|
|
i += 1
|
|
|
val |= (b & 0x7F) << shift
|
|
|
if not (b & 0x80):
|
|
|
return val, i
|
|
|
shift += 7
|
|
|
if shift > 63:
|
|
|
break
|
|
|
raise ValueError("invalid varint")
|
|
|
|
|
|
|
|
|
def _write_varint(v: int) -> bytes:
|
|
|
out = bytearray()
|
|
|
vv = int(v)
|
|
|
while True:
|
|
|
to_write = vv & 0x7F
|
|
|
vv >>= 7
|
|
|
if vv:
|
|
|
out.append(to_write | 0x80)
|
|
|
else:
|
|
|
out.append(to_write)
|
|
|
break
|
|
|
return bytes(out)
|
|
|
|
|
|
|
|
|
def _make_key(field_no: int, wire_type: int) -> bytes:
|
|
|
return _write_varint((field_no << 3) | wire_type)
|
|
|
|
|
|
|
|
|
def _decode_timestamp(buf: bytes) -> Tuple[Optional[int], Optional[int]]:
|
|
|
i = 0
|
|
|
seconds: Optional[int] = None
|
|
|
nanos: Optional[int] = None
|
|
|
while i < len(buf):
|
|
|
key, i = _read_varint(buf, i)
|
|
|
field_no = key >> 3
|
|
|
wt = key & 0x07
|
|
|
if wt == 0:
|
|
|
val, i = _read_varint(buf, i)
|
|
|
if field_no == 1:
|
|
|
seconds = int(val)
|
|
|
elif field_no == 2:
|
|
|
nanos = int(val)
|
|
|
elif wt == 2:
|
|
|
ln, i2 = _read_varint(buf, i)
|
|
|
i = i2 + ln
|
|
|
elif wt == 1:
|
|
|
i += 8
|
|
|
elif wt == 5:
|
|
|
i += 4
|
|
|
else:
|
|
|
break
|
|
|
return seconds, nanos
|
|
|
|
|
|
|
|
|
def _encode_timestamp(seconds: Optional[int], nanos: Optional[int]) -> bytes:
|
|
|
parts = bytearray()
|
|
|
if seconds is not None:
|
|
|
parts += _make_key(1, 0)
|
|
|
parts += _write_varint(int(seconds))
|
|
|
if nanos is not None:
|
|
|
parts += _make_key(2, 0)
|
|
|
parts += _write_varint(int(nanos))
|
|
|
return bytes(parts)
|
|
|
|
|
|
|
|
|
def decode_server_message_data(b64url: str) -> Dict:
|
|
|
try:
|
|
|
raw = _b64url_decode_padded(b64url)
|
|
|
except Exception as e:
|
|
|
return {"error": f"base64url decode failed: {e}"}
|
|
|
|
|
|
i = 0
|
|
|
uuid: Optional[str] = None
|
|
|
seconds: Optional[int] = None
|
|
|
nanos: Optional[int] = None
|
|
|
|
|
|
while i < len(raw):
|
|
|
key, i = _read_varint(raw, i)
|
|
|
field_no = key >> 3
|
|
|
wt = key & 0x07
|
|
|
if wt == 2:
|
|
|
ln, i2 = _read_varint(raw, i)
|
|
|
i = i2
|
|
|
data = raw[i:i+ln]
|
|
|
i += ln
|
|
|
if field_no == 1:
|
|
|
try:
|
|
|
uuid = data.decode("utf-8")
|
|
|
except Exception:
|
|
|
uuid = None
|
|
|
elif field_no == 3:
|
|
|
s, n = _decode_timestamp(data)
|
|
|
if s is not None:
|
|
|
seconds = s
|
|
|
if n is not None:
|
|
|
nanos = n
|
|
|
elif wt == 0:
|
|
|
_, i = _read_varint(raw, i)
|
|
|
elif wt == 1:
|
|
|
i += 8
|
|
|
elif wt == 5:
|
|
|
i += 4
|
|
|
else:
|
|
|
break
|
|
|
|
|
|
iso_utc: Optional[str] = None
|
|
|
iso_ny: Optional[str] = None
|
|
|
if seconds is not None:
|
|
|
micros = int((nanos or 0) / 1000)
|
|
|
dt = datetime.fromtimestamp(int(seconds), tz=timezone.utc).replace(microsecond=micros)
|
|
|
iso_utc = dt.isoformat().replace("+00:00", "Z")
|
|
|
if ZoneInfo is not None:
|
|
|
try:
|
|
|
iso_ny = dt.astimezone(ZoneInfo("America/New_York")).isoformat()
|
|
|
except Exception:
|
|
|
iso_ny = None
|
|
|
|
|
|
if uuid and (seconds is not None or nanos is not None):
|
|
|
t = "UUID_AND_TIMESTAMP"
|
|
|
elif uuid:
|
|
|
t = "UUID_ONLY"
|
|
|
elif seconds is not None or nanos is not None:
|
|
|
t = "TIMESTAMP_ONLY"
|
|
|
else:
|
|
|
t = "UNKNOWN"
|
|
|
|
|
|
return {
|
|
|
"uuid": uuid,
|
|
|
"seconds": seconds,
|
|
|
"nanos": nanos,
|
|
|
"iso_utc": iso_utc,
|
|
|
"iso_ny": iso_ny,
|
|
|
"type": t,
|
|
|
}
|
|
|
|
|
|
|
|
|
def encode_server_message_data(uuid: Optional[str] = None,
|
|
|
seconds: Optional[int] = None,
|
|
|
nanos: Optional[int] = None) -> str:
|
|
|
parts = bytearray()
|
|
|
if uuid:
|
|
|
b = uuid.encode("utf-8")
|
|
|
parts += _make_key(1, 2)
|
|
|
parts += _write_varint(len(b))
|
|
|
parts += b
|
|
|
if seconds is not None or nanos is not None:
|
|
|
ts = _encode_timestamp(seconds, nanos)
|
|
|
parts += _make_key(3, 2)
|
|
|
parts += _write_varint(len(ts))
|
|
|
parts += ts
|
|
|
return _b64url_encode_nopad(bytes(parts)) |