| from typing import Dict, Any, Optional |
| from .redis_connection import RedisConnection |
| from src.llm.models.schemas import ConversationResponse |
| import json |
| import time |
|
|
| class RedisMemoryManager: |
| def __init__(self): |
| self.redis = RedisConnection().client |
| |
| def store_conversation(self, session_id: str, chat_id: str, response: ConversationResponse) -> None: |
| """ |
| Store complete conversation response with metadata |
| """ |
| response_data = response.dict() |
| timestamp = time.time() |
| |
| |
| self.redis.hset( |
| f"session:{session_id}:chats", |
| chat_id, |
| json.dumps({ |
| 'response': response_data, |
| 'timestamp': timestamp |
| }) |
| ) |
| |
| |
| self.redis.hset( |
| f"session:{session_id}", |
| mapping={ |
| 'last_chat_id': chat_id, |
| 'last_updated': str(timestamp) |
| } |
| ) |
| |
| def get_conversation(self, session_id: str, chat_id: str) -> Optional[ConversationResponse]: |
| """ |
| Retrieve specific conversation response |
| """ |
| data = self.redis.hget(f"session:{session_id}:chats", chat_id) |
| if data: |
| return ConversationResponse(**json.loads(data)['response']) |
| return None |
| |
| def get_session_conversations(self, session_id: str) -> Dict[str, Any]: |
| """ |
| Get all conversations for a session |
| """ |
| conversations = self.redis.hgetall(f"session:{session_id}:chats") |
| return { |
| chat_id: ConversationResponse(**json.loads(data)['response']) |
| for chat_id, data in conversations.items() |
| } |
| |
| def update_emotional_state(self, session_id: str, emotions: Dict[str, Any]) -> None: |
| """ |
| Update emotional state tracking |
| """ |
| self.redis.hset( |
| f"session:{session_id}:state", |
| 'emotions', |
| json.dumps(emotions) |
| ) |
| |
| def get_emotional_state(self, session_id: str) -> Dict[str, Any]: |
| """ |
| Retrieve current emotional state |
| """ |
| data = self.redis.hget(f"session:{session_id}:state", 'emotions') |
| return json.loads(data) if data else {} |