Guilherme34's picture
Upload folder using huggingface_hub
aa15bce verified
from __future__ import annotations
import re
import threading
from html import escape, unescape
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Protocol, Tuple
from ...config import get_settings
from ...logging_config import logger
from ...models import ChatMessage
from ...utils.timezones import now_in_user_timezone
from typing import TYPE_CHECKING
if TYPE_CHECKING: # pragma: no cover - used for type checkers only
from .summarization import WorkingMemoryLog
_DATA_DIR = Path(__file__).resolve().parent.parent.parent / "data"
_CONVERSATION_LOG_PATH = _DATA_DIR / "conversation" / "poke_conversation.log"
class TranscriptFormatter(Protocol):
def __call__(self, tag: str, timestamp: str, payload: str) -> str: # pragma: no cover - typing protocol
...
def _encode_payload(payload: str) -> str:
normalized = payload.replace("\r\n", "\n").replace("\r", "\n")
collapsed = normalized.replace("\n", "\\n")
return escape(collapsed, quote=False)
def _decode_payload(payload: str) -> str:
return unescape(payload).replace("\\n", "\n")
def _default_formatter(tag: str, timestamp: str, payload: str) -> str:
encoded = _encode_payload(payload)
return f"<{tag} timestamp=\"{timestamp}\">{encoded}</{tag}>\n"
def _resolve_working_memory_log() -> "WorkingMemoryLog":
from .summarization import get_working_memory_log
return get_working_memory_log()
_ATTR_PATTERN = re.compile(r"(\w+)\s*=\s*\"([^\"]*)\"")
class ConversationLog:
"""Append-only conversation log persisted to disk for the interaction agent."""
def __init__(self, path: Path, formatter: TranscriptFormatter = _default_formatter):
self._path = path
self._formatter = formatter
self._lock = threading.Lock()
self._ensure_directory()
self._working_memory_log = _resolve_working_memory_log()
def _ensure_directory(self) -> None:
try:
self._path.parent.mkdir(parents=True, exist_ok=True)
except Exception as exc: # pragma: no cover - defensive
logger.warning("conversation log directory creation failed", extra={"error": str(exc)})
def _append(self, tag: str, payload: str) -> str:
timestamp = now_in_user_timezone("%Y-%m-%d %H:%M:%S")
entry = self._formatter(tag, timestamp, str(payload))
with self._lock:
try:
with self._path.open("a", encoding="utf-8") as handle:
handle.write(entry)
except Exception as exc: # pragma: no cover - defensive
logger.error(
"conversation log append failed",
extra={"error": str(exc), "tag": tag, "path": str(self._path)},
)
raise
self._notify_summarization()
return timestamp
def _parse_line(self, line: str) -> Optional[Tuple[str, str, str]]:
stripped = line.strip()
if not stripped.startswith("<") or "</" not in stripped:
return None
open_end = stripped.find(">")
if open_end == -1:
return None
open_tag_content = stripped[1:open_end]
if " " in open_tag_content:
tag, attr_string = open_tag_content.split(" ", 1)
else:
tag, attr_string = open_tag_content, ""
close_start = stripped.rfind("</")
close_end = stripped.rfind(">")
if close_start == -1 or close_end == -1:
return None
closing_tag = stripped[close_start + 2 : close_end]
if closing_tag != tag:
return None
payload = stripped[open_end + 1 : close_start]
attributes: Dict[str, str] = {
match.group(1): match.group(2) for match in _ATTR_PATTERN.finditer(attr_string)
}
timestamp = attributes.get("timestamp", "")
return tag, timestamp, _decode_payload(payload)
def iter_entries(self) -> Iterator[Tuple[str, str, str]]:
with self._lock:
try:
lines = self._path.read_text(encoding="utf-8").splitlines()
except FileNotFoundError:
lines = []
except Exception as exc: # pragma: no cover - defensive
logger.error(
"conversation log read failed", extra={"error": str(exc), "path": str(self._path)}
)
raise
for line in lines:
item = self._parse_line(line)
if item is not None:
yield item
def load_transcript(self) -> str:
parts: List[str] = []
for tag, timestamp, payload in self.iter_entries():
safe_payload = escape(payload, quote=False)
if timestamp:
parts.append(f"<{tag} timestamp=\"{timestamp}\">{safe_payload}</{tag}>")
else:
parts.append(f"<{tag}>{safe_payload}</{tag}>")
return "\n".join(parts)
def record_user_message(self, content: str) -> None:
timestamp = self._append("user_message", content)
self._working_memory_log.append_entry("user_message", content, timestamp)
def record_agent_message(self, content: str) -> None:
timestamp = self._append("agent_message", content)
self._working_memory_log.append_entry("agent_message", content, timestamp)
def record_reply(self, content: str) -> None:
timestamp = self._append("poke_reply", content)
self._working_memory_log.append_entry("poke_reply", content, timestamp)
def record_wait(self, reason: str) -> None:
"""Record a wait marker that should not reach the user-facing chat history."""
timestamp = self._append("wait", reason)
self._working_memory_log.append_entry("wait", reason, timestamp)
def _notify_summarization(self) -> None:
settings = get_settings()
if not settings.summarization_enabled:
return
try:
from .summarization import schedule_summarization # type: ignore import-not-found
except Exception as exc: # pragma: no cover - defensive
logger.debug(
"summarization scheduler unavailable",
extra={"error": str(exc)},
)
return
try:
schedule_summarization()
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"failed to schedule summarization",
extra={"error": str(exc)},
)
def to_chat_messages(self) -> List[ChatMessage]:
messages: List[ChatMessage] = []
for tag, timestamp, payload in self.iter_entries():
normalized_timestamp = timestamp or None
if tag == "user_message":
messages.append(
ChatMessage(role="user", content=payload, timestamp=normalized_timestamp)
)
elif tag == "poke_reply":
messages.append(
ChatMessage(
role="assistant", content=payload, timestamp=normalized_timestamp
)
)
elif tag == "wait":
# Wait markers are orchestration metadata and must not surface to the user
continue
return messages
def clear(self) -> None:
with self._lock:
try:
if self._path.exists():
self._path.unlink()
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"conversation log clear failed", extra={"error": str(exc), "path": str(self._path)}
)
finally:
self._ensure_directory()
try:
self._working_memory_log.clear()
except Exception as exc: # pragma: no cover - defensive
logger.debug(
"working memory clear skipped",
extra={"error": str(exc)},
)
_conversation_log = ConversationLog(_CONVERSATION_LOG_PATH)
def get_conversation_log() -> ConversationLog:
return _conversation_log
__all__ = ["ConversationLog", "get_conversation_log"]