| """ |
| Appwrite Database Service - Phase 2 |
| Provides persistent storage for news articles with fast querying capability. |
| """ |
|
|
| |
| |
| |
| import warnings |
| import warnings |
| warnings.filterwarnings('ignore', category=DeprecationWarning, module='appwrite') |
| warnings.filterwarnings('ignore', message='Call to deprecated function') |
|
|
| try: |
| from appwrite.client import Client |
| from appwrite.services.databases import Databases |
| from appwrite.services.storage import Storage |
| from appwrite.query import Query |
| from appwrite.exception import AppwriteException |
| APPWRITE_AVAILABLE = True |
| except ImportError: |
| APPWRITE_AVAILABLE = False |
| print("Appwrite SDK not available - database features disabled") |
|
|
| from typing import List, Optional, Dict |
| from datetime import datetime, timedelta |
| import hashlib |
| import asyncio |
| from app.models import Article |
| from app.config import settings |
| import logging |
|
|
| |
| |
| |
| from app.utils.custom_logger import get_logger, TAG_DB, TAG_ERROR |
| logger = get_logger(__name__) |
|
|
|
|
| class TablesDBWrapper: |
| """ |
| Future-Proofing Wrapper (Migration Phase) |
| Wraps legacy 'documents' API into new 'tables' nomenclature |
| """ |
| def __init__(self, db_service): |
| self.db = db_service |
| |
| async def create_row(self, *args, **kwargs): |
| return await asyncio.to_thread(self.db.create_document, *args, **kwargs) |
| |
| async def get_row(self, *args, **kwargs): |
| return await asyncio.to_thread(self.db.get_document, *args, **kwargs) |
| |
| async def list_rows(self, *args, **kwargs): |
| return await asyncio.to_thread(self.db.list_documents, *args, **kwargs) |
|
|
| async def delete_row(self, *args, **kwargs): |
| |
| return await asyncio.to_thread(self.db.delete_document, *args, **kwargs) |
|
|
| async def update_row(self, *args, **kwargs): |
| return await asyncio.to_thread(self.db.update_document, *args, **kwargs) |
|
|
|
|
| class AppwriteDatabase: |
| """Appwrite Database service for persistent article storage (L2 cache)""" |
| |
| def __init__(self): |
| self.initialized = False |
| self.client = None |
| self.databases = None |
| self.storage = None |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| self._write_semaphore = asyncio.Semaphore(10) |
| |
| if APPWRITE_AVAILABLE and settings.APPWRITE_PROJECT_ID: |
| self._initialize() |
| |
| def _initialize(self): |
| """Initialize Appwrite client and database connection""" |
| if not APPWRITE_AVAILABLE: |
| return |
| |
| try: |
| |
| if not settings.APPWRITE_PROJECT_ID or not settings.APPWRITE_API_KEY: |
| print("Appwrite credentials not configured - database features disabled") |
| self.initialized = False |
| return |
| |
| |
| self.client = Client() |
| self.client.set_endpoint(settings.APPWRITE_ENDPOINT) |
| self.client.set_project(settings.APPWRITE_PROJECT_ID) |
| self.client.set_key(settings.APPWRITE_API_KEY) |
| |
| |
| self.databases = Databases(self.client) |
| |
| |
| self.storage = Storage(self.client) |
| |
| self.initialized = True |
| print("") |
| self.initialized = True |
| print("") |
| print("-" * 80) |
| print("[Appwrite] Database initialized successfully!") |
| print(f"Database ID: {settings.APPWRITE_DATABASE_ID}") |
| print(f"Collection ID: {settings.APPWRITE_COLLECTION_ID}") |
| print("-" * 80) |
| print("-" * 80) |
| print("") |
| print("") |
| |
| self.tablesDB = TablesDBWrapper(self.databases) |
| |
| except Exception as e: |
| print("") |
| except Exception as e: |
| print("") |
| print("!" * 80) |
| print("[Appwrite] Initialization FAILED!") |
| print(f"[ERROR] Error: {e}") |
| print("[INFO] Please check your Appwrite credentials in .env file") |
| print("!" * 80) |
| print("") |
| print("") |
| self.initialized = False |
| |
| def get_collection_id(self, category: str) -> str: |
| """ |
| Phase 4: Strict Routing Algorithm (Vertical Architecture) |
| """ |
| |
| if not category or not category.strip(): |
| logger.warning("[ROUTING] Empty category, defaulting to News Articles") |
| return settings.APPWRITE_COLLECTION_ID |
| |
| cat = category.lower().strip() |
| |
| |
| if cat == 'ai': |
| return settings.APPWRITE_AI_COLLECTION_ID |
| |
| |
| if cat.startswith('cloud-'): |
| return settings.APPWRITE_CLOUD_COLLECTION_ID |
| |
| |
| if cat == 'research' or cat.startswith('research-'): |
| return settings.APPWRITE_RESEARCH_COLLECTION_ID |
| |
| |
| if cat.startswith('data-') or cat.startswith('business-') or cat == 'customer-data-platform': |
| return settings.APPWRITE_DATA_COLLECTION_ID |
| |
| |
| if cat == 'magazines': |
| return settings.APPWRITE_MAGAZINE_COLLECTION_ID |
| |
| |
| if cat == 'medium-article': |
| return settings.APPWRITE_MEDIUM_COLLECTION_ID |
| |
| |
| logger.warning(f"[ROUTING] Unmatched category '{cat}', defaulting to News Articles") |
| return settings.APPWRITE_COLLECTION_ID |
|
|
| |
| def _generate_url_hash(self, url: str) -> str: |
| """ |
| Generate a unique hash for an article URL |
| |
| **INTEGRATION UPDATE**: Matches Schema Size 64 |
| Uses SHA-256 hash of the RAW URL. |
| |
| Returns: |
| 64-character hex hash |
| """ |
| import hashlib |
| |
| hash_bytes = hashlib.sha256(url.encode('utf-8')).hexdigest() |
| |
| return hash_bytes |
| |
| async def get_articles(self, category: str, limit: int = 20, offset: int = 0) -> List[Dict]: |
| """ |
| Get articles by category with pagination and projection (FAANG-Level) |
| """ |
| if not self.initialized: |
| return [] |
| |
| try: |
| |
| target_collection_id = self.get_collection_id(category) |
|
|
| |
| select_fields = [ |
| '$id', |
| 'title', |
| 'url', |
| 'image_url', |
| 'published_at', |
| 'source', |
| 'category', |
| 'url_hash', |
| 'authors', |
| 'pdf_url', |
| 'summary' |
| ] |
| |
| |
| queries = [ |
| Query.order_desc('published_at'), |
| Query.limit(limit), |
| Query.offset(offset) |
| ] |
| |
| |
| |
| if category != 'research': |
| queries.insert(0, Query.equal('category', category)) |
| |
| response = await self.tablesDB.list_rows( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=target_collection_id, |
| queries=queries |
| ) |
| |
| print(f"[DEBUG] Appwrite Raw Response: Total={response.get('total')}, Docs={len(response.get('documents', []))}") |
| |
| |
| articles = [] |
| for doc in response['documents']: |
| try: |
| |
| description = doc.get('description', '') |
| if not description and doc.get('summary'): |
| description = doc.get('summary') |
| |
| url = doc.get('url', '') |
| if not url and doc.get('pdf_url'): |
| url = doc.get('pdf_url') |
| |
| article = { |
| '$id': doc.get('$id'), |
| 'title': doc.get('title'), |
| 'description': description, |
| 'url': url, |
| 'image_url': doc.get('image_url', ''), |
| 'publishedAt': doc.get('published_at'), |
| 'published_at': doc.get('published_at'), |
| 'source': doc.get('source', ''), |
| 'category': doc.get('category'), |
| 'likes': doc.get('likes', 0), |
| 'dislikes': doc.get('dislike', 0), |
| 'views': doc.get('views', 0), |
| 'author': doc.get('authors') |
| |
| } |
| articles.append(article) |
| except Exception as e: |
| print(f"Error parsing Appwrite document: {e}") |
| continue |
| |
| if articles: |
| print(f"[SUCCESS] Retrieved {len(articles)} articles for '{category}' (Collection: {target_collection_id})") |
| |
| return articles |
| |
| except AppwriteException as e: |
| print(f"Appwrite query error for category '{category}': {e}") |
| return [] |
| |
| async def get_articles_with_queries(self, queries: List, category: str = None) -> List[Dict]: |
| """ |
| Get articles with custom query filters (for cursor pagination) |
| |
| Args: |
| queries: List of Appwrite Query objects |
| category: Optional category for explicit routing (Recommended) |
| """ |
| if not self.initialized: |
| return [] |
| |
| try: |
| |
| target_collection_id = settings.APPWRITE_COLLECTION_ID |
| |
| if category: |
| |
| target_collection_id = self.get_collection_id(category) |
| logger.info(f"π [ROUTING] Category='{category}' -> Collection='{target_collection_id}'") |
| else: |
| |
| |
| for q in queries: |
| q_str = str(q) |
| if 'category' in q_str: |
| import re |
| |
| |
| match = re.search(r'category.*?"values":\["([^"]+)"\]', q_str) |
| if not match: |
| |
| match = re.search(r'category.*?"([^"]+)"', q_str) |
| |
| if match: |
| category_val = match.group(1) |
| target_collection_id = self.get_collection_id(category_val) |
| logger.info(f"π [ROUTING-FALLBACK] Extracted='{category_val}' -> Collection='{target_collection_id}'") |
| break |
|
|
| logger.info(f"π [QUERY] Executing query on Collection: {target_collection_id}") |
| |
| response = await self.tablesDB.list_rows( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=target_collection_id, |
| queries=queries |
| ) |
| |
| |
| articles = [] |
| for doc in response['documents']: |
| try: |
| article = { |
| '$id': doc.get('$id'), |
| 'title': doc.get('title'), |
| 'description': doc.get('description') or doc.get('summary', ''), |
| 'url': doc.get('url'), |
| 'image_url': doc.get('image_url', ''), |
| 'publishedAt': doc.get('published_at'), |
| 'published_at': doc.get('published_at'), |
| 'source': doc.get('source', ''), |
| 'category': doc.get('category'), |
| 'likes': doc.get('likes', 0), |
| 'dislikes': doc.get('dislike', 0), |
| 'views': doc.get('views', 0) |
| } |
| articles.append(article) |
| except Exception as e: |
| continue |
| |
| return articles |
| |
| except Exception as e: |
| print(f"Query error: {e}") |
| return [] |
| |
| async def save_articles(self, articles: List) -> int: |
| """ |
| Save articles to Appwrite database with TRUE parallel writes |
| """ |
| logger = logging.getLogger(__name__) |
| |
| if not self.initialized: |
| return (0, 0, 0, []) |
| |
| if not articles: |
| return (0, 0, 0, []) |
|
|
| |
| try: |
| from app.services.deduplication import get_url_filter |
| url_filter = get_url_filter() |
| except ImportError: |
| logger.warning("[Appwrite] Deduplication service not found, skipping local bloom filter check") |
| url_filter = None |
| |
| async def save_single_article(article: dict) -> tuple: |
| try: |
| |
| url = str(article.get('url', '')) if isinstance(article, dict) else str(article.url) |
| if not url: |
| return ('error', None) |
| |
| |
| if url_filter and not url_filter.check_and_add(url): |
| |
| |
| return ('duplicate', None) |
|
|
| |
| |
| url_hash_full = self._generate_url_hash(url) |
| |
| doc_id = url_hash_full[:32] |
| |
| |
| def get_field(obj, field, default=''): |
| if isinstance(obj, dict): |
| return obj.get(field, default) |
| return getattr(obj, field, default) |
| |
| |
| category_val = str(get_field(article, 'category', '')) |
| target_collection_id = self.get_collection_id(category_val) |
|
|
| |
| |
| |
| |
| |
| |
| pub_date = get_field(article, 'published_at') or get_field(article, 'publishedAt') |
| if isinstance(pub_date, datetime): |
| pub_date_str = pub_date.isoformat() |
| else: |
| pub_date_str = str(pub_date or datetime.now().isoformat()) |
|
|
| document_data = { |
| 'title': str(get_field(article, 'title', ''))[:500], |
| 'description': str(get_field(article, 'description', ''))[:2000], |
| 'url': url[:2048], |
| 'image_url': str(get_field(article, 'image_url') or get_field(article, 'image', ''))[:2048] or None, |
| 'published_at': pub_date_str, |
| 'source': str(get_field(article, 'source', ''))[:200], |
| 'category': str(get_field(article, 'category', ''))[:100], |
| 'fetched_at': datetime.now().isoformat(), |
| 'url_hash': url_hash_full, |
| 'slug': str(get_field(article, 'slug', ''))[:200] if get_field(article, 'slug', '') else None, |
| 'quality_score': int(get_field(article, 'quality_score', 50)), |
| |
| 'likes': 0, |
| 'dislike': 0, |
| 'views': 0, |
| 'audio_url': get_field(article, 'audio_url', None) |
| } |
| |
| |
| if target_collection_id == settings.APPWRITE_CLOUD_COLLECTION_ID: |
| document_data['provider'] = document_data['source'] |
| document_data['is_official'] = False |
| |
| |
| |
| image_value = document_data.pop('image_url', None) |
| |
| |
| if image_value and isinstance(image_value, str) and image_value.strip(): |
| |
| if image_value.startswith(('http://', 'https://')): |
| document_data['image'] = image_value |
| else: |
| |
| document_data['image'] = None |
| else: |
| |
| document_data['image'] = None |
| |
| |
| |
|
|
| |
| await self.tablesDB.create_row( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=target_collection_id, |
| document_id=doc_id, |
| data=document_data |
| ) |
| |
| return ('success', document_data) |
| |
| except AppwriteException as e: |
| |
| if 'document_already_exists' in str(e).lower() or 'unique' in str(e).lower(): |
| return ('duplicate', None) |
| else: |
| logger.error("%s Appwrite write failed: %s | URL: %s...", |
| TAG_ERROR, str(e), url[:60]) |
| return ('error', str(e)) |
| |
| except Exception as e: |
| logger.error("%s Unexpected error during save: %s | URL: %s...", |
| TAG_ERROR, str(e), url[:60]) |
| return ('error', str(e)) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async def _safe_save(article): |
| async with self._write_semaphore: |
| return await save_single_article(article) |
|
|
| save_tasks = [_safe_save(article) for article in articles] |
|
|
| |
| |
| results = await asyncio.gather(*save_tasks, return_exceptions=True) |
| |
| |
| saved_count = 0 |
| saved_documents = [] |
| duplicate_count = 0 |
| error_count = 0 |
| |
| for result in results: |
| if isinstance(result, Exception): |
| error_count += 1 |
| continue |
| |
| status, data = result |
| if status == 'success': |
| saved_count += 1 |
| saved_documents.append(data) |
| elif status == 'duplicate': |
| duplicate_count += 1 |
| else: |
| error_count += 1 |
| |
| if saved_count > 0 or duplicate_count > 0 or error_count > 0: |
| logger.info( |
| "%s Saved: %d | Duplicates: %d | Errors: %d", |
| TAG_DB, saved_count, duplicate_count, error_count |
| ) |
| |
| return saved_count, duplicate_count, error_count, saved_documents |
| |
| async def delete_old_articles(self, days: int = 30) -> int: |
| """ |
| Delete articles older than specified days |
| |
| Args: |
| days: Delete articles older than this many days |
| |
| Returns: |
| Number of articles deleted |
| """ |
| if not self.initialized: |
| return 0 |
| |
| try: |
| cutoff_date = (datetime.now() - timedelta(days=days)).isoformat() |
| |
| |
| response = await self.tablesDB.list_rows( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_COLLECTION_ID, |
| queries=[ |
| Query.less_than('fetched_at', cutoff_date), |
| Query.limit(500) |
| ] |
| ) |
| |
| deleted_count = 0 |
| for doc in response['documents']: |
| try: |
| await self.tablesDB.delete_row( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_COLLECTION_ID, |
| document_id=doc['$id'] |
| ) |
| deleted_count += 1 |
| except Exception as e: |
| print(f"Error deleting document {doc['$id']}: {e}") |
| |
| if deleted_count > 0: |
| print(f"[CLEANUP] Deleted {deleted_count} articles older than {days} days") |
| else: |
| print(f"[CLEANUP] No old articles to delete") |
| |
| return deleted_count |
| |
| except Exception as e: |
| print(f"Error deleting old articles: {e}") |
| return 0 |
| |
| |
| |
| |
|
|
| async def create_subscriber(self, email: str, name: str, preferences: Dict[str, bool], token: str) -> bool: |
| """ |
| Create a new subscriber in Appwrite (Dual-Write) |
| Uses Boolean Flags schema: sub_morning, sub_afternoon, etc. |
| """ |
| if not self.initialized: |
| return False |
| |
| try: |
| |
| data = { |
| "email": email, |
| "name": name, |
| "token": token, |
| "isActive": True, |
| |
| "sub_morning": preferences.get("Morning", False), |
| "sub_afternoon": preferences.get("Afternoon", False), |
| "sub_evening": preferences.get("Evening", False), |
| "sub_weekly": preferences.get("Weekly", False), |
| "sub_monthly": preferences.get("Monthly", False) |
| } |
| |
| |
| |
| |
| |
| doc_id = hashlib.md5(email.lower().encode()).hexdigest() |
|
|
| await self.tablesDB.create_row( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_SUBSCRIBERS_COLLECTION_ID, |
| document_id=doc_id, |
| data=data |
| ) |
| logger.info(f"β
[Appwrite] Subscriber created: {email}") |
| return True |
|
|
| except AppwriteException as e: |
| if 'document_already_exists' in str(e).lower() or 'unique' in str(e).lower(): |
| |
| |
| logger.info(f"βΉοΈ [Appwrite] Subscriber exists, updating: {email}") |
| return await self.update_subscriber(email, preferences) |
| |
| logger.error(f"β [Appwrite] Error creating subscriber: {e}") |
| return False |
| except Exception as e: |
| logger.error(f"β [Appwrite] Unexpected error creating subscriber: {e}") |
| return False |
|
|
| async def get_subscriber(self, email: str) -> Optional[Dict]: |
| """Get subscriber by email""" |
| if not self.initialized: |
| return None |
| |
| try: |
| documents = await self.tablesDB.list_rows( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_SUBSCRIBERS_COLLECTION_ID, |
| queries=[Query.equal("email", email)] |
| ) |
| |
| if documents['total'] > 0: |
| return documents['documents'][0] |
| return None |
| |
| except Exception as e: |
| logger.error(f"β [Appwrite] Error getting subscriber: {e}") |
| return None |
|
|
| async def update_subscriber(self, email: str, preferences: Dict[str, bool]) -> bool: |
| """Update subscriber preferences""" |
| if not self.initialized: |
| return False |
| |
| try: |
| |
| subscriber = await self.get_subscriber(email) |
| if not subscriber: |
| return False |
| |
| doc_id = subscriber['$id'] |
| |
| |
| data = {} |
| if "Morning" in preferences: data["sub_morning"] = preferences["Morning"] |
| if "Afternoon" in preferences: data["sub_afternoon"] = preferences["Afternoon"] |
| if "Evening" in preferences: data["sub_evening"] = preferences["Evening"] |
| if "Weekly" in preferences: data["sub_weekly"] = preferences["Weekly"] |
| if "Monthly" in preferences: data["sub_monthly"] = preferences["Monthly"] |
| |
| |
| await self.tablesDB.update_row( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_SUBSCRIBERS_COLLECTION_ID, |
| document_id=doc_id, |
| data=data |
| ) |
| logger.info(f"β
[Appwrite] Subscriber updated: {email}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"β [Appwrite] Error updating subscriber: {e}") |
| return False |
|
|
| async def get_subscriber_by_token(self, token: str) -> Optional[Dict]: |
| """Get subscriber by unsubscribe token""" |
| try: |
| documents = await self.tablesDB.list_rows( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_SUBSCRIBERS_COLLECTION_ID, |
| queries=[Query.equal("token", token)] |
| ) |
| |
| if documents['total'] > 0: |
| return documents['documents'][0] |
| return None |
| |
| except Exception as e: |
| logger.error(f"β [Appwrite] Error finding subscriber by token: {e}") |
| return None |
|
|
| async def update_article_audio(self, collection_id: str, document_id: str, audio_url: str, text_summary: Optional[str] = None) -> bool: |
| """Update article with audio URL and optional text summary""" |
| if not self.initialized: |
| return False |
| |
| try: |
| data = {'audio_url': audio_url} |
| if text_summary: |
| data['text_summary'] = text_summary |
| |
| await self.tablesDB.update_row( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=collection_id, |
| document_id=document_id, |
| data=data |
| ) |
| return True |
| except Exception as e: |
| logger.error(f"β [Appwrite] Error updating article audio: {e}") |
| return False |
|
|
| async def update_subscription_status(self, email: str, preference: str, is_active: bool) -> bool: |
| """ |
| Update specific subscription preference (Granular Unsubscribe) |
| """ |
| if not self.initialized: |
| return False |
| |
| try: |
| subscriber = await self.get_subscriber(email) |
| if not subscriber: |
| return False |
| |
| |
| field_map = { |
| "Morning": "sub_morning", |
| "Afternoon": "sub_afternoon", |
| "Evening": "sub_evening", |
| "Weekly": "sub_weekly", |
| "Monthly": "sub_monthly" |
| } |
| |
| field = field_map.get(preference) |
| if not field: |
| logger.error(f"Invalid preference: {preference}") |
| return False |
| |
| data = {field: is_active} |
| |
| await self.tablesDB.update_row( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_SUBSCRIBERS_COLLECTION_ID, |
| document_id=subscriber['$id'], |
| data=data |
| ) |
| logger.info(f"β
[Appwrite] Updated {preference} for {email} to {is_active}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"β [Appwrite] Error updating subscription status: {e}") |
| return False |
|
|
| async def update_subscriber_status(self, email: str, subscribed: bool) -> bool: |
| """ |
| Update global subscription status (Global Unsubscribe) |
| """ |
| if not self.initialized: |
| return False |
| |
| try: |
| subscriber = await self.get_subscriber(email) |
| if not subscriber: |
| return False |
| |
| data = {"isActive": subscribed} |
| |
| await self.tablesDB.update_row( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_SUBSCRIBERS_COLLECTION_ID, |
| document_id=subscriber['$id'], |
| data=data |
| ) |
| logger.info(f"β
[Appwrite] Global status for {email} set to {subscribed}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"β [Appwrite] Error updating global subscriber status: {e}") |
| return False |
|
|
| async def update_last_sent(self, email: str) -> bool: |
| """ |
| Update lastSentAt timestamp for a subscriber |
| """ |
| if not self.initialized: |
| return False |
| |
| try: |
| subscriber = await self.get_subscriber(email) |
| if not subscriber: |
| return False |
| |
| from datetime import datetime |
| import pytz |
| |
| |
| utc_now = datetime.now(pytz.UTC).isoformat() |
| |
| await self.tablesDB.update_row( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_SUBSCRIBERS_COLLECTION_ID, |
| document_id=subscriber['$id'], |
| data={'lastSentAt': utc_now} |
| ) |
| |
| return True |
| |
| except Exception as e: |
| logger.error(f"β [Appwrite] Error updating lastSentAt: {e}") |
| return False |
|
|
| async def get_subscribers_by_preference(self, preference: str) -> List[Dict]: |
| """ |
| Get all subscribers filtered by newsletter preference |
| Directly from Appwrite (Source of Truth) |
| """ |
| if not self.initialized: |
| return [] |
| |
| try: |
| |
| field_map = { |
| "Morning": "sub_morning", |
| "Afternoon": "sub_afternoon", |
| "Evening": "sub_evening", |
| "Weekly": "sub_weekly", |
| "Monthly": "sub_monthly" |
| } |
| |
| field = field_map.get(preference) |
| |
| |
| if not field: |
| logger.warning(f"β οΈ [Appwrite] Unknown preference '{preference}', defaulting to Weekly") |
| field = "sub_weekly" |
| |
| logger.info(f"π [Appwrite] Fetching subscribers for {preference} ({field})...") |
| |
| |
| |
| |
| |
| documents = await self.tablesDB.list_rows( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_SUBSCRIBERS_COLLECTION_ID, |
| queries=[ |
| Query.equal("isActive", True), |
| Query.equal(field, True), |
| Query.limit(1000) |
| ] |
| ) |
| |
| subs = documents['documents'] |
| logger.info(f"β
[Appwrite] Found {len(subs)} subscribers for {preference}") |
| return subs |
| |
| except Exception as e: |
| logger.error(f"β [Appwrite] Error getting subscribers by preference: {e}") |
| return [] |
|
|
| async def update_article_audio(self, collection_id: str, document_id: str, audio_url: str, text_summary: Optional[str] = None) -> bool: |
| """Update article with audio URL and optional text summary""" |
| if not self.initialized: |
| return False |
| |
| try: |
| data = {'audio_url': audio_url} |
| if text_summary: |
| data['text_summary'] = text_summary |
| |
| await self.tablesDB.update_row( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=collection_id, |
| document_id=document_id, |
| data=data |
| ) |
| return True |
| except Exception as e: |
| logger.error(f"β [Appwrite] Error updating article audio: {e}") |
| return False |
|
|
| async def get_database_stats(self) -> Dict: |
| """ |
| Get database statistics |
| |
| Returns: |
| Dictionary with database stats (total articles, by category, etc.) |
| """ |
| if not self.initialized: |
| return {"error": "Appwrite not initialized"} |
| |
| try: |
| |
| total_response = self.tablesDB.list_rows( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_COLLECTION_ID, |
| queries=[Query.limit(1)] |
| ) |
| total_articles = total_response['total'] |
| |
| |
| categories = [ |
| "ai", "data-security", "data-governance", "data-privacy", |
| "data-engineering", "data-management", "business-intelligence", |
| "business-analytics", "customer-data-platform", "data-centers", |
| "cloud-computing", "magazines" |
| ] |
| |
| articles_by_category = {} |
| for category in categories: |
| response = self.tablesDB.list_rows( |
| database_id=settings.APPWRITE_DATABASE_ID, |
| collection_id=settings.APPWRITE_COLLECTION_ID, |
| queries=[ |
| Query.equal('category', category), |
| Query.limit(1) |
| ] |
| ) |
| articles_by_category[category] = response['total'] |
| |
| return { |
| "total_articles": total_articles, |
| "articles_by_category": articles_by_category, |
| "database_id": settings.APPWRITE_DATABASE_ID, |
| "collection_id": settings.APPWRITE_COLLECTION_ID, |
| "initialized": self.initialized |
| } |
| |
| except Exception as e: |
| print(f"Error getting database stats: {e}") |
| return {"error": str(e)} |
|
|
|
|
| |
| _appwrite_db = None |
|
|
| def get_appwrite_db() -> AppwriteDatabase: |
| """Get or create Appwrite database singleton instance""" |
| global _appwrite_db |
| if _appwrite_db is None: |
| _appwrite_db = AppwriteDatabase() |
| return _appwrite_db |
|
|