| """ |
| agent_memory.py |
| --------------- |
| Agent memory layer for Notiflow. |
| |
| Stores recent business context so skills and future agents can reference |
| what was last discussed (customer names, items, etc.). |
| |
| Storage: JSON file at the path defined in app/config.py (MEMORY_FILE). |
| Structure: |
| { |
| "recent_customers": ["Rahul", "Priya"], # newest last |
| "recent_items": ["kurti", "aata"] |
| } |
| |
| Public API |
| ---------- |
| load_memory() -> dict |
| update_memory(customer=None, item=None) -> None |
| |
| Design notes: |
| - Maximum 10 entries per list (oldest pruned automatically). |
| - Read-modify-write is done in one function call to minimise race window. |
| - None values are silently ignored (no-op). |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| from pathlib import Path |
| from typing import Optional |
|
|
| from app.config import MEMORY_FILE |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _MAX_ENTRIES = 10 |
|
|
| _EMPTY_MEMORY: dict = { |
| "recent_customers": [], |
| "recent_items": [], |
| } |
|
|
|
|
| |
| |
| |
|
|
| def _read_file() -> dict: |
| """Read memory from disk; return empty structure if file missing/corrupt.""" |
| path = Path(MEMORY_FILE) |
| if not path.exists(): |
| return {k: list(v) for k, v in _EMPTY_MEMORY.items()} |
| try: |
| with path.open("r", encoding="utf-8") as f: |
| data = json.load(f) |
| |
| data.setdefault("recent_customers", []) |
| data.setdefault("recent_items", []) |
| return data |
| except (json.JSONDecodeError, OSError) as exc: |
| logger.warning("Could not read memory file (%s) — using empty memory.", exc) |
| return {k: list(v) for k, v in _EMPTY_MEMORY.items()} |
|
|
|
|
| def _write_file(memory: dict) -> None: |
| """Write memory dict to disk atomically (write to temp then rename).""" |
| path = Path(MEMORY_FILE) |
| tmp = path.with_suffix(".tmp") |
| try: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with tmp.open("w", encoding="utf-8") as f: |
| json.dump(memory, f, indent=2, ensure_ascii=False) |
| tmp.replace(path) |
| except OSError as exc: |
| logger.error("Could not write memory file: %s", exc) |
| if tmp.exists(): |
| tmp.unlink(missing_ok=True) |
|
|
|
|
| def _append_unique(lst: list, value: str, max_size: int = _MAX_ENTRIES) -> list: |
| """ |
| Append value to list, deduplicate, and keep only the most recent entries. |
| Most recent item is always at the end. |
| """ |
| if value in lst: |
| lst.remove(value) |
| lst.append(value) |
| return lst[-max_size:] |
|
|
|
|
| |
| |
| |
|
|
| def load_memory() -> dict: |
| """ |
| Load the current agent memory from disk. |
| |
| Returns: |
| { |
| "recent_customers": [str, ...], |
| "recent_items": [str, ...] |
| } |
| """ |
| memory = _read_file() |
| logger.debug("Memory loaded: %s", memory) |
| return memory |
|
|
|
|
| def update_memory( |
| customer: Optional[str] = None, |
| item: Optional[str] = None, |
| ) -> None: |
| """ |
| Update agent memory with a new customer name and/or item. |
| |
| None values are silently ignored. |
| Duplicates are deduplicated and moved to the end (most recent position). |
| |
| Args: |
| customer: Customer name to remember (e.g. "Rahul"). |
| item: Item name to remember (e.g. "kurti"). |
| |
| Example: |
| >>> update_memory(customer="Rahul", item="kurti") |
| """ |
| if customer is None and item is None: |
| return |
|
|
| memory = _read_file() |
|
|
| if customer: |
| memory["recent_customers"] = _append_unique( |
| memory["recent_customers"], str(customer).strip() |
| ) |
|
|
| if item: |
| memory["recent_items"] = _append_unique( |
| memory["recent_items"], str(item).strip() |
| ) |
|
|
| _write_file(memory) |
| logger.info("Memory updated: customer=%s item=%s", customer, item) |