dev-docs-rag-hf / appwrite_service.py
iamfaham's picture
Upload appwrite_service.py
3603603 verified
import os
from dotenv import load_dotenv
from appwrite.client import Client
from appwrite.services.databases import Databases
from appwrite.services.storage import Storage
from appwrite.input_file import InputFile
import json
import logging
from typing import List, Dict, Any, Optional
import tempfile
import time
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AppwriteService:
def __init__(self):
"""Initialize Appwrite client and services"""
# Validate required environment variables
self._validate_environment()
self.client = Client()
# Set up client with environment variables
self.client.set_endpoint(
os.getenv("APPWRITE_ENDPOINT", "https://cloud.appwrite.io/v1")
)
self.client.set_project(os.getenv("APPWRITE_PROJECT_ID"))
self.client.set_key(os.getenv("APPWRITE_API_KEY"))
# Initialize services
self.databases = Databases(self.client)
self.storage = Storage(self.client)
# Database and collection IDs
self.database_id = os.getenv("APPWRITE_DATABASE_ID", "react_docs_db")
self.chunks_collection_id = os.getenv(
"APPWRITE_COLLECTION_ID", "document_chunks"
)
self.completion_collection_id = "completion_status"
self.bucket_id = os.getenv("APPWRITE_BUCKET_ID", "react_docs_bucket")
# Initialize database and storage if they don't exist
self._initialize_database()
self._initialize_storage()
def _validate_environment(self):
"""Validate that required environment variables are set"""
required_vars = ["APPWRITE_PROJECT_ID", "APPWRITE_API_KEY"]
missing_vars = []
for var in required_vars:
if not os.getenv(var):
missing_vars.append(var)
if missing_vars:
error_msg = (
f"Missing required environment variables: {', '.join(missing_vars)}"
)
logger.error(error_msg)
logger.error("Please set these variables in your .env file:")
for var in missing_vars:
logger.error(f" {var}=your_value_here")
raise ValueError(error_msg)
def _initialize_database(self):
"""Initialize database and chunks collection if they don't exist"""
try:
# Check if database exists
try:
self.databases.get(database_id=self.database_id)
logger.info(f"Database {self.database_id} already exists")
except Exception:
# Create database
self.databases.create(
database_id=self.database_id, name="React Documentation Database"
)
logger.info(f"Created database {self.database_id}")
# Initialize chunks collection
self._initialize_chunks_collection()
# Initialize completion status collection
self._initialize_completion_collection()
except Exception as e:
logger.error(f"Error initializing database: {str(e)}")
raise
def _initialize_storage(self):
"""Check if storage bucket exists (don't create if it doesn't)"""
try:
# Check if bucket exists
try:
self.storage.get_bucket(bucket_id=self.bucket_id)
logger.info(f"Storage bucket {self.bucket_id} exists and is accessible")
except Exception as e:
logger.error(
f"Storage bucket {self.bucket_id} not found or not accessible: {str(e)}"
)
logger.error(
"Please make sure the bucket exists and your API key has access to it"
)
raise
except Exception as e:
logger.error(f"Error checking storage bucket: {str(e)}")
raise
def _initialize_chunks_collection(self):
"""Initialize chunks collection"""
try:
# Check if chunks collection exists
try:
self.databases.get_collection(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
)
logger.info(
f"Chunks collection {self.chunks_collection_id} already exists"
)
except Exception:
# Create chunks collection
self.databases.create_collection(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
name="Document Chunks",
)
# Create attributes for the chunks collection
self.databases.create_string_attribute(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
key="content",
size=65536, # 64KB for content
required=True,
)
self.databases.create_string_attribute(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
key="title",
size=255,
required=True,
)
self.databases.create_string_attribute(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
key="url",
size=500,
required=False,
)
self.databases.create_string_attribute(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
key="chunk_id",
size=100,
required=True,
)
logger.info(
f"Created chunks collection {self.chunks_collection_id} with attributes"
)
except Exception as e:
logger.error(f"Error initializing chunks collection: {str(e)}")
raise
def _initialize_completion_collection(self):
"""Initialize completion status collection"""
try:
# Check if completion collection exists
try:
self.databases.get_collection(
database_id=self.database_id,
collection_id=self.completion_collection_id,
)
logger.info(
f"Completion collection {self.completion_collection_id} already exists"
)
except Exception:
# Create completion collection
self.databases.create_collection(
database_id=self.database_id,
collection_id=self.completion_collection_id,
name="Completion Status",
)
# Create attributes for the completion collection
self.databases.create_string_attribute(
database_id=self.database_id,
collection_id=self.completion_collection_id,
key="url",
size=500,
required=True,
)
self.databases.create_string_attribute(
database_id=self.database_id,
collection_id=self.completion_collection_id,
key="status",
size=50,
required=True,
)
self.databases.create_string_attribute(
database_id=self.database_id,
collection_id=self.completion_collection_id,
key="completed_at",
size=100,
required=True,
)
self.databases.create_integer_attribute(
database_id=self.database_id,
collection_id=self.completion_collection_id,
key="chunks_count",
required=True,
)
logger.info(
f"Created completion collection {self.completion_collection_id} with attributes"
)
except Exception as e:
logger.error(f"Error initializing completion collection: {str(e)}")
raise
def get_docs_file_id(self, url: str) -> str:
"""Generate file ID based on the documentation URL"""
url_lower = url.lower()
# Map URLs to file IDs
if "react.dev" in url_lower or "reactjs.org" in url_lower:
return "react_docs_raw.json"
elif "docs.python.org" in url_lower or "python.org" in url_lower:
return "python_docs_raw.json"
elif "golang.org" in url_lower or "go.dev" in url_lower:
return "golang_docs_raw.json"
elif "developer.mozilla.org" in url_lower or "mdn" in url_lower:
return "mdn_docs_raw.json"
elif "vuejs.org" in url_lower:
return "vue_docs_raw.json"
elif "nodejs.org" in url_lower:
return "nodejs_docs_raw.json"
elif "angular.io" in url_lower:
return "angular_docs_raw.json"
elif "svelte.dev" in url_lower:
return "svelte_docs_raw.json"
elif "nextjs.org" in url_lower:
return "nextjs_docs_raw.json"
elif "nuxt.com" in url_lower:
return "nuxt_docs_raw.json"
elif "djangoproject.com" in url_lower or "django" in url_lower:
return "django_docs_raw.json"
elif "fastapi.tiangolo.com" in url_lower or "fastapi" in url_lower:
return "fastapi_docs_raw.json"
elif "docs.docker.com" in url_lower or "docker.com" in url_lower:
return "docker_docs_raw.json"
elif "kubernetes.io" in url_lower:
return "kubernetes_docs_raw.json"
elif "docs.mongodb.com" in url_lower or "mongodb.com" in url_lower:
return "mongodb_docs_raw.json"
elif "postgresql.org" in url_lower or "postgresql" in url_lower:
return "postgresql_docs_raw.json"
else:
# For unknown URLs, create a generic ID based on domain
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc.replace(".", "_").replace("www_", "")
return f"{domain}_docs_raw.json"
def docs_already_exist(self, url: str) -> bool:
"""Check if documentation for this URL already exists in storage"""
try:
file_id = self.get_docs_file_id(url)
# Try to get the file from storage
self.storage.get_file(bucket_id=self.bucket_id, file_id=file_id)
logger.info(f"Documentation already exists for {url} (file: {file_id})")
return True
except Exception as e:
logger.info(f"Documentation does not exist for {url}: {str(e)}")
return False
def save_raw_docs_to_storage(
self, docs: List[Dict[str, Any]], url: str = None
) -> bool:
"""Save raw documents as JSON file to Appwrite storage bucket"""
temp_file_path = None
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
try:
logger.info(
f"Saving {len(docs)} raw documents to Appwrite storage (attempt {attempt + 1}/{max_retries})"
)
# Generate file ID based on URL
file_id = self.get_docs_file_id(url) if url else "unknown_docs_raw.json"
logger.info(f"Using file ID: {file_id}")
# Create JSON content
json_content = json.dumps(docs, indent=2, ensure_ascii=False)
# Create temporary file with a unique name
temp_file_path = tempfile.mktemp(suffix=".json")
# Write content to temporary file
with open(temp_file_path, "w", encoding="utf-8") as temp_file:
temp_file.write(json_content)
# Upload file to storage bucket
input_file = InputFile.from_path(temp_file_path)
# Try to delete existing file first, then create new one
try:
# Try to delete existing file
self.storage.delete_file(bucket_id=self.bucket_id, file_id=file_id)
logger.info(f"Deleted existing file: {file_id}")
except Exception as e:
# File doesn't exist or can't be deleted, that's okay
logger.info(
f"Could not delete existing file (may not exist): {str(e)}"
)
# Upload to storage with retry logic
result = self.storage.create_file(
bucket_id=self.bucket_id,
file_id=file_id,
file=input_file,
)
logger.info(
f"Successfully saved raw documents to storage: {result['$id']}"
)
return True
except Exception as e:
logger.error(
f"Error saving raw documents to storage (attempt {attempt + 1}/{max_retries}): {str(e)}"
)
# Clean up temporary file on error
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
temp_file_path = None
except (OSError, PermissionError) as cleanup_error:
logger.warning(
f"Could not delete temporary file {temp_file_path}: {str(cleanup_error)}"
)
# If this is the last attempt, return False
if attempt == max_retries - 1:
logger.error(
f"Failed to save raw documents after {max_retries} attempts"
)
return False
# Wait before retrying
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
return False
def get_raw_docs_from_storage(self, url: str = None) -> List[Dict[str, Any]]:
"""Retrieve raw documents from Appwrite storage bucket"""
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
try:
logger.info(
f"Retrieving raw documents from Appwrite storage (attempt {attempt + 1}/{max_retries})"
)
# Generate file ID based on URL
file_id = self.get_docs_file_id(url) if url else "react_docs_raw.json"
logger.info(f"Looking for file: {file_id}")
# Download file from storage
result = self.storage.get_file_download(
bucket_id=self.bucket_id, file_id=file_id
)
logger.info(f"Download result type: {type(result)}")
# Handle different possible return types
docs = None
# Case 1: Result is already a list of dicts (JSON content)
if isinstance(result, list) and result and isinstance(result[0], dict):
docs = result
logger.info("Result is already a list of documents")
# Case 2: Result is bytes
elif isinstance(result, bytes):
json_content = result.decode("utf-8")
docs = json.loads(json_content)
logger.info("Result is bytes, decoded successfully")
# Case 3: Result is a list of bytes
elif (
isinstance(result, list) and result and isinstance(result[0], bytes)
):
json_bytes = b"".join(result)
json_content = json_bytes.decode("utf-8")
docs = json.loads(json_content)
logger.info("Result is list of bytes, joined and decoded")
# Case 4: Result is a single dict
elif isinstance(result, dict):
docs = [result]
logger.info("Result is a single document dict")
# Case 5: Try to convert to string and parse
else:
try:
json_str = str(result)
docs = json.loads(json_str)
logger.info("Result converted to string and parsed")
except Exception as e:
logger.error(f"Failed to parse result: {str(e)}")
raise ValueError(
f"Could not parse downloaded file content: {str(e)}"
)
if docs is None:
raise ValueError("Could not parse the downloaded file content")
logger.info(f"Retrieved {len(docs)} raw documents from storage")
return docs
except Exception as e:
logger.error(
f"Error retrieving raw documents from storage (attempt {attempt + 1}/{max_retries}): {str(e)}"
)
# If this is the last attempt, return empty list
if attempt == max_retries - 1:
logger.error(
f"Failed to retrieve raw documents after {max_retries} attempts"
)
return []
# Wait before retrying
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
return []
def get_chunks_file_id(self, url: str) -> str:
"""Generate chunks file ID based on the documentation URL"""
url_lower = url.lower()
# Map URLs to chunks file IDs
if "react.dev" in url_lower or "reactjs.org" in url_lower:
return "react_docs_chunks.json"
elif "docs.python.org" in url_lower or "python.org" in url_lower:
return "python_docs_chunks.json"
elif "golang.org" in url_lower or "go.dev" in url_lower:
return "golang_docs_chunks.json"
elif "developer.mozilla.org" in url_lower or "mdn" in url_lower:
return "mdn_docs_chunks.json"
elif "vuejs.org" in url_lower:
return "vue_docs_chunks.json"
elif "nodejs.org" in url_lower:
return "nodejs_docs_chunks.json"
elif "angular.io" in url_lower:
return "angular_docs_chunks.json"
elif "svelte.dev" in url_lower:
return "svelte_docs_chunks.json"
elif "nextjs.org" in url_lower:
return "nextjs_docs_chunks.json"
elif "nuxt.com" in url_lower:
return "nuxt_docs_chunks.json"
elif "djangoproject.com" in url_lower or "django" in url_lower:
return "django_docs_chunks.json"
elif "fastapi.tiangolo.com" in url_lower or "fastapi" in url_lower:
return "fastapi_docs_chunks.json"
elif "docs.docker.com" in url_lower or "docker.com" in url_lower:
return "docker_docs_chunks.json"
elif "kubernetes.io" in url_lower:
return "kubernetes_docs_chunks.json"
elif "docs.mongodb.com" in url_lower or "mongodb.com" in url_lower:
return "mongodb_docs_chunks.json"
elif "postgresql.org" in url_lower or "postgresql" in url_lower:
return "postgresql_docs_chunks.json"
else:
# For unknown URLs, create a generic ID based on domain
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc.replace(".", "_").replace("www_", "")
return f"{domain}_docs_chunks.json"
def chunks_already_exist(self, url: str) -> bool:
"""Check if chunks for this URL already exist in storage"""
try:
file_id = self.get_chunks_file_id(url)
# Try to get the file from storage
self.storage.get_file(bucket_id=self.bucket_id, file_id=file_id)
logger.info(f"Chunks already exist for {url} (file: {file_id})")
return True
except Exception as e:
logger.info(f"Chunks do not exist for {url}: {str(e)}")
return False
def save_chunks_to_storage(
self, chunks: List[Dict[str, Any]], url: str = None
) -> bool:
"""Save document chunks as JSON file to Appwrite storage bucket (FAST)"""
temp_file_path = None
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
try:
logger.info(
f"Saving {len(chunks)} chunks to Appwrite storage (attempt {attempt + 1}/{max_retries})"
)
# Generate file ID based on URL
file_id = (
self.get_chunks_file_id(url) if url else "unknown_docs_chunks.json"
)
logger.info(f"Using chunks file ID: {file_id}")
# Create JSON content
json_content = json.dumps(chunks, indent=2, ensure_ascii=False)
# Create temporary file with a unique name
temp_file_path = tempfile.mktemp(suffix=".json")
# Write content to temporary file
with open(temp_file_path, "w", encoding="utf-8") as temp_file:
temp_file.write(json_content)
# Upload file to storage bucket
input_file = InputFile.from_path(temp_file_path)
# Try to delete existing file first, then create new one
try:
# Try to delete existing file
self.storage.delete_file(bucket_id=self.bucket_id, file_id=file_id)
logger.info(f"Deleted existing chunks file: {file_id}")
except Exception as e:
# File doesn't exist or can't be deleted, that's okay
logger.info(
f"Could not delete existing chunks file (may not exist): {str(e)}"
)
# Upload to storage with retry logic
result = self.storage.create_file(
bucket_id=self.bucket_id,
file_id=file_id,
file=input_file,
)
logger.info(f"Successfully saved chunks to storage: {result['$id']}")
return True
except Exception as e:
logger.error(
f"Error saving chunks to storage (attempt {attempt + 1}/{max_retries}): {str(e)}"
)
# Clean up temporary file on error
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
temp_file_path = None
except (OSError, PermissionError) as cleanup_error:
logger.warning(
f"Could not delete temporary file {temp_file_path}: {str(cleanup_error)}"
)
# If this is the last attempt, return False
if attempt == max_retries - 1:
logger.error(f"Failed to save chunks after {max_retries} attempts")
return False
# Wait before retrying
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
return False
def get_chunks_from_storage(self, url: str = None) -> List[Dict[str, Any]]:
"""Retrieve document chunks from Appwrite storage bucket (FAST)"""
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
try:
logger.info(
f"Retrieving chunks from Appwrite storage (attempt {attempt + 1}/{max_retries})"
)
# Generate file ID based on URL
file_id = (
self.get_chunks_file_id(url) if url else "react_docs_chunks.json"
)
logger.info(f"Looking for chunks file: {file_id}")
# Download file from storage
result = self.storage.get_file_download(
bucket_id=self.bucket_id, file_id=file_id
)
logger.info(f"Download result type: {type(result)}")
# Handle different possible return types
chunks = None
# Case 1: Result is already a list of dicts (JSON content)
if isinstance(result, list) and result and isinstance(result[0], dict):
chunks = result
logger.info("Result is already a list of chunks")
# Case 2: Result is bytes
elif isinstance(result, bytes):
json_content = result.decode("utf-8")
chunks = json.loads(json_content)
logger.info("Result is bytes, decoded successfully")
# Case 3: Result is a list of bytes
elif (
isinstance(result, list) and result and isinstance(result[0], bytes)
):
json_bytes = b"".join(result)
json_content = json_bytes.decode("utf-8")
chunks = json.loads(json_content)
logger.info("Result is list of bytes, joined and decoded")
# Case 4: Result is a single dict
elif isinstance(result, dict):
chunks = [result]
logger.info("Result is a single chunk dict")
# Case 5: Try to convert to string and parse
else:
try:
json_str = str(result)
chunks = json.loads(json_str)
logger.info("Result converted to string and parsed")
except Exception as e:
logger.error(f"Failed to parse result: {str(e)}")
raise ValueError(
f"Could not parse downloaded chunks file content: {str(e)}"
)
if chunks is None:
raise ValueError(
"Could not parse the downloaded chunks file content"
)
logger.info(f"Retrieved {len(chunks)} chunks from storage")
return chunks
except Exception as e:
logger.error(
f"Error retrieving chunks from storage (attempt {attempt + 1}/{max_retries}): {str(e)}"
)
# If this is the last attempt, return empty list
if attempt == max_retries - 1:
logger.error(
f"Failed to retrieve chunks after {max_retries} attempts"
)
return []
# Wait before retrying
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
return []
def save_chunks(self, chunks: List[Dict[str, Any]], url: str = None) -> bool:
"""Save document chunks - optimized version using storage bucket"""
try:
logger.info(f"Saving {len(chunks)} chunks using optimized method")
# Use the fast storage method instead of database
return self.save_chunks_to_storage(chunks, url)
except Exception as e:
logger.error(f"Error saving chunks: {str(e)}")
return False
def get_all_chunks(self, url: str = None) -> List[Dict[str, Any]]:
"""Retrieve all document chunks - optimized version using storage bucket"""
try:
logger.info("Retrieving all chunks using optimized method")
# Use the fast storage method instead of database
return self.get_chunks_from_storage(url)
except Exception as e:
logger.error(f"Error retrieving chunks: {str(e)}")
return []
def search_chunks(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search for chunks containing specific text"""
try:
logger.info(f"Searching for chunks with query: {query}")
# Search documents in the collection
response = self.databases.list_documents(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
queries=[],
)
chunks = []
for doc in response["documents"]:
# Simple client-side search for now
if (
query.lower() in doc["content"].lower()
or query.lower() in doc["title"].lower()
):
chunks.append(
{
"content": doc["content"],
"title": doc["title"],
"url": doc.get("url", ""),
"chunk_id": doc["chunk_id"],
}
)
logger.info(f"Found {len(chunks)} matching chunks")
return chunks[:limit]
except Exception as e:
logger.error(f"Error searching chunks in Appwrite: {str(e)}")
return []
def delete_raw_docs_from_storage(self) -> bool:
"""Delete raw documents file from storage bucket"""
try:
logger.info("Deleting raw documents from storage")
# Delete file from storage
self.storage.delete_file(
bucket_id=self.bucket_id, file_id="react_docs_raw.json"
)
logger.info("Successfully deleted raw documents from storage")
return True
except Exception as e:
logger.error(f"Error deleting raw documents from storage: {str(e)}")
return False
def delete_all_chunks(self) -> bool:
"""Delete all chunks from the database (use with caution)"""
try:
logger.info("Deleting all chunks from Appwrite")
# Get all documents
response = self.databases.list_documents(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
)
# Delete each document
for doc in response["documents"]:
self.databases.delete_document(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
document_id=doc["$id"],
)
logger.info("Successfully deleted all chunks")
return True
except Exception as e:
logger.error(f"Error deleting chunks from Appwrite: {str(e)}")
return False
def get_raw_docs_count(self) -> int:
"""Get the total number of raw documents in storage"""
try:
# Check if raw docs file exists
try:
self.storage.get_file(
bucket_id=self.bucket_id, file_id="react_docs_raw.json"
)
# If file exists, get the count from the content
docs = self.get_raw_docs_from_storage()
return len(docs)
except Exception:
return 0
except Exception as e:
logger.error(f"Error getting raw docs count: {str(e)}")
return 0
def get_chunks_count(self) -> int:
"""Get the total number of chunks in the database"""
try:
response = self.databases.list_documents(
database_id=self.database_id,
collection_id=self.chunks_collection_id,
)
return response["total"]
except Exception as e:
logger.error(f"Error getting chunks count: {str(e)}")
return 0
def clear_all_data(self) -> bool:
"""Clear all data from both storage and database"""
try:
logger.info("Clearing all data from storage and database")
success1 = self.delete_raw_docs_from_storage()
success2 = self.delete_all_chunks()
return success1 and success2
except Exception as e:
logger.error(f"Error clearing all data: {str(e)}")
return False
def list_storage_files(self) -> List[str]:
"""List all files in the storage bucket"""
try:
response = self.storage.list_files(bucket_id=self.bucket_id)
files = [file["$id"] for file in response["files"]]
logger.info(f"Found {len(files)} files in storage")
return files
except Exception as e:
logger.error(f"Error listing storage files: {str(e)}")
return []
def save_completion_status(self, url: str, chunks_count: int) -> bool:
"""Save completion status for a documentation URL"""
try:
import datetime
# Check if completion record already exists
existing_record = self.get_completion_status(url)
if existing_record:
# Update existing record
self.databases.update_document(
database_id=self.database_id,
collection_id=self.completion_collection_id,
document_id=existing_record["$id"],
data={
"url": url,
"status": "completed",
"completed_at": datetime.datetime.now().isoformat(),
"chunks_count": chunks_count,
},
)
logger.info(f"Updated completion status for {url}")
else:
# Create new record
self.databases.create_document(
database_id=self.database_id,
collection_id=self.completion_collection_id,
document_id="unique()",
data={
"url": url,
"status": "completed",
"completed_at": datetime.datetime.now().isoformat(),
"chunks_count": chunks_count,
},
)
logger.info(f"Saved completion status for {url}")
return True
except Exception as e:
logger.error(f"Error saving completion status: {str(e)}")
return False
def get_completion_status(self, url: str) -> Optional[Dict[str, Any]]:
"""Get completion status for a documentation URL"""
try:
from appwrite.query import Query
response = self.databases.list_documents(
database_id=self.database_id,
collection_id=self.completion_collection_id,
queries=[Query.equal("url", url)],
)
if response["documents"]:
return response["documents"][0]
return None
except Exception as e:
logger.error(f"Error getting completion status: {str(e)}")
return None
def is_fully_processed(self, url: str) -> bool:
"""Check if documentation is fully processed (has completion status)"""
try:
completion_status = self.get_completion_status(url)
return (
completion_status is not None
and completion_status.get("status") == "completed"
)
except Exception as e:
logger.error(f"Error checking if fully processed: {str(e)}")
return False
# Global instance
appwrite_service = AppwriteService()