agentcache / src /storage /images.py
Yash030's picture
feat: upscale & UX overhaul — blueprint split, debounce, tests, viewer enhancements, DX improvements
4d5727a
Raw
History Blame Contribute Delete
3.03 kB
"""
src/storage/images.py — Image-on-disk storage helpers (A2.3).
Copied from src/functions.py — do NOT delete the originals (backward compat).
"""
import os
import hashlib
from typing import Optional, Tuple
IMAGES_DIR = os.path.join(os.path.expanduser("~"), ".agentmemory", "images")
def get_max_bytes() -> int:
"""Return the configured image-store byte limit (default 500 MB)."""
return int(os.getenv("AGENTMEMORY_IMAGE_STORE_MAX_BYTES", 500 * 1024 * 1024))
def is_managed_image_path(file_path: str) -> bool:
"""Return True iff *file_path* lives inside the managed images directory."""
if not file_path:
return False
resolved = os.path.abspath(file_path)
normalized_images_dir = os.path.abspath(IMAGES_DIR)
return resolved.startswith(normalized_images_dir + os.sep) or resolved == normalized_images_dir
def save_image_to_disk(base64_data: str) -> Tuple[str, int]:
"""Decode *base64_data* and write to the managed images directory.
Returns:
(file_path, bytes_written) — bytes_written is 0 if the file already
existed (content-addressable dedup via SHA-256).
"""
if not base64_data:
return "", 0
if not os.path.exists(IMAGES_DIR):
os.makedirs(IMAGES_DIR, exist_ok=True)
clean_base64 = base64_data
ext = "png"
if base64_data.startswith("data:image/"):
comma_idx = base64_data.find(",")
if comma_idx != -1:
meta = base64_data[:comma_idx]
if "jpeg" in meta or "jpg" in meta:
ext = "jpg"
elif "webp" in meta:
ext = "webp"
elif "gif" in meta:
ext = "gif"
clean_base64 = base64_data[comma_idx + 1:]
elif base64_data.startswith("/9j/"):
ext = "jpg"
h = hashlib.sha256(clean_base64.encode("utf-8")).hexdigest()
file_path = os.path.join(IMAGES_DIR, f"{h}.{ext}")
if os.path.exists(file_path):
return file_path, 0
import base64
buffer = base64.b64decode(clean_base64)
with open(file_path, "wb") as f:
f.write(buffer)
size = os.path.getsize(file_path)
return file_path, size
def delete_image(file_path: Optional[str]) -> int:
"""Delete a managed image file and return the number of bytes freed.
Returns 0 if the path is not managed or does not exist.
"""
if not file_path or not is_managed_image_path(file_path):
return 0
try:
if os.path.exists(file_path):
size = os.path.getsize(file_path)
os.remove(file_path)
return size
except Exception as e:
print(f"[agentmemory] Failed to delete image context: {e}")
return 0
def touch_image(file_path: str) -> None:
"""Update the mtime of a managed image (keeps it alive past LRU eviction)."""
if not file_path or not is_managed_image_path(file_path):
return
try:
if os.path.exists(file_path):
os.utime(file_path, None)
except Exception:
pass