File size: 5,087 Bytes
3619409 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | """
Migration Script: Backfill Slug and Quality Score
Adds missing fields to existing articles in Appwrite
Run this once to update all existing articles with:
- slug: SEO-friendly URL slug
- quality_score: Article quality ranking (0-100)
"""
import asyncio
from appwrite.client import Client
from appwrite.services.databases import Databases
from appwrite.query import Query
import os
from dotenv import load_dotenv
# Add parent directory to path
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.utils.data_validation import generate_slug, calculate_quality_score
# Load environment variables
load_dotenv()
# Appwrite Configuration
APPWRITE_ENDPOINT = os.getenv('APPWRITE_ENDPOINT')
APPWRITE_PROJECT_ID = os.getenv('APPWRITE_PROJECT_ID')
APPWRITE_API_KEY = os.getenv('APPWRITE_API_KEY' )
APPWRITE_DATABASE_ID = os.getenv('APPWRITE_DATABASE_ID')
APPWRITE_COLLECTION_ID = os.getenv('APPWRITE_COLLECTION_ID')
async def migrate_articles():
"""
Migrate existing articles to add slug and quality_score
"""
print("=" * 60)
print("π Appwrite Article Migration Script")
print("=" * 60)
print(f"Database: {APPWRITE_DATABASE_ID}")
print(f"Collection: {APPWRITE_COLLECTION_ID}")
print()
# Initialize Appwrite client
client = Client()
client.set_endpoint(APPWRITE_ENDPOINT)
client.set_project(APPWRITE_PROJECT_ID)
client.set_key(APPWRITE_API_KEY)
databases = Databases(client)
# Fetch all articles (paginated)
offset = 0
limit = 100
total_updated = 0
total_skipped = 0
total_errors = 0
while True:
try:
print(f"π₯ Fetching articles {offset + 1} to {offset + limit}...")
# Query articles
response = databases.list_documents(
database_id=APPWRITE_DATABASE_ID,
collection_id=APPWRITE_COLLECTION_ID,
queries=[
Query.limit(limit),
Query.offset(offset)
]
)
documents = response['documents']
if not documents:
print("β
No more articles to process")
break
print(f"π Processing {len(documents)} articles...")
# Update each document
for doc in documents:
try:
doc_id = doc['$id']
title = doc.get('title', '')
# Check if already has slug and quality_score
has_slug = doc.get('slug')
has_quality = doc.get('quality_score') is not None
if has_slug and has_quality:
total_skipped += 1
continue
# Generate missing fields
updates = {}
if not has_slug:
updates['slug'] = generate_slug(title)
if not has_quality:
updates['quality_score'] = calculate_quality_score({
'title': title,
'description': doc.get('description', ''),
'image': doc.get('image_url'),
'source': doc.get('source', '')
})
# Update document
if updates:
databases.update_document(
database_id=APPWRITE_DATABASE_ID,
collection_id=APPWRITE_COLLECTION_ID,
document_id=doc_id,
data=updates
)
total_updated += 1
print(f" β Updated: {title[:50]}... (score: {updates.get('quality_score', 'N/A')})")
except Exception as e:
total_errors += 1
print(f" β Error updating {doc.get('title', 'unknown')[:30]}: {e}")
continue
# Move to next batch
offset += limit
# Small delay to avoid rate limiting
await asyncio.sleep(0.5)
except Exception as e:
print(f"β Error fetching batch at offset {offset}: {e}")
break
# Summary
print()
print("=" * 60)
print("π MIGRATION SUMMARY")
print("=" * 60)
print(f"β
Updated: {total_updated} articles")
print(f"βοΈ Skipped: {total_skipped} articles (already have fields)")
print(f"β Errors: {total_errors} articles")
print(f"π Total Processed: {total_updated + total_skipped + total_errors}")
print("=" * 60)
if __name__ == "__main__":
print("Starting migration...")
asyncio.run(migrate_articles())
print("Migration complete!")
|