SHAFI commited on
Commit
82bd507
·
1 Parent(s): 1c599fe

the full ingestion pipline of the pulse is upgraded, worked on flaws and rectified them, solved over 10 major flaws

Browse files
app/routes/monitoring.py CHANGED
@@ -137,8 +137,8 @@ async def cache_health_check():
137
 
138
  # Test connectivity with a simple PING
139
  test_key = "_health_check_test"
140
- cache.set(test_key, "ok", ttl=10)
141
- result = cache.get(test_key)
142
 
143
  if result == "ok":
144
  return {
 
137
 
138
  # Test connectivity with a simple PING
139
  test_key = "_health_check_test"
140
+ await cache.set(test_key, "ok", ttl=10)
141
+ result = await cache.get(test_key)
142
 
143
  if result == "ok":
144
  return {
app/routes/news.py CHANGED
@@ -59,7 +59,7 @@ async def get_news_by_category(
59
 
60
  # Try Upstash cache first (5 min TTL)
61
  if upstash_cache.enabled:
62
- cached_data = upstash_cache.get(cache_key)
63
  if cached_data:
64
  return NewsResponse(
65
  success=True,
@@ -134,7 +134,7 @@ async def get_news_by_category(
134
 
135
  # Cache the result (5 min TTL)
136
  if upstash_cache.enabled:
137
- upstash_cache.set(
138
  cache_key,
139
  {"articles": articles, "has_more": has_more, "next_cursor": next_cursor},
140
  ttl=300 # 5 minutes
@@ -160,7 +160,7 @@ async def get_rss_feed(provider: str):
160
  # Check Upstash cache
161
  cache_key = f"rss:{provider}"
162
  if upstash_cache.enabled:
163
- cached_data = upstash_cache.get(cache_key)
164
  if cached_data:
165
  return NewsResponse(
166
  success=True,
@@ -176,7 +176,7 @@ async def get_rss_feed(provider: str):
176
 
177
  # Cache in Upstash (10 min TTL for RSS feeds)
178
  if upstash_cache.enabled:
179
- upstash_cache.set(cache_key, articles, ttl=600)
180
 
181
  return NewsResponse(
182
  success=True,
 
59
 
60
  # Try Upstash cache first (5 min TTL)
61
  if upstash_cache.enabled:
62
+ cached_data = await upstash_cache.get(cache_key)
63
  if cached_data:
64
  return NewsResponse(
65
  success=True,
 
134
 
135
  # Cache the result (5 min TTL)
136
  if upstash_cache.enabled:
137
+ await upstash_cache.set(
138
  cache_key,
139
  {"articles": articles, "has_more": has_more, "next_cursor": next_cursor},
140
  ttl=300 # 5 minutes
 
160
  # Check Upstash cache
161
  cache_key = f"rss:{provider}"
162
  if upstash_cache.enabled:
163
+ cached_data = await upstash_cache.get(cache_key)
164
  if cached_data:
165
  return NewsResponse(
166
  success=True,
 
176
 
177
  # Cache in Upstash (10 min TTL for RSS feeds)
178
  if upstash_cache.enabled:
179
+ await upstash_cache.set(cache_key, articles, ttl=600)
180
 
181
  return NewsResponse(
182
  success=True,
app/services/adaptive_scheduler.py CHANGED
@@ -18,6 +18,7 @@ from datetime import datetime
18
  from typing import Dict, List
19
  import json
20
  import os
 
21
 
22
 
23
  class AdaptiveScheduler:
@@ -48,29 +49,76 @@ class AdaptiveScheduler:
48
  'total_articles': 0
49
  }
50
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  def _load_velocity_data(self) -> Dict:
52
- """Load velocity data from disk (persists across restarts)"""
53
- data_file = 'data/velocity_tracking.json'
54
-
55
- if os.path.exists(data_file):
56
- try:
57
- with open(data_file, 'r') as f:
58
- return json.load(f)
59
- except Exception as e:
60
- print(f"Warning: Failed to load velocity data: {e}")
61
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
  return {}
63
-
64
  def _save_velocity_data(self):
65
- """Save velocity data to disk"""
66
- data_file = 'data/velocity_tracking.json'
67
- os.makedirs('data', exist_ok=True)
68
-
 
 
 
 
 
 
 
69
  try:
70
- with open(data_file, 'w') as f:
71
- json.dump(self.velocity_data, f, indent=2)
 
 
 
 
 
 
 
 
 
 
72
  except Exception as e:
73
- print(f"Warning: Failed to save velocity data: {e}")
74
 
75
  def update_category_velocity(self, category: str, article_count: int):
76
  """
@@ -115,11 +163,55 @@ class AdaptiveScheduler:
115
  print(f"📊 {category.upper()}: Moderate velocity ({avg_count:.1f} avg) → 15min interval")
116
 
117
  data['interval'] = new_interval
118
-
119
- # Persist to disk
120
- self._save_velocity_data()
121
-
 
 
 
 
 
122
  return new_interval
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
 
124
  def get_interval(self, category: str) -> int:
125
  """Get current interval for a category"""
 
18
  from typing import Dict, List
19
  import json
20
  import os
21
+ import httpx
22
 
23
 
24
  class AdaptiveScheduler:
 
49
  'total_articles': 0
50
  }
51
 
52
+ def _redis_key(self) -> str:
53
+ """Redis key where velocity data is stored permanently."""
54
+ return "segmento:adaptive_velocity_state"
55
+
56
+ def _redis_headers(self):
57
+ """Auth headers for the Upstash Redis REST API."""
58
+ return {"Authorization": f"Bearer {os.getenv('UPSTASH_REDIS_REST_TOKEN', '')}"}
59
+
60
+ def _redis_url(self) -> str:
61
+ """Base URL for the Upstash Redis REST API."""
62
+ return os.getenv("UPSTASH_REDIS_REST_URL", "")
63
+
64
  def _load_velocity_data(self) -> Dict:
65
+ """
66
+ Load velocity tracking data from Redis.
67
+
68
+ Fix #4 (Phase 7): The old version wrote to a local JSON file
69
+ (data/velocity_tracking.json). On cloud platforms (Render, Railway,
70
+ Heroku), local disks are wiped on every deploy, so the system kept
71
+ forgetting its trained intervals after restarts.
72
+
73
+ Redis is permanent the key lives forever (no TTL) and the adaptive
74
+ scheduler's memory now survives deploys and server restarts.
75
+ """
76
+ redis_url = self._redis_url()
77
+ if not redis_url:
78
+ # Redis not configured — start with empty data (same as before).
79
+ return {}
80
+
81
+ try:
82
+ url = f"{redis_url}/get/{self._redis_key()}"
83
+ with httpx.Client(timeout=5.0) as client:
84
+ response = client.get(url, headers=self._redis_headers())
85
+ data = response.json()
86
+ # Upstash returns {"result": "<json string>"} or {"result": null}
87
+ raw = data.get("result")
88
+ if raw:
89
+ return json.loads(raw)
90
+ except Exception as e:
91
+ print(f"[ADAPTIVE] Could not load velocity data from Redis ({e}) — starting fresh.")
92
+
93
  return {}
94
+
95
  def _save_velocity_data(self):
96
+ """
97
+ Save velocity tracking data to Redis (no expiry — keep forever).
98
+
99
+ Uses the Upstash REST API's SET command. No TTL is set so the data
100
+ persists indefinitely and we never lose our trained intervals.
101
+ """
102
+ redis_url = self._redis_url()
103
+ if not redis_url:
104
+ # Redis not configured — silently skip, same as before.
105
+ return
106
+
107
  try:
108
+ # Serialize the velocity dict to a JSON string.
109
+ payload = json.dumps(self.velocity_data)
110
+
111
+ # Upstash REST: POST /set/<key> with body = value
112
+ # No EX or PX param = key never expires.
113
+ url = f"{redis_url}/set/{self._redis_key()}"
114
+ with httpx.Client(timeout=5.0) as client:
115
+ client.post(
116
+ url,
117
+ headers=self._redis_headers(),
118
+ content=payload.encode("utf-8")
119
+ )
120
  except Exception as e:
121
+ print(f"[ADAPTIVE] Could not save velocity data to Redis ({e}) — data may be lost on restart.")
122
 
123
  def update_category_velocity(self, category: str, article_count: int):
124
  """
 
163
  print(f"📊 {category.upper()}: Moderate velocity ({avg_count:.1f} avg) → 15min interval")
164
 
165
  data['interval'] = new_interval
166
+
167
+ # NOTE: We no longer call _save_velocity_data() here.
168
+ # Reason: this method is sync, but it is called from an async job.
169
+ # Calling a blocking httpx.Client inside an async function freezes the
170
+ # entire event loop for up to 5 seconds on every category run.
171
+ # The caller (fetch_single_category_job) is responsible for awaiting
172
+ # async_persist() AFTER this method returns. That way the save
173
+ # happens asynchronously without blocking anything.
174
+
175
  return new_interval
176
+
177
+ async def async_persist(self):
178
+ """
179
+ Save velocity data to Redis using a non-blocking async HTTP call.
180
+
181
+ Why a separate method?
182
+ -----------------------
183
+ update_category_velocity() is a regular (sync) function because it is
184
+ called from many places, including some that are not async.
185
+ Putting an async HTTP call directly inside a sync function would block
186
+ the entire event loop — freezing FastAPI's ability to serve user
187
+ requests for up to 5 seconds.
188
+
189
+ The fix:
190
+ update_category_velocity() updates memory only (instant, no I/O).
191
+ async_persist() does the actual Redis write asynchronously.
192
+ The caller (fetch_single_category_job) awaits this after the update.
193
+ """
194
+ redis_url = self._redis_url()
195
+ if not redis_url:
196
+ return
197
+
198
+ try:
199
+ payload = json.dumps(self.velocity_data)
200
+ url = f"{redis_url}/set/{self._redis_key()}"
201
+
202
+ # httpx.AsyncClient never blocks the event loop.
203
+ # Even if the Upstash call takes 200ms, FastAPI keeps serving users.
204
+ async with httpx.AsyncClient(timeout=5.0) as client:
205
+ await client.post(
206
+ url,
207
+ headers=self._redis_headers(),
208
+ content=payload.encode("utf-8")
209
+ )
210
+ except Exception as e:
211
+ print(
212
+ f"[ADAPTIVE] Could not persist velocity data to Redis ({e}) "
213
+ "\u2014 data is safe in memory for this session."
214
+ )
215
 
216
  def get_interval(self, category: str) -> int:
217
  """Get current interval for a category"""
app/services/api_quota.py CHANGED
@@ -4,7 +4,7 @@ Monitors API usage and prevents hitting rate limits
4
  """
5
 
6
  from typing import Dict, Optional
7
- from datetime import datetime, timedelta
8
  import logging
9
 
10
  logger = logging.getLogger(__name__)
@@ -140,6 +140,101 @@ class APIQuotaTracker:
140
 
141
  return stats
142
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
 
144
  # Global singleton
145
  _quota_tracker: Optional[APIQuotaTracker] = None
 
4
  """
5
 
6
  from typing import Dict, Optional
7
+ from datetime import datetime, timedelta, date
8
  import logging
9
 
10
  logger = logging.getLogger(__name__)
 
140
 
141
  return stats
142
 
143
+ # --------------------------------------------------------------------------
144
+ # REDIS-BACKED ASYNC METHODS (Phase 3 additions)
145
+ # --------------------------------------------------------------------------
146
+ # These two methods do the same job as can_make_call() and record_call(),
147
+ # but they also read and write from Upstash Redis.
148
+ #
149
+ # Why two sets of methods? Because the old sync methods are called from
150
+ # places we do not want to change right now. The new async ones are called
151
+ # only from news_aggregator.py, which is already async.
152
+ #
153
+ # Redis key format: quota:{provider}:{YYYY-MM-DD}
154
+ # e.g. quota:gnews:2026-02-26
155
+ # TTL: 86400 seconds (24 hours) — the key naturally disappears at the end
156
+ # of the day, which is the same as resetting the counter to zero at midnight.
157
+ # --------------------------------------------------------------------------
158
+
159
+ async def async_can_make_call(self, provider: str, calls: int = 1) -> bool:
160
+ """
161
+ Check if we can still call this paid provider today.
162
+
163
+ Reads the current call count from Redis first (so the answer survives
164
+ server restarts). Falls back to the in-memory count if Redis is down.
165
+ """
166
+ if provider not in self.quotas or "calls_per_day" not in self.quotas[provider]:
167
+ # Unknown or non-daily provider — allow the call.
168
+ return True
169
+
170
+ limit = self.quotas[provider]["calls_per_day"]
171
+
172
+ try:
173
+ from app.services.upstash_cache import get_upstash_cache
174
+ cache = get_upstash_cache()
175
+ redis_key = f"quota:{provider}:{date.today().isoformat()}"
176
+
177
+ # Ask Redis: how many calls have been made today so far?
178
+ raw = await cache._execute_command(["GET", redis_key])
179
+ used_today = int(raw) if raw is not None else 0
180
+
181
+ # Also sync in-memory so the sync path stays accurate.
182
+ self.quotas[provider]["calls_made"] = used_today
183
+
184
+ can_call = (used_today + calls) <= limit
185
+ if not can_call:
186
+ logger.warning(
187
+ "[QUOTA] %s daily limit reached: %d/%d (Redis source)",
188
+ provider.upper(), used_today, limit
189
+ )
190
+ return can_call
191
+
192
+ except Exception as e:
193
+ # Redis unavailable — fall back to the in-memory counter.
194
+ logger.debug("[QUOTA] Redis unavailable (%s) — using in-memory fallback.", e)
195
+ return self.can_make_call(provider, calls)
196
+
197
+ async def async_record_call(self, provider: str, calls: int = 1):
198
+ """
199
+ Record that we just used one API credit for this provider.
200
+
201
+ Writes to BOTH in-memory AND Redis so the count is correct
202
+ whether the server restarts or not.
203
+ """
204
+ if provider not in self.quotas or "calls_per_day" not in self.quotas[provider]:
205
+ return
206
+
207
+ # Always update in-memory immediately (zero latency fast path).
208
+ self.record_call(provider, calls)
209
+
210
+ # Then persist to Redis in the background so a restart does not lose the count.
211
+ try:
212
+ from app.services.upstash_cache import get_upstash_cache
213
+ cache = get_upstash_cache()
214
+ redis_key = f"quota:{provider}:{date.today().isoformat()}"
215
+
216
+ # INCR atomically adds 1 to the counter.
217
+ # If the key does not exist yet, Redis creates it and starts at 0.
218
+ await cache._execute_command(["INCR", redis_key])
219
+
220
+ # Make sure the key expires at the end of today (24-hour TTL).
221
+ # EXPIRE only sets it if not already set, so we do not keep
222
+ # resetting the TTL on every call.
223
+ await cache._execute_command(["EXPIRE", redis_key, 86400])
224
+
225
+ logger.debug(
226
+ "[QUOTA] Recorded call for %s in Redis (key: %s).",
227
+ provider.upper(), redis_key
228
+ )
229
+
230
+ except Exception as e:
231
+ # Redis write failed — in-memory was already updated, so we are still
232
+ # protected within this session. Log and move on.
233
+ logger.debug(
234
+ "[QUOTA] Redis write failed for %s (%s) — in-memory count still correct.",
235
+ provider.upper(), e
236
+ )
237
+
238
 
239
  # Global singleton
240
  _quota_tracker: Optional[APIQuotaTracker] = None
app/services/news_aggregator.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import httpx
2
  from typing import List, Dict, Optional
3
  from datetime import datetime
@@ -14,6 +15,8 @@ from app.services.news_providers import (
14
  OfficialCloudProvider
15
  )
16
  from app.config import settings
 
 
17
 
18
  class NewsAggregator:
19
  """Service for aggregating news from multiple sources with automatic failover"""
@@ -46,8 +49,27 @@ class NewsAggregator:
46
  # Official Cloud Provider (Strict Isolation)
47
  self.providers['official_cloud'] = OfficialCloudProvider()
48
 
49
- # Provider priority order
50
- self.provider_priority = settings.NEWS_PROVIDER_PRIORITY
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
 
52
  # Cloud provider RSS feeds
53
  self.cloud_rss_urls = {
@@ -65,53 +87,173 @@ class NewsAggregator:
65
  'provider_usage': {},
66
  'failover_count': 0
67
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
 
69
  async def fetch_by_category(self, category: str) -> List[Article]:
70
  """
71
- Fetch news by category using hybrid approach with automatic failover
72
- Tries providers in priority order until successful
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  """
74
- self.stats['total_requests'] += 1
75
-
76
- # Try each provider in priority order
77
- for provider_name in self.provider_priority:
 
 
 
 
 
 
78
  provider = self.providers.get(provider_name)
79
-
80
- # Skip if provider not configured
81
  if not provider:
82
  continue
83
-
84
- # Skip if provider is not available (rate limited)
 
 
 
 
 
 
 
 
 
 
 
 
85
  if not provider.is_available():
86
- print(f"[SKIP] [{provider_name.upper()}] Not available (rate limited), trying next...")
87
- self.stats['failover_count'] += 1
 
 
88
  continue
89
-
90
  try:
91
- print(f"[FETCH] [{provider_name.upper()}] Attempting to fetch '{category}' news...")
92
  articles = await provider.fetch_news(category, limit=20)
93
-
94
- # If we got articles, return them
95
  if articles:
96
- # No need to print here, provider already printed success
97
-
98
- # Track usage statistics
99
- if provider_name not in self.stats['provider_usage']:
100
- self.stats['provider_usage'][provider_name] = 0
101
- self.stats['provider_usage'][provider_name] += 1
102
-
103
- return articles
 
104
  else:
105
- print(f"[SKIP] [{provider_name.upper()}] No articles returned, trying next provider...")
106
-
107
  except Exception as e:
108
- print(f"[ERROR] [{provider_name.upper()}] Error: {e}, trying next...")
109
- self.stats['failover_count'] += 1
110
- continue
111
-
112
- # If all providers failed, return empty list
113
- print(f"[WARN] [NEWS AGGREGATOR] All providers exhausted for '{category}' - no articles available")
114
- return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
 
116
  async def fetch_from_provider(self, provider_name: str, category: str) -> List[Article]:
117
  """Fetch news specifically from a named provider (bypassing priority/failover)"""
 
1
+ import asyncio
2
  import httpx
3
  from typing import List, Dict, Optional
4
  from datetime import datetime
 
15
  OfficialCloudProvider
16
  )
17
  from app.config import settings
18
+ from app.services.api_quota import get_quota_tracker
19
+ from app.services.circuit_breaker import get_circuit_breaker
20
 
21
  class NewsAggregator:
22
  """Service for aggregating news from multiple sources with automatic failover"""
 
49
  # Official Cloud Provider (Strict Isolation)
50
  self.providers['official_cloud'] = OfficialCloudProvider()
51
 
52
+ # ── Provider role lists ──────────────────────────────────────────────
53
+ # PAID_CHAIN: tried in order, stop after the first success (save credits)
54
+ # FREE_SOURCES: always tried, always in parallel (no cost, no limits)
55
+ self.PAID_CHAIN = ['gnews', 'newsapi', 'newsdata']
56
+ self.FREE_SOURCES = ['google_rss', 'medium', 'official_cloud']
57
+
58
+ # Medium only publishes articles for a small set of topics.
59
+ # Calling it for 'data-centers' or 'cloud-oracle' would return nothing.
60
+ self.MEDIUM_SUPPORTED_CATEGORIES = {
61
+ 'ai', 'data-science', 'cloud-computing', 'programming',
62
+ 'technology', 'data-laws'
63
+ }
64
+
65
+ # Official Cloud RSS only makes sense for cloud-related categories.
66
+ self.CLOUD_CATEGORIES = {
67
+ c for c in [
68
+ 'cloud-computing', 'cloud-aws', 'cloud-azure', 'cloud-gcp',
69
+ 'cloud-oracle', 'cloud-ibm', 'cloud-alibaba', 'cloud-digitalocean',
70
+ 'cloud-huawei', 'cloud-cloudflare'
71
+ ]
72
+ }
73
 
74
  # Cloud provider RSS feeds
75
  self.cloud_rss_urls = {
 
87
  'provider_usage': {},
88
  'failover_count': 0
89
  }
90
+
91
+ # Async lock — keeps stats correct when 22 category tasks share this one aggregator.
92
+ # Without this, two tasks updating the same counter at the same time could miss a count.
93
+ self._lock = asyncio.Lock()
94
+
95
+ # --- Phase 2 additions: infrastructure guards ---
96
+
97
+ # Which providers cost real API credits.
98
+ # Kept as a set for O(1) lookup inside the waterfall loop.
99
+ self.paid_providers = set(self.PAID_CHAIN)
100
+
101
+ # The Quota Tracker counts how many API calls we have made today.
102
+ # It is a module-level singleton — once created it lives in memory for the
103
+ # entire lifetime of the server process, surviving every hourly scheduler
104
+ # run without resetting. (It DOES reset if the server itself restarts;
105
+ # that is acceptable for now and noted as a future improvement.)
106
+ self.quota = get_quota_tracker()
107
+
108
+ # The Circuit Breaker watches each provider for repeated failures.
109
+ # If a provider fails 3 times in 5 minutes, we stop calling it for 1 hour
110
+ # (like hanging up on a broken phone line and trying it again later).
111
+ # It is also a module-level singleton — same lifetime as the quota tracker.
112
+ self.circuit = get_circuit_breaker()
113
 
114
  async def fetch_by_category(self, category: str) -> List[Article]:
115
  """
116
+ Fetch news from ALL available sources for a category.
117
+
118
+ Strategy (Phase 5 — True Multi-Source Aggregation):
119
+
120
+ STEP A ─ Paid Waterfall:
121
+ Try GNews → NewsAPI → NewsData in order.
122
+ Stop as soon as one returns articles.
123
+ We only want ONE paid call per category to stay inside our daily budget.
124
+ Think of it like: only knock on the first open door, don't ring every bell.
125
+
126
+ STEP B ─ Free Parallel Run (always runs, even if Step A succeeded):
127
+ Simultaneously fetch from Google RSS, Medium, and Official Cloud.
128
+ These are free and have no rate-limit cost, so we always want them.
129
+ Think of it like: sending postcards to all your free newspaper subscriptions.
130
+
131
+ STEP C ─ Combine:
132
+ Merge paid + free results into one big list.
133
+ Duplicates are fine here — the in-batch deduplication in scheduler.py
134
+ will clean them up right after this function returns.
135
  """
136
+ async with self._lock:
137
+ self.stats['total_requests'] += 1
138
+
139
+ combined_articles: List[Article] = []
140
+
141
+ # ======================================================================
142
+ # STEP A: PAID WATERFALL — one successful call is all we need
143
+ # ======================================================================
144
+ paid_success = False
145
+ for provider_name in self.PAID_CHAIN:
146
  provider = self.providers.get(provider_name)
147
+
148
+ # Skip if this paid provider was not configured (no API key set).
149
  if not provider:
150
  continue
151
+
152
+ # Guard 1 Circuit Breaker
153
+ if self.circuit.should_skip(provider_name):
154
+ print(f"[CIRCUIT] [{provider_name.upper()}] Circuit OPEN — skipping this run.")
155
+ async with self._lock:
156
+ self.stats['failover_count'] += 1
157
+ continue
158
+
159
+ # Guard 2 ─ Quota Check (paid only)
160
+ if not await self.quota.async_can_make_call(provider_name):
161
+ print(f"[QUOTA] [{provider_name.upper()}] Daily limit reached — skipping.")
162
+ continue
163
+
164
+ # Guard 3 ─ Provider's own 429 flag
165
  if not provider.is_available():
166
+ print(f"[SKIP] [{provider_name.upper()}] Provider reported 429 recording and skipping.")
167
+ self.circuit.record_failure(provider_name, error_type="rate_limit", status_code=429)
168
+ async with self._lock:
169
+ self.stats['failover_count'] += 1
170
  continue
171
+
172
  try:
173
+ print(f"[PAID] [{provider_name.upper()}] Fetching '{category}'...")
174
  articles = await provider.fetch_news(category, limit=20)
175
+
 
176
  if articles:
177
+ self.circuit.record_success(provider_name)
178
+ await self.quota.async_record_call(provider_name)
179
+ async with self._lock:
180
+ self.stats['provider_usage'][provider_name] = \
181
+ self.stats['provider_usage'].get(provider_name, 0) + 1
182
+ combined_articles.extend(articles)
183
+ paid_success = True
184
+ print(f"[PAID] [{provider_name.upper()}] Got {len(articles)} articles — stopping paid chain.")
185
+ break # ← KEY: one success is enough, protect our credits
186
  else:
187
+ print(f"[PAID] [{provider_name.upper()}] No articles trying next paid provider.")
188
+
189
  except Exception as e:
190
+ print(f"[ERROR] [{provider_name.upper()}] Fetch failed: {e} recording failure.")
191
+ self.circuit.record_failure(provider_name, error_type="exception")
192
+ async with self._lock:
193
+ self.stats['failover_count'] += 1
194
+ continue # try next paid provider
195
+
196
+ if not paid_success:
197
+ print(f"[PAID] No paid provider delivered articles for '{category}'.")
198
+
199
+ # ======================================================================
200
+ # STEP B: FREE PARALLEL RUN — always fires, no cost
201
+ # ======================================================================
202
+ # We build a list of coroutines for free sources, but only include a
203
+ # provider if it actually supports this category (avoid pointless calls).
204
+ free_tasks: list = []
205
+ free_names: list = [] # track which name maps to which task result
206
+
207
+ # Google RSS supports ALL categories.
208
+ google_rss = self.providers.get('google_rss')
209
+ if google_rss and not self.circuit.should_skip('google_rss'):
210
+ if google_rss.is_available():
211
+ free_tasks.append(google_rss.fetch_news(category, limit=20))
212
+ free_names.append('google_rss')
213
+
214
+ # Medium only supports a small set of topics.
215
+ if category in self.MEDIUM_SUPPORTED_CATEGORIES:
216
+ medium = self.providers.get('medium')
217
+ if medium and not self.circuit.should_skip('medium'):
218
+ if medium.is_available():
219
+ free_tasks.append(medium.fetch_news(category, limit=10))
220
+ free_names.append('medium')
221
+
222
+ # Official Cloud RSS only makes sense for cloud-* categories.
223
+ if category in self.CLOUD_CATEGORIES:
224
+ official = self.providers.get('official_cloud')
225
+ if official and not self.circuit.should_skip('official_cloud'):
226
+ if official.is_available():
227
+ free_tasks.append(official.fetch_news(category, limit=10))
228
+ free_names.append('official_cloud')
229
+
230
+ if free_tasks:
231
+ print(f"[FREE] Launching {len(free_tasks)} free source(s) in parallel for '{category}'...")
232
+ free_results = await asyncio.gather(*free_tasks, return_exceptions=True)
233
+
234
+ for name, result in zip(free_names, free_results):
235
+ if isinstance(result, Exception):
236
+ print(f"[ERROR] [{name.upper()}] Free fetch error: {result}")
237
+ self.circuit.record_failure(name, error_type="exception")
238
+ elif isinstance(result, list) and result:
239
+ self.circuit.record_success(name)
240
+ combined_articles.extend(result)
241
+ print(f"[FREE] [{name.upper()}] Got {len(result)} articles.")
242
+ async with self._lock:
243
+ self.stats['provider_usage'][name] = \
244
+ self.stats['provider_usage'].get(name, 0) + 1
245
+
246
+ # ======================================================================
247
+ # STEP C: RETURN COMBINED LIST
248
+ # ======================================================================
249
+ # Return everything we collected. Duplicates are expected and welcome —
250
+ # the in-batch dedup in scheduler.py (Phase 1) will strip them cleanly.
251
+ if combined_articles:
252
+ print(f"[DONE] '{category}': {len(combined_articles)} total articles from all sources.")
253
+ else:
254
+ print(f"[WARN] '{category}': No articles from any source this run.")
255
+
256
+ return combined_articles
257
 
258
  async def fetch_from_provider(self, provider_name: str, category: str) -> List[Article]:
259
  """Fetch news specifically from a named provider (bypassing priority/failover)"""
app/services/news_providers.py CHANGED
@@ -58,11 +58,21 @@ class GNewsProvider(NewsProvider):
58
  'data-governance': 'data governance compliance',
59
  'data-privacy': 'data privacy GDPR',
60
  'data-engineering': 'data engineering pipeline',
 
61
  'business-intelligence': 'business intelligence BI',
62
  'business-analytics': 'business analytics',
63
  'customer-data-platform': 'customer data platform CDP',
64
  'data-centers': 'data centers infrastructure',
65
  'cloud-computing': 'cloud computing AWS Azure Google Cloud Salesforce Alibaba Cloud Tencent Cloud Huawei Cloud Cloudflare',
 
 
 
 
 
 
 
 
 
66
  'medium-article': 'Medium article blog writing publishing',
67
  'magazines': 'technology news',
68
  'data-laws': 'data privacy law GDPR CCPA AI regulation compliance',
@@ -146,11 +156,21 @@ class NewsAPIProvider(NewsProvider):
146
  'data-governance': '"data governance" OR "data management" OR compliance',
147
  'data-privacy': '"data privacy" OR GDPR OR "privacy regulation"',
148
  'data-engineering': '"data engineering" OR "data pipeline" OR "big data"',
 
149
  'business-intelligence': '"business intelligence" OR "BI tools"',
150
  'business-analytics': '"business analytics" OR analytics',
151
  'customer-data-platform': '"customer data platform" OR CDP',
152
  'data-centers': '"data centers" OR "data centre"',
153
  'cloud-computing': '"cloud computing" OR AWS OR Azure OR "Google Cloud" OR Salesforce OR "Alibaba Cloud" OR "Tencent Cloud" OR "Huawei Cloud" OR Cloudflare',
 
 
 
 
 
 
 
 
 
154
  'medium-article': 'Medium OR "Medium article" OR "Medium blog" OR "Medium publishing"',
155
  'magazines': 'technology',
156
  'data-laws': '"data privacy law" OR GDPR OR CCPA OR "EU AI Act" OR "data protection act"',
@@ -225,11 +245,21 @@ class NewsDataProvider(NewsProvider):
225
  'data-governance': 'data governance,compliance',
226
  'data-privacy': 'data privacy,GDPR',
227
  'data-engineering': 'data engineering,big data',
 
228
  'business-intelligence': 'business intelligence',
229
  'business-analytics': 'business analytics',
230
  'customer-data-platform': 'customer data platform',
231
  'data-centers': 'data centers',
232
  'cloud-computing': 'cloud computing,AWS,Azure,Google Cloud,Salesforce,Alibaba Cloud,Tencent Cloud,Huawei Cloud,Cloudflare',
 
 
 
 
 
 
 
 
 
233
  'medium-article': 'Medium,article,blog,writing,publishing',
234
  'magazines': 'technology',
235
  'data-laws': 'data privacy law,GDPR,CCPA,AI regulation,compliance',
@@ -310,11 +340,21 @@ class GoogleNewsRSSProvider(NewsProvider):
310
  'data-governance': 'https://news.google.com/rss/search?q=data+governance+OR+data+management&hl=en-US&gl=US&ceid=US:en',
311
  'data-privacy': 'https://news.google.com/rss/search?q=data+privacy+OR+GDPR+OR+privacy+regulation&hl=en-US&gl=US&ceid=US:en',
312
  'data-engineering': 'https://news.google.com/rss/search?q=data+engineering+OR+data+pipeline+OR+big+data&hl=en-US&gl=US&ceid=US:en',
 
313
  'business-intelligence': 'https://news.google.com/rss/search?q=business+intelligence+OR+BI+tools&hl=en-US&gl=US&ceid=US:en',
314
  'business-analytics': 'https://news.google.com/rss/search?q=business+analytics&hl=en-US&gl=US&ceid=US:en',
315
  'customer-data-platform': 'https://news.google.com/rss/search?q=customer+data+platform+OR+CDP&hl=en-US&gl=US&ceid=US:en',
316
  'data-centers': 'https://news.google.com/rss/search?q=data+centers+OR+data+centre&hl=en-US&gl=US&ceid=US:en',
317
  'cloud-computing': 'https://news.google.com/rss/search?q=cloud+computing+OR+AWS+OR+Azure+OR+Google+Cloud+OR+Salesforce+OR+Alibaba+Cloud+OR+Tencent+Cloud+OR+Huawei+Cloud+OR+Cloudflare&hl=en-US&gl=US&ceid=US:en',
 
 
 
 
 
 
 
 
 
318
  'medium-article': 'https://news.google.com/rss/search?q=Medium+article+OR+Medium+blog+OR+Medium+publishing&hl=en-US&gl=US&ceid=US:en',
319
  'magazines': 'https://news.google.com/rss/headlines/section/topic/TECHNOLOGY?hl=en-US&gl=US&ceid=US:en',
320
  'data-laws': 'https://news.google.com/rss/search?q=data+privacy+law+OR+GDPR+OR+CCPA+OR+AI+Regulation&hl=en-US&gl=US&ceid=US:en',
 
58
  'data-governance': 'data governance compliance',
59
  'data-privacy': 'data privacy GDPR',
60
  'data-engineering': 'data engineering pipeline',
61
+ 'data-management': 'data management master data MDM data catalog data quality',
62
  'business-intelligence': 'business intelligence BI',
63
  'business-analytics': 'business analytics',
64
  'customer-data-platform': 'customer data platform CDP',
65
  'data-centers': 'data centers infrastructure',
66
  'cloud-computing': 'cloud computing AWS Azure Google Cloud Salesforce Alibaba Cloud Tencent Cloud Huawei Cloud Cloudflare',
67
+ 'cloud-aws': 'AWS Amazon Web Services S3 EC2 Lambda CloudFront SageMaker',
68
+ 'cloud-azure': 'Microsoft Azure Azure DevOps Azure ML Azure OpenAI',
69
+ 'cloud-gcp': 'Google Cloud Platform GCP BigQuery Vertex AI Cloud Run Dataflow',
70
+ 'cloud-oracle': 'Oracle Cloud OCI Oracle Database Oracle Fusion',
71
+ 'cloud-ibm': 'IBM Cloud IBM Watson Red Hat OpenShift IBM Z',
72
+ 'cloud-alibaba': 'Alibaba Cloud Aliyun AliCloud',
73
+ 'cloud-digitalocean': 'DigitalOcean Droplet App Platform',
74
+ 'cloud-huawei': 'Huawei Cloud HuaweiCloud',
75
+ 'cloud-cloudflare': 'Cloudflare Workers R2 Cloudflare Pages Zero Trust',
76
  'medium-article': 'Medium article blog writing publishing',
77
  'magazines': 'technology news',
78
  'data-laws': 'data privacy law GDPR CCPA AI regulation compliance',
 
156
  'data-governance': '"data governance" OR "data management" OR compliance',
157
  'data-privacy': '"data privacy" OR GDPR OR "privacy regulation"',
158
  'data-engineering': '"data engineering" OR "data pipeline" OR "big data"',
159
+ 'data-management': '"data management" OR "master data" OR MDM OR "data catalog" OR "data quality" OR "data lineage"',
160
  'business-intelligence': '"business intelligence" OR "BI tools"',
161
  'business-analytics': '"business analytics" OR analytics',
162
  'customer-data-platform': '"customer data platform" OR CDP',
163
  'data-centers': '"data centers" OR "data centre"',
164
  'cloud-computing': '"cloud computing" OR AWS OR Azure OR "Google Cloud" OR Salesforce OR "Alibaba Cloud" OR "Tencent Cloud" OR "Huawei Cloud" OR Cloudflare',
165
+ 'cloud-aws': 'AWS OR "Amazon Web Services" OR "Amazon S3" OR EC2 OR Lambda OR CloudFront OR SageMaker',
166
+ 'cloud-azure': 'Azure OR "Microsoft Azure" OR "Azure DevOps" OR "Azure ML" OR "Azure OpenAI"',
167
+ 'cloud-gcp': 'GCP OR "Google Cloud" OR BigQuery OR "Vertex AI" OR "Cloud Run" OR Dataflow',
168
+ 'cloud-oracle': '"Oracle Cloud" OR OCI OR "Oracle Database" OR "Oracle Fusion"',
169
+ 'cloud-ibm': '"IBM Cloud" OR "IBM Watson" OR "Red Hat" OR OpenShift OR "IBM Z"',
170
+ 'cloud-alibaba': '"Alibaba Cloud" OR Aliyun OR AliCloud',
171
+ 'cloud-digitalocean': 'DigitalOcean OR Droplet OR "App Platform"',
172
+ 'cloud-huawei': '"Huawei Cloud" OR HuaweiCloud',
173
+ 'cloud-cloudflare': 'Cloudflare OR "Cloudflare Workers" OR "Cloudflare R2" OR "Zero Trust"',
174
  'medium-article': 'Medium OR "Medium article" OR "Medium blog" OR "Medium publishing"',
175
  'magazines': 'technology',
176
  'data-laws': '"data privacy law" OR GDPR OR CCPA OR "EU AI Act" OR "data protection act"',
 
245
  'data-governance': 'data governance,compliance',
246
  'data-privacy': 'data privacy,GDPR',
247
  'data-engineering': 'data engineering,big data',
248
+ 'data-management': 'data management,master data,MDM,data catalog,data quality,data lineage',
249
  'business-intelligence': 'business intelligence',
250
  'business-analytics': 'business analytics',
251
  'customer-data-platform': 'customer data platform',
252
  'data-centers': 'data centers',
253
  'cloud-computing': 'cloud computing,AWS,Azure,Google Cloud,Salesforce,Alibaba Cloud,Tencent Cloud,Huawei Cloud,Cloudflare',
254
+ 'cloud-aws': 'AWS,Amazon Web Services,Amazon S3,EC2,Lambda,CloudFront,SageMaker',
255
+ 'cloud-azure': 'Azure,Microsoft Azure,Azure DevOps,Azure ML,Azure OpenAI',
256
+ 'cloud-gcp': 'GCP,Google Cloud Platform,BigQuery,Vertex AI,Cloud Run,Dataflow',
257
+ 'cloud-oracle': 'Oracle Cloud,OCI,Oracle Database,Oracle Fusion',
258
+ 'cloud-ibm': 'IBM Cloud,IBM Watson,Red Hat,OpenShift,IBM Z',
259
+ 'cloud-alibaba': 'Alibaba Cloud,Aliyun,AliCloud',
260
+ 'cloud-digitalocean': 'DigitalOcean,Droplet,App Platform',
261
+ 'cloud-huawei': 'Huawei Cloud,HuaweiCloud',
262
+ 'cloud-cloudflare': 'Cloudflare,Cloudflare Workers,Cloudflare R2,Zero Trust',
263
  'medium-article': 'Medium,article,blog,writing,publishing',
264
  'magazines': 'technology',
265
  'data-laws': 'data privacy law,GDPR,CCPA,AI regulation,compliance',
 
340
  'data-governance': 'https://news.google.com/rss/search?q=data+governance+OR+data+management&hl=en-US&gl=US&ceid=US:en',
341
  'data-privacy': 'https://news.google.com/rss/search?q=data+privacy+OR+GDPR+OR+privacy+regulation&hl=en-US&gl=US&ceid=US:en',
342
  'data-engineering': 'https://news.google.com/rss/search?q=data+engineering+OR+data+pipeline+OR+big+data&hl=en-US&gl=US&ceid=US:en',
343
+ 'data-management': 'https://news.google.com/rss/search?q=%22data+management%22+OR+%22master+data%22+OR+MDM+OR+%22data+catalog%22&hl=en-US&gl=US&ceid=US:en',
344
  'business-intelligence': 'https://news.google.com/rss/search?q=business+intelligence+OR+BI+tools&hl=en-US&gl=US&ceid=US:en',
345
  'business-analytics': 'https://news.google.com/rss/search?q=business+analytics&hl=en-US&gl=US&ceid=US:en',
346
  'customer-data-platform': 'https://news.google.com/rss/search?q=customer+data+platform+OR+CDP&hl=en-US&gl=US&ceid=US:en',
347
  'data-centers': 'https://news.google.com/rss/search?q=data+centers+OR+data+centre&hl=en-US&gl=US&ceid=US:en',
348
  'cloud-computing': 'https://news.google.com/rss/search?q=cloud+computing+OR+AWS+OR+Azure+OR+Google+Cloud+OR+Salesforce+OR+Alibaba+Cloud+OR+Tencent+Cloud+OR+Huawei+Cloud+OR+Cloudflare&hl=en-US&gl=US&ceid=US:en',
349
+ 'cloud-aws': 'https://news.google.com/rss/search?q=AWS+OR+%22Amazon+Web+Services%22+OR+%22Amazon+S3%22+OR+EC2+OR+Lambda&hl=en-US&gl=US&ceid=US:en',
350
+ 'cloud-azure': 'https://news.google.com/rss/search?q=Azure+OR+%22Microsoft+Azure%22+OR+%22Azure+DevOps%22&hl=en-US&gl=US&ceid=US:en',
351
+ 'cloud-gcp': 'https://news.google.com/rss/search?q=GCP+OR+%22Google+Cloud%22+OR+BigQuery+OR+%22Vertex+AI%22&hl=en-US&gl=US&ceid=US:en',
352
+ 'cloud-oracle': 'https://news.google.com/rss/search?q=%22Oracle+Cloud%22+OR+OCI+OR+%22Oracle+Database%22&hl=en-US&gl=US&ceid=US:en',
353
+ 'cloud-ibm': 'https://news.google.com/rss/search?q=%22IBM+Cloud%22+OR+%22IBM+Watson%22+OR+OpenShift&hl=en-US&gl=US&ceid=US:en',
354
+ 'cloud-alibaba': 'https://news.google.com/rss/search?q=%22Alibaba+Cloud%22+OR+Aliyun&hl=en-US&gl=US&ceid=US:en',
355
+ 'cloud-digitalocean': 'https://news.google.com/rss/search?q=DigitalOcean+OR+Droplet&hl=en-US&gl=US&ceid=US:en',
356
+ 'cloud-huawei': 'https://news.google.com/rss/search?q=%22Huawei+Cloud%22+OR+HuaweiCloud&hl=en-US&gl=US&ceid=US:en',
357
+ 'cloud-cloudflare': 'https://news.google.com/rss/search?q=Cloudflare+OR+%22Cloudflare+Workers%22+OR+%22Zero+Trust%22&hl=en-US&gl=US&ceid=US:en',
358
  'medium-article': 'https://news.google.com/rss/search?q=Medium+article+OR+Medium+blog+OR+Medium+publishing&hl=en-US&gl=US&ceid=US:en',
359
  'magazines': 'https://news.google.com/rss/headlines/section/topic/TECHNOLOGY?hl=en-US&gl=US&ceid=US:en',
360
  'data-laws': 'https://news.google.com/rss/search?q=data+privacy+law+OR+GDPR+OR+CCPA+OR+AI+Regulation&hl=en-US&gl=US&ceid=US:en',
app/services/scheduler.py CHANGED
@@ -51,6 +51,43 @@ CATEGORIES = [
51
  "cloud-cloudflare"
52
  ]
53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
 
55
  async def fetch_all_news():
56
  """
@@ -78,10 +115,17 @@ async def fetch_all_news():
78
  total_irrelevant = 0
79
  category_stats = {}
80
 
81
- # Parallel fetch all categories at once
 
 
 
 
 
 
 
82
  fetch_tasks = []
83
  for category in CATEGORIES:
84
- task = fetch_and_validate_category(category)
85
  fetch_tasks.append(task)
86
 
87
  # Execute all fetches concurrently with error isolation
@@ -99,7 +143,9 @@ async def fetch_all_news():
99
  total_errors += 1
100
  continue
101
 
102
- category, articles, invalid_count, irrelevant_count = result
 
 
103
 
104
  if not articles:
105
  logger.warning("⚠️ No valid articles for category: %s", category)
@@ -195,22 +241,119 @@ async def fetch_all_news():
195
  )
196
 
197
  # Update adaptive scheduler intervals
198
- from app.services.adaptive_scheduler import get_adaptive_scheduler
199
-
200
- adaptive = get_adaptive_scheduler(CATEGORIES)
201
  if adaptive:
202
- # Update intervals based on this run's statistics
203
- for category, stats in category_stats.items():
204
  if 'fetched' in stats:
205
- new_interval = adaptive.update_category_velocity(
206
- category,
207
- stats['fetched']
208
- )
209
-
210
- # Print adaptive scheduler summary
211
  adaptive.print_summary()
212
 
213
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
214
  async def fetch_daily_research():
215
  """
216
  Background Job: Fetch Research Papers from ArXiv
@@ -232,76 +375,115 @@ async def fetch_daily_research():
232
  logger.info("═" * 80)
233
 
234
 
235
- async def fetch_and_validate_category(category: str) -> tuple:
236
  """
237
- Fetch and validate articles for a single category
238
-
 
 
 
 
 
 
239
  Returns: (category, valid_articles, invalid_count, irrelevant_count)
240
  """
241
  from app.utils.data_validation import is_valid_article, sanitize_article, is_relevant_to_category
242
  from app.utils.date_parser import normalize_article_date
 
 
243
 
244
  try:
245
  logger.info("📌 Fetching %s...", category.upper())
246
 
247
- # Fetch from external APIs
248
- news_aggregator = NewsAggregator()
249
-
250
- # Concurrent fetch from Main Chain + Medium + Official Cloud
251
- main_task = news_aggregator.fetch_by_category(category)
252
- medium_task = news_aggregator.fetch_from_provider('medium', category)
253
- official_task = news_aggregator.fetch_from_provider('official_cloud', category)
254
-
255
- results = await asyncio.gather(main_task, medium_task, official_task, return_exceptions=True)
256
-
257
- # Combine results
258
- raw_articles = []
259
-
260
- # Result 0: Main Provider Chain
261
- if isinstance(results[0], list):
262
- raw_articles.extend(results[0])
263
-
264
- # Result 1: Medium RSS
265
- if isinstance(results[1], list):
266
- if results[1]:
267
- logger.info(" + Found %d Medium articles for %s", len(results[1]), category)
268
- raw_articles.extend(results[1])
269
-
270
- # Result 2: Official Cloud
271
- if isinstance(results[2], list):
272
- if results[2]:
273
- logger.info(" + Found %d Official Cloud articles for %s", len(results[2]), category)
274
- raw_articles.extend(results[2])
275
 
276
  if not raw_articles:
277
  return (category, [], 0, 0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
278
 
279
  # Validate, filter, and sanitize
280
  valid_articles = []
281
  invalid_count = 0
282
  irrelevant_count = 0
 
283
 
284
  for article in raw_articles:
285
- # Step 1: Basic validation
286
  if not is_valid_article(article):
287
  invalid_count += 1
288
  continue
289
-
290
- # Step 2: Category relevance check
291
  if not is_relevant_to_category(article, category):
292
  irrelevant_count += 1
293
  continue
294
-
295
- # Step 3: Normalize date to UTC ISO-8601
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
296
  article = normalize_article_date(article)
297
-
298
- # Step 4: Sanitize and clean
299
  clean_article = sanitize_article(article)
300
  valid_articles.append(clean_article)
301
 
302
- logger.info("✓ %s: %d valid, %d invalid, %d irrelevant",
303
  category.upper(), len(valid_articles), invalid_count, irrelevant_count)
304
- return (category, valid_articles, invalid_count, irrelevant_count)
305
 
306
  except asyncio.TimeoutError:
307
  logger.error("⏱️ Timeout fetching %s (>30s)", category)
@@ -473,18 +655,41 @@ def start_scheduler():
473
  logger.info("⏰ [SCHEDULER] Initializing background scheduler...")
474
  logger.info("═" * 80)
475
 
476
- # News Fetcher Job (Frequency: Every 1 hour)
477
- scheduler.add_job(
478
- fetch_all_news,
479
- trigger=IntervalTrigger(hours=1),
480
- id='fetch_all_news',
481
- name='News Fetcher (every 1 hour)',
482
- replace_existing=True
483
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
484
  logger.info("")
485
- logger.info("✅ Job #1 Registered: 📰 News Fetcher")
486
- logger.info(" ⏱️ Schedule: Every 1 hour")
487
- logger.info(" 📋 Task: Direct Fetch -> Deduplicate -> Store (Appwrite)")
488
 
489
  # Cleanup Job (Frequency: Every 30 minutes)
490
  scheduler.add_job(
 
51
  "cloud-cloudflare"
52
  ]
53
 
54
+ # --------------------------------------------------------------------------
55
+ # MODULE-LEVEL SINGLETONS (Phase 6)
56
+ # --------------------------------------------------------------------------
57
+ # These two objects are created ONCE when the server starts and are shared
58
+ # by all 22 per-category jobs for the entire lifetime of the process.
59
+ #
60
+ # _shared_aggregator — one NewsAggregator for all categories (Phase 1 fix).
61
+ # It holds provider state (quota counts, circuit-breaker) that must
62
+ # survive across job runs. Creating a new one for every job would reset
63
+ # all that carefully maintained state.
64
+ #
65
+ # _adaptive — the AdaptiveScheduler that tracks how many articles each
66
+ # category produces and adjusts its fetch interval accordingly.
67
+ # Also persists to disk (data/velocity_tracking.json) so intervals
68
+ # survive server restarts.
69
+ # --------------------------------------------------------------------------
70
+ _shared_aggregator = None
71
+ _adaptive = None
72
+
73
+
74
+ def _get_shared_aggregator():
75
+ """Return (creating if needed) the one shared NewsAggregator instance."""
76
+ global _shared_aggregator
77
+ if _shared_aggregator is None:
78
+ _shared_aggregator = NewsAggregator()
79
+ logger.info("[AGGREGATOR] Shared NewsAggregator created (singleton).")
80
+ return _shared_aggregator
81
+
82
+
83
+ def _get_adaptive():
84
+ """Return (creating if needed) the one shared AdaptiveScheduler instance."""
85
+ global _adaptive
86
+ if _adaptive is None:
87
+ _adaptive = get_adaptive_scheduler(CATEGORIES)
88
+ logger.info("[ADAPTIVE] AdaptiveScheduler created for %d categories.", len(CATEGORIES))
89
+ return _adaptive
90
+
91
 
92
  async def fetch_all_news():
93
  """
 
115
  total_irrelevant = 0
116
  category_stats = {}
117
 
118
+ # Parallel fetch all categories at once.
119
+ # We create ONE shared aggregator here so all 22 category tasks share
120
+ # the same provider state (quota counts, circuit states, etc.).
121
+ # Fix #3 (Phase 7): Use the permanent module-level singleton instead of
122
+ # creating a fresh instance here. This ensures that even manual triggers
123
+ # respect the live quota counts and circuit-breaker state from the
124
+ # adaptive jobs that may already be running.
125
+ shared_aggregator = _get_shared_aggregator()
126
  fetch_tasks = []
127
  for category in CATEGORIES:
128
+ task = fetch_and_validate_category(category, shared_aggregator)
129
  fetch_tasks.append(task)
130
 
131
  # Execute all fetches concurrently with error isolation
 
143
  total_errors += 1
144
  continue
145
 
146
+ # Unpack 5-tuple relevant_count (5th item) is not needed here,
147
+ # it is only used by fetch_single_category_job for adaptive velocity.
148
+ category, articles, invalid_count, irrelevant_count, _ = result
149
 
150
  if not articles:
151
  logger.warning("⚠️ No valid articles for category: %s", category)
 
241
  )
242
 
243
  # Update adaptive scheduler intervals
244
+ # (kept for backward compat — manual trigger may still call this)
245
+ adaptive = _get_adaptive()
 
246
  if adaptive:
247
+ for cat, stats in category_stats.items():
 
248
  if 'fetched' in stats:
249
+ adaptive.update_category_velocity(cat, stats['fetched'])
 
 
 
 
 
250
  adaptive.print_summary()
251
 
252
 
253
+ async def fetch_single_category_job(category: str):
254
+ """
255
+ Per-category background job (Phase 6).
256
+
257
+ This is what each of the 22 adaptive jobs calls every N minutes.
258
+ It is a self-contained unit: fetch → validate → save → report → reschedule.
259
+
260
+ In plain English:
261
+ Think of this like a delivery driver who has a single route (one category).
262
+ After every delivery run, the dispatcher (adaptive scheduler) checks how
263
+ many packages were delivered. If the route is always busy (lots of news),
264
+ the driver gets sent out more often. If the route is quiet, the driver
265
+ waits longer before going out again.
266
+ """
267
+ aggregator = _get_shared_aggregator()
268
+ adaptive = _get_adaptive()
269
+
270
+ logger.info("[ADAPTIVE JOB] Starting fetch for category: %s", category.upper())
271
+
272
+ try:
273
+ # Step 1: Fetch + validate (calls the full Phase 1-4 pipeline).
274
+ result = await fetch_and_validate_category(category, aggregator)
275
+
276
+ if isinstance(result, Exception):
277
+ logger.error("[ADAPTIVE JOB] %s fetch failed: %s", category, result)
278
+ return
279
+
280
+ # Unpack the 5-tuple returned by fetch_and_validate_category.
281
+ # relevant_count = articles that passed Steps 1+2 (valid + on-topic)
282
+ # but before Step 3 (Redis 48h dedup) filtered them.
283
+ # This is the true measure of how active a category's news feed is.
284
+ cat, articles, invalid_count, irrelevant_count, relevant_count = result
285
+
286
+ if not articles:
287
+ logger.info("[ADAPTIVE JOB] %s: No valid articles this run.", category.upper())
288
+ saved_count = 0
289
+ else:
290
+ # Step 2: Save to Appwrite.
291
+ appwrite_db = get_appwrite_db()
292
+ cache_service = CacheService()
293
+
294
+ logger.info("[ADAPTIVE JOB] %s: Saving %d articles...", category.upper(), len(articles))
295
+ saved_count, duplicate_count, error_count, _ = await appwrite_db.save_articles(articles)
296
+
297
+ logger.info(
298
+ "[ADAPTIVE JOB] %s: %d saved, %d duplicates, %d errors, "
299
+ "%d invalid, %d irrelevant.",
300
+ category.upper(), saved_count, duplicate_count, error_count,
301
+ invalid_count, irrelevant_count
302
+ )
303
+
304
+ # Step 3: Update Redis article cache so the API serves fresh results.
305
+ try:
306
+ await cache_service.set(f"news:{category}", articles, ttl=settings.CACHE_TTL)
307
+ except Exception as cache_err:
308
+ logger.debug("[ADAPTIVE JOB] Redis cache update skipped: %s", cache_err)
309
+
310
+ # Step 4: Feed result count back to the adaptive scheduler.
311
+ # We use relevant_count (articles that passed validation + keyword relevance)
312
+ # rather than saved_count (articles actually new to Appwrite).
313
+ #
314
+ # Why? A busy category with a slow-updating RSS feed will have high
315
+ # relevant_count but low saved_count (we already have the articles).
316
+ # Using saved_count would incorrectly mark it as "quiet" and slow it down.
317
+ # relevant_count correctly reflects: "how much real news is out there?"
318
+ if adaptive:
319
+ # Fix #1 (Phase 7): Read old_interval NOW, before update_category_velocity
320
+ # overwrites data['interval'] inside the AdaptiveScheduler.
321
+ # The comparison new_interval != old_interval was always False before
322
+ # because we were reading the interval AFTER it was already updated.
323
+ old_interval = adaptive.get_interval(category)
324
+
325
+ # Now update velocity with the correct metric (in-memory only — instant).
326
+ new_interval = adaptive.update_category_velocity(category, relevant_count)
327
+
328
+ # Persist the updated velocity to Redis asynchronously.
329
+ # async_persist() uses httpx.AsyncClient so it never blocks the event loop.
330
+ # Think of it like dropping a letter in a post box — we do not stand
331
+ # and wait for the postman to deliver it. We just drop it and walk on.
332
+ await adaptive.async_persist()
333
+
334
+ # Step 5: If the interval genuinely changed, tell APScheduler
335
+ # to reschedule this specific job live — no server restart needed.
336
+ if new_interval != old_interval:
337
+ job_id = f"fetch_{category}"
338
+ try:
339
+ scheduler.reschedule_job(
340
+ job_id,
341
+ trigger=IntervalTrigger(minutes=new_interval)
342
+ )
343
+ logger.info(
344
+ "[ADAPTIVE] %s interval changed: %dmin → %dmin. Job rescheduled live.",
345
+ category.upper(), old_interval, new_interval
346
+ )
347
+ except Exception as reschedule_err:
348
+ logger.warning(
349
+ "[ADAPTIVE] Could not reschedule %s job: %s",
350
+ job_id, reschedule_err
351
+ )
352
+
353
+ except Exception as e:
354
+ logger.exception("[ADAPTIVE JOB] Unhandled error for category %s: %s", category, e)
355
+
356
+
357
  async def fetch_daily_research():
358
  """
359
  Background Job: Fetch Research Papers from ArXiv
 
375
  logger.info("═" * 80)
376
 
377
 
378
+ async def fetch_and_validate_category(category: str, aggregator) -> tuple:
379
  """
380
+ Fetch and validate articles for a single category.
381
+
382
+ Args:
383
+ category: The news category (e.g. 'ai', 'cloud-aws').
384
+ aggregator: The shared NewsAggregator instance for this run.
385
+ Using a shared instance means all 22 parallel tasks
386
+ share the same quota counters and circuit-breaker state.
387
+
388
  Returns: (category, valid_articles, invalid_count, irrelevant_count)
389
  """
390
  from app.utils.data_validation import is_valid_article, sanitize_article, is_relevant_to_category
391
  from app.utils.date_parser import normalize_article_date
392
+ from app.utils.url_canonicalization import canonicalize_url
393
+ from app.utils.redis_dedup import is_url_seen_or_mark
394
 
395
  try:
396
  logger.info("📌 Fetching %s...", category.upper())
397
 
398
+ # Ask the aggregator for all articles from all sources for this category.
399
+ # fetch_by_category (Phase 5) internally runs:
400
+ # 1. Paid waterfall — GNews → NewsAPI → NewsData (stops on first success)
401
+ # 2. Free parallel — Google RSS + Medium + Official Cloud, all at once
402
+ # 3. Returns the merged list
403
+ # We no longer need to call fetch_from_provider for medium/official_cloud
404
+ # separately here. That would duplicate the work Phase 5 already does.
405
+ raw_articles = await aggregator.fetch_by_category(category)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
406
 
407
  if not raw_articles:
408
  return (category, [], 0, 0)
409
+
410
+ # ------------------------------------------------------------------
411
+ # IN-BATCH DEDUPLICATION
412
+ # ------------------------------------------------------------------
413
+ # When 3 providers run at the same time for the same category, they
414
+ # sometimes return the exact same article (e.g. a TechCrunch AI story
415
+ # can come from both GNews AND Google RSS in the same fetch cycle).
416
+ # We catch and remove these same-batch duplicates RIGHT HERE, before
417
+ # the expensive validation loop even starts.
418
+ # This is like a quick ID-card check at the entrance before people
419
+ # join the full security screening queue.
420
+ _seen_in_batch: set = set()
421
+ _deduplicated_raw = []
422
+ for _art in raw_articles:
423
+ _raw_url = str(_art.url) if _art.url else ''
424
+ _canonical = canonicalize_url(_raw_url) if _raw_url else ''
425
+ # If we have a valid canonical URL and we've already seen it → skip
426
+ if _canonical and _canonical in _seen_in_batch:
427
+ continue
428
+ if _canonical:
429
+ _seen_in_batch.add(_canonical)
430
+ _deduplicated_raw.append(_art)
431
+
432
+ _batch_dupes_removed = len(raw_articles) - len(_deduplicated_raw)
433
+ if _batch_dupes_removed > 0:
434
+ logger.info(
435
+ " 🔄 [BATCH DEDUP] %s: Removed %d within-batch duplicates before validation",
436
+ category.upper(), _batch_dupes_removed
437
+ )
438
+ raw_articles = _deduplicated_raw
439
+ # ------------------------------------------------------------------
440
 
441
  # Validate, filter, and sanitize
442
  valid_articles = []
443
  invalid_count = 0
444
  irrelevant_count = 0
445
+ relevant_count = 0 # articles that are valid + relevant, before Redis dedup
446
 
447
  for article in raw_articles:
448
+ # Step 1: Basic validation — must have a title, URL, and publication date.
449
  if not is_valid_article(article):
450
  invalid_count += 1
451
  continue
452
+
453
+ # Step 2: Category relevance check — title+description must match category keywords.
454
  if not is_relevant_to_category(article, category):
455
  irrelevant_count += 1
456
  continue
457
+
458
+ # Checkpoint: count articles that are valid AND relevant, but before
459
+ # the Redis 48-hour check strips out the ones we have already stored.
460
+ # This is the true "how much real news is in this category?" signal.
461
+ # The adaptive scheduler uses this number to decide fetch frequency.
462
+ # (Fix #2 - Phase 7: was using saved_count, which confused "quiet feed"
463
+ # with "feed we already have fully stored" — two very different things.)
464
+ relevant_count += 1
465
+
466
+ # Step 3: Redis 48-hour dedup check — THE MAIN BOUNCER.
467
+ # Check if we have already stored this exact article URL in the last 48 hours.
468
+ # If yes, skip silently — it's a repeat. If no, mark it as seen and continue.
469
+ # This stops the same article being saved every hour from a slow-updating RSS feed.
470
+ if await is_url_seen_or_mark(str(article.url) if article.url else ''):
471
+ logger.debug(
472
+ " [REDIS DEDUP] Skipped article already seen in last 48 hours: %s",
473
+ str(article.url)[:80]
474
+ )
475
+ continue
476
+
477
+ # Step 4: Normalize date to UTC ISO-8601.
478
  article = normalize_article_date(article)
479
+
480
+ # Step 5: Sanitize and clean the article fields.
481
  clean_article = sanitize_article(article)
482
  valid_articles.append(clean_article)
483
 
484
+ logger.info("✓ %s: %d valid, %d invalid, %d irrelevant",
485
  category.upper(), len(valid_articles), invalid_count, irrelevant_count)
486
+ return (category, valid_articles, invalid_count, irrelevant_count, relevant_count)
487
 
488
  except asyncio.TimeoutError:
489
  logger.error("⏱️ Timeout fetching %s (>30s)", category)
 
655
  logger.info("⏰ [SCHEDULER] Initializing background scheduler...")
656
  logger.info("═" * 80)
657
 
658
+ # ── Job #1: PER-CATEGORY ADAPTIVE NEWS FETCHERS (Phase 6) ───────────
659
+ # Instead of one giant job that fetches all 22 categories every hour,
660
+ # we register 22 individual jobs, each on its own timer.
661
+ #
662
+ # The timer for each category is read from the adaptive scheduler,
663
+ # which remembers how "active" each category was in past runs:
664
+ # - 'ai' category gets lots of articles → runs every 5 minutes
665
+ # - 'cloud-alibaba' is quiet → runs every 60 minutes
666
+ # - Most categories start at 15 minutes (the default)
667
+ #
668
+ # After every run, the job updates its own timer if the velocity changed.
669
+ # No server restart needed.
670
+ # -------------------------------------------------------------------------
671
+ adaptive = _get_adaptive() # initializes singleton + loads saved intervals
672
+
673
+ for idx, category in enumerate(CATEGORIES, start=1):
674
+ initial_interval = adaptive.get_interval(category) # minutes
675
+ job_id = f"fetch_{category}"
676
+
677
+ scheduler.add_job(
678
+ fetch_single_category_job,
679
+ trigger=IntervalTrigger(minutes=initial_interval),
680
+ args=[category],
681
+ id=job_id,
682
+ name=f"News Fetcher: {category} (every {initial_interval}min)",
683
+ replace_existing=True
684
+ )
685
+ logger.info(
686
+ " ✓ [%02d/%02d] %-30s → every %d min",
687
+ idx, len(CATEGORIES), category, initial_interval
688
+ )
689
+
690
  logger.info("")
691
+ logger.info("✅ Job #1 Group Registered: 📰 %d Adaptive News Fetchers", len(CATEGORIES))
692
+ logger.info(" Intervals range from 5 min (high-velocity) to 60 min (quiet)")
 
693
 
694
  # Cleanup Job (Frequency: Every 30 minutes)
695
  scheduler.add_job(
app/services/upstash_cache.py CHANGED
@@ -61,14 +61,25 @@ class UpstashCache:
61
  self.enabled = enabled
62
  self.default_ttl = default_ttl
63
 
64
- # HTTP client with timeout
65
- self.client = httpx.Client(
66
- timeout=5.0, # 5 second timeout
67
- headers={
68
- "Authorization": f"Bearer {rest_token}",
69
- "Content-Type": "application/json"
70
- }
71
- )
 
 
 
 
 
 
 
 
 
 
 
72
 
73
  # Stats tracking
74
  self.stats = {
@@ -88,7 +99,7 @@ class UpstashCache:
88
  logger.info(f" Free Tier: 256 MB data, 50 GB/month bandwidth")
89
  logger.info("=" * 70)
90
 
91
- def _execute_command(self, command: list) -> Optional[Any]:
92
  """
93
  Execute Redis command via REST API
94
 
@@ -102,7 +113,8 @@ class UpstashCache:
102
  return None
103
 
104
  try:
105
- response = self.client.post(
 
106
  f"{self.rest_url}",
107
  json=command
108
  )
@@ -120,7 +132,7 @@ class UpstashCache:
120
  self.stats['errors'] += 1
121
  return None
122
 
123
- def get(self, key: str) -> Optional[Any]:
124
  """
125
  Get value from cache
126
 
@@ -134,7 +146,7 @@ class UpstashCache:
134
  return None
135
 
136
  try:
137
- result = self._execute_command(["GET", key])
138
 
139
  if result is None:
140
  self.stats['misses'] += 1
@@ -152,7 +164,7 @@ class UpstashCache:
152
  self.stats['errors'] += 1
153
  return None
154
 
155
- def set(
156
  self,
157
  key: str,
158
  value: Any,
@@ -185,7 +197,7 @@ class UpstashCache:
185
  ttl_seconds = ttl if ttl is not None else self.default_ttl
186
 
187
  # SETEX command (set with expiration)
188
- result = self._execute_command(["SETEX", key, ttl_seconds, serialized])
189
 
190
  if result == "OK" or result is not None:
191
  self.stats['sets'] += 1
@@ -199,7 +211,7 @@ class UpstashCache:
199
  self.stats['errors'] += 1
200
  return False
201
 
202
- def delete(self, key: str) -> bool:
203
  """
204
  Delete key from cache
205
 
@@ -213,7 +225,7 @@ class UpstashCache:
213
  return False
214
 
215
  try:
216
- result = self._execute_command(["DEL", key])
217
  deleted = result == 1
218
 
219
  if deleted:
@@ -225,7 +237,7 @@ class UpstashCache:
225
  logger.error(f"❌ Cache delete error for {key}: {e}")
226
  return False
227
 
228
- def invalidate_pattern(self, pattern: str) -> int:
229
  """
230
  Invalidate all keys matching pattern
231
 
@@ -240,14 +252,14 @@ class UpstashCache:
240
 
241
  try:
242
  # Get all matching keys
243
- keys = self._execute_command(["KEYS", pattern])
244
 
245
  if not keys:
246
  return 0
247
 
248
  # Delete all keys
249
  for key in keys:
250
- self._execute_command(["DEL", key])
251
 
252
  logger.info(f"🗑️ Invalidated {len(keys)} keys matching '{pattern}'")
253
  return len(keys)
@@ -288,7 +300,7 @@ class UpstashCache:
288
  logger.info("=" * 70)
289
  logger.info("")
290
 
291
- def health_check(self) -> bool:
292
  """
293
  Check if Upstash is reachable
294
 
@@ -296,7 +308,7 @@ class UpstashCache:
296
  True if healthy, False otherwise
297
  """
298
  try:
299
- result = self._execute_command(["PING"])
300
  healthy = result == "PONG"
301
 
302
  if healthy:
@@ -310,10 +322,10 @@ class UpstashCache:
310
  logger.error(f"❌ Upstash health check error: {e}")
311
  return False
312
 
313
- def close(self):
314
  """Close HTTP client"""
315
- if hasattr(self, 'client'):
316
- self.client.close()
317
 
318
 
319
  # Global singleton instance
 
61
  self.enabled = enabled
62
  self.default_ttl = default_ttl
63
 
64
+ # Stats tracking
65
+ self.stats = {
66
+ 'hits': 0,
67
+ 'misses': 0,
68
+ 'sets': 0,
69
+ 'errors': 0
70
+ }
71
+
72
+ def _get_client(self) -> httpx.AsyncClient:
73
+ """Lazy initialization of httpx client to avoid asyncio loop issues on Windows"""
74
+ if not hasattr(self, '_client') or self._client is None:
75
+ self._client = httpx.AsyncClient(
76
+ timeout=5.0, # 5 second timeout
77
+ headers={
78
+ "Authorization": f"Bearer {self.rest_token}",
79
+ "Content-Type": "application/json"
80
+ }
81
+ )
82
+ return self._client
83
 
84
  # Stats tracking
85
  self.stats = {
 
99
  logger.info(f" Free Tier: 256 MB data, 50 GB/month bandwidth")
100
  logger.info("=" * 70)
101
 
102
+ async def _execute_command(self, command: list) -> Optional[Any]:
103
  """
104
  Execute Redis command via REST API
105
 
 
113
  return None
114
 
115
  try:
116
+ client = self._get_client()
117
+ response = await client.post(
118
  f"{self.rest_url}",
119
  json=command
120
  )
 
132
  self.stats['errors'] += 1
133
  return None
134
 
135
+ async def get(self, key: str) -> Optional[Any]:
136
  """
137
  Get value from cache
138
 
 
146
  return None
147
 
148
  try:
149
+ result = await self._execute_command(["GET", key])
150
 
151
  if result is None:
152
  self.stats['misses'] += 1
 
164
  self.stats['errors'] += 1
165
  return None
166
 
167
+ async def set(
168
  self,
169
  key: str,
170
  value: Any,
 
197
  ttl_seconds = ttl if ttl is not None else self.default_ttl
198
 
199
  # SETEX command (set with expiration)
200
+ result = await self._execute_command(["SETEX", key, ttl_seconds, serialized])
201
 
202
  if result == "OK" or result is not None:
203
  self.stats['sets'] += 1
 
211
  self.stats['errors'] += 1
212
  return False
213
 
214
+ async def delete(self, key: str) -> bool:
215
  """
216
  Delete key from cache
217
 
 
225
  return False
226
 
227
  try:
228
+ result = await self._execute_command(["DEL", key])
229
  deleted = result == 1
230
 
231
  if deleted:
 
237
  logger.error(f"❌ Cache delete error for {key}: {e}")
238
  return False
239
 
240
+ async def invalidate_pattern(self, pattern: str) -> int:
241
  """
242
  Invalidate all keys matching pattern
243
 
 
252
 
253
  try:
254
  # Get all matching keys
255
+ keys = await self._execute_command(["KEYS", pattern])
256
 
257
  if not keys:
258
  return 0
259
 
260
  # Delete all keys
261
  for key in keys:
262
+ await self._execute_command(["DEL", key])
263
 
264
  logger.info(f"🗑️ Invalidated {len(keys)} keys matching '{pattern}'")
265
  return len(keys)
 
300
  logger.info("=" * 70)
301
  logger.info("")
302
 
303
+ async def health_check(self) -> bool:
304
  """
305
  Check if Upstash is reachable
306
 
 
308
  True if healthy, False otherwise
309
  """
310
  try:
311
+ result = await self._execute_command(["PING"])
312
  healthy = result == "PONG"
313
 
314
  if healthy:
 
322
  logger.error(f"❌ Upstash health check error: {e}")
323
  return False
324
 
325
+ async def close(self):
326
  """Close HTTP client"""
327
+ if hasattr(self, '_client') and self._client is not None:
328
+ await self._client.aclose()
329
 
330
 
331
  # Global singleton instance
app/utils/data_validation.py CHANGED
@@ -238,6 +238,8 @@ def is_relevant_to_category(article: Union[Dict, 'Article'], category: str) -> b
238
  article_dict = article
239
 
240
  # Category keyword dictionaries
 
 
241
  CATEGORY_KEYWORDS = {
242
  'ai': [
243
  'ai', 'artificial intelligence', 'machine learning', 'deep learning',
@@ -260,6 +262,11 @@ def is_relevant_to_category(article: Union[Dict, 'Article'], category: str) -> b
260
  'data engineering', 'pipeline', 'etl', 'big data', 'spark',
261
  'hadoop', 'kafka', 'airflow', 'data warehouse', 'snowflake'
262
  ],
 
 
 
 
 
263
  'business-intelligence': [
264
  'business intelligence', 'bi', 'analytics', 'dashboard',
265
  'tableau', 'power bi', 'looker', 'reporting', 'kpi'
@@ -281,6 +288,40 @@ def is_relevant_to_category(article: Union[Dict, 'Article'], category: str) -> b
281
  'alibaba cloud', 'tencent cloud', 'huawei cloud', 'cloudflare',
282
  'saas', 'paas', 'iaas', 'serverless', 'kubernetes'
283
  ],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284
  'medium-article': [
285
  'medium', 'article', 'blog', 'writing', 'publishing',
286
  'content', 'story', 'author', 'blogging'
@@ -298,11 +339,27 @@ def is_relevant_to_category(article: Union[Dict, 'Article'], category: str) -> b
298
  # Unknown category - allow (don't reject)
299
  return True
300
 
301
- # Combine title and description for checking
302
- # FIX: Use (value or '') pattern to handle explicit None values from messy RSS feeds
 
 
 
 
303
  title = (article_dict.get('title') or '').lower()
304
  description = (article_dict.get('description') or '').lower()
305
- text = f"{title} {description}"
 
 
 
 
 
 
 
 
 
 
 
 
306
 
307
  # Count keyword matches
308
  matches = sum(1 for keyword in keywords if keyword.lower() in text)
 
238
  article_dict = article
239
 
240
  # Category keyword dictionaries
241
+ # Each category has a list of words we scan for in the article's title,
242
+ # description, AND URL path. If at least one word matches, the article passes.
243
  CATEGORY_KEYWORDS = {
244
  'ai': [
245
  'ai', 'artificial intelligence', 'machine learning', 'deep learning',
 
262
  'data engineering', 'pipeline', 'etl', 'big data', 'spark',
263
  'hadoop', 'kafka', 'airflow', 'data warehouse', 'snowflake'
264
  ],
265
+ 'data-management': [
266
+ 'data management', 'master data', 'mdm', 'data catalog',
267
+ 'data quality', 'data lineage', 'data stewardship',
268
+ 'data governance', 'data integration', 'reference data'
269
+ ],
270
  'business-intelligence': [
271
  'business intelligence', 'bi', 'analytics', 'dashboard',
272
  'tableau', 'power bi', 'looker', 'reporting', 'kpi'
 
288
  'alibaba cloud', 'tencent cloud', 'huawei cloud', 'cloudflare',
289
  'saas', 'paas', 'iaas', 'serverless', 'kubernetes'
290
  ],
291
+ # ── Cloud sub-categories (each maps to a specific provider) ──────────
292
+ 'cloud-aws': [
293
+ 'aws', 'amazon web services', 's3', 'ec2', 'lambda',
294
+ 'cloudfront', 'sagemaker', 'dynamodb', 'amazon'
295
+ ],
296
+ 'cloud-azure': [
297
+ 'azure', 'microsoft azure', 'azure devops', 'azure ml',
298
+ 'azure openai', 'microsoft cloud'
299
+ ],
300
+ 'cloud-gcp': [
301
+ 'gcp', 'google cloud', 'bigquery', 'vertex ai',
302
+ 'cloud run', 'dataflow', 'google cloud platform'
303
+ ],
304
+ 'cloud-oracle': [
305
+ 'oracle cloud', 'oci', 'oracle database', 'oracle fusion',
306
+ 'oracle cloud infrastructure'
307
+ ],
308
+ 'cloud-ibm': [
309
+ 'ibm cloud', 'ibm watson', 'red hat', 'openshift', 'ibm z'
310
+ ],
311
+ 'cloud-alibaba': [
312
+ 'alibaba cloud', 'aliyun', 'alicloud'
313
+ ],
314
+ 'cloud-digitalocean': [
315
+ 'digitalocean', 'droplet', 'app platform'
316
+ ],
317
+ 'cloud-huawei': [
318
+ 'huawei cloud', 'huaweicloud'
319
+ ],
320
+ 'cloud-cloudflare': [
321
+ 'cloudflare', 'cloudflare workers', 'cloudflare r2',
322
+ 'cloudflare pages', 'zero trust'
323
+ ],
324
+ # ── Content / publishing categories ───────────────────────────────────
325
  'medium-article': [
326
  'medium', 'article', 'blog', 'writing', 'publishing',
327
  'content', 'story', 'author', 'blogging'
 
339
  # Unknown category - allow (don't reject)
340
  return True
341
 
342
+ # Build the text we will search for keywords.
343
+ # We use title + description as the primary source.
344
+ # We also append the article's URL path because RSS feeds (especially Google News)
345
+ # often return empty descriptions. The URL itself usually tells you what the
346
+ # article is about — e.g. "/aws-launches-new-s3-feature" clearly contains 'aws' and 's3'.
347
+ # Hyphens and slashes are replaced with spaces so words can be matched individually.
348
  title = (article_dict.get('title') or '').lower()
349
  description = (article_dict.get('description') or '').lower()
350
+
351
+ # Extract the URL path safely.
352
+ raw_url = article_dict.get('url') or ''
353
+ url_str = str(raw_url).lower()
354
+ try:
355
+ parsed_url = urlparse(url_str)
356
+ # Replace hyphens and slashes with spaces so
357
+ # "/aws-new-s3-launch" becomes "aws new s3 launch".
358
+ url_words = parsed_url.path.replace('-', ' ').replace('/', ' ')
359
+ except Exception:
360
+ url_words = ''
361
+
362
+ text = f"{title} {description} {url_words}"
363
 
364
  # Count keyword matches
365
  matches = sum(1 for keyword in keywords if keyword.lower() in text)
app/utils/redis_dedup.py ADDED
@@ -0,0 +1,102 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Redis URL Deduplication Bouncer
3
+ ================================
4
+
5
+ This is the 48-hour memory for the ingestion pipeline.
6
+
7
+ How it works (simple version):
8
+ Imagine a nightclub bouncer who keeps a list of everyone who came in
9
+ today and yesterday. If you try to enter again while still on the list,
10
+ you are turned away. After 48 hours, your name falls off the list and
11
+ you are welcome back.
12
+
13
+ That is exactly what this module does for article URLs.
14
+
15
+ Each article URL is:
16
+ 1. Cleaned and normalized (canonicalized).
17
+ 2. Converted to a short SHA-256 fingerprint (so we store 16 chars not full URLs).
18
+ 3. Checked against Upstash Redis with the command: SET key 1 EX 172800 NX
19
+ - EX 172800 = expire after 172800 seconds = 48 hours
20
+ - NX = only set if Not eXists
21
+
22
+ Redis response:
23
+ - "OK" → key did NOT exist → article is NEW → return False (not seen before)
24
+ - null → key already existed → article is DUPLICATE → return True (seen before)
25
+
26
+ Fallback:
27
+ If Upstash is not configured or Redis is unreachable, this function
28
+ safely returns False (treats every article as new). The Appwrite
29
+ database constraint is still the final safety net in that case.
30
+ """
31
+
32
+ import logging
33
+ from app.utils.url_canonicalization import canonicalize_url, get_url_hash
34
+ from app.services.upstash_cache import get_upstash_cache
35
+
36
+ logger = logging.getLogger(__name__)
37
+
38
+ # Redis key prefix for URL deduplication keys.
39
+ # Keeps our keys clearly separate from the article-cache keys.
40
+ _KEY_PREFIX = "seen_url:"
41
+
42
+ # 48 hours expressed in seconds.
43
+ # This matches the cleanup janitor in scheduler.py which also deletes
44
+ # articles older than 48 hours. When an article is deleted from the
45
+ # database, its Redis key will also expire around the same time,
46
+ # allowing the article to be re-ingested if it genuinely resurfaces.
47
+ _TTL_SECONDS = 172_800 # 48 * 60 * 60
48
+
49
+
50
+ async def is_url_seen_or_mark(raw_url: str) -> bool:
51
+ """
52
+ Check if we have seen this article URL in the last 48 hours.
53
+ If we have NOT seen it, mark it as seen so future checks catch it.
54
+
55
+ Args:
56
+ raw_url: The article URL (any format — we normalize it internally).
57
+
58
+ Returns:
59
+ True → We have seen this URL before. It is a duplicate. Skip it.
60
+ False → This URL is brand new. The article was also marked in Redis
61
+ so the next run will correctly identify it as a duplicate.
62
+ """
63
+ if not raw_url:
64
+ # No URL means we cannot deduplicate. Let it through.
65
+ return False
66
+
67
+ try:
68
+ # Step 1: Normalize the URL so different versions of the same link
69
+ # (http vs https, trailing slash, utm_ params) all produce the same key.
70
+ canonical = canonicalize_url(str(raw_url))
71
+
72
+ # Step 2: Convert to a short hash so our Redis keys are tiny and uniform.
73
+ url_hash = get_url_hash(canonical)
74
+ redis_key = f"{_KEY_PREFIX}{url_hash}"
75
+
76
+ # Step 3: Get the Upstash client (shared singleton — already used by cache).
77
+ cache = get_upstash_cache()
78
+
79
+ # Step 4: Try to set the key WITH NX (only if it does not already exist).
80
+ # This is an atomic check-and-set: no race condition possible.
81
+ # Command: SET seen_url:{hash} 1 EX 172800 NX
82
+ result = await cache._execute_command(
83
+ ["SET", redis_key, "1", "EX", _TTL_SECONDS, "NX"]
84
+ )
85
+
86
+ if result == "OK":
87
+ # Redis successfully created the key → this URL is NEW.
88
+ return False # Not a duplicate — let the article through.
89
+ else:
90
+ # Redis returned null → key already existed → DUPLICATE.
91
+ logger.debug("[REDIS DEDUP] Duplicate detected: %s", redis_key)
92
+ return True # It's a duplicate — skip this article.
93
+
94
+ except Exception as e:
95
+ # Something went wrong with Redis (network error, timeout, etc.).
96
+ # We NEVER block an article because of a Redis failure.
97
+ # The Appwrite database will still catch true duplicates as a safety net.
98
+ logger.warning(
99
+ "[REDIS DEDUP] Redis check failed (%s) — letting article through as safe fallback.",
100
+ e
101
+ )
102
+ return False # Safe fallback: treat as new article.
app/verify_manual_fetch.py DELETED
@@ -1,87 +0,0 @@
1
-
2
- import asyncio
3
- import logging
4
- from app.services.appwrite_db import get_appwrite_db
5
- from app.config import settings
6
- from appwrite.query import Query
7
- from datetime import datetime, timedelta
8
-
9
- # Setup logging
10
- logging.basicConfig(level=logging.INFO)
11
- logger = logging.getLogger("Verifier")
12
-
13
- async def verify_stored_articles():
14
- print("="*60)
15
- print("🔍 VERIFYING FETCHED ARTICLES IN APPWRITE")
16
- print("="*60)
17
-
18
- appwrite_db = get_appwrite_db()
19
-
20
- # Check a few key categories
21
- categories_to_check = ["ai", "cloud-computing", "data-security"]
22
-
23
- total_found = 0
24
-
25
- for category in categories_to_check:
26
- try:
27
- # Query for articles created in the last 1 hour
28
- # Note: 'created_at' is internal Appwrite, 'publishedAt' is article time
29
- # We'll check 'publishedAt' as a proxy for recent content
30
- # OR just list the latest documents
31
-
32
- # Using the collection ID for the category (which is actually just the main collection with category filter in this architecture)
33
- # Wait, the architecture uses specific collections for specific types OR one collection with category field?
34
- # scheduler.py uses:
35
- # settings.APPWRITE_COLLECTION_ID for "Regular News"
36
- # settings.APPWRITE_CLOUD_COLLECTION_ID for "Cloud News"
37
- # settings.APPWRITE_AI_COLLECTION_ID for "AI News"
38
-
39
- collection_id = None
40
- if category == "ai":
41
- collection_id = settings.APPWRITE_AI_COLLECTION_ID
42
- elif category == "cloud-computing":
43
- collection_id = settings.APPWRITE_CLOUD_COLLECTION_ID
44
- else:
45
- collection_id = settings.APPWRITE_COLLECTION_ID # Default/Data
46
-
47
- print(f"\n📂 Checking collection for category: {category.upper()}")
48
- print(f" ID: {collection_id}")
49
-
50
- if not collection_id:
51
- print(" ⚠️ Collection ID not configured")
52
- continue
53
-
54
- response = appwrite_db.databases.list_documents(
55
- database_id=settings.APPWRITE_DATABASE_ID,
56
- collection_id=collection_id,
57
- queries=[
58
- Query.limit(5),
59
- Query.order_desc('$createdAt') # Get most recently created
60
- # Query.equal('category', category) # Optional if collection is mixed
61
- ]
62
- )
63
-
64
- count = len(response['documents'])
65
- print(f" ✅ Found {count} recent documents")
66
-
67
- if count > 0:
68
- for doc in response['documents']:
69
- title = doc.get('title', 'No Title')
70
- created_at = doc.get('$createdAt', 'Unknown')
71
- print(f" - [{created_at}] {title[:60]}...")
72
- total_found += 1
73
- else:
74
- print(" ❌ No documents found. Fetch may have failed.")
75
-
76
- except Exception as e:
77
- print(f" ❌ Error querying collection: {e}")
78
-
79
- print("\n" + "="*60)
80
- if total_found > 0:
81
- print(f"✅ VERIFICATION PASSED: Found {total_found} recent articles.")
82
- else:
83
- print("❌ VERIFICATION FAILED: No recent articles found.")
84
- print("="*60)
85
-
86
- if __name__ == "__main__":
87
- asyncio.run(verify_stored_articles())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/verify_simple.py DELETED
@@ -1,30 +0,0 @@
1
-
2
- import asyncio
3
- import sys
4
- from app.services.appwrite_db import get_appwrite_db
5
- from app.config import settings
6
- from appwrite.query import Query
7
-
8
- async def verify():
9
- print("STARTING VERIFICATION", flush=True)
10
- try:
11
- appwrite_db = get_appwrite_db()
12
- print(f"DB Initialized: {appwrite_db.initialized}", flush=True)
13
-
14
- # Check AI news
15
- print(f"Checking AI News collection: {settings.APPWRITE_AI_COLLECTION_ID}", flush=True)
16
- response = appwrite_db.databases.list_documents(
17
- database_id=settings.APPWRITE_DATABASE_ID,
18
- collection_id=settings.APPWRITE_AI_COLLECTION_ID,
19
- queries=[Query.limit(5), Query.order_desc('$createdAt')]
20
- )
21
- print(f"Found {len(response['documents'])} docs", flush=True)
22
- for doc in response['documents']:
23
- print(f"- {doc.get('title', 'No Title')[:50]}...", flush=True)
24
-
25
- except Exception as e:
26
- print(f"ERROR: {e}", flush=True)
27
- print("DONE", flush=True)
28
-
29
- if __name__ == "__main__":
30
- asyncio.run(verify())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
data/velocity_tracking.json CHANGED
@@ -1,210 +1,210 @@
1
  {
2
  "ai": {
3
- "interval": 15,
4
  "history": [
5
- 26,
6
  0,
7
  0,
8
  6,
9
- 9
 
10
  ],
11
- "last_fetch": "2026-02-14T13:58:05.405121",
12
- "total_fetches": 21,
13
- "total_articles": 283
14
  },
15
  "data-security": {
16
- "interval": 15,
17
  "history": [
18
- 22,
19
  0,
20
  0,
21
  3,
22
- 2
 
23
  ],
24
- "last_fetch": "2026-02-14T13:58:05.410615",
25
- "total_fetches": 21,
26
- "total_articles": 186
27
  },
28
  "data-governance": {
29
  "interval": 60,
30
  "history": [
31
- 19,
32
  0,
33
  0,
34
  3,
35
- 2
 
36
  ],
37
- "last_fetch": "2026-02-14T13:58:05.416621",
38
- "total_fetches": 21,
39
- "total_articles": 178
40
  },
41
  "data-privacy": {
42
- "interval": 15,
43
  "history": [
44
- 23,
45
  0,
46
  0,
47
  5,
 
48
  5
49
  ],
50
- "last_fetch": "2026-02-14T13:58:05.421392",
51
- "total_fetches": 21,
52
- "total_articles": 210
53
  },
54
  "data-engineering": {
55
  "interval": 60,
56
  "history": [
57
- 14,
58
  0,
59
  0,
60
  2,
 
61
  5
62
  ],
63
- "last_fetch": "2026-02-14T13:58:05.424148",
64
- "total_fetches": 21,
65
- "total_articles": 191
66
  },
67
  "data-management": {
68
  "interval": 15,
69
  "history": [
70
- 10,
71
  0,
72
  0,
73
  10,
 
74
  10
75
  ],
76
- "last_fetch": "2026-02-14T13:58:05.430647",
77
- "total_fetches": 21,
78
- "total_articles": 271
79
  },
80
  "business-intelligence": {
81
- "interval": 15,
82
  "history": [
83
- 23,
84
  0,
85
  0,
86
  8,
87
- 8
 
88
  ],
89
- "last_fetch": "2026-02-14T13:58:05.435766",
90
- "total_fetches": 21,
91
- "total_articles": 255
92
  },
93
  "business-analytics": {
94
- "interval": 15,
95
  "history": [
96
- 20,
97
  0,
98
  0,
99
  6,
100
- 6
 
101
  ],
102
- "last_fetch": "2026-02-14T13:58:05.438090",
103
- "total_fetches": 21,
104
- "total_articles": 203
105
  },
106
  "customer-data-platform": {
107
  "interval": 15,
108
  "history": [
109
- 26,
110
  0,
111
  0,
112
  9,
 
113
  9
114
  ],
115
- "last_fetch": "2026-02-14T13:58:05.440124",
116
- "total_fetches": 21,
117
- "total_articles": 247
118
  },
119
  "data-centers": {
120
- "interval": 15,
121
  "history": [
122
- 27,
123
  0,
124
  0,
125
  6,
126
- 6
 
127
  ],
128
- "last_fetch": "2026-02-14T13:58:05.443427",
129
- "total_fetches": 21,
130
- "total_articles": 318
131
  },
132
  "cloud-computing": {
133
- "interval": 15,
134
  "history": [
135
- 24,
136
  0,
137
  0,
138
  5,
139
- 7
 
140
  ],
141
- "last_fetch": "2026-02-14T13:58:05.448267",
142
- "total_fetches": 21,
143
- "total_articles": 400
144
  },
145
  "magazines": {
146
  "interval": 60,
147
  "history": [
148
- 0,
149
  0,
150
  0,
151
  6,
152
- 5
 
153
  ],
154
- "last_fetch": "2026-02-14T13:58:05.451427",
155
- "total_fetches": 21,
156
- "total_articles": 106
157
  },
158
  "data-laws": {
159
  "interval": 15,
160
  "history": [
161
- 29,
162
  0,
163
  0,
164
  9,
165
- 9
 
166
  ],
167
- "last_fetch": "2026-02-14T13:58:05.455106",
168
- "total_fetches": 21,
169
- "total_articles": 327
170
  },
171
  "cloud-aws": {
172
  "interval": 15,
173
  "history": [
174
- 29,
175
  0,
176
  0,
177
  10,
 
178
  10
179
  ],
180
- "last_fetch": "2026-02-14T13:58:05.456886",
181
- "total_fetches": 21,
182
- "total_articles": 514
183
  },
184
  "cloud-azure": {
185
  "interval": 15,
186
  "history": [
187
- 20,
188
  0,
189
  0,
190
  10,
 
191
  10
192
  ],
193
- "last_fetch": "2026-02-14T13:58:05.459585",
194
- "total_fetches": 21,
195
- "total_articles": 360
196
  },
197
  "cloud-gcp": {
198
  "interval": 60,
199
  "history": [
200
- 20,
201
  0,
202
  0,
203
  0,
204
  0
205
  ],
206
- "last_fetch": "2026-02-14T13:58:05.464655",
207
- "total_fetches": 21,
208
  "total_articles": 380
209
  },
210
  "cloud-oracle": {
@@ -216,8 +216,8 @@
216
  0,
217
  0
218
  ],
219
- "last_fetch": "2026-02-14T13:58:05.467651",
220
- "total_fetches": 21,
221
  "total_articles": 20
222
  },
223
  "cloud-ibm": {
@@ -229,8 +229,8 @@
229
  0,
230
  0
231
  ],
232
- "last_fetch": "2026-02-14T13:58:05.471190",
233
- "total_fetches": 21,
234
  "total_articles": 20
235
  },
236
  "cloud-alibaba": {
@@ -242,8 +242,8 @@
242
  0,
243
  0
244
  ],
245
- "last_fetch": "2026-02-14T13:58:05.472734",
246
- "total_fetches": 21,
247
  "total_articles": 20
248
  },
249
  "cloud-digitalocean": {
@@ -255,8 +255,8 @@
255
  0,
256
  0
257
  ],
258
- "last_fetch": "2026-02-14T13:58:05.474379",
259
- "total_fetches": 21,
260
  "total_articles": 20
261
  },
262
  "cloud-huawei": {
@@ -268,21 +268,21 @@
268
  0,
269
  0
270
  ],
271
- "last_fetch": "2026-02-14T13:58:05.478287",
272
- "total_fetches": 21,
273
  "total_articles": 20
274
  },
275
  "cloud-cloudflare": {
276
  "interval": 60,
277
  "history": [
278
- 20,
279
  0,
280
  0,
281
  0,
282
  0
283
  ],
284
- "last_fetch": "2026-02-14T13:58:05.483222",
285
- "total_fetches": 21,
286
  "total_articles": 360
287
  }
288
  }
 
1
  {
2
  "ai": {
3
+ "interval": 60,
4
  "history": [
 
5
  0,
6
  0,
7
  6,
8
+ 9,
9
+ 8
10
  ],
11
+ "last_fetch": "2026-02-18T12:47:11.378844",
12
+ "total_fetches": 22,
13
+ "total_articles": 291
14
  },
15
  "data-security": {
16
+ "interval": 60,
17
  "history": [
 
18
  0,
19
  0,
20
  3,
21
+ 2,
22
+ 6
23
  ],
24
+ "last_fetch": "2026-02-18T12:47:11.386589",
25
+ "total_fetches": 22,
26
+ "total_articles": 192
27
  },
28
  "data-governance": {
29
  "interval": 60,
30
  "history": [
 
31
  0,
32
  0,
33
  3,
34
+ 2,
35
+ 6
36
  ],
37
+ "last_fetch": "2026-02-18T12:47:11.406642",
38
+ "total_fetches": 22,
39
+ "total_articles": 184
40
  },
41
  "data-privacy": {
42
+ "interval": 60,
43
  "history": [
 
44
  0,
45
  0,
46
  5,
47
+ 5,
48
  5
49
  ],
50
+ "last_fetch": "2026-02-18T12:47:11.415936",
51
+ "total_fetches": 22,
52
+ "total_articles": 215
53
  },
54
  "data-engineering": {
55
  "interval": 60,
56
  "history": [
 
57
  0,
58
  0,
59
  2,
60
+ 5,
61
  5
62
  ],
63
+ "last_fetch": "2026-02-18T12:47:11.419143",
64
+ "total_fetches": 22,
65
+ "total_articles": 196
66
  },
67
  "data-management": {
68
  "interval": 15,
69
  "history": [
 
70
  0,
71
  0,
72
  10,
73
+ 10,
74
  10
75
  ],
76
+ "last_fetch": "2026-02-18T12:47:11.429711",
77
+ "total_fetches": 22,
78
+ "total_articles": 281
79
  },
80
  "business-intelligence": {
81
+ "interval": 60,
82
  "history": [
 
83
  0,
84
  0,
85
  8,
86
+ 8,
87
+ 7
88
  ],
89
+ "last_fetch": "2026-02-18T12:47:11.431810",
90
+ "total_fetches": 22,
91
+ "total_articles": 262
92
  },
93
  "business-analytics": {
94
+ "interval": 60,
95
  "history": [
 
96
  0,
97
  0,
98
  6,
99
+ 6,
100
+ 7
101
  ],
102
+ "last_fetch": "2026-02-18T12:47:11.433518",
103
+ "total_fetches": 22,
104
+ "total_articles": 210
105
  },
106
  "customer-data-platform": {
107
  "interval": 15,
108
  "history": [
 
109
  0,
110
  0,
111
  9,
112
+ 9,
113
  9
114
  ],
115
+ "last_fetch": "2026-02-18T12:47:11.435120",
116
+ "total_fetches": 22,
117
+ "total_articles": 256
118
  },
119
  "data-centers": {
120
+ "interval": 60,
121
  "history": [
 
122
  0,
123
  0,
124
  6,
125
+ 6,
126
+ 8
127
  ],
128
+ "last_fetch": "2026-02-18T12:47:11.436689",
129
+ "total_fetches": 22,
130
+ "total_articles": 326
131
  },
132
  "cloud-computing": {
133
+ "interval": 60,
134
  "history": [
 
135
  0,
136
  0,
137
  5,
138
+ 7,
139
+ 8
140
  ],
141
+ "last_fetch": "2026-02-18T12:47:11.437988",
142
+ "total_fetches": 22,
143
+ "total_articles": 408
144
  },
145
  "magazines": {
146
  "interval": 60,
147
  "history": [
 
148
  0,
149
  0,
150
  6,
151
+ 5,
152
+ 1
153
  ],
154
+ "last_fetch": "2026-02-18T12:47:11.439034",
155
+ "total_fetches": 22,
156
+ "total_articles": 107
157
  },
158
  "data-laws": {
159
  "interval": 15,
160
  "history": [
 
161
  0,
162
  0,
163
  9,
164
+ 9,
165
+ 10
166
  ],
167
+ "last_fetch": "2026-02-18T12:47:11.440244",
168
+ "total_fetches": 22,
169
+ "total_articles": 337
170
  },
171
  "cloud-aws": {
172
  "interval": 15,
173
  "history": [
 
174
  0,
175
  0,
176
  10,
177
+ 10,
178
  10
179
  ],
180
+ "last_fetch": "2026-02-18T12:47:11.441717",
181
+ "total_fetches": 22,
182
+ "total_articles": 524
183
  },
184
  "cloud-azure": {
185
  "interval": 15,
186
  "history": [
 
187
  0,
188
  0,
189
  10,
190
+ 10,
191
  10
192
  ],
193
+ "last_fetch": "2026-02-18T12:47:11.443201",
194
+ "total_fetches": 22,
195
+ "total_articles": 370
196
  },
197
  "cloud-gcp": {
198
  "interval": 60,
199
  "history": [
200
+ 0,
201
  0,
202
  0,
203
  0,
204
  0
205
  ],
206
+ "last_fetch": "2026-02-18T12:47:11.444443",
207
+ "total_fetches": 22,
208
  "total_articles": 380
209
  },
210
  "cloud-oracle": {
 
216
  0,
217
  0
218
  ],
219
+ "last_fetch": "2026-02-18T12:47:11.445683",
220
+ "total_fetches": 22,
221
  "total_articles": 20
222
  },
223
  "cloud-ibm": {
 
229
  0,
230
  0
231
  ],
232
+ "last_fetch": "2026-02-18T12:47:11.446826",
233
+ "total_fetches": 22,
234
  "total_articles": 20
235
  },
236
  "cloud-alibaba": {
 
242
  0,
243
  0
244
  ],
245
+ "last_fetch": "2026-02-18T12:47:11.450091",
246
+ "total_fetches": 22,
247
  "total_articles": 20
248
  },
249
  "cloud-digitalocean": {
 
255
  0,
256
  0
257
  ],
258
+ "last_fetch": "2026-02-18T12:47:11.455437",
259
+ "total_fetches": 22,
260
  "total_articles": 20
261
  },
262
  "cloud-huawei": {
 
268
  0,
269
  0
270
  ],
271
+ "last_fetch": "2026-02-18T12:47:11.461316",
272
+ "total_fetches": 22,
273
  "total_articles": 20
274
  },
275
  "cloud-cloudflare": {
276
  "interval": 60,
277
  "history": [
278
+ 0,
279
  0,
280
  0,
281
  0,
282
  0
283
  ],
284
+ "last_fetch": "2026-02-18T12:47:11.464407",
285
+ "total_fetches": 22,
286
  "total_articles": 360
287
  }
288
  }
docs/appwrite_schema.md DELETED
@@ -1,77 +0,0 @@
1
- # Appwrite Database Schema Configuration
2
- # Instructions for setting up indexes in Appwrite Console
3
-
4
- ## Collection: articles
5
-
6
- ### Attributes
7
- ```json
8
- {
9
- "$id": "string (16 chars, auto-generated)",
10
- "url_hash": "string (16 chars, required, unique)",
11
- "title": "string (500 chars, required)",
12
- "url": "string (2048 chars, required)",
13
- "description": "string (2000 chars, optional)",
14
- "image_url": "string (1000 chars, optional)",
15
- "published_at": "string (50 chars, required, ISO format)",
16
- "category": "string (50 chars, required)",
17
- "source": "string (200 chars, optional)",
18
- "fetched_at": "string (50 chars, required, ISO format)",
19
- "slug": "string (200 chars, optional)",
20
- "quality_score": "integer (optional, default: 50)"
21
- }
22
- ```
23
-
24
- ### Indexes (CRITICAL FOR PERFORMANCE)
25
-
26
- #### 1. Primary Index: url_hash (Unique Constraint)
27
- - **Type:** unique
28
- - **Attribute:** url_hash
29
- - **Order:** ASC
30
- - **Purpose:** Prevents duplicate articles at database level
31
- - **Impact:** Enforces data integrity, eliminates dedup logic in code
32
-
33
- #### 2. Composite Index: category + published_at (MOST IMPORTANT)
34
- - **Type:** key
35
- - **Attributes:** [category, published_at]
36
- - **Orders:** [ASC, DESC]
37
- - **Purpose:** Powers the main query: "Get latest articles for category X"
38
- - **Impact:** 40x faster than full table scan
39
- - **Query Example:**
40
- ```sql
41
- WHERE category = 'ai' ORDER BY published_at DESC LIMIT 20
42
- ```
43
-
44
- #### 3. Index: published_at (For Global Feed)
45
- - **Type:** key
46
- - **Attribute:** published_at
47
- - **Order:** DESC
48
- - **Purpose:** Get latest articles across all categories
49
- - **Impact:** Fast global news feed
50
- - **Query Example:**
51
- ```sql
52
- ORDER BY published_at DESC LIMIT 50
53
- ```
54
-
55
- #### 4. Index: source (For Analytics)
56
- - **Type:** key
57
- - **Attribute:** source
58
- - **Order:** ASC
59
- - **Purpose:** Provider statistics and filtering
60
- - **Impact:** Fast source-based queries
61
-
62
- ## Setup Instructions
63
-
64
- ### Via Appwrite Console:
65
- 1. Go to Databases → articles collection
66
- 2. Click "Indexes" tab
67
- 3. Add each index with the specifications above
68
-
69
- ### Expected Performance Gains:
70
- - List query (category filter): 40x faster
71
- - Global feed query: 30x faster
72
- - Deduplication: Automatic (no code needed)
73
-
74
- ## Migration Notes
75
- - Existing articles will be automatically indexed
76
- - Index creation may take a few minutes for large collections
77
- - No downtime required
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
docs/phase2_implementation_guide.md DELETED
@@ -1,269 +0,0 @@
1
- # Phase 2: Database Schema Enhancement - Implementation Guide
2
-
3
- ## Overview
4
- This guide walks you through adding indexes and new fields to your Appwrite database for FAANG-level performance.
5
-
6
- ---
7
-
8
- ## Step 1: Add New Attributes (Appwrite Console)
9
-
10
- ### Navigate to Database
11
- 1. Go to https://cloud.appwrite.io/console
12
- 2. Select your project
13
- 3. Go to Databases → Your Database → `articles` collection
14
- 4. Click "Attributes" tab
15
-
16
- ### Add New Attributes
17
-
18
- #### Attribute 1: slug
19
- - **Key:** `slug`
20
- - **Type:** String
21
- - **Size:** 200
22
- - **Required:** No (will be populated by migration)
23
- - **Default:** "" (empty string)
24
- - **Purpose:** SEO-friendly URL slugs
25
-
26
- #### Attribute 2: quality_score
27
- - **Key:** `quality_score`
28
- - **Type:** Integer
29
- - **Required:** No
30
- - **Default:** 50
31
- - **Min:** 0
32
- - **Max:** 100
33
- - **Purpose:** Article quality ranking
34
-
35
- ### Click "Create" for each attribute
36
-
37
- ---
38
-
39
- ## Step 2: Create Indexes (Critical for Performance!)
40
-
41
- ### Navigate to Indexes
42
- 1. In the same collection, click "Indexes" tab
43
- 2. Click "Create Index" button
44
-
45
- ### Index 1: url_hash (UNIQUE CONSTRAINT)
46
- - **Key:** `idx_url_hash_unique`
47
- - **Type:** Unique
48
- - **Attributes:** Select `url_hash`
49
- - **Order:** ASC
50
- - **Purpose:** Prevents duplicate articles automatically
51
- - **Impact:** Database-level deduplication
52
-
53
- ### Index 2: category + published_at (COMPOSITE - MOST IMPORTANT!)
54
- - **Key:** `idx_category_published`
55
- - **Type:** Key
56
- - **Attributes:** Select `category` AND `published_at` (in that order)
57
- - **Orders:** `category` ASC, `published_at` DESC
58
- - **Purpose:** Powers main query: "Get latest AI articles"
59
- - **Impact:** 40x faster than without index
60
-
61
- ### Index 3: published_at (GLOBAL FEED)
62
- - **Key:** `idx_published_desc`
63
- - **Type:** Key
64
- - **Attributes:** Select `published_at`
65
- - **Order:** DESC
66
- - **Purpose:** Get latest articles across all categories
67
- - **Impact:** Fast global news feed
68
-
69
- ### Index 4: source (ANALYTICS)
70
- - **Key:** `idx_source`
71
- - **Type:** Key
72
- - **Attributes:** Select `source`
73
- - **Order:** ASC
74
- - **Purpose:** Provider statistics
75
- - **Impact:** Fast source-based filtering
76
-
77
- ### Click "Create" for each index
78
-
79
- ---
80
-
81
- ## Step 3: Run Migration Script
82
-
83
- The migration script will backfill `slug` and `quality_score` for all existing articles.
84
-
85
- ### Option A: Manual Run (Recommended for first time)
86
-
87
- ```bash
88
- # Navigate to backend directory
89
- cd SegmentoPulse/backend
90
-
91
- # Activate virtual environment (if using)
92
- source venv/bin/activate # Linux/Mac
93
- # or
94
- .venv\Scripts\activate # Windows
95
-
96
- # Run migration script
97
- python scripts/migrate_article_fields.py
98
- ```
99
-
100
- **Expected Output:**
101
- ```
102
- ========================================================
103
- 📊 Appwrite Article Migration Script
104
- ========================================================
105
- Database: segmento_db
106
- Collection: articles
107
-
108
- 📥 Fetching articles 1 to 100...
109
- 📝 Processing 100 articles...
110
- ✓ Updated: Google Announces New AI... (score: 85)
111
- ✓ Updated: Data Security Report 2026... (score: 70)
112
- ...
113
-
114
- 📥 Fetching articles 101 to 200...
115
- ...
116
-
117
- ========================================================
118
- 📊 MIGRATION SUMMARY
119
- ========================================================
120
- ✅ Updated: 1,250 articles
121
- ⏭️ Skipped: 0 articles
122
- ❌ Errors: 0 articles
123
- 📈 Total Processed: 1,250
124
- ========================================================
125
- ```
126
-
127
- ### Option B: Via Admin API (Future)
128
-
129
- ```bash
130
- # Trigger via admin endpoint (once implemented)
131
- curl -X POST http://localhost:8000/api/admin/migrate/articles
132
- ```
133
-
134
- ---
135
-
136
- ## Step 4: Verify Implementation
137
-
138
- ### Test 1: Check Indexes Are Used
139
-
140
- ```python
141
- # In Python console
142
- from app.services.appwrite_db import get_appwrite_db
143
-
144
- db = get_appwrite_db()
145
- articles = await db.get_articles('ai', limit=20)
146
-
147
- # Should see in logs:
148
- # ✓ Retrieved 20 articles for 'ai' (offset: 0, projection: ON)
149
- ```
150
-
151
- ### Test 2: Check New Fields Are Populated
152
-
153
- ```python
154
- # Verify slug and quality_score exist
155
- for article in articles[:5]:
156
- print(f"{article.get('title')}")
157
- print(f" Slug: {article.get('slug')}")
158
- print(f" Quality: {article.get('quality_score')}")
159
- print()
160
- ```
161
-
162
- **Expected:**
163
- ```
164
- Google Announces New AI Model
165
- Slug: google-announces-new-ai-model
166
- Quality: 85
167
-
168
- Apple Vision Pro 2 Released
169
- Slug: apple-vision-pro-2-released
170
- Quality: 90
171
- ```
172
-
173
- ### Test 3: Verify Deduplication
174
-
175
- ```bash
176
- # Try to trigger a news fetch manually
177
- curl -X POST http://localhost:8000/api/admin/scheduler/fetch-now
178
-
179
- # Check logs for:
180
- # ✅ ai: 20 fetched, 2 saved, 18 duplicates
181
- ```
182
-
183
- ---
184
-
185
- ## Step 5: Monitor Performance
186
-
187
- ### Before Indexes (Baseline)
188
- ```bash
189
- # Query time without indexes: ~2000ms for 1000+ articles
190
- ```
191
-
192
- ### After Indexes (Expected)
193
- ```bash
194
- # Query time with indexes: ~50ms (40x faster!) ✅
195
- ```
196
-
197
- ### Check Index Usage (Appwrite Console)
198
- 1. Go to your collection
199
- 2. Click "Indexes" tab
200
- 3. Each index should show usage statistics
201
-
202
- ---
203
-
204
- ## Troubleshooting
205
-
206
- ### Issue: "Attribute already exists"
207
- - **Solution:** The attribute was already created. Skip to next step.
208
-
209
- ### Issue: "Index creation failed"
210
- - **Cause:** May need to specify different index type or attributes
211
- - **Solution:** Check Appwrite documentation for your SDK version
212
-
213
- ### Issue: Migration script can't find articles
214
- - **Cause:** Wrong database/collection ID
215
- - **Solution:** Verify environment variables:
216
- ```bash
217
- echo $APPWRITE_DATABASE_ID
218
- echo $APPWRITE_COLLECTION_ID
219
- ```
220
-
221
- ### Issue: Migration is slow
222
- - **Cause:** Large collection (10k+ articles)
223
- - **Solution:** This is normal. Script processes 100 articles at a time.
224
- - **Time estimate:** ~1 minute per 1,000 articles
225
-
226
- ---
227
-
228
- ## Rollback Plan (If Needed)
229
-
230
- ### Remove Attributes (if needed)
231
- 1. Go to Appwrite Console → Attributes
232
- 2. Click ⋮ menu next to `slug` or `quality_score`
233
- 3. Select "Delete"
234
-
235
- ### Remove Indexes
236
- 1. Go to Appwrite Console → Indexes
237
- 2. Click ⋮ menu next to index
238
- 3. Select "Delete"
239
-
240
- **Note:** Deleting indexes won't delete data, just the index structure.
241
-
242
- ---
243
-
244
- ## Performance Impact Summary
245
-
246
- | Operation | Before | After | Improvement |
247
- |-----------|--------|-------|-------------|
248
- | **Category Query** | 2000ms | 50ms | **40x faster** |
249
- | **Duplicate Check** | App logic | DB unique constraint | **Automatic** |
250
- | **Deduplication Rate** | ~47% | ~47% | **More reliable** |
251
- | **Quality Ranking** | Not possible | Sort by score | **New feature** |
252
-
253
- ---
254
-
255
- ## Next Steps
256
-
257
- After completing Phase 2:
258
- - [ ] Verify all indexes are created
259
- - [ ] Run migration script successfully
260
- - [ ] Test query performance
261
- - [ ] Move to Phase 3: Cursor Pagination
262
-
263
- ---
264
-
265
- ## Questions?
266
-
267
- - **How often should I re-run migration?** Never. New articles automatically get slug and quality_score.
268
- - **What if I add more articles?** They'll automatically have the new fields from the updated save_articles() method.
269
- - **Can I skip indexes?** No! Indexes are critical for performance at scale.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
requirements.txt CHANGED
Binary files a/requirements.txt and b/requirements.txt differ