AIDA / app /ai /memory /redis_memory.py
destinyebuka's picture
fyp
e471097
# app/ai/memory/redis_memory.py
import json
from typing import List, Dict, Any
from app.ai.config import redis_client
from structlog import get_logger
logger = get_logger(__name__)
async def get_redis_client():
"""Return the shared async Redis client (used by lightning tracer/rewards)."""
return redis_client
HISTORY_TTL = 60 * 60 * 24 * 7 # 7 days
RATE_LIMIT_KEY_TTL = 60 # 1 min
STATE_TTL = 60 * 60 * 24 * 1 # 1 day (conversation state)
# ---------- history ----------
async def load_history(user_id: str) -> List[dict]:
raw = await redis_client.get(f"aida:history:{user_id}")
if raw is None:
return []
return json.loads(raw)
async def save_turn(user_id: str, messages: List[dict]) -> None:
await redis_client.setex(
f"aida:history:{user_id}",
HISTORY_TTL,
json.dumps(messages, ensure_ascii=False),
)
# ---------- conversation state ----------
async def load_state(user_id: str) -> Dict[str, Any]:
raw = await redis_client.get(f"aida:state:{user_id}")
if raw is None:
return {}
return json.loads(raw)
async def save_state(user_id: str, state: Dict[str, Any]) -> None:
await redis_client.setex(
f"aida:state:{user_id}",
STATE_TTL,
json.dumps(state, ensure_ascii=False, default=str),
)
# ---------- rate limit ----------
async def is_rate_limited(user_id: str, limit: int = 30) -> bool:
key = f"aida:rate:{user_id}"
current = await redis_client.incr(key)
if current == 1:
await redis_client.expire(key, RATE_LIMIT_KEY_TTL)
return current > limit