agentcache / src /cache /observe.py
Yash030's picture
feat: migrate agentmemory to agentcache namespace, endpoints, and tools
12a6c9a
Raw
History Blame Contribute Delete
1.13 kB
"""
src/memory/observe.py β€” Observation ingestion pipeline.
Public API:
folder_observe(kv, payload) β€” ingest a folder-scoped observation
observe(kv, payload) β€” legacy session-scoped observation
build_synthetic_compression(raw) β€” build compressed observation dict
strip_private_data(text) β€” redact secrets and private tags
"""
from __future__ import annotations
import os
import re
import json
import datetime
from typing import Any, Dict, List, Optional, Tuple
from db import StateKV
import functions as _fn # access module-level globals (_bm25_index, etc.)
# Re-export for backward compatibility
strip_private_data = _fn.strip_private_data
build_synthetic_compression = _fn.build_synthetic_compression
def folder_observe(kv: StateKV, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Ingest a new observation scoped to a (folder_path, agent_id) pair."""
return _fn.folder_observe(kv, payload)
def observe(kv: StateKV, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Legacy session-scoped observation ingestion."""
return _fn.observe(kv, payload)