File size: 4,233 Bytes
0bb062b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | """
agent_memory.py
---------------
Agent memory layer for Notiflow.
Stores recent business context so skills and future agents can reference
what was last discussed (customer names, items, etc.).
Storage: JSON file at the path defined in app/config.py (MEMORY_FILE).
Structure:
{
"recent_customers": ["Rahul", "Priya"], # newest last
"recent_items": ["kurti", "aata"]
}
Public API
----------
load_memory() -> dict
update_memory(customer=None, item=None) -> None
Design notes:
- Maximum 10 entries per list (oldest pruned automatically).
- Read-modify-write is done in one function call to minimise race window.
- None values are silently ignored (no-op).
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Optional
from app.config import MEMORY_FILE
logger = logging.getLogger(__name__)
_MAX_ENTRIES = 10
_EMPTY_MEMORY: dict = {
"recent_customers": [],
"recent_items": [],
}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _read_file() -> dict:
"""Read memory from disk; return empty structure if file missing/corrupt."""
path = Path(MEMORY_FILE)
if not path.exists():
return {k: list(v) for k, v in _EMPTY_MEMORY.items()}
try:
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
# Ensure both keys are present even if file is partial
data.setdefault("recent_customers", [])
data.setdefault("recent_items", [])
return data
except (json.JSONDecodeError, OSError) as exc:
logger.warning("Could not read memory file (%s) — using empty memory.", exc)
return {k: list(v) for k, v in _EMPTY_MEMORY.items()}
def _write_file(memory: dict) -> None:
"""Write memory dict to disk atomically (write to temp then rename)."""
path = Path(MEMORY_FILE)
tmp = path.with_suffix(".tmp")
try:
path.parent.mkdir(parents=True, exist_ok=True)
with tmp.open("w", encoding="utf-8") as f:
json.dump(memory, f, indent=2, ensure_ascii=False)
tmp.replace(path)
except OSError as exc:
logger.error("Could not write memory file: %s", exc)
if tmp.exists():
tmp.unlink(missing_ok=True)
def _append_unique(lst: list, value: str, max_size: int = _MAX_ENTRIES) -> list:
"""
Append value to list, deduplicate, and keep only the most recent entries.
Most recent item is always at the end.
"""
if value in lst:
lst.remove(value) # remove old occurrence so it moves to end
lst.append(value)
return lst[-max_size:] # keep newest max_size entries
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def load_memory() -> dict:
"""
Load the current agent memory from disk.
Returns:
{
"recent_customers": [str, ...],
"recent_items": [str, ...]
}
"""
memory = _read_file()
logger.debug("Memory loaded: %s", memory)
return memory
def update_memory(
customer: Optional[str] = None,
item: Optional[str] = None,
) -> None:
"""
Update agent memory with a new customer name and/or item.
None values are silently ignored.
Duplicates are deduplicated and moved to the end (most recent position).
Args:
customer: Customer name to remember (e.g. "Rahul").
item: Item name to remember (e.g. "kurti").
Example:
>>> update_memory(customer="Rahul", item="kurti")
"""
if customer is None and item is None:
return
memory = _read_file()
if customer:
memory["recent_customers"] = _append_unique(
memory["recent_customers"], str(customer).strip()
)
if item:
memory["recent_items"] = _append_unique(
memory["recent_items"], str(item).strip()
)
_write_file(memory)
logger.info("Memory updated: customer=%s item=%s", customer, item) |