| """ |
| Migration Script: Backfill Slug and Quality Score |
| Adds missing fields to existing articles in Appwrite |
| |
| Run this once to update all existing articles with: |
| - slug: SEO-friendly URL slug |
| - quality_score: Article quality ranking (0-100) |
| """ |
|
|
| import asyncio |
| from appwrite.client import Client |
| from appwrite.services.databases import Databases |
| from appwrite.query import Query |
| import os |
| from dotenv import load_dotenv |
|
|
| |
| import sys |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from app.utils.data_validation import generate_slug, calculate_quality_score |
|
|
| |
| load_dotenv() |
|
|
| |
| APPWRITE_ENDPOINT = os.getenv('APPWRITE_ENDPOINT') |
| APPWRITE_PROJECT_ID = os.getenv('APPWRITE_PROJECT_ID') |
| APPWRITE_API_KEY = os.getenv('APPWRITE_API_KEY' ) |
| APPWRITE_DATABASE_ID = os.getenv('APPWRITE_DATABASE_ID') |
| APPWRITE_COLLECTION_ID = os.getenv('APPWRITE_COLLECTION_ID') |
|
|
|
|
| async def migrate_articles(): |
| """ |
| Migrate existing articles to add slug and quality_score |
| """ |
| print("=" * 60) |
| print("π Appwrite Article Migration Script") |
| print("=" * 60) |
| print(f"Database: {APPWRITE_DATABASE_ID}") |
| print(f"Collection: {APPWRITE_COLLECTION_ID}") |
| print() |
| |
| |
| client = Client() |
| client.set_endpoint(APPWRITE_ENDPOINT) |
| client.set_project(APPWRITE_PROJECT_ID) |
| client.set_key(APPWRITE_API_KEY) |
| |
| databases = Databases(client) |
| |
| |
| offset = 0 |
| limit = 100 |
| total_updated = 0 |
| total_skipped = 0 |
| total_errors = 0 |
| |
| while True: |
| try: |
| print(f"π₯ Fetching articles {offset + 1} to {offset + limit}...") |
| |
| |
| response = databases.list_documents( |
| database_id=APPWRITE_DATABASE_ID, |
| collection_id=APPWRITE_COLLECTION_ID, |
| queries=[ |
| Query.limit(limit), |
| Query.offset(offset) |
| ] |
| ) |
| |
| documents = response['documents'] |
| |
| if not documents: |
| print("β
No more articles to process") |
| break |
| |
| print(f"π Processing {len(documents)} articles...") |
| |
| |
| for doc in documents: |
| try: |
| doc_id = doc['$id'] |
| title = doc.get('title', '') |
| |
| |
| has_slug = doc.get('slug') |
| has_quality = doc.get('quality_score') is not None |
| |
| if has_slug and has_quality: |
| total_skipped += 1 |
| continue |
| |
| |
| updates = {} |
| |
| if not has_slug: |
| updates['slug'] = generate_slug(title) |
| |
| if not has_quality: |
| updates['quality_score'] = calculate_quality_score({ |
| 'title': title, |
| 'description': doc.get('description', ''), |
| 'image': doc.get('image_url'), |
| 'source': doc.get('source', '') |
| }) |
| |
| |
| if updates: |
| databases.update_document( |
| database_id=APPWRITE_DATABASE_ID, |
| collection_id=APPWRITE_COLLECTION_ID, |
| document_id=doc_id, |
| data=updates |
| ) |
| total_updated += 1 |
| print(f" β Updated: {title[:50]}... (score: {updates.get('quality_score', 'N/A')})") |
| |
| except Exception as e: |
| total_errors += 1 |
| print(f" β Error updating {doc.get('title', 'unknown')[:30]}: {e}") |
| continue |
| |
| |
| offset += limit |
| |
| |
| await asyncio.sleep(0.5) |
| |
| except Exception as e: |
| print(f"β Error fetching batch at offset {offset}: {e}") |
| break |
| |
| |
| print() |
| print("=" * 60) |
| print("π MIGRATION SUMMARY") |
| print("=" * 60) |
| print(f"β
Updated: {total_updated} articles") |
| print(f"βοΈ Skipped: {total_skipped} articles (already have fields)") |
| print(f"β Errors: {total_errors} articles") |
| print(f"π Total Processed: {total_updated + total_skipped + total_errors}") |
| print("=" * 60) |
|
|
|
|
| if __name__ == "__main__": |
| print("Starting migration...") |
| asyncio.run(migrate_articles()) |
| print("Migration complete!") |
|
|