| from __future__ import annotations |
|
|
| from datetime import timedelta |
|
|
| from app.web import local_now |
|
|
|
|
| HEARTBEAT_INTERVAL_SECONDS = 5 |
| ONLINE_WINDOW_SECONDS = 15 |
| PRESENCE_WRITE_INTERVAL_SECONDS = HEARTBEAT_INTERVAL_SECONDS |
| PRESENCE_OVERVIEW_PULL_SECONDS = 5 |
|
|
|
|
| def online_cutoff(reference_time=None): |
| now = reference_time or local_now() |
| return now - timedelta(seconds=ONLINE_WINDOW_SECONDS) |
|
|
|
|
| def is_online(last_seen_at, reference_time=None) -> bool: |
| if not last_seen_at: |
| return False |
| return last_seen_at >= online_cutoff(reference_time) |
|
|
|
|
| def unix_seconds(value) -> int | None: |
| if not value: |
| return None |
| return int(value.timestamp()) |
|
|
|
|
| def should_write_presence(session: dict, reference_time=None, force: bool = False) -> bool: |
| now = reference_time or local_now() |
| now_seconds = int(now.timestamp()) |
| if force: |
| session["presence_last_written_at"] = now_seconds |
| return True |
|
|
| last_written = int(session.get("presence_last_written_at", 0) or 0) |
| if now_seconds - last_written < PRESENCE_WRITE_INTERVAL_SECONDS: |
| return False |
|
|
| session["presence_last_written_at"] = now_seconds |
| return True
|
|
|
|
|