Spaces:
Configuration error
Configuration error
File size: 8,198 Bytes
aa15bce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
from __future__ import annotations
import re
import threading
from html import escape, unescape
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Protocol, Tuple
from ...config import get_settings
from ...logging_config import logger
from ...models import ChatMessage
from ...utils.timezones import now_in_user_timezone
from typing import TYPE_CHECKING
if TYPE_CHECKING: # pragma: no cover - used for type checkers only
from .summarization import WorkingMemoryLog
_DATA_DIR = Path(__file__).resolve().parent.parent.parent / "data"
_CONVERSATION_LOG_PATH = _DATA_DIR / "conversation" / "poke_conversation.log"
class TranscriptFormatter(Protocol):
def __call__(self, tag: str, timestamp: str, payload: str) -> str: # pragma: no cover - typing protocol
...
def _encode_payload(payload: str) -> str:
normalized = payload.replace("\r\n", "\n").replace("\r", "\n")
collapsed = normalized.replace("\n", "\\n")
return escape(collapsed, quote=False)
def _decode_payload(payload: str) -> str:
return unescape(payload).replace("\\n", "\n")
def _default_formatter(tag: str, timestamp: str, payload: str) -> str:
encoded = _encode_payload(payload)
return f"<{tag} timestamp=\"{timestamp}\">{encoded}</{tag}>\n"
def _resolve_working_memory_log() -> "WorkingMemoryLog":
from .summarization import get_working_memory_log
return get_working_memory_log()
_ATTR_PATTERN = re.compile(r"(\w+)\s*=\s*\"([^\"]*)\"")
class ConversationLog:
"""Append-only conversation log persisted to disk for the interaction agent."""
def __init__(self, path: Path, formatter: TranscriptFormatter = _default_formatter):
self._path = path
self._formatter = formatter
self._lock = threading.Lock()
self._ensure_directory()
self._working_memory_log = _resolve_working_memory_log()
def _ensure_directory(self) -> None:
try:
self._path.parent.mkdir(parents=True, exist_ok=True)
except Exception as exc: # pragma: no cover - defensive
logger.warning("conversation log directory creation failed", extra={"error": str(exc)})
def _append(self, tag: str, payload: str) -> str:
timestamp = now_in_user_timezone("%Y-%m-%d %H:%M:%S")
entry = self._formatter(tag, timestamp, str(payload))
with self._lock:
try:
with self._path.open("a", encoding="utf-8") as handle:
handle.write(entry)
except Exception as exc: # pragma: no cover - defensive
logger.error(
"conversation log append failed",
extra={"error": str(exc), "tag": tag, "path": str(self._path)},
)
raise
self._notify_summarization()
return timestamp
def _parse_line(self, line: str) -> Optional[Tuple[str, str, str]]:
stripped = line.strip()
if not stripped.startswith("<") or "</" not in stripped:
return None
open_end = stripped.find(">")
if open_end == -1:
return None
open_tag_content = stripped[1:open_end]
if " " in open_tag_content:
tag, attr_string = open_tag_content.split(" ", 1)
else:
tag, attr_string = open_tag_content, ""
close_start = stripped.rfind("</")
close_end = stripped.rfind(">")
if close_start == -1 or close_end == -1:
return None
closing_tag = stripped[close_start + 2 : close_end]
if closing_tag != tag:
return None
payload = stripped[open_end + 1 : close_start]
attributes: Dict[str, str] = {
match.group(1): match.group(2) for match in _ATTR_PATTERN.finditer(attr_string)
}
timestamp = attributes.get("timestamp", "")
return tag, timestamp, _decode_payload(payload)
def iter_entries(self) -> Iterator[Tuple[str, str, str]]:
with self._lock:
try:
lines = self._path.read_text(encoding="utf-8").splitlines()
except FileNotFoundError:
lines = []
except Exception as exc: # pragma: no cover - defensive
logger.error(
"conversation log read failed", extra={"error": str(exc), "path": str(self._path)}
)
raise
for line in lines:
item = self._parse_line(line)
if item is not None:
yield item
def load_transcript(self) -> str:
parts: List[str] = []
for tag, timestamp, payload in self.iter_entries():
safe_payload = escape(payload, quote=False)
if timestamp:
parts.append(f"<{tag} timestamp=\"{timestamp}\">{safe_payload}</{tag}>")
else:
parts.append(f"<{tag}>{safe_payload}</{tag}>")
return "\n".join(parts)
def record_user_message(self, content: str) -> None:
timestamp = self._append("user_message", content)
self._working_memory_log.append_entry("user_message", content, timestamp)
def record_agent_message(self, content: str) -> None:
timestamp = self._append("agent_message", content)
self._working_memory_log.append_entry("agent_message", content, timestamp)
def record_reply(self, content: str) -> None:
timestamp = self._append("poke_reply", content)
self._working_memory_log.append_entry("poke_reply", content, timestamp)
def record_wait(self, reason: str) -> None:
"""Record a wait marker that should not reach the user-facing chat history."""
timestamp = self._append("wait", reason)
self._working_memory_log.append_entry("wait", reason, timestamp)
def _notify_summarization(self) -> None:
settings = get_settings()
if not settings.summarization_enabled:
return
try:
from .summarization import schedule_summarization # type: ignore import-not-found
except Exception as exc: # pragma: no cover - defensive
logger.debug(
"summarization scheduler unavailable",
extra={"error": str(exc)},
)
return
try:
schedule_summarization()
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"failed to schedule summarization",
extra={"error": str(exc)},
)
def to_chat_messages(self) -> List[ChatMessage]:
messages: List[ChatMessage] = []
for tag, timestamp, payload in self.iter_entries():
normalized_timestamp = timestamp or None
if tag == "user_message":
messages.append(
ChatMessage(role="user", content=payload, timestamp=normalized_timestamp)
)
elif tag == "poke_reply":
messages.append(
ChatMessage(
role="assistant", content=payload, timestamp=normalized_timestamp
)
)
elif tag == "wait":
# Wait markers are orchestration metadata and must not surface to the user
continue
return messages
def clear(self) -> None:
with self._lock:
try:
if self._path.exists():
self._path.unlink()
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"conversation log clear failed", extra={"error": str(exc), "path": str(self._path)}
)
finally:
self._ensure_directory()
try:
self._working_memory_log.clear()
except Exception as exc: # pragma: no cover - defensive
logger.debug(
"working memory clear skipped",
extra={"error": str(exc)},
)
_conversation_log = ConversationLog(_CONVERSATION_LOG_PATH)
def get_conversation_log() -> ConversationLog:
return _conversation_log
__all__ = ["ConversationLog", "get_conversation_log"]
|