Spaces:
Sleeping
Sleeping
| import os | |
| from dotenv import load_dotenv | |
| from appwrite.client import Client | |
| from appwrite.services.databases import Databases | |
| from appwrite.services.storage import Storage | |
| from appwrite.input_file import InputFile | |
| import json | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| import tempfile | |
| import time | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class AppwriteService: | |
| def __init__(self): | |
| """Initialize Appwrite client and services""" | |
| # Validate required environment variables | |
| self._validate_environment() | |
| self.client = Client() | |
| # Set up client with environment variables | |
| self.client.set_endpoint( | |
| os.getenv("APPWRITE_ENDPOINT", "https://cloud.appwrite.io/v1") | |
| ) | |
| self.client.set_project(os.getenv("APPWRITE_PROJECT_ID")) | |
| self.client.set_key(os.getenv("APPWRITE_API_KEY")) | |
| # Initialize services | |
| self.databases = Databases(self.client) | |
| self.storage = Storage(self.client) | |
| # Database and collection IDs | |
| self.database_id = os.getenv("APPWRITE_DATABASE_ID", "react_docs_db") | |
| self.chunks_collection_id = os.getenv( | |
| "APPWRITE_COLLECTION_ID", "document_chunks" | |
| ) | |
| self.completion_collection_id = "completion_status" | |
| self.bucket_id = os.getenv("APPWRITE_BUCKET_ID", "react_docs_bucket") | |
| # Initialize database and storage if they don't exist | |
| self._initialize_database() | |
| self._initialize_storage() | |
| def _validate_environment(self): | |
| """Validate that required environment variables are set""" | |
| required_vars = ["APPWRITE_PROJECT_ID", "APPWRITE_API_KEY"] | |
| missing_vars = [] | |
| for var in required_vars: | |
| if not os.getenv(var): | |
| missing_vars.append(var) | |
| if missing_vars: | |
| error_msg = ( | |
| f"Missing required environment variables: {', '.join(missing_vars)}" | |
| ) | |
| logger.error(error_msg) | |
| logger.error("Please set these variables in your .env file:") | |
| for var in missing_vars: | |
| logger.error(f" {var}=your_value_here") | |
| raise ValueError(error_msg) | |
| def _initialize_database(self): | |
| """Initialize database and chunks collection if they don't exist""" | |
| try: | |
| # Check if database exists | |
| try: | |
| self.databases.get(database_id=self.database_id) | |
| logger.info(f"Database {self.database_id} already exists") | |
| except Exception: | |
| # Create database | |
| self.databases.create( | |
| database_id=self.database_id, name="React Documentation Database" | |
| ) | |
| logger.info(f"Created database {self.database_id}") | |
| # Initialize chunks collection | |
| self._initialize_chunks_collection() | |
| # Initialize completion status collection | |
| self._initialize_completion_collection() | |
| except Exception as e: | |
| logger.error(f"Error initializing database: {str(e)}") | |
| raise | |
| def _initialize_storage(self): | |
| """Check if storage bucket exists (don't create if it doesn't)""" | |
| try: | |
| # Check if bucket exists | |
| try: | |
| self.storage.get_bucket(bucket_id=self.bucket_id) | |
| logger.info(f"Storage bucket {self.bucket_id} exists and is accessible") | |
| except Exception as e: | |
| logger.error( | |
| f"Storage bucket {self.bucket_id} not found or not accessible: {str(e)}" | |
| ) | |
| logger.error( | |
| "Please make sure the bucket exists and your API key has access to it" | |
| ) | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error checking storage bucket: {str(e)}") | |
| raise | |
| def _initialize_chunks_collection(self): | |
| """Initialize chunks collection""" | |
| try: | |
| # Check if chunks collection exists | |
| try: | |
| self.databases.get_collection( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| ) | |
| logger.info( | |
| f"Chunks collection {self.chunks_collection_id} already exists" | |
| ) | |
| except Exception: | |
| # Create chunks collection | |
| self.databases.create_collection( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| name="Document Chunks", | |
| ) | |
| # Create attributes for the chunks collection | |
| self.databases.create_string_attribute( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| key="content", | |
| size=65536, # 64KB for content | |
| required=True, | |
| ) | |
| self.databases.create_string_attribute( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| key="title", | |
| size=255, | |
| required=True, | |
| ) | |
| self.databases.create_string_attribute( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| key="url", | |
| size=500, | |
| required=False, | |
| ) | |
| self.databases.create_string_attribute( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| key="chunk_id", | |
| size=100, | |
| required=True, | |
| ) | |
| logger.info( | |
| f"Created chunks collection {self.chunks_collection_id} with attributes" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error initializing chunks collection: {str(e)}") | |
| raise | |
| def _initialize_completion_collection(self): | |
| """Initialize completion status collection""" | |
| try: | |
| # Check if completion collection exists | |
| try: | |
| self.databases.get_collection( | |
| database_id=self.database_id, | |
| collection_id=self.completion_collection_id, | |
| ) | |
| logger.info( | |
| f"Completion collection {self.completion_collection_id} already exists" | |
| ) | |
| except Exception: | |
| # Create completion collection | |
| self.databases.create_collection( | |
| database_id=self.database_id, | |
| collection_id=self.completion_collection_id, | |
| name="Completion Status", | |
| ) | |
| # Create attributes for the completion collection | |
| self.databases.create_string_attribute( | |
| database_id=self.database_id, | |
| collection_id=self.completion_collection_id, | |
| key="url", | |
| size=500, | |
| required=True, | |
| ) | |
| self.databases.create_string_attribute( | |
| database_id=self.database_id, | |
| collection_id=self.completion_collection_id, | |
| key="status", | |
| size=50, | |
| required=True, | |
| ) | |
| self.databases.create_string_attribute( | |
| database_id=self.database_id, | |
| collection_id=self.completion_collection_id, | |
| key="completed_at", | |
| size=100, | |
| required=True, | |
| ) | |
| self.databases.create_integer_attribute( | |
| database_id=self.database_id, | |
| collection_id=self.completion_collection_id, | |
| key="chunks_count", | |
| required=True, | |
| ) | |
| logger.info( | |
| f"Created completion collection {self.completion_collection_id} with attributes" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error initializing completion collection: {str(e)}") | |
| raise | |
| def get_docs_file_id(self, url: str) -> str: | |
| """Generate file ID based on the documentation URL""" | |
| url_lower = url.lower() | |
| # Map URLs to file IDs | |
| if "react.dev" in url_lower or "reactjs.org" in url_lower: | |
| return "react_docs_raw.json" | |
| elif "docs.python.org" in url_lower or "python.org" in url_lower: | |
| return "python_docs_raw.json" | |
| elif "golang.org" in url_lower or "go.dev" in url_lower: | |
| return "golang_docs_raw.json" | |
| elif "developer.mozilla.org" in url_lower or "mdn" in url_lower: | |
| return "mdn_docs_raw.json" | |
| elif "vuejs.org" in url_lower: | |
| return "vue_docs_raw.json" | |
| elif "nodejs.org" in url_lower: | |
| return "nodejs_docs_raw.json" | |
| elif "angular.io" in url_lower: | |
| return "angular_docs_raw.json" | |
| elif "svelte.dev" in url_lower: | |
| return "svelte_docs_raw.json" | |
| elif "nextjs.org" in url_lower: | |
| return "nextjs_docs_raw.json" | |
| elif "nuxt.com" in url_lower: | |
| return "nuxt_docs_raw.json" | |
| elif "djangoproject.com" in url_lower or "django" in url_lower: | |
| return "django_docs_raw.json" | |
| elif "fastapi.tiangolo.com" in url_lower or "fastapi" in url_lower: | |
| return "fastapi_docs_raw.json" | |
| elif "docs.docker.com" in url_lower or "docker.com" in url_lower: | |
| return "docker_docs_raw.json" | |
| elif "kubernetes.io" in url_lower: | |
| return "kubernetes_docs_raw.json" | |
| elif "docs.mongodb.com" in url_lower or "mongodb.com" in url_lower: | |
| return "mongodb_docs_raw.json" | |
| elif "postgresql.org" in url_lower or "postgresql" in url_lower: | |
| return "postgresql_docs_raw.json" | |
| else: | |
| # For unknown URLs, create a generic ID based on domain | |
| from urllib.parse import urlparse | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.replace(".", "_").replace("www_", "") | |
| return f"{domain}_docs_raw.json" | |
| def docs_already_exist(self, url: str) -> bool: | |
| """Check if documentation for this URL already exists in storage""" | |
| try: | |
| file_id = self.get_docs_file_id(url) | |
| # Try to get the file from storage | |
| self.storage.get_file(bucket_id=self.bucket_id, file_id=file_id) | |
| logger.info(f"Documentation already exists for {url} (file: {file_id})") | |
| return True | |
| except Exception as e: | |
| logger.info(f"Documentation does not exist for {url}: {str(e)}") | |
| return False | |
| def save_raw_docs_to_storage( | |
| self, docs: List[Dict[str, Any]], url: str = None | |
| ) -> bool: | |
| """Save raw documents as JSON file to Appwrite storage bucket""" | |
| temp_file_path = None | |
| max_retries = 3 | |
| retry_delay = 2 # seconds | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info( | |
| f"Saving {len(docs)} raw documents to Appwrite storage (attempt {attempt + 1}/{max_retries})" | |
| ) | |
| # Generate file ID based on URL | |
| file_id = self.get_docs_file_id(url) if url else "unknown_docs_raw.json" | |
| logger.info(f"Using file ID: {file_id}") | |
| # Create JSON content | |
| json_content = json.dumps(docs, indent=2, ensure_ascii=False) | |
| # Create temporary file with a unique name | |
| temp_file_path = tempfile.mktemp(suffix=".json") | |
| # Write content to temporary file | |
| with open(temp_file_path, "w", encoding="utf-8") as temp_file: | |
| temp_file.write(json_content) | |
| # Upload file to storage bucket | |
| input_file = InputFile.from_path(temp_file_path) | |
| # Try to delete existing file first, then create new one | |
| try: | |
| # Try to delete existing file | |
| self.storage.delete_file(bucket_id=self.bucket_id, file_id=file_id) | |
| logger.info(f"Deleted existing file: {file_id}") | |
| except Exception as e: | |
| # File doesn't exist or can't be deleted, that's okay | |
| logger.info( | |
| f"Could not delete existing file (may not exist): {str(e)}" | |
| ) | |
| # Upload to storage with retry logic | |
| result = self.storage.create_file( | |
| bucket_id=self.bucket_id, | |
| file_id=file_id, | |
| file=input_file, | |
| ) | |
| logger.info( | |
| f"Successfully saved raw documents to storage: {result['$id']}" | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| f"Error saving raw documents to storage (attempt {attempt + 1}/{max_retries}): {str(e)}" | |
| ) | |
| # Clean up temporary file on error | |
| if temp_file_path and os.path.exists(temp_file_path): | |
| try: | |
| os.unlink(temp_file_path) | |
| temp_file_path = None | |
| except (OSError, PermissionError) as cleanup_error: | |
| logger.warning( | |
| f"Could not delete temporary file {temp_file_path}: {str(cleanup_error)}" | |
| ) | |
| # If this is the last attempt, return False | |
| if attempt == max_retries - 1: | |
| logger.error( | |
| f"Failed to save raw documents after {max_retries} attempts" | |
| ) | |
| return False | |
| # Wait before retrying | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 # Exponential backoff | |
| return False | |
| def get_raw_docs_from_storage(self, url: str = None) -> List[Dict[str, Any]]: | |
| """Retrieve raw documents from Appwrite storage bucket""" | |
| max_retries = 3 | |
| retry_delay = 2 # seconds | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info( | |
| f"Retrieving raw documents from Appwrite storage (attempt {attempt + 1}/{max_retries})" | |
| ) | |
| # Generate file ID based on URL | |
| file_id = self.get_docs_file_id(url) if url else "react_docs_raw.json" | |
| logger.info(f"Looking for file: {file_id}") | |
| # Download file from storage | |
| result = self.storage.get_file_download( | |
| bucket_id=self.bucket_id, file_id=file_id | |
| ) | |
| logger.info(f"Download result type: {type(result)}") | |
| # Handle different possible return types | |
| docs = None | |
| # Case 1: Result is already a list of dicts (JSON content) | |
| if isinstance(result, list) and result and isinstance(result[0], dict): | |
| docs = result | |
| logger.info("Result is already a list of documents") | |
| # Case 2: Result is bytes | |
| elif isinstance(result, bytes): | |
| json_content = result.decode("utf-8") | |
| docs = json.loads(json_content) | |
| logger.info("Result is bytes, decoded successfully") | |
| # Case 3: Result is a list of bytes | |
| elif ( | |
| isinstance(result, list) and result and isinstance(result[0], bytes) | |
| ): | |
| json_bytes = b"".join(result) | |
| json_content = json_bytes.decode("utf-8") | |
| docs = json.loads(json_content) | |
| logger.info("Result is list of bytes, joined and decoded") | |
| # Case 4: Result is a single dict | |
| elif isinstance(result, dict): | |
| docs = [result] | |
| logger.info("Result is a single document dict") | |
| # Case 5: Try to convert to string and parse | |
| else: | |
| try: | |
| json_str = str(result) | |
| docs = json.loads(json_str) | |
| logger.info("Result converted to string and parsed") | |
| except Exception as e: | |
| logger.error(f"Failed to parse result: {str(e)}") | |
| raise ValueError( | |
| f"Could not parse downloaded file content: {str(e)}" | |
| ) | |
| if docs is None: | |
| raise ValueError("Could not parse the downloaded file content") | |
| logger.info(f"Retrieved {len(docs)} raw documents from storage") | |
| return docs | |
| except Exception as e: | |
| logger.error( | |
| f"Error retrieving raw documents from storage (attempt {attempt + 1}/{max_retries}): {str(e)}" | |
| ) | |
| # If this is the last attempt, return empty list | |
| if attempt == max_retries - 1: | |
| logger.error( | |
| f"Failed to retrieve raw documents after {max_retries} attempts" | |
| ) | |
| return [] | |
| # Wait before retrying | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 # Exponential backoff | |
| return [] | |
| def get_chunks_file_id(self, url: str) -> str: | |
| """Generate chunks file ID based on the documentation URL""" | |
| url_lower = url.lower() | |
| # Map URLs to chunks file IDs | |
| if "react.dev" in url_lower or "reactjs.org" in url_lower: | |
| return "react_docs_chunks.json" | |
| elif "docs.python.org" in url_lower or "python.org" in url_lower: | |
| return "python_docs_chunks.json" | |
| elif "golang.org" in url_lower or "go.dev" in url_lower: | |
| return "golang_docs_chunks.json" | |
| elif "developer.mozilla.org" in url_lower or "mdn" in url_lower: | |
| return "mdn_docs_chunks.json" | |
| elif "vuejs.org" in url_lower: | |
| return "vue_docs_chunks.json" | |
| elif "nodejs.org" in url_lower: | |
| return "nodejs_docs_chunks.json" | |
| elif "angular.io" in url_lower: | |
| return "angular_docs_chunks.json" | |
| elif "svelte.dev" in url_lower: | |
| return "svelte_docs_chunks.json" | |
| elif "nextjs.org" in url_lower: | |
| return "nextjs_docs_chunks.json" | |
| elif "nuxt.com" in url_lower: | |
| return "nuxt_docs_chunks.json" | |
| elif "djangoproject.com" in url_lower or "django" in url_lower: | |
| return "django_docs_chunks.json" | |
| elif "fastapi.tiangolo.com" in url_lower or "fastapi" in url_lower: | |
| return "fastapi_docs_chunks.json" | |
| elif "docs.docker.com" in url_lower or "docker.com" in url_lower: | |
| return "docker_docs_chunks.json" | |
| elif "kubernetes.io" in url_lower: | |
| return "kubernetes_docs_chunks.json" | |
| elif "docs.mongodb.com" in url_lower or "mongodb.com" in url_lower: | |
| return "mongodb_docs_chunks.json" | |
| elif "postgresql.org" in url_lower or "postgresql" in url_lower: | |
| return "postgresql_docs_chunks.json" | |
| else: | |
| # For unknown URLs, create a generic ID based on domain | |
| from urllib.parse import urlparse | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.replace(".", "_").replace("www_", "") | |
| return f"{domain}_docs_chunks.json" | |
| def chunks_already_exist(self, url: str) -> bool: | |
| """Check if chunks for this URL already exist in storage""" | |
| try: | |
| file_id = self.get_chunks_file_id(url) | |
| # Try to get the file from storage | |
| self.storage.get_file(bucket_id=self.bucket_id, file_id=file_id) | |
| logger.info(f"Chunks already exist for {url} (file: {file_id})") | |
| return True | |
| except Exception as e: | |
| logger.info(f"Chunks do not exist for {url}: {str(e)}") | |
| return False | |
| def save_chunks_to_storage( | |
| self, chunks: List[Dict[str, Any]], url: str = None | |
| ) -> bool: | |
| """Save document chunks as JSON file to Appwrite storage bucket (FAST)""" | |
| temp_file_path = None | |
| max_retries = 3 | |
| retry_delay = 2 # seconds | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info( | |
| f"Saving {len(chunks)} chunks to Appwrite storage (attempt {attempt + 1}/{max_retries})" | |
| ) | |
| # Generate file ID based on URL | |
| file_id = ( | |
| self.get_chunks_file_id(url) if url else "unknown_docs_chunks.json" | |
| ) | |
| logger.info(f"Using chunks file ID: {file_id}") | |
| # Create JSON content | |
| json_content = json.dumps(chunks, indent=2, ensure_ascii=False) | |
| # Create temporary file with a unique name | |
| temp_file_path = tempfile.mktemp(suffix=".json") | |
| # Write content to temporary file | |
| with open(temp_file_path, "w", encoding="utf-8") as temp_file: | |
| temp_file.write(json_content) | |
| # Upload file to storage bucket | |
| input_file = InputFile.from_path(temp_file_path) | |
| # Try to delete existing file first, then create new one | |
| try: | |
| # Try to delete existing file | |
| self.storage.delete_file(bucket_id=self.bucket_id, file_id=file_id) | |
| logger.info(f"Deleted existing chunks file: {file_id}") | |
| except Exception as e: | |
| # File doesn't exist or can't be deleted, that's okay | |
| logger.info( | |
| f"Could not delete existing chunks file (may not exist): {str(e)}" | |
| ) | |
| # Upload to storage with retry logic | |
| result = self.storage.create_file( | |
| bucket_id=self.bucket_id, | |
| file_id=file_id, | |
| file=input_file, | |
| ) | |
| logger.info(f"Successfully saved chunks to storage: {result['$id']}") | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| f"Error saving chunks to storage (attempt {attempt + 1}/{max_retries}): {str(e)}" | |
| ) | |
| # Clean up temporary file on error | |
| if temp_file_path and os.path.exists(temp_file_path): | |
| try: | |
| os.unlink(temp_file_path) | |
| temp_file_path = None | |
| except (OSError, PermissionError) as cleanup_error: | |
| logger.warning( | |
| f"Could not delete temporary file {temp_file_path}: {str(cleanup_error)}" | |
| ) | |
| # If this is the last attempt, return False | |
| if attempt == max_retries - 1: | |
| logger.error(f"Failed to save chunks after {max_retries} attempts") | |
| return False | |
| # Wait before retrying | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 # Exponential backoff | |
| return False | |
| def get_chunks_from_storage(self, url: str = None) -> List[Dict[str, Any]]: | |
| """Retrieve document chunks from Appwrite storage bucket (FAST)""" | |
| max_retries = 3 | |
| retry_delay = 2 # seconds | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info( | |
| f"Retrieving chunks from Appwrite storage (attempt {attempt + 1}/{max_retries})" | |
| ) | |
| # Generate file ID based on URL | |
| file_id = ( | |
| self.get_chunks_file_id(url) if url else "react_docs_chunks.json" | |
| ) | |
| logger.info(f"Looking for chunks file: {file_id}") | |
| # Download file from storage | |
| result = self.storage.get_file_download( | |
| bucket_id=self.bucket_id, file_id=file_id | |
| ) | |
| logger.info(f"Download result type: {type(result)}") | |
| # Handle different possible return types | |
| chunks = None | |
| # Case 1: Result is already a list of dicts (JSON content) | |
| if isinstance(result, list) and result and isinstance(result[0], dict): | |
| chunks = result | |
| logger.info("Result is already a list of chunks") | |
| # Case 2: Result is bytes | |
| elif isinstance(result, bytes): | |
| json_content = result.decode("utf-8") | |
| chunks = json.loads(json_content) | |
| logger.info("Result is bytes, decoded successfully") | |
| # Case 3: Result is a list of bytes | |
| elif ( | |
| isinstance(result, list) and result and isinstance(result[0], bytes) | |
| ): | |
| json_bytes = b"".join(result) | |
| json_content = json_bytes.decode("utf-8") | |
| chunks = json.loads(json_content) | |
| logger.info("Result is list of bytes, joined and decoded") | |
| # Case 4: Result is a single dict | |
| elif isinstance(result, dict): | |
| chunks = [result] | |
| logger.info("Result is a single chunk dict") | |
| # Case 5: Try to convert to string and parse | |
| else: | |
| try: | |
| json_str = str(result) | |
| chunks = json.loads(json_str) | |
| logger.info("Result converted to string and parsed") | |
| except Exception as e: | |
| logger.error(f"Failed to parse result: {str(e)}") | |
| raise ValueError( | |
| f"Could not parse downloaded chunks file content: {str(e)}" | |
| ) | |
| if chunks is None: | |
| raise ValueError( | |
| "Could not parse the downloaded chunks file content" | |
| ) | |
| logger.info(f"Retrieved {len(chunks)} chunks from storage") | |
| return chunks | |
| except Exception as e: | |
| logger.error( | |
| f"Error retrieving chunks from storage (attempt {attempt + 1}/{max_retries}): {str(e)}" | |
| ) | |
| # If this is the last attempt, return empty list | |
| if attempt == max_retries - 1: | |
| logger.error( | |
| f"Failed to retrieve chunks after {max_retries} attempts" | |
| ) | |
| return [] | |
| # Wait before retrying | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 # Exponential backoff | |
| return [] | |
| def save_chunks(self, chunks: List[Dict[str, Any]], url: str = None) -> bool: | |
| """Save document chunks - optimized version using storage bucket""" | |
| try: | |
| logger.info(f"Saving {len(chunks)} chunks using optimized method") | |
| # Use the fast storage method instead of database | |
| return self.save_chunks_to_storage(chunks, url) | |
| except Exception as e: | |
| logger.error(f"Error saving chunks: {str(e)}") | |
| return False | |
| def get_all_chunks(self, url: str = None) -> List[Dict[str, Any]]: | |
| """Retrieve all document chunks - optimized version using storage bucket""" | |
| try: | |
| logger.info("Retrieving all chunks using optimized method") | |
| # Use the fast storage method instead of database | |
| return self.get_chunks_from_storage(url) | |
| except Exception as e: | |
| logger.error(f"Error retrieving chunks: {str(e)}") | |
| return [] | |
| def search_chunks(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: | |
| """Search for chunks containing specific text""" | |
| try: | |
| logger.info(f"Searching for chunks with query: {query}") | |
| # Search documents in the collection | |
| response = self.databases.list_documents( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| queries=[], | |
| ) | |
| chunks = [] | |
| for doc in response["documents"]: | |
| # Simple client-side search for now | |
| if ( | |
| query.lower() in doc["content"].lower() | |
| or query.lower() in doc["title"].lower() | |
| ): | |
| chunks.append( | |
| { | |
| "content": doc["content"], | |
| "title": doc["title"], | |
| "url": doc.get("url", ""), | |
| "chunk_id": doc["chunk_id"], | |
| } | |
| ) | |
| logger.info(f"Found {len(chunks)} matching chunks") | |
| return chunks[:limit] | |
| except Exception as e: | |
| logger.error(f"Error searching chunks in Appwrite: {str(e)}") | |
| return [] | |
| def delete_raw_docs_from_storage(self) -> bool: | |
| """Delete raw documents file from storage bucket""" | |
| try: | |
| logger.info("Deleting raw documents from storage") | |
| # Delete file from storage | |
| self.storage.delete_file( | |
| bucket_id=self.bucket_id, file_id="react_docs_raw.json" | |
| ) | |
| logger.info("Successfully deleted raw documents from storage") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting raw documents from storage: {str(e)}") | |
| return False | |
| def delete_all_chunks(self) -> bool: | |
| """Delete all chunks from the database (use with caution)""" | |
| try: | |
| logger.info("Deleting all chunks from Appwrite") | |
| # Get all documents | |
| response = self.databases.list_documents( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| ) | |
| # Delete each document | |
| for doc in response["documents"]: | |
| self.databases.delete_document( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| document_id=doc["$id"], | |
| ) | |
| logger.info("Successfully deleted all chunks") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting chunks from Appwrite: {str(e)}") | |
| return False | |
| def get_raw_docs_count(self) -> int: | |
| """Get the total number of raw documents in storage""" | |
| try: | |
| # Check if raw docs file exists | |
| try: | |
| self.storage.get_file( | |
| bucket_id=self.bucket_id, file_id="react_docs_raw.json" | |
| ) | |
| # If file exists, get the count from the content | |
| docs = self.get_raw_docs_from_storage() | |
| return len(docs) | |
| except Exception: | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"Error getting raw docs count: {str(e)}") | |
| return 0 | |
| def get_chunks_count(self) -> int: | |
| """Get the total number of chunks in the database""" | |
| try: | |
| response = self.databases.list_documents( | |
| database_id=self.database_id, | |
| collection_id=self.chunks_collection_id, | |
| ) | |
| return response["total"] | |
| except Exception as e: | |
| logger.error(f"Error getting chunks count: {str(e)}") | |
| return 0 | |
| def clear_all_data(self) -> bool: | |
| """Clear all data from both storage and database""" | |
| try: | |
| logger.info("Clearing all data from storage and database") | |
| success1 = self.delete_raw_docs_from_storage() | |
| success2 = self.delete_all_chunks() | |
| return success1 and success2 | |
| except Exception as e: | |
| logger.error(f"Error clearing all data: {str(e)}") | |
| return False | |
| def list_storage_files(self) -> List[str]: | |
| """List all files in the storage bucket""" | |
| try: | |
| response = self.storage.list_files(bucket_id=self.bucket_id) | |
| files = [file["$id"] for file in response["files"]] | |
| logger.info(f"Found {len(files)} files in storage") | |
| return files | |
| except Exception as e: | |
| logger.error(f"Error listing storage files: {str(e)}") | |
| return [] | |
| def save_completion_status(self, url: str, chunks_count: int) -> bool: | |
| """Save completion status for a documentation URL""" | |
| try: | |
| import datetime | |
| # Check if completion record already exists | |
| existing_record = self.get_completion_status(url) | |
| if existing_record: | |
| # Update existing record | |
| self.databases.update_document( | |
| database_id=self.database_id, | |
| collection_id=self.completion_collection_id, | |
| document_id=existing_record["$id"], | |
| data={ | |
| "url": url, | |
| "status": "completed", | |
| "completed_at": datetime.datetime.now().isoformat(), | |
| "chunks_count": chunks_count, | |
| }, | |
| ) | |
| logger.info(f"Updated completion status for {url}") | |
| else: | |
| # Create new record | |
| self.databases.create_document( | |
| database_id=self.database_id, | |
| collection_id=self.completion_collection_id, | |
| document_id="unique()", | |
| data={ | |
| "url": url, | |
| "status": "completed", | |
| "completed_at": datetime.datetime.now().isoformat(), | |
| "chunks_count": chunks_count, | |
| }, | |
| ) | |
| logger.info(f"Saved completion status for {url}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error saving completion status: {str(e)}") | |
| return False | |
| def get_completion_status(self, url: str) -> Optional[Dict[str, Any]]: | |
| """Get completion status for a documentation URL""" | |
| try: | |
| from appwrite.query import Query | |
| response = self.databases.list_documents( | |
| database_id=self.database_id, | |
| collection_id=self.completion_collection_id, | |
| queries=[Query.equal("url", url)], | |
| ) | |
| if response["documents"]: | |
| return response["documents"][0] | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting completion status: {str(e)}") | |
| return None | |
| def is_fully_processed(self, url: str) -> bool: | |
| """Check if documentation is fully processed (has completion status)""" | |
| try: | |
| completion_status = self.get_completion_status(url) | |
| return ( | |
| completion_status is not None | |
| and completion_status.get("status") == "completed" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error checking if fully processed: {str(e)}") | |
| return False | |
| # Global instance | |
| appwrite_service = AppwriteService() | |