File size: 7,606 Bytes
353cfb6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import os
import sys
import time
import warnings
from dotenv import load_dotenv
from appwrite.client import Client
from appwrite.services.databases import Databases
# Suppress warnings
warnings.filterwarnings("ignore")
# Load environment variables
load_dotenv()
APPWRITE_ENDPOINT = os.getenv("APPWRITE_ENDPOINT")
APPWRITE_PROJECT_ID = os.getenv("APPWRITE_PROJECT_ID")
APPWRITE_API_KEY = os.getenv("APPWRITE_API_KEY")
APPWRITE_DATABASE_ID = os.getenv("APPWRITE_DATABASE_ID")
if not all([APPWRITE_ENDPOINT, APPWRITE_PROJECT_ID, APPWRITE_API_KEY, APPWRITE_DATABASE_ID]):
print("β Missing environment variables. Check .env file.")
sys.exit(1)
client = Client()
client.set_endpoint(APPWRITE_ENDPOINT)
client.set_project(APPWRITE_PROJECT_ID)
client.set_key(APPWRITE_API_KEY)
databases = Databases(client)
def wait_for_attribute(collection_id, key):
"""Polls Appwrite until the attribute status is 'available'."""
print(f" β³ Waiting for attribute '{key}' to be available...", end="", flush=True)
retries = 0
max_retries = 60 # 2 minutes
while retries < max_retries:
try:
# Check attribute status by listing or getting
# The SDK doesn't have 'get_attribute' easily for all types, so we list
response = databases.list_attributes(APPWRITE_DATABASE_ID, collection_id)
attrs = response.get('attributes', [])
target = next((a for a in attrs if a['key'] == key), None)
if target:
if target['status'] == 'available':
print(" β
Ready.")
return True
elif target['status'] == 'failed':
print(" β Failed.")
return False
time.sleep(2)
print(".", end="", flush=True)
retries += 1
except Exception as e:
print(f" Error: {e}")
time.sleep(2)
retries += 1
print(" β Timeout.")
return False
def wait_for_index(collection_id, key):
"""Polls Appwrite until the index status is 'available'."""
print(f" β³ Waiting for index '{key}' to be available...", end="", flush=True)
retries = 0
max_retries = 60
while retries < max_retries:
try:
response = databases.list_indexes(APPWRITE_DATABASE_ID, collection_id)
indexes = response.get('indexes', [])
target = next((i for i in indexes if i['key'] == key), None)
if target:
if target['status'] == 'available':
print(" β
Ready.")
return True
elif target['status'] == 'failed':
print(" β Failed.")
return False
time.sleep(2)
print(".", end="", flush=True)
retries += 1
except Exception as e:
print(f" Error: {e}")
time.sleep(2)
retries += 1
print(" β Timeout.")
return False
def setup_v2(collection_id):
print(f"π Starting Robust Schema Setup for: {collection_id}")
# Define Attributes
# (func, key, size/opts, required, default, array)
# Simplified structure for the loop
# 1. Attributes
print("\nπ¦ Creating Attributes (Synchronous Mode)...")
try:
# paper_id
print(" -> paper_id")
try:
databases.create_string_attribute(APPWRITE_DATABASE_ID, collection_id, "paper_id", 100, True)
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_attribute(collection_id, "paper_id")
# title
print(" -> title")
try:
databases.create_string_attribute(APPWRITE_DATABASE_ID, collection_id, "title", 500, True)
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_attribute(collection_id, "title")
# summary
print(" -> summary")
try:
databases.create_string_attribute(APPWRITE_DATABASE_ID, collection_id, "summary", 5000, True)
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_attribute(collection_id, "summary")
# authors
print(" -> authors")
try:
databases.create_string_attribute(APPWRITE_DATABASE_ID, collection_id, "authors", 5000, False)
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_attribute(collection_id, "authors")
# published_at
print(" -> published_at")
try:
databases.create_datetime_attribute(APPWRITE_DATABASE_ID, collection_id, "published_at", True)
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_attribute(collection_id, "published_at")
# pdf_url
print(" -> pdf_url")
try:
databases.create_url_attribute(APPWRITE_DATABASE_ID, collection_id, "pdf_url", True)
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_attribute(collection_id, "pdf_url")
# category
print(" -> category")
try:
databases.create_string_attribute(APPWRITE_DATABASE_ID, collection_id, "category", 50, True)
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_attribute(collection_id, "category")
# Stats
for stat in ["likes", "dislikes", "views"]:
print(f" -> {stat}")
try:
databases.create_integer_attribute(APPWRITE_DATABASE_ID, collection_id, stat, False, 0, 2147483647, 0)
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_attribute(collection_id, stat)
except Exception as e:
print(f"β Critical Error during attribute creation: {e}")
return
# 2. Indexes
print("\nποΈ Creating Indexes (Now that attributes are ready)...")
# unique_paper_id
print(" -> unique_paper_id")
try:
databases.create_index(APPWRITE_DATABASE_ID, collection_id, "unique_paper_id", "unique", ["paper_id"], ["ASC"])
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_index(collection_id, "unique_paper_id")
# idx_published_at
print(" -> idx_published_at")
try:
databases.create_index(APPWRITE_DATABASE_ID, collection_id, "idx_published_at", "key", ["published_at"], ["DESC"])
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_index(collection_id, "idx_published_at")
# idx_category
print(" -> idx_category")
try:
databases.create_index(APPWRITE_DATABASE_ID, collection_id, "idx_category", "key", ["category"], ["ASC"])
except Exception as e:
if "already exists" not in str(e): print(f"Error: {e}")
wait_for_index(collection_id, "idx_category")
print("\nβ
Verification Complete: All attributes and indexes are AVAILABLE.")
print(" You may now run the ingestion script.")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python scripts/setup_research_v2.py <NEW_COLLECTION_ID>")
sys.exit(1)
col_id = sys.argv[1]
setup_v2(col_id)
|