cam / app /services /presence.py
cacode's picture
Upload 67 files
3d6b7f2 verified
from __future__ import annotations
from datetime import timedelta
from app.web import local_now
HEARTBEAT_INTERVAL_SECONDS = 5
ONLINE_WINDOW_SECONDS = 15
PRESENCE_WRITE_INTERVAL_SECONDS = HEARTBEAT_INTERVAL_SECONDS
PRESENCE_OVERVIEW_PULL_SECONDS = 5
def online_cutoff(reference_time=None):
now = reference_time or local_now()
return now - timedelta(seconds=ONLINE_WINDOW_SECONDS)
def is_online(last_seen_at, reference_time=None) -> bool:
if not last_seen_at:
return False
return last_seen_at >= online_cutoff(reference_time)
def unix_seconds(value) -> int | None:
if not value:
return None
return int(value.timestamp())
def should_write_presence(session: dict, reference_time=None, force: bool = False) -> bool:
now = reference_time or local_now()
now_seconds = int(now.timestamp())
if force:
session["presence_last_written_at"] = now_seconds
return True
last_written = int(session.get("presence_last_written_at", 0) or 0)
if now_seconds - last_written < PRESENCE_WRITE_INTERVAL_SECONDS:
return False
session["presence_last_written_at"] = now_seconds
return True