OffGridSchedula / server /threads.py
ParetoOptimal's picture
Initial Commit
0366d65
Raw
History Blame Contribute Delete
1.97 kB
"""Assemble a conversation thread from individual messages.
Used by both the ``/agent`` endpoint (join a posted ``messages[]`` into a thread)
and autonomous mode (build a per-chat rolling window from the ingest feed). Pure —
no Gradio / llama / network — so it's trivially unit-testable in stub mode.
"""
from __future__ import annotations
import os
from dateutil import parser as dtparser
def format_thread(messages: list[dict]) -> str:
"""Render messages as ``"sender: text"`` lines, skipping empty bodies."""
lines = []
for m in messages:
text = (m.get("text") or "").strip()
if not text:
continue
sender = (m.get("sender") or "?").strip()
lines.append(f"{sender}: {text}")
return "\n".join(lines)
def _ts(value) -> float | None:
try:
return dtparser.parse(str(value)).timestamp()
except (ValueError, TypeError, OverflowError):
return None
def rolling_thread(
feed: list[dict],
chat: str,
window: int | None = None,
minutes: int | None = None,
) -> str:
"""Build a thread from the most recent messages of one chat in the feed.
Keeps the last ``window`` messages for ``chat`` that fall within ``minutes`` of
the newest one (env-tunable via AUTO_THREAD_WINDOW / AUTO_THREAD_MINUTES).
"""
window = window or int(os.environ.get("AUTO_THREAD_WINDOW", "20"))
minutes = minutes or int(os.environ.get("AUTO_THREAD_MINUTES", "720"))
msgs = [m for m in feed if (m.get("chat") or "") == chat]
if not msgs:
return ""
msgs = msgs[-window:]
# Drop messages older than `minutes` before the newest (when timestamps parse).
stamps = [(_ts(m.get("timestamp")), m) for m in msgs]
newest = max((s for s, _ in stamps if s is not None), default=None)
if newest is not None:
cutoff = newest - minutes * 60
msgs = [m for s, m in stamps if s is None or s >= cutoff]
return format_thread(msgs)