| """ |
| Vector Store Manager for OpenAI Vector Store API integration. |
| """ |
| import os |
| import json |
| import time |
| from typing import Dict, List, Optional, Any, Union |
| from dotenv import load_dotenv |
|
|
| |
| |
| |
|
|
| class VectorStoreManager: |
| """ |
| Manages interactions with OpenAI Vector Stores for user authentication and data. |
| """ |
| |
| def __init__(self): |
| """Initialize the Vector Store Manager.""" |
| load_dotenv() |
| self.api_key = os.getenv("OPENAI_API_KEY") |
| |
| |
| |
| |
| |
| self.auth_index_store_id = None |
| self.user_profiles_store_id = None |
| self.user_content_store_id = None |
| |
| |
| self.email_path_cache = {} |
| |
| |
| self.data_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data") |
| self.indexes_dir = os.path.join(self.data_dir, "indexes") |
| self.users_dir = os.path.join(self.data_dir, "users") |
| self.profiles_dir = os.path.join(self.users_dir, "profiles") |
| self.history_dir = os.path.join(self.users_dir, "history") |
| |
| async def initialize_stores(self): |
| """ |
| Initialize vector stores if they don't exist. |
| In a real implementation, this would create OpenAI Vector Stores. |
| """ |
| |
| self.auth_index_store_id = "vs_auth_index_123" |
| self.user_profiles_store_id = "vs_user_profiles_456" |
| self.user_content_store_id = "vs_user_content_789" |
| |
| |
| email_index_path = os.path.join(self.indexes_dir, "user_email_index.json") |
| if not os.path.exists(email_index_path): |
| self._create_email_index() |
| |
| |
| |
| |
| |
| |
| |
| return { |
| "auth_index_store_id": self.auth_index_store_id, |
| "user_profiles_store_id": self.user_profiles_store_id, |
| "user_content_store_id": self.user_content_store_id |
| } |
| |
| def _create_email_index(self): |
| """Create the email index file if it doesn't exist.""" |
| os.makedirs(self.indexes_dir, exist_ok=True) |
| email_index_path = os.path.join(self.indexes_dir, "user_email_index.json") |
| |
| |
| email_index = {} |
| with open(email_index_path, 'w') as f: |
| json.dump(email_index, f, indent=2) |
| |
| async def verify_email(self, email: str) -> bool: |
| """ |
| Verify if an email exists in the authentication index. |
| |
| Args: |
| email: The email to verify |
| |
| Returns: |
| True if email is registered, False otherwise |
| """ |
| |
| |
| email_index_path = os.path.join(self.indexes_dir, "user_email_index.json") |
| |
| if not os.path.exists(email_index_path): |
| return False |
| |
| try: |
| with open(email_index_path, 'r') as f: |
| email_index = json.load(f) |
| |
| |
| normalized_email = email.lower() |
| return normalized_email in email_index |
| |
| except (json.JSONDecodeError, FileNotFoundError): |
| return False |
| |
| async def get_user_profile(self, email: str) -> Optional[Dict[str, Any]]: |
| """ |
| Get a user's profile data. |
| |
| Args: |
| email: The user's email |
| |
| Returns: |
| User profile data or None if not found |
| """ |
| |
| |
| normalized_email = email.lower() |
| |
| |
| email_index_path = os.path.join(self.indexes_dir, "user_email_index.json") |
| profile_path = None |
| |
| try: |
| with open(email_index_path, 'r') as f: |
| email_index = json.load(f) |
| |
| if normalized_email in email_index: |
| profile_path = email_index[normalized_email] |
| else: |
| |
| profile_path = os.path.join(self.profiles_dir, f"{normalized_email}.json") |
| except (json.JSONDecodeError, FileNotFoundError): |
| |
| profile_path = os.path.join(self.profiles_dir, f"{normalized_email}.json") |
| |
| |
| if not os.path.exists(profile_path): |
| return None |
| |
| try: |
| with open(profile_path, 'r') as f: |
| profile_data = json.load(f) |
| return profile_data |
| except (json.JSONDecodeError, FileNotFoundError): |
| return None |
| |
| async def add_new_user(self, email: str, profile_data: Dict[str, Any]) -> bool: |
| """ |
| Add a new user to the system. |
| |
| Args: |
| email: The user's email |
| profile_data: The user's profile data |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| normalized_email = email.lower() |
| |
| |
| os.makedirs(self.profiles_dir, exist_ok=True) |
| |
| |
| profile_path = os.path.join(self.profiles_dir, f"{normalized_email}.json") |
| |
| try: |
| with open(profile_path, 'w') as f: |
| json.dump(profile_data, f, indent=2) |
| |
| |
| email_index_path = os.path.join(self.indexes_dir, "user_email_index.json") |
| email_index = {} |
| |
| if os.path.exists(email_index_path): |
| with open(email_index_path, 'r') as f: |
| try: |
| email_index = json.load(f) |
| except json.JSONDecodeError: |
| email_index = {} |
| |
| |
| email_index[normalized_email] = profile_path |
| |
| with open(email_index_path, 'w') as f: |
| json.dump(email_index, f, indent=2) |
| |
| return True |
| |
| except (IOError, json.JSONDecodeError): |
| return False |
| |
| async def update_user_profile(self, email: str, profile_data: Dict[str, Any]) -> bool: |
| """ |
| Update a user's profile data. |
| |
| Args: |
| email: The user's email |
| profile_data: The updated profile data |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| |
| if not await self.verify_email(email): |
| return False |
| |
| |
| normalized_email = email.lower() |
| profile_path = os.path.join(self.profiles_dir, f"{normalized_email}.json") |
| |
| try: |
| with open(profile_path, 'w') as f: |
| json.dump(profile_data, f, indent=2) |
| return True |
| except (IOError, json.JSONDecodeError): |
| return False |
|
|
| |
| |