"""Assemble a conversation thread from individual messages. Used by both the ``/agent`` endpoint (join a posted ``messages[]`` into a thread) and autonomous mode (build a per-chat rolling window from the ingest feed). Pure — no Gradio / llama / network — so it's trivially unit-testable in stub mode. """ from __future__ import annotations import os from dateutil import parser as dtparser def format_thread(messages: list[dict]) -> str: """Render messages as ``"sender: text"`` lines, skipping empty bodies.""" lines = [] for m in messages: text = (m.get("text") or "").strip() if not text: continue sender = (m.get("sender") or "?").strip() lines.append(f"{sender}: {text}") return "\n".join(lines) def _ts(value) -> float | None: try: return dtparser.parse(str(value)).timestamp() except (ValueError, TypeError, OverflowError): return None def rolling_thread( feed: list[dict], chat: str, window: int | None = None, minutes: int | None = None, ) -> str: """Build a thread from the most recent messages of one chat in the feed. Keeps the last ``window`` messages for ``chat`` that fall within ``minutes`` of the newest one (env-tunable via AUTO_THREAD_WINDOW / AUTO_THREAD_MINUTES). """ window = window or int(os.environ.get("AUTO_THREAD_WINDOW", "20")) minutes = minutes or int(os.environ.get("AUTO_THREAD_MINUTES", "720")) msgs = [m for m in feed if (m.get("chat") or "") == chat] if not msgs: return "" msgs = msgs[-window:] # Drop messages older than `minutes` before the newest (when timestamps parse). stamps = [(_ts(m.get("timestamp")), m) for m in msgs] newest = max((s for s, _ in stamps if s is not None), default=None) if newest is not None: cutoff = newest - minutes * 60 msgs = [m for s, m in stamps if s is None or s >= cutoff] return format_thread(msgs)