Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| User State Manager for Residential Architecture Assistant | |
| Handles saving, loading, and managing user conversation states with timestamps. | |
| Provides persistent storage and history functionality. | |
| """ | |
| import json | |
| import os | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Any | |
| from state import ConversationState | |
| import uuid | |
| import hashlib | |
| class UserStateManager: | |
| """Manages user conversation states with persistent JSON storage""" | |
| def __init__(self, storage_dir: str = "user_conversations"): | |
| self.storage_dir = storage_dir | |
| self.ensure_storage_directory() | |
| self.current_session_id = None | |
| self.current_user_id = None | |
| def ensure_storage_directory(self): | |
| """Create storage directory if it doesn't exist""" | |
| if not os.path.exists(self.storage_dir): | |
| os.makedirs(self.storage_dir) | |
| def generate_user_id(self, ip_address: str = None) -> str: | |
| """Generate a unique user ID based on session or IP""" | |
| if ip_address: | |
| # Create consistent user ID from IP | |
| return hashlib.md5(ip_address.encode()).hexdigest()[:12] | |
| else: | |
| # Generate random session-based ID | |
| return str(uuid.uuid4())[:12] | |
| def start_new_session(self, user_id: str = None, ip_address: str = None) -> str: | |
| """Start a new conversation session""" | |
| if not user_id: | |
| user_id = self.generate_user_id(ip_address) | |
| self.current_user_id = user_id | |
| self.current_session_id = str(uuid.uuid4()) | |
| return self.current_session_id | |
| def save_user_state(self, state: ConversationState, user_id: str = None, session_id: str = None) -> str: | |
| """Save user state with timestamp""" | |
| if not user_id: | |
| user_id = self.current_user_id or self.generate_user_id() | |
| if not session_id: | |
| session_id = self.current_session_id or str(uuid.uuid4()) | |
| timestamp = datetime.now().isoformat() | |
| # Create save record | |
| save_record = { | |
| "user_id": user_id, | |
| "session_id": session_id, | |
| "timestamp": timestamp, | |
| "state": self._serialize_state(state), | |
| "summary": self._create_state_summary(state) | |
| } | |
| # Save to user-specific file | |
| user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") | |
| # Load existing conversations or create new | |
| if os.path.exists(user_file): | |
| with open(user_file, 'r', encoding='utf-8') as f: | |
| user_data = json.load(f) | |
| else: | |
| user_data = { | |
| "user_id": user_id, | |
| "created": timestamp, | |
| "conversations": [] | |
| } | |
| # Add or update conversation | |
| conversation_exists = False | |
| for i, conv in enumerate(user_data["conversations"]): | |
| if conv["session_id"] == session_id: | |
| user_data["conversations"][i] = save_record | |
| conversation_exists = True | |
| break | |
| if not conversation_exists: | |
| user_data["conversations"].append(save_record) | |
| # Sort conversations by timestamp (newest first) | |
| user_data["conversations"].sort(key=lambda x: x["timestamp"], reverse=True) | |
| # Save to file | |
| with open(user_file, 'w', encoding='utf-8') as f: | |
| json.dump(user_data, f, indent=2, ensure_ascii=False) | |
| self.current_user_id = user_id | |
| self.current_session_id = session_id | |
| return session_id | |
| def load_user_state(self, user_id: str, session_id: str = None) -> Optional[ConversationState]: | |
| """Load user state by user_id and optional session_id""" | |
| user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") | |
| if not os.path.exists(user_file): | |
| return None | |
| with open(user_file, 'r', encoding='utf-8') as f: | |
| user_data = json.load(f) | |
| conversations = user_data.get("conversations", []) | |
| if session_id: | |
| # Load specific session | |
| for conv in conversations: | |
| if conv["session_id"] == session_id: | |
| return self._deserialize_state(conv["state"]) | |
| else: | |
| # Load most recent conversation | |
| if conversations: | |
| return self._deserialize_state(conversations[0]["state"]) | |
| return None | |
| def get_user_history(self, user_id: str) -> List[Dict[str, Any]]: | |
| """Get conversation history for a user""" | |
| user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") | |
| if not os.path.exists(user_file): | |
| return [] | |
| with open(user_file, 'r', encoding='utf-8') as f: | |
| user_data = json.load(f) | |
| return user_data.get("conversations", []) | |
| def get_all_users(self) -> List[Dict[str, Any]]: | |
| """Get summary of all users and their conversations""" | |
| users = [] | |
| for filename in os.listdir(self.storage_dir): | |
| if filename.startswith("user_") and filename.endswith(".json"): | |
| user_id = filename[5:-5] # Remove "user_" prefix and ".json" suffix | |
| try: | |
| user_file = os.path.join(self.storage_dir, filename) | |
| with open(user_file, 'r', encoding='utf-8') as f: | |
| user_data = json.load(f) | |
| conversations = user_data.get("conversations", []) | |
| users.append({ | |
| "user_id": user_id, | |
| "created": user_data.get("created", "Unknown"), | |
| "total_conversations": len(conversations), | |
| "last_activity": conversations[0]["timestamp"] if conversations else "Never", | |
| "latest_summary": conversations[0].get("summary", {}) if conversations else {} | |
| }) | |
| except Exception as e: | |
| print(f"Error reading user file {filename}: {e}") | |
| continue | |
| # Sort by last activity | |
| users.sort(key=lambda x: x["last_activity"], reverse=True) | |
| return users | |
| def search_conversations_by_time(self, start_time: str, end_time: str = None) -> List[Dict[str, Any]]: | |
| """Search conversations within a time range""" | |
| results = [] | |
| for filename in os.listdir(self.storage_dir): | |
| if not filename.startswith("user_") or not filename.endswith(".json"): | |
| continue | |
| try: | |
| user_file = os.path.join(self.storage_dir, filename) | |
| with open(user_file, 'r', encoding='utf-8') as f: | |
| user_data = json.load(f) | |
| conversations = user_data.get("conversations", []) | |
| for conv in conversations: | |
| conv_time = conv["timestamp"] | |
| if end_time: | |
| if start_time <= conv_time <= end_time: | |
| results.append({ | |
| "user_id": conv["user_id"], | |
| "session_id": conv["session_id"], | |
| "timestamp": conv_time, | |
| "summary": conv.get("summary", {}) | |
| }) | |
| else: | |
| if conv_time >= start_time: | |
| results.append({ | |
| "user_id": conv["user_id"], | |
| "session_id": conv["session_id"], | |
| "timestamp": conv_time, | |
| "summary": conv.get("summary", {}) | |
| }) | |
| except Exception as e: | |
| print(f"Error searching in file {filename}: {e}") | |
| continue | |
| # Sort by timestamp | |
| results.sort(key=lambda x: x["timestamp"], reverse=True) | |
| return results | |
| def delete_user_data(self, user_id: str) -> bool: | |
| """Delete all data for a specific user""" | |
| user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") | |
| if os.path.exists(user_file): | |
| try: | |
| os.remove(user_file) | |
| return True | |
| except Exception as e: | |
| print(f"Error deleting user data: {e}") | |
| return False | |
| return False | |
| def _serialize_state(self, state: ConversationState) -> Dict[str, Any]: | |
| """Convert ConversationState to JSON-serializable format""" | |
| # Create a deep copy of the state for serialization | |
| serialized = {} | |
| for key, value in state.items(): | |
| if isinstance(value, (dict, list, str, int, float, bool)) or value is None: | |
| serialized[key] = value | |
| else: | |
| # Convert complex objects to string representation | |
| serialized[key] = str(value) | |
| return serialized | |
| def _deserialize_state(self, serialized_state: Dict[str, Any]) -> ConversationState: | |
| """Convert JSON data back to ConversationState""" | |
| from graph import create_initial_state | |
| # Start with fresh state structure | |
| state = create_initial_state() | |
| # Update with saved values | |
| for key, value in serialized_state.items(): | |
| if key in state: | |
| state[key] = value | |
| return state | |
| def _create_state_summary(self, state: ConversationState) -> Dict[str, Any]: | |
| """Create a summary of the conversation state for quick reference""" | |
| summary = { | |
| "total_messages": len(state.get("messages", [])), | |
| "current_topic": state.get("current_topic"), | |
| "user_requirements": {}, | |
| "floorplan_status": {}, | |
| "project_progress": {} | |
| } | |
| # User requirements summary | |
| user_reqs = state.get("user_requirements", {}) | |
| if user_reqs.get("budget"): | |
| summary["user_requirements"]["budget"] = f"${user_reqs['budget']:,.0f}" | |
| if user_reqs.get("location"): | |
| summary["user_requirements"]["location"] = user_reqs["location"] | |
| if user_reqs.get("family_size"): | |
| summary["user_requirements"]["family_size"] = user_reqs["family_size"] | |
| # Floorplan status | |
| floorplan_reqs = state.get("floorplan_requirements", {}) | |
| if floorplan_reqs.get("total_sqft"): | |
| summary["floorplan_status"]["size"] = f"{floorplan_reqs['total_sqft']} sq ft" | |
| if floorplan_reqs.get("num_floors"): | |
| summary["floorplan_status"]["floors"] = floorplan_reqs["num_floors"] | |
| if floorplan_reqs.get("rooms"): | |
| rooms = floorplan_reqs["rooms"] | |
| room_summary = ", ".join([f"{r['count']}x {r['type']}" for r in rooms]) | |
| summary["floorplan_status"]["rooms"] = room_summary | |
| # Project progress | |
| completed_phases = [] | |
| if state.get("detailed_floorplan", {}).get("detailed_rooms"): | |
| completed_phases.append("Architectural Design") | |
| if state.get("budget_breakdown", {}).get("total_construction_cost"): | |
| completed_phases.append("Budget Analysis") | |
| if state.get("agent_memory", {}).get("structural_analysis"): | |
| completed_phases.append("Structural Analysis") | |
| if state.get("agent_memory", {}).get("sustainability"): | |
| completed_phases.append("Sustainability Review") | |
| if state.get("agent_memory", {}).get("permits"): | |
| completed_phases.append("Permit Planning") | |
| if state.get("agent_memory", {}).get("interior_design"): | |
| completed_phases.append("Interior Design") | |
| summary["project_progress"]["completed_phases"] = completed_phases | |
| summary["project_progress"]["completion_percentage"] = int((len(completed_phases) / 6) * 100) | |
| return summary | |
| # Global instance for easy access | |
| user_state_manager = UserStateManager() |