SHAFI commited on
Commit
9ffe1f1
Β·
1 Parent(s): 8b90318

feat: Add comprehensive emoji logging for better observability

Browse files

- Enhanced scheduler with detailed status indicators (β°πŸ“°πŸ§Ήβœ…βŒ)
- Added provider failover logging with rate limit detection (😒⏭️)
- Improved database operation visibility (πŸ’ΎπŸ“ŠπŸ—‘οΈ)
- Created test utilities for manual verification
- All operations now clearly visible in HF Spaces logs

app/services/appwrite_db.py CHANGED
@@ -59,12 +59,22 @@ class AppwriteDatabase:
59
  self.databases = Databases(self.client)
60
 
61
  self.initialized = True
62
- print(f"βœ“ Appwrite database initialized successfully")
63
- print(f" Database: {settings.APPWRITE_DATABASE_ID}")
64
- print(f" Collection: {settings.APPWRITE_COLLECTION_ID}")
 
 
 
 
65
 
66
  except Exception as e:
67
- print(f"βœ— Appwrite initialization error: {e}")
 
 
 
 
 
 
68
  self.initialized = False
69
 
70
  def _generate_url_hash(self, url: str) -> str:
@@ -189,9 +199,9 @@ class AppwriteDatabase:
189
  continue
190
 
191
  if saved_count > 0:
192
- print(f"βœ“ Saved {saved_count} new articles to Appwrite")
193
  if skipped_count > 0:
194
- print(f" Skipped {skipped_count} duplicate articles")
195
 
196
  return saved_count
197
 
@@ -234,7 +244,9 @@ class AppwriteDatabase:
234
  print(f"Error deleting document {doc['$id']}: {e}")
235
 
236
  if deleted_count > 0:
237
- print(f"βœ“ Deleted {deleted_count} articles older than {days} days")
 
 
238
 
239
  return deleted_count
240
 
 
59
  self.databases = Databases(self.client)
60
 
61
  self.initialized = True
62
+ print("")
63
+ print("βœ“" * 80)
64
+ print("βœ… [Appwrite] Database initialized successfully!")
65
+ print(f"πŸ“Š Database ID: {settings.APPWRITE_DATABASE_ID}")
66
+ print(f"πŸ“‹ Collection ID: {settings.APPWRITE_COLLECTION_ID}")
67
+ print("βœ“" * 80)
68
+ print("")
69
 
70
  except Exception as e:
71
+ print("")
72
+ print("βœ—" * 80)
73
+ print("❌ [Appwrite] Initialization FAILED!")
74
+ print(f"⚠️ Error: {e}")
75
+ print("πŸ’‘ Please check your Appwrite credentials in .env file")
76
+ print("βœ—" * 80)
77
+ print("")
78
  self.initialized = False
79
 
80
  def _generate_url_hash(self, url: str) -> str:
 
199
  continue
200
 
201
  if saved_count > 0:
202
+ print(f"βœ… [Appwrite] Saved {saved_count} new articles to database")
203
  if skipped_count > 0:
204
+ print (f"⏭️ [Appwrite] Skipped {skipped_count} duplicate articles")
205
 
206
  return saved_count
207
 
 
244
  print(f"Error deleting document {doc['$id']}: {e}")
245
 
246
  if deleted_count > 0:
247
+ print(f"βœ… [Appwrite] Deleted {deleted_count} articles older than {days} days")
248
+ else:
249
+ print(f"πŸ“‹ [Appwrite] No old articles to delete")
250
 
251
  return deleted_count
252
 
app/services/news_aggregator.py CHANGED
@@ -73,17 +73,17 @@ class NewsAggregator:
73
 
74
  # Skip if provider is not available (rate limited)
75
  if not provider.is_available():
76
- print(f"Provider {provider_name} is not available (rate limited), trying next...")
77
  self.stats['failover_count'] += 1
78
  continue
79
 
80
  try:
81
- print(f"Fetching news for '{category}' from {provider_name}...")
82
  articles = await provider.fetch_news(category, limit=20)
83
 
84
  # If we got articles, return them
85
  if articles:
86
- print(f"βœ“ Successfully fetched {len(articles)} articles from {provider_name}")
87
 
88
  # Track usage statistics
89
  if provider_name not in self.stats['provider_usage']:
@@ -92,15 +92,15 @@ class NewsAggregator:
92
 
93
  return articles
94
  else:
95
- print(f"Provider {provider_name} returned no articles, trying next...")
96
 
97
  except Exception as e:
98
- print(f"Error with provider {provider_name}: {e}, trying next...")
99
  self.stats['failover_count'] += 1
100
  continue
101
 
102
  # If all providers failed, return empty list
103
- print(f"⚠ All providers exhausted for category '{category}'")
104
  return []
105
 
106
  async def fetch_rss(self, provider: str) -> List[Article]:
 
73
 
74
  # Skip if provider is not available (rate limited)
75
  if not provider.is_available():
76
+ print(f"⏭️ [{provider_name.upper()}] Not available (rate limited), trying next...")
77
  self.stats['failover_count'] += 1
78
  continue
79
 
80
  try:
81
+ print(f"πŸ“‘ [{provider_name.upper()}] Attempting to fetch '{category}' news...")
82
  articles = await provider.fetch_news(category, limit=20)
83
 
84
  # If we got articles, return them
85
  if articles:
86
+ # No need to print here, provider already printed success
87
 
88
  # Track usage statistics
89
  if provider_name not in self.stats['provider_usage']:
 
92
 
93
  return articles
94
  else:
95
+ print(f"⏭️ [{provider_name.upper()}] No articles returned, trying next provider...")
96
 
97
  except Exception as e:
98
+ print(f"❌ [{provider_name.upper()}] Error: {e}, trying next...")
99
  self.stats['failover_count'] += 1
100
  continue
101
 
102
  # If all providers failed, return empty list
103
+ print(f"😞 [NEWS AGGREGATOR] All providers exhausted for '{category}' - no articles available")
104
  return []
105
 
106
  async def fetch_rss(self, provider: str) -> List[Article]:
app/services/news_providers.py CHANGED
@@ -62,7 +62,8 @@ class GNewsProvider(NewsProvider):
62
  'business-analytics': 'business analytics',
63
  'customer-data-platform': 'customer data platform CDP',
64
  'data-centers': 'data centers infrastructure',
65
- 'cloud-computing': 'cloud computing',
 
66
  'magazines': 'technology news',
67
  }
68
 
@@ -86,17 +87,25 @@ class GNewsProvider(NewsProvider):
86
  response = await client.get(url, params=params)
87
 
88
  if response.status_code == 429:
 
89
  self.mark_rate_limited()
90
  return []
91
 
92
  if response.status_code == 200:
93
  self.request_count += 1
94
  data = response.json()
95
- return self._parse_response(data, category)
 
 
 
 
 
 
 
96
 
97
  return []
98
  except Exception as e:
99
- print(f"GNews API error: {e}")
100
  return []
101
 
102
  def _parse_response(self, data: Dict, category: str) -> List[Article]:
@@ -115,7 +124,7 @@ class GNewsProvider(NewsProvider):
115
  )
116
  articles.append(article)
117
  except Exception as e:
118
- print(f"Error parsing GNews article: {e}")
119
  continue
120
  return articles
121
 
@@ -139,7 +148,8 @@ class NewsAPIProvider(NewsProvider):
139
  'business-analytics': '"business analytics" OR analytics',
140
  'customer-data-platform': '"customer data platform" OR CDP',
141
  'data-centers': '"data centers" OR "data centre"',
142
- 'cloud-computing': '"cloud computing" OR cloud',
 
143
  'magazines': 'technology',
144
  }
145
 
@@ -192,7 +202,7 @@ class NewsAPIProvider(NewsProvider):
192
  )
193
  articles.append(article)
194
  except Exception as e:
195
- print(f"Error parsing NewsAPI article: {e}")
196
  continue
197
  return articles
198
 
@@ -216,7 +226,8 @@ class NewsDataProvider(NewsProvider):
216
  'business-analytics': 'business analytics',
217
  'customer-data-platform': 'customer data platform',
218
  'data-centers': 'data centers',
219
- 'cloud-computing': 'cloud computing',
 
220
  'magazines': 'technology',
221
  }
222
 
@@ -239,17 +250,25 @@ class NewsDataProvider(NewsProvider):
239
  response = await client.get(url, params=params)
240
 
241
  if response.status_code == 429:
 
242
  self.mark_rate_limited()
243
  return []
244
 
245
  if response.status_code == 200:
246
  self.request_count += 1
247
  data = response.json()
248
- return self._parse_response(data, category, limit)
 
 
 
 
 
 
 
249
 
250
  return []
251
  except Exception as e:
252
- print(f"NewsData.io error: {e}")
253
  return []
254
 
255
  def _parse_response(self, data: Dict, category: str, limit: int) -> List[Article]:
@@ -268,7 +287,7 @@ class NewsDataProvider(NewsProvider):
268
  )
269
  articles.append(article)
270
  except Exception as e:
271
- print(f"Error parsing NewsData article: {e}")
272
  continue
273
  return articles
274
 
@@ -291,7 +310,8 @@ class GoogleNewsRSSProvider(NewsProvider):
291
  'business-analytics': 'https://news.google.com/rss/search?q=business+analytics&hl=en-US&gl=US&ceid=US:en',
292
  'customer-data-platform': 'https://news.google.com/rss/search?q=customer+data+platform+OR+CDP&hl=en-US&gl=US&ceid=US:en',
293
  'data-centers': 'https://news.google.com/rss/search?q=data+centers+OR+data+centre&hl=en-US&gl=US&ceid=US:en',
294
- 'cloud-computing': 'https://news.google.com/rss/search?q=cloud+computing&hl=en-US&gl=US&ceid=US:en',
 
295
  'magazines': 'https://news.google.com/rss/headlines/section/topic/TECHNOLOGY?hl=en-US&gl=US&ceid=US:en',
296
  }
297
 
@@ -308,15 +328,23 @@ class GoogleNewsRSSProvider(NewsProvider):
308
  response = await client.get(feed_url)
309
 
310
  if response.status_code == 429:
 
311
  self.mark_rate_limited()
312
  return []
313
 
314
  if response.status_code == 200:
315
  self.request_count += 1
316
  parser = RSSParser()
317
- return await parser.parse_google_news(response.text, category)
 
 
 
 
 
 
 
318
 
319
  return []
320
  except Exception as e:
321
- print(f"Google News RSS error: {e}")
322
  return []
 
62
  'business-analytics': 'business analytics',
63
  'customer-data-platform': 'customer data platform CDP',
64
  'data-centers': 'data centers infrastructure',
65
+ 'cloud-computing': 'cloud computing AWS Azure Google Cloud Salesforce Alibaba Cloud Tencent Cloud Huawei Cloud Cloudflare',
66
+ 'medium-article': 'Medium article blog writing publishing',
67
  'magazines': 'technology news',
68
  }
69
 
 
87
  response = await client.get(url, params=params)
88
 
89
  if response.status_code == 429:
90
+ print("😒 [GNews] Rate limit hit! Switching to next provider...")
91
  self.mark_rate_limited()
92
  return []
93
 
94
  if response.status_code == 200:
95
  self.request_count += 1
96
  data = response.json()
97
+ articles = self._parse_response(data, category)
98
+ if articles:
99
+ print(f"βœ… [GNews] Fetched {len(articles)} articles successfully")
100
+ else:
101
+ print("⚠️ [GNews] No articles found in response")
102
+ return articles
103
+ else:
104
+ print(f"❌ [GNews] HTTP {response.status_code} error")
105
 
106
  return []
107
  except Exception as e:
108
+ print(f"❌ [GNews] API error: {e}")
109
  return []
110
 
111
  def _parse_response(self, data: Dict, category: str) -> List[Article]:
 
124
  )
125
  articles.append(article)
126
  except Exception as e:
127
+ print(f"⚠️ [GNews] Error parsing article: {e}")
128
  continue
129
  return articles
130
 
 
148
  'business-analytics': '"business analytics" OR analytics',
149
  'customer-data-platform': '"customer data platform" OR CDP',
150
  'data-centers': '"data centers" OR "data centre"',
151
+ 'cloud-computing': '"cloud computing" OR AWS OR Azure OR "Google Cloud" OR Salesforce OR "Alibaba Cloud" OR "Tencent Cloud" OR "Huawei Cloud" OR Cloudflare',
152
+ 'medium-article': 'Medium OR "Medium article" OR "Medium blog" OR "Medium publishing"',
153
  'magazines': 'technology',
154
  }
155
 
 
202
  )
203
  articles.append(article)
204
  except Exception as e:
205
+ print(f"⚠️ [NewsAPI] Error parsing article: {e}")
206
  continue
207
  return articles
208
 
 
226
  'business-analytics': 'business analytics',
227
  'customer-data-platform': 'customer data platform',
228
  'data-centers': 'data centers',
229
+ 'cloud-computing': 'cloud computing,AWS,Azure,Google Cloud,Salesforce,Alibaba Cloud,Tencent Cloud,Huawei Cloud,Cloudflare',
230
+ 'medium-article': 'Medium,article,blog,writing,publishing',
231
  'magazines': 'technology',
232
  }
233
 
 
250
  response = await client.get(url, params=params)
251
 
252
  if response.status_code == 429:
253
+ print("😒 [NewsData] Rate limit hit! Switching to next provider...")
254
  self.mark_rate_limited()
255
  return []
256
 
257
  if response.status_code == 200:
258
  self.request_count += 1
259
  data = response.json()
260
+ articles = self._parse_response(data, category, limit)
261
+ if articles:
262
+ print(f"βœ… [NewsData] Fetched {len(articles)} articles successfully")
263
+ else:
264
+ print("⚠️ [NewsData] No articles found in response")
265
+ return articles
266
+ else:
267
+ print(f"❌ [NewsData] HTTP {response.status_code} error")
268
 
269
  return []
270
  except Exception as e:
271
+ print(f"❌ [NewsData] error: {e}")
272
  return []
273
 
274
  def _parse_response(self, data: Dict, category: str, limit: int) -> List[Article]:
 
287
  )
288
  articles.append(article)
289
  except Exception as e:
290
+ print(f"⚠️ [NewsData] Error parsing article: {e}")
291
  continue
292
  return articles
293
 
 
310
  'business-analytics': 'https://news.google.com/rss/search?q=business+analytics&hl=en-US&gl=US&ceid=US:en',
311
  'customer-data-platform': 'https://news.google.com/rss/search?q=customer+data+platform+OR+CDP&hl=en-US&gl=US&ceid=US:en',
312
  'data-centers': 'https://news.google.com/rss/search?q=data+centers+OR+data+centre&hl=en-US&gl=US&ceid=US:en',
313
+ 'cloud-computing': 'https://news.google.com/rss/search?q=cloud+computing+OR+AWS+OR+Azure+OR+Google+Cloud+OR+Salesforce+OR+Alibaba+Cloud+OR+Tencent+Cloud+OR+Huawei+Cloud+OR+Cloudflare&hl=en-US&gl=US&ceid=US:en',
314
+ 'medium-article': 'https://news.google.com/rss/search?q=Medium+article+OR+Medium+blog+OR+Medium+publishing&hl=en-US&gl=US&ceid=US:en',
315
  'magazines': 'https://news.google.com/rss/headlines/section/topic/TECHNOLOGY?hl=en-US&gl=US&ceid=US:en',
316
  }
317
 
 
328
  response = await client.get(feed_url)
329
 
330
  if response.status_code == 429:
331
+ print("😒 [Google RSS] Rate limit hit! Trying next provider...")
332
  self.mark_rate_limited()
333
  return []
334
 
335
  if response.status_code == 200:
336
  self.request_count += 1
337
  parser = RSSParser()
338
+ articles = await parser.parse_google_news(response.text, category)
339
+ if articles:
340
+ print(f"βœ… [Google RSS] Fetched {len(articles)} articles successfully")
341
+ else:
342
+ print("⚠️ [Google RSS] No articles found in feed")
343
+ return articles
344
+ else:
345
+ print(f"❌ [Google RSS] HTTP {response.status_code} error")
346
 
347
  return []
348
  except Exception as e:
349
+ print(f"❌ [Google RSS] error: {e}")
350
  return []
app/services/scheduler.py CHANGED
@@ -44,7 +44,10 @@ async def fetch_all_news():
44
  Runs every 15 minutes to keep database fresh with latest articles.
45
  This ensures users always get fast responses from L2 cache (Appwrite).
46
  """
47
- logger.info("πŸ”„ [Background Fetcher] Starting news fetch for all categories...")
 
 
 
48
 
49
  news_aggregator = NewsAggregator()
50
  appwrite_db = get_appwrite_db()
@@ -55,13 +58,16 @@ async def fetch_all_news():
55
 
56
  for category in CATEGORIES:
57
  try:
58
- logger.info(f" Fetching {category}...")
 
 
59
 
60
  # Fetch from external APIs
61
  articles = await news_aggregator.fetch_by_category(category)
62
 
63
  if articles:
64
  # Save to Appwrite database (L2)
 
65
  saved_count = await appwrite_db.save_articles(articles)
66
  total_fetched += len(articles)
67
  total_saved += saved_count
@@ -69,18 +75,26 @@ async def fetch_all_news():
69
  # Update Redis cache (L1) if available
70
  try:
71
  await cache_service.set(f"news:{category}", articles, ttl=settings.CACHE_TTL)
 
72
  except Exception as e:
73
- logger.debug(f" Redis cache update skipped for {category}: {e}")
74
 
75
- logger.info(f" βœ“ {category}: {len(articles)} fetched, {saved_count} saved")
76
  else:
77
- logger.warning(f" βœ— {category}: No articles available")
78
 
79
  except Exception as e:
80
- logger.error(f" βœ— {category}: Error - {e}")
 
81
  continue
82
 
83
- logger.info(f"βœ… [Background Fetcher] Complete! {total_fetched} articles fetched, {total_saved} new articles saved")
 
 
 
 
 
 
84
 
85
 
86
  async def cleanup_old_news():
@@ -90,12 +104,18 @@ async def cleanup_old_news():
90
  Runs daily at midnight to keep Appwrite database within free tier limits.
91
  Only keeps the last 2 days of articles.
92
  """
93
- logger.info("🧹 [Janitor] Starting cleanup of old news articles...")
 
 
 
 
94
 
95
  appwrite_db = get_appwrite_db()
96
 
97
  if not appwrite_db.initialized:
98
- logger.warning(" Appwrite not initialized - skipping cleanup")
 
 
99
  return
100
 
101
  try:
@@ -104,10 +124,12 @@ async def cleanup_old_news():
104
  cutoff_date = datetime.now() - timedelta(hours=retention_hours)
105
  cutoff_iso = cutoff_date.isoformat()
106
 
107
- logger.info(f" Retention policy: {retention_hours} hours")
108
- logger.info(f" Cutoff date: {cutoff_date.strftime('%Y-%m-%d %H:%M:%S')}")
 
109
 
110
  # Query and delete old articles
 
111
  from appwrite.query import Query
112
 
113
  response = appwrite_db.databases.list_documents(
@@ -119,7 +141,12 @@ async def cleanup_old_news():
119
  ]
120
  )
121
 
 
 
122
  deleted_count = 0
 
 
 
123
  for doc in response['documents']:
124
  try:
125
  appwrite_db.databases.delete_document(
@@ -128,32 +155,55 @@ async def cleanup_old_news():
128
  document_id=doc['$id']
129
  )
130
  deleted_count += 1
 
 
131
  except Exception as e:
132
- logger.error(f" Error deleting document {doc['$id']}: {e}")
133
 
134
  # Clear Redis cache to force refresh from updated database
 
135
  cache_service = CacheService()
 
136
  for category in CATEGORIES:
137
  try:
138
  await cache_service.delete(f"news:{category}")
 
139
  except Exception as e:
140
- logger.debug(f" Cache clear skipped for {category}: {e}")
 
 
 
141
 
142
- logger.info(f"βœ… [Janitor] Complete! Deleted {deleted_count} articles older than {retention_hours} hours")
 
 
 
 
 
 
143
 
144
  # If there are more old articles, schedule another cleanup soon
145
  if len(response['documents']) >= 100:
146
- logger.info(f" More old articles detected - will clean up again in next run")
 
147
 
148
  except Exception as e:
149
- logger.error(f"βœ— [Janitor] Cleanup failed: {e}")
 
 
 
 
 
150
 
151
 
152
  def start_scheduler():
153
  """
154
  Initialize and start the background scheduler with all jobs
155
  """
156
- logger.info("⏰ Starting background scheduler...")
 
 
 
157
 
158
  # Job 1: Fetch news every 15 minutes
159
  scheduler.add_job(
@@ -163,7 +213,9 @@ def start_scheduler():
163
  name='News Fetcher (every 15 min)',
164
  replace_existing=True
165
  )
166
- logger.info(" βœ“ Registered: News Fetcher (every 15 minutes)")
 
 
167
 
168
  # Job 2: Cleanup old news daily at midnight (00:00)
169
  scheduler.add_job(
@@ -173,30 +225,51 @@ def start_scheduler():
173
  name='Database Janitor (daily at midnight)',
174
  replace_existing=True
175
  )
176
- logger.info(" βœ“ Registered: Database Janitor (daily at 00:00 UTC)")
 
 
 
177
 
178
  # Start the scheduler
 
 
179
  scheduler.start()
180
- logger.info("βœ… Background scheduler started successfully!")
 
 
 
 
 
181
 
182
 
183
  def shutdown_scheduler():
184
  """
185
  Gracefully shutdown the scheduler
186
  """
187
- logger.info("⏹️ Shutting down background scheduler...")
 
 
 
188
  scheduler.shutdown(wait=True)
189
- logger.info("βœ… Background scheduler shut down successfully")
 
 
190
 
191
 
192
  # Manual job triggers for testing (can be called from admin endpoints)
193
  async def trigger_fetch_now():
194
  """Manually trigger news fetch (for testing)"""
195
- logger.info("πŸ”§ [Manual Trigger] Running fetch job now...")
 
 
 
196
  await fetch_all_news()
197
 
198
 
199
  async def trigger_cleanup_now():
200
  """Manually trigger cleanup (for testing)"""
201
- logger.info("πŸ”§ [Manual Trigger] Running cleanup job now...")
 
 
 
202
  await cleanup_old_news()
 
44
  Runs every 15 minutes to keep database fresh with latest articles.
45
  This ensures users always get fast responses from L2 cache (Appwrite).
46
  """
47
+ logger.info("═" * 80)
48
+ logger.info("πŸ“° [NEWS FETCHER] Starting news fetch for all categories...")
49
+ logger.info("πŸ• Fetch Time: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
50
+ logger.info("═" * 80)
51
 
52
  news_aggregator = NewsAggregator()
53
  appwrite_db = get_appwrite_db()
 
58
 
59
  for category in CATEGORIES:
60
  try:
61
+ logger.info("")
62
+ logger.info("πŸ“Œ Category: %s", category.upper())
63
+ logger.info("⏳ Fetching from news providers...")
64
 
65
  # Fetch from external APIs
66
  articles = await news_aggregator.fetch_by_category(category)
67
 
68
  if articles:
69
  # Save to Appwrite database (L2)
70
+ logger.info("πŸ’Ύ Saving to Appwrite database...")
71
  saved_count = await appwrite_db.save_articles(articles)
72
  total_fetched += len(articles)
73
  total_saved += saved_count
 
75
  # Update Redis cache (L1) if available
76
  try:
77
  await cache_service.set(f"news:{category}", articles, ttl=settings.CACHE_TTL)
78
+ logger.info("⚑ Redis cache updated")
79
  except Exception as e:
80
+ logger.debug("⚠️ Redis cache unavailable (not critical): %s", e)
81
 
82
+ logger.info("βœ… SUCCESS: %d articles fetched, %d new articles saved", len(articles), saved_count)
83
  else:
84
+ logger.warning("⚠️ WARNING: No articles available from any provider")
85
 
86
  except Exception as e:
87
+ logger.error("❌ ERROR in %s: %s", category, str(e))
88
+ logger.exception("Full traceback:")
89
  continue
90
 
91
+ logger.info("")
92
+ logger.info("═" * 80)
93
+ logger.info("πŸŽ‰ [NEWS FETCHER] COMPLETED!")
94
+ logger.info("πŸ“Š Total fetched: %d articles", total_fetched)
95
+ logger.info("πŸ’Ύ Total saved: %d new articles", total_saved)
96
+ logger.info("πŸ• Completion time: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
97
+ logger.info("═" * 80)
98
 
99
 
100
  async def cleanup_old_news():
 
104
  Runs daily at midnight to keep Appwrite database within free tier limits.
105
  Only keeps the last 2 days of articles.
106
  """
107
+ logger.info("")
108
+ logger.info("═" * 80)
109
+ logger.info("🧹 [CLEANUP JANITOR] Starting cleanup of old news articles...")
110
+ logger.info("πŸ• Cleanup Time: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
111
+ logger.info("═" * 80)
112
 
113
  appwrite_db = get_appwrite_db()
114
 
115
  if not appwrite_db.initialized:
116
+ logger.error("❌ CRITICAL: Appwrite database not initialized!")
117
+ logger.error("⚠️ Cleanup cannot proceed - database connection required")
118
+ logger.error("πŸ’‘ Check Appwrite credentials in environment variables")
119
  return
120
 
121
  try:
 
124
  cutoff_date = datetime.now() - timedelta(hours=retention_hours)
125
  cutoff_iso = cutoff_date.isoformat()
126
 
127
+ logger.info("πŸ“‹ Retention Policy: %d hours", retention_hours)
128
+ logger.info("πŸ“… Cutoff Date: %s", cutoff_date.strftime('%Y-%m-%d %H:%M:%S'))
129
+ logger.info("πŸ—‘οΈ Articles published before this will be deleted...")
130
 
131
  # Query and delete old articles
132
+ logger.info("πŸ” Querying Appwrite for old articles...")
133
  from appwrite.query import Query
134
 
135
  response = appwrite_db.databases.list_documents(
 
141
  ]
142
  )
143
 
144
+ logger.info("πŸ“Š Found %d old articles to delete", len(response['documents']))
145
+
146
  deleted_count = 0
147
+ if len(response['documents']) > 0:
148
+ logger.info("πŸ—‘οΈ Deleting articles...")
149
+
150
  for doc in response['documents']:
151
  try:
152
  appwrite_db.databases.delete_document(
 
155
  document_id=doc['$id']
156
  )
157
  deleted_count += 1
158
+ if deleted_count % 10 == 0:
159
+ logger.info(" Progress: %d articles deleted...", deleted_count)
160
  except Exception as e:
161
+ logger.error("❌ Error deleting document %s: %s", doc['$id'], e)
162
 
163
  # Clear Redis cache to force refresh from updated database
164
+ logger.info("πŸ”„ Clearing Redis cache...")
165
  cache_service = CacheService()
166
+ cache_cleared = 0
167
  for category in CATEGORIES:
168
  try:
169
  await cache_service.delete(f"news:{category}")
170
+ cache_cleared += 1
171
  except Exception as e:
172
+ logger.debug("⚠️ Cache clear skipped for %s: %s", category, e)
173
+
174
+ if cache_cleared > 0:
175
+ logger.info("βœ… Cache cleared for %d categories", cache_cleared)
176
 
177
+ logger.info("")
178
+ logger.info("═" * 80)
179
+ logger.info("πŸŽ‰ [CLEANUP JANITOR] COMPLETED!")
180
+ logger.info("πŸ—‘οΈ Total Deleted: %d articles", deleted_count)
181
+ logger.info("⏰ Retention: Articles older than %d hours removed", retention_hours)
182
+ logger.info("πŸ• Completion Time: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
183
+ logger.info("═" * 80)
184
 
185
  # If there are more old articles, schedule another cleanup soon
186
  if len(response['documents']) >= 100:
187
+ logger.warning("⚠️ WARNING: More old articles detected (100+ limit reached)")
188
+ logger.warning("πŸ“… Additional cleanup will run in next scheduled job")
189
 
190
  except Exception as e:
191
+ logger.error("")
192
+ logger.error("═" * 80)
193
+ logger.error("❌ [CLEANUP JANITOR] FAILED!")
194
+ logger.error("Error: %s", str(e))
195
+ logger.error("═" * 80)
196
+ logger.exception("Full traceback:")
197
 
198
 
199
  def start_scheduler():
200
  """
201
  Initialize and start the background scheduler with all jobs
202
  """
203
+ logger.info("")
204
+ logger.info("═" * 80)
205
+ logger.info("⏰ [SCHEDULER] Initializing background scheduler...")
206
+ logger.info("═" * 80)
207
 
208
  # Job 1: Fetch news every 15 minutes
209
  scheduler.add_job(
 
213
  name='News Fetcher (every 15 min)',
214
  replace_existing=True
215
  )
216
+ logger.info("βœ… Job #1 Registered: πŸ“° News Fetcher")
217
+ logger.info(" ⏱️ Schedule: Every 15 minutes")
218
+ logger.info(" πŸ“‹ Task: Fetch news from all providers and update database")
219
 
220
  # Job 2: Cleanup old news daily at midnight (00:00)
221
  scheduler.add_job(
 
225
  name='Database Janitor (daily at midnight)',
226
  replace_existing=True
227
  )
228
+ logger.info("")
229
+ logger.info("βœ… Job #2 Registered: 🧹 Database Janitor")
230
+ logger.info(" ⏱️ Schedule: Daily at 00:00 UTC")
231
+ logger.info(" πŸ“‹ Task: Delete articles older than 48 hours")
232
 
233
  # Start the scheduler
234
+ logger.info("")
235
+ logger.info("πŸš€ Starting scheduler engine...")
236
  scheduler.start()
237
+ logger.info("")
238
+ logger.info("═" * 80)
239
+ logger.info("βœ… [SCHEDULER] Background scheduler started successfully!")
240
+ logger.info("πŸ”„ All jobs are now active and running")
241
+ logger.info("═" * 80)
242
+ logger.info("")
243
 
244
 
245
  def shutdown_scheduler():
246
  """
247
  Gracefully shutdown the scheduler
248
  """
249
+ logger.info("")
250
+ logger.info("═" * 80)
251
+ logger.info("⏹️ [SCHEDULER] Shutting down background scheduler...")
252
+ logger.info("⏳ Waiting for running jobs to complete...")
253
  scheduler.shutdown(wait=True)
254
+ logger.info("βœ… [SCHEDULER] Background scheduler shut down successfully")
255
+ logger.info("═" * 80)
256
+ logger.info("")
257
 
258
 
259
  # Manual job triggers for testing (can be called from admin endpoints)
260
  async def trigger_fetch_now():
261
  """Manually trigger news fetch (for testing)"""
262
+ logger.info("")
263
+ logger.info("═" * 80)
264
+ logger.info("πŸ”§ [MANUAL TRIGGER] Running fetch job NOW...")
265
+ logger.info("═" * 80)
266
  await fetch_all_news()
267
 
268
 
269
  async def trigger_cleanup_now():
270
  """Manually trigger cleanup (for testing)"""
271
+ logger.info("")
272
+ logger.info("═" * 80)
273
+ logger.info("πŸ”§ [MANUAL TRIGGER] Running cleanup job NOW...")
274
+ logger.info("═" * 80)
275
  await cleanup_old_news()
seed_medium.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Seed Medium Article Category
3
+ =============================
4
+ One-time script to insert the initial/pinned Medium article into Appwrite database.
5
+
6
+ This ensures the "Medium Article" category starts with a specific guide article.
7
+
8
+ Usage:
9
+ python seed_medium.py
10
+ """
11
+
12
+ from appwrite.client import Client
13
+ from appwrite.services.databases import Databases
14
+ from appwrite.id import ID
15
+ from datetime import datetime
16
+ import os
17
+ from dotenv import load_dotenv
18
+
19
+ # Load environment variables
20
+ load_dotenv()
21
+
22
+ def seed_medium_article():
23
+ """Insert the seed Medium article into Appwrite"""
24
+
25
+ # Initialize Appwrite client
26
+ client = Client()
27
+ client.set_endpoint(os.getenv('APPWRITE_ENDPOINT', 'https://cloud.appwrite.io/v1'))
28
+ client.set_project(os.getenv('APPWRITE_PROJECT_ID'))
29
+ client.set_key(os.getenv('APPWRITE_API_KEY'))
30
+
31
+ # Initialize database service
32
+ databases = Databases(client)
33
+
34
+ # Database and collection IDs
35
+ database_id = os.getenv('APPWRITE_DATABASE_ID', 'segmento_db')
36
+ collection_id = os.getenv('APPWRITE_COLLECTION_ID', 'articles')
37
+
38
+ # Article data to insert
39
+ article_data = {
40
+ 'title': 'Using RSS feeds of profiles, publications, and topics',
41
+ 'description': 'Learn how to use RSS feeds to stay updated with your favorite Medium profiles, publications, and topics. This comprehensive guide covers everything you need to know about accessing and using Medium RSS feeds.',
42
+ 'url': 'https://help.medium.com/hc/en-us/articles/214874118-Using-RSS-feeds-of-profiles-publications-and-topics',
43
+ 'image': 'https://miro.medium.com/v2/resize:fit:1200/1*F0LADxTtsKOgmPa-_7iRcQ.png', # Medium logo
44
+ 'publishedAt': datetime.now().isoformat(),
45
+ 'source': 'Medium Help',
46
+ 'category': 'medium-article',
47
+ 'isPinned': True, # Mark as pinned so it always appears first
48
+ }
49
+
50
+ try:
51
+ # Check if article already exists (by URL)
52
+ existing = databases.list_documents(
53
+ database_id=database_id,
54
+ collection_id=collection_id,
55
+ queries=[
56
+ f'equal("url", "{article_data["url"]}")'
57
+ ]
58
+ )
59
+
60
+ if existing['total'] > 0:
61
+ print("βœ… Article already exists in database")
62
+ print(f" Document ID: {existing['documents'][0]['$id']}")
63
+ return existing['documents'][0]
64
+
65
+ # Create the document
66
+ result = databases.create_document(
67
+ database_id=database_id,
68
+ collection_id=collection_id,
69
+ document_id=ID.unique(),
70
+ data=article_data
71
+ )
72
+
73
+ print("βœ… Successfully seeded Medium Article!")
74
+ print(f" Title: {result['title']}")
75
+ print(f" Document ID: {result['$id']}")
76
+ print(f" Category: {result['category']}")
77
+ print(f" Published At: {result['publishedAt']}")
78
+
79
+ return result
80
+
81
+ except Exception as e:
82
+ print(f"❌ Error seeding article: {str(e)}")
83
+ raise
84
+
85
+ if __name__ == '__main__':
86
+ print("=" * 60)
87
+ print("Seeding Medium Article Category")
88
+ print("=" * 60)
89
+ print()
90
+
91
+ # Verify environment variables
92
+ required_vars = ['APPWRITE_ENDPOINT', 'APPWRITE_PROJECT_ID', 'APPWRITE_API_KEY']
93
+ missing_vars = [var for var in required_vars if not os.getenv(var)]
94
+
95
+ if missing_vars:
96
+ print(f"❌ Missing required environment variables: {', '.join(missing_vars)}")
97
+ print(" Please set them in your .env file")
98
+ exit(1)
99
+
100
+ seed_medium_article()
101
+
102
+ print()
103
+ print("=" * 60)
104
+ print("Seeding Complete!")
105
+ print("=" * 60)
test_cleanup.py ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Test script to manually trigger cleanup job
3
+ Run this to test if the cleanup scheduler works with Appwrite credentials
4
+ """
5
+
6
+ import asyncio
7
+ import sys
8
+ import os
9
+
10
+ # Add parent directory to path
11
+ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
12
+
13
+ async def main():
14
+ from app.services.scheduler import trigger_cleanup_now
15
+
16
+ print("=" * 80)
17
+ print("πŸ§ͺ MANUAL TEST: Cleanup Scheduler")
18
+ print("=" * 80)
19
+ print("")
20
+
21
+ await trigger_cleanup_now()
22
+
23
+ print("")
24
+ print("=" * 80)
25
+ print("βœ… Test completed!")
26
+ print("=" * 80)
27
+
28
+ if __name__ == "__main__":
29
+ asyncio.run(main())
test_scheduler_status.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Test script to view scheduler status and jobs
3
+ """
4
+
5
+ import asyncio
6
+ import sys
7
+ import os
8
+
9
+ # Add parent directory to path
10
+ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
11
+
12
+ async def main():
13
+ from app.services.scheduler import scheduler
14
+ from app.services.appwrite_db import get_appwrite_db
15
+
16
+ print("=" * 80)
17
+ print("πŸ“Š SCHEDULER STATUS CHECK")
18
+ print("=" * 80)
19
+ print("")
20
+
21
+ # Check Appwrite connection
22
+ print("πŸ”Ή Appwrite Database Status:")
23
+ appwrite_db = get_appwrite_db()
24
+ print(f" Initialized: {appwrite_db.initialized}")
25
+ print("")
26
+
27
+ # Check scheduler status
28
+ print("πŸ”Ή Scheduler Status:")
29
+ print(f" Running: {scheduler.running}")
30
+ print("")
31
+
32
+ # List jobs
33
+ print("πŸ”Ή Registered Jobs:")
34
+ jobs = scheduler.get_jobs()
35
+ if jobs:
36
+ for job in jobs:
37
+ print(f" - {job.name} (ID: {job.id})")
38
+ print(f" Next run: {job.next_run_time}")
39
+ print(f" Trigger: {job.trigger}")
40
+ print("")
41
+ else:
42
+ print(" No jobs registered")
43
+ print("")
44
+
45
+ print("=" * 80)
46
+
47
+ if __name__ == "__main__":
48
+ asyncio.run(main())