SHAFI commited on
Commit
6068eaf
·
1 Parent(s): 7fe7f42

bugs fixed and paid api's fetcing fixed

Browse files
app/routes/admin.py CHANGED
@@ -3,6 +3,7 @@ from typing import Dict, List
3
  import asyncio
4
  from app.services.news_aggregator import NewsAggregator
5
  from app.services.cache_service import CacheService
 
6
  from app.config import settings
7
 
8
  router = APIRouter()
@@ -50,7 +51,11 @@ async def warm_cache():
50
  - categories_failed: Number of categories that failed
51
  - details: Per-category results with article counts
52
  """
53
- news_aggregator = NewsAggregator()
 
 
 
 
54
  cache_service = CacheService()
55
 
56
  results = {
@@ -63,7 +68,7 @@ async def warm_cache():
63
  print(f"[Cache Warming] Fetching {category}...")
64
 
65
  # Fetch articles from news aggregator (tries all providers with failover)
66
- articles = await news_aggregator.fetch_by_category(category)
67
 
68
  if articles:
69
  # Cache the articles with configured TTL (600 seconds)
@@ -117,8 +122,12 @@ async def get_cache_stats():
117
  - Cache TTL configuration
118
  - Provider statistics
119
  """
 
 
120
  cache_service = CacheService()
121
- news_aggregator = NewsAggregator()
 
 
122
 
123
  cached_categories = []
124
 
@@ -131,7 +140,7 @@ async def get_cache_stats():
131
  })
132
 
133
  # Get provider statistics
134
- provider_stats = news_aggregator.get_stats()
135
 
136
  return {
137
  "cache_ttl": settings.CACHE_TTL,
@@ -239,12 +248,15 @@ async def populate_database():
239
  - Recovery after database cleanup
240
  """
241
  from app.services.appwrite_db import get_appwrite_db
242
- from app.services.news_aggregator import NewsAggregator
243
  import asyncio
244
 
245
  try:
246
  appwrite_db = get_appwrite_db()
247
- news_aggregator = NewsAggregator()
 
 
 
248
 
249
  results = {
250
  "successful": [],
@@ -256,7 +268,7 @@ async def populate_database():
256
  print(f"[DB Populate] Fetching {category}...")
257
 
258
  # Fetch articles from external APIs
259
- articles = await news_aggregator.fetch_by_category(category)
260
 
261
  if articles:
262
  # Save to Appwrite database
@@ -770,3 +782,60 @@ async def bloom_filter_health_check():
770
  "recommendation": "Check server logs for detailed error information"
771
  }
772
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  import asyncio
4
  from app.services.news_aggregator import NewsAggregator
5
  from app.services.cache_service import CacheService
6
+ from app.services.circuit_breaker import get_circuit_breaker
7
  from app.config import settings
8
 
9
  router = APIRouter()
 
51
  - categories_failed: Number of categories that failed
52
  - details: Per-category results with article counts
53
  """
54
+ from app.services.scheduler import _get_shared_aggregator
55
+
56
+ # Fix 3: Using the shared singleton ensures admin cache-warming
57
+ # respects the same API quotas and circuit breakers as the scheduler.
58
+ shared_aggregator = _get_shared_aggregator()
59
  cache_service = CacheService()
60
 
61
  results = {
 
68
  print(f"[Cache Warming] Fetching {category}...")
69
 
70
  # Fetch articles from news aggregator (tries all providers with failover)
71
+ articles = await shared_aggregator.fetch_by_category(category)
72
 
73
  if articles:
74
  # Cache the articles with configured TTL (600 seconds)
 
122
  - Cache TTL configuration
123
  - Provider statistics
124
  """
125
+ from app.services.scheduler import _get_shared_aggregator
126
+
127
  cache_service = CacheService()
128
+
129
+ # Fix 3: Get stats from the exact same instance that is doing the fetching
130
+ shared_aggregator = _get_shared_aggregator()
131
 
132
  cached_categories = []
133
 
 
140
  })
141
 
142
  # Get provider statistics
143
+ provider_stats = shared_aggregator.get_stats()
144
 
145
  return {
146
  "cache_ttl": settings.CACHE_TTL,
 
248
  - Recovery after database cleanup
249
  """
250
  from app.services.appwrite_db import get_appwrite_db
251
+ from app.services.scheduler import _get_shared_aggregator
252
  import asyncio
253
 
254
  try:
255
  appwrite_db = get_appwrite_db()
256
+
257
+ # Fix 3: Use the shared aggregator to respect quotas and circuit breakers
258
+ # during massive full-database populate operations.
259
+ shared_aggregator = _get_shared_aggregator()
260
 
261
  results = {
262
  "successful": [],
 
268
  print(f"[DB Populate] Fetching {category}...")
269
 
270
  # Fetch articles from external APIs
271
+ articles = await shared_aggregator.fetch_by_category(category)
272
 
273
  if articles:
274
  # Save to Appwrite database
 
782
  "recommendation": "Check server logs for detailed error information"
783
  }
784
 
785
+
786
+ # ===========================================
787
+ # Circuit Breaker Management (Fix 1b)
788
+ # ===========================================
789
+
790
+ @router.post("/circuits/reset")
791
+ async def reset_circuit_breakers():
792
+ """
793
+ Emergency Circuit Breaker Reset
794
+
795
+ Run this endpoint right after any redeployment where you fixed a broken
796
+ API key. It does two things:
797
+
798
+ 1. Deletes all 'circuit:{provider}:state' keys from Upstash Redis,
799
+ so the newly fixed provider isn't blocked by a stale Redis record.
800
+
801
+ 2. Resets the in-memory circuit state for all providers back to CLOSED,
802
+ so the very next scheduler run can immediately try the provider again.
803
+
804
+ Without this, a redeployment with a fixed API key could be blocked for
805
+ up to 1 hour by the old Redis state written by the previous broken key.
806
+
807
+ Usage:
808
+ curl -X POST http://your-server/api/admin/circuits/reset
809
+ """
810
+ try:
811
+ from app.services.upstash_cache import get_upstash_cache
812
+
813
+ circuit = get_circuit_breaker()
814
+
815
+ # Step 1: Wipe all Redis circuit keys for known providers.
816
+ cache = get_upstash_cache()
817
+ known_providers = ["gnews", "newsapi", "newsdata", "google_rss", "medium", "official_cloud"]
818
+ deleted_keys = []
819
+
820
+ for provider in known_providers:
821
+ key = f"circuit:{provider}:state"
822
+ result = await cache._execute_command(["DEL", key])
823
+ if result and int(result) > 0:
824
+ deleted_keys.append(key)
825
+
826
+ # Step 2: Reset the in-memory circuit state (all providers go to CLOSED).
827
+ circuit.reset()
828
+
829
+ return {
830
+ "success": True,
831
+ "message": "All circuit breakers have been reset. Paid providers will be tried on the next scheduler run.",
832
+ "redis_keys_deleted": deleted_keys,
833
+ "redis_keys_checked": [f"circuit:{p}:state" for p in known_providers],
834
+ "note": "Run this after fixing a broken API key and redeploying."
835
+ }
836
+
837
+ except Exception as e:
838
+ raise HTTPException(
839
+ status_code=500,
840
+ detail=f"Failed to reset circuit breakers: {str(e)}"
841
+ )
app/services/appwrite_db.py CHANGED
@@ -856,7 +856,7 @@ class AppwriteDatabase:
856
  if text_summary:
857
  data['text_summary'] = text_summary
858
 
859
- self.tablesDB.update_row(
860
  database_id=settings.APPWRITE_DATABASE_ID,
861
  collection_id=collection_id,
862
  document_id=document_id,
 
856
  if text_summary:
857
  data['text_summary'] = text_summary
858
 
859
+ await self.tablesDB.update_row(
860
  database_id=settings.APPWRITE_DATABASE_ID,
861
  collection_id=collection_id,
862
  document_id=document_id,
app/services/circuit_breaker.py CHANGED
@@ -9,9 +9,11 @@ Features:
9
  - Exponential backoff
10
  - Circuit state: CLOSED → OPEN → HALF_OPEN → CLOSED
11
  - Per-provider tracking
 
12
  """
13
 
14
  import time
 
15
  import logging
16
  from typing import Dict, Optional
17
  from enum import Enum
@@ -30,19 +32,25 @@ class CircuitState(str, Enum):
30
  class ProviderCircuitBreaker:
31
  """
32
  Circuit breaker for news API providers
33
-
34
  Prevents repeatedly calling providers that are:
35
  - Rate limited (HTTP 429)
36
  - Down (HTTP 5xx)
37
  - Slow to respond
38
-
39
  Strategy:
40
  - After 3 failures in 5 minutes → OPEN circuit (skip for 1 hour)
41
  - After 1 hour → HALF_OPEN (allow 1 test request)
42
  - If test succeeds → CLOSED (normal operation)
43
  - If test fails → OPEN for another hour
 
 
 
 
 
 
44
  """
45
-
46
  def __init__(
47
  self,
48
  failure_threshold: int = 3,
@@ -52,7 +60,7 @@ class ProviderCircuitBreaker:
52
  ):
53
  """
54
  Initialize circuit breaker
55
-
56
  Args:
57
  failure_threshold: Number of failures before opening circuit
58
  failure_window: Time window for counting failures (seconds)
@@ -63,42 +71,127 @@ class ProviderCircuitBreaker:
63
  self.failure_window = failure_window
64
  self.open_duration = open_duration
65
  self.half_open_max_attempts = half_open_max_attempts
66
-
67
  # Provider state tracking
68
  self.states: Dict[str, CircuitState] = defaultdict(lambda: CircuitState.CLOSED)
69
- self.failure_counts: Dict[str, int] = defaultdict(int)
70
- self.last_failure_time: Dict[str, float] = {}
71
  self.circuit_open_time: Dict[str, float] = {}
72
  self.half_open_attempts: Dict[str, int] = defaultdict(int)
73
-
 
 
 
74
  logger.info("=" * 70)
75
  logger.info("⚡ [CIRCUIT BREAKER] Provider protection initialized")
76
  logger.info(f" Failure threshold: {failure_threshold} failures")
77
  logger.info(f" Failure window: {failure_window}s")
78
  logger.info(f" Open duration: {open_duration}s ({open_duration//60} min)")
 
79
  logger.info("=" * 70)
80
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
  def should_skip(self, provider: str) -> bool:
82
  """
83
  Check if provider should be skipped
84
-
85
  Args:
86
  provider: Provider name (e.g., "gnews", "newsapi")
87
-
88
  Returns:
89
  True if provider should be skipped, False otherwise
90
  """
91
  current_state = self.states[provider]
92
  current_time = time.time()
93
-
94
  # CLOSED = normal operation, don't skip
95
  if current_state == CircuitState.CLOSED:
96
  return False
97
-
98
  # OPEN = provider failing, check if should move to HALF_OPEN
99
  if current_state == CircuitState.OPEN:
100
  open_time = self.circuit_open_time.get(provider, 0)
101
-
102
  # Check if open duration has elapsed
103
  if current_time - open_time >= self.open_duration:
104
  # Move to HALF_OPEN (allow test request)
@@ -111,44 +204,83 @@ class ProviderCircuitBreaker:
111
  remaining = int(self.open_duration - (current_time - open_time))
112
  logger.debug(f"⚡ Circuit OPEN for {provider} ({remaining}s remaining)")
113
  return True
114
-
115
  # HALF_OPEN = testing recovery
116
  if current_state == CircuitState.HALF_OPEN:
117
  # Allow limited test requests
118
  if self.half_open_attempts[provider] < self.half_open_max_attempts:
 
 
 
 
119
  return False # Allow test
120
  else:
121
- logger.debug(f"⚡ Circuit HALF_OPEN for {provider} (max tests reached)")
122
- return True # Max tests reached
123
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124
  return False
125
-
126
  def record_success(self, provider: str):
127
  """
128
  Record successful request
129
-
130
  Args:
131
  provider: Provider name
132
  """
133
  current_state = self.states[provider]
134
-
135
- # Reset failure count
136
- self.failure_counts[provider] = 0
137
-
138
  # Close circuit if it was open/half-open
139
  if current_state != CircuitState.CLOSED:
140
  self.states[provider] = CircuitState.CLOSED
141
  logger.info(f"✅ Circuit CLOSED for {provider} (recovered)")
142
-
 
 
 
 
 
 
 
143
  def record_failure(
144
- self,
145
- provider: str,
146
  error_type: str = "unknown",
147
  status_code: Optional[int] = None
148
  ):
149
  """
150
  Record failed request
151
-
152
  Args:
153
  provider: Provider name
154
  error_type: Type of error ("rate_limit", "timeout", "server_error")
@@ -156,90 +288,132 @@ class ProviderCircuitBreaker:
156
  """
157
  current_state = self.states[provider]
158
  current_time = time.time()
159
-
160
- # Increment failure count
161
- self.failure_counts[provider] += 1
162
- self.last_failure_time[provider] = current_time
163
-
 
 
 
 
 
 
 
164
  # Log failure with details
165
  status_str = f" (HTTP {status_code})" if status_code else ""
166
  logger.warning(
167
- f"⚠️ {provider} failure #{self.failure_counts[provider]}: "
168
  f"{error_type}{status_str}"
169
  )
170
-
171
  # Check if should open circuit
172
  if current_state == CircuitState.CLOSED:
173
  # Check failure window
174
- if self.failure_counts[provider] >= self.failure_threshold:
175
- # Open circuit
176
  self.states[provider] = CircuitState.OPEN
177
  self.circuit_open_time[provider] = current_time
178
-
179
  logger.warning(
180
  f"🔴 Circuit OPEN for {provider} "
181
- f"({self.failure_counts[provider]} failures) "
182
  f"- skipping for {self.open_duration//60} minutes"
183
  )
184
-
 
 
 
 
 
 
 
185
  # If in HALF_OPEN and fails, go back to OPEN
186
  elif current_state == CircuitState.HALF_OPEN:
187
  self.states[provider] = CircuitState.OPEN
188
  self.circuit_open_time[provider] = current_time
189
-
190
  logger.warning(
191
  f"🔴 Circuit back to OPEN for {provider} "
192
  f"(test failed) - skipping for {self.open_duration//60} minutes"
193
  )
194
-
 
 
 
 
 
 
 
195
  def reset(self, provider: Optional[str] = None):
196
  """
197
  Reset circuit breaker
198
-
199
  Args:
200
  provider: Provider to reset (None = reset all)
201
  """
202
  if provider:
203
- # Reset specific provider
204
  self.states[provider] = CircuitState.CLOSED
205
- self.failure_counts[provider] = 0
206
  self.half_open_attempts[provider] = 0
207
  logger.info(f"🔄 Circuit reset for {provider}")
 
 
 
 
 
 
 
208
  else:
209
- # Reset all providers
210
  self.states.clear()
211
- self.failure_counts.clear()
212
- self.last_failure_time.clear()
213
  self.circuit_open_time.clear()
214
  self.half_open_attempts.clear()
215
  logger.info("🔄 All circuits reset")
216
-
 
 
 
 
 
 
 
 
 
 
 
 
 
217
  def get_stats(self) -> dict:
218
  """Get circuit breaker statistics"""
219
  total_open = sum(1 for s in self.states.values() if s == CircuitState.OPEN)
220
  total_half_open = sum(1 for s in self.states.values() if s == CircuitState.HALF_OPEN)
221
  total_closed = sum(1 for s in self.states.values() if s == CircuitState.CLOSED)
222
-
223
  # Provider details
224
  provider_details = {}
225
  for provider, state in self.states.items():
 
 
226
  provider_details[provider] = {
227
  'state': state.value,
228
- 'failures': self.failure_counts.get(provider, 0),
229
- 'last_failure': self.last_failure_time.get(provider)
230
  }
231
-
232
  return {
233
  'total_open': total_open,
234
  'total_half_open': total_half_open,
235
  'total_closed': total_closed,
236
  'providers': provider_details
237
  }
238
-
239
  def print_stats(self):
240
  """Print circuit breaker statistics"""
241
  stats = self.get_stats()
242
-
243
  logger.info("")
244
  logger.info("=" * 70)
245
  logger.info("⚡ [CIRCUIT BREAKER] Provider Status")
@@ -248,20 +422,20 @@ class ProviderCircuitBreaker:
248
  logger.info(f" 🔹 Half-Open Circuits: {stats['total_half_open']}")
249
  logger.info(f" 🔹 Closed Circuits: {stats['total_closed']}")
250
  logger.info("")
251
-
252
  for provider, details in stats['providers'].items():
253
  state_emoji = {
254
  'closed': '✅',
255
  'open': '🔴',
256
  'half_open': '🟡'
257
  }.get(details['state'], '❓')
258
-
259
  logger.info(
260
  f" {state_emoji} {provider.upper()}: "
261
  f"{details['state'].upper()} "
262
  f"({details['failures']} failures)"
263
  )
264
-
265
  logger.info("=" * 70)
266
  logger.info("")
267
 
@@ -273,12 +447,12 @@ _circuit_breaker: Optional[ProviderCircuitBreaker] = None
273
  def get_circuit_breaker() -> ProviderCircuitBreaker:
274
  """
275
  Get or create global circuit breaker instance
276
-
277
  Returns:
278
  ProviderCircuitBreaker: Singleton instance
279
  """
280
  global _circuit_breaker
281
-
282
  if _circuit_breaker is None:
283
  _circuit_breaker = ProviderCircuitBreaker(
284
  failure_threshold=3, # 3 failures
@@ -286,5 +460,5 @@ def get_circuit_breaker() -> ProviderCircuitBreaker:
286
  open_duration=3600, # skip for 1 hour
287
  half_open_max_attempts=1 # 1 test request
288
  )
289
-
290
  return _circuit_breaker
 
9
  - Exponential backoff
10
  - Circuit state: CLOSED → OPEN → HALF_OPEN → CLOSED
11
  - Per-provider tracking
12
+ - Redis-backed persistence: circuit state survives server restarts
13
  """
14
 
15
  import time
16
+ import asyncio
17
  import logging
18
  from typing import Dict, Optional
19
  from enum import Enum
 
32
  class ProviderCircuitBreaker:
33
  """
34
  Circuit breaker for news API providers
35
+
36
  Prevents repeatedly calling providers that are:
37
  - Rate limited (HTTP 429)
38
  - Down (HTTP 5xx)
39
  - Slow to respond
40
+
41
  Strategy:
42
  - After 3 failures in 5 minutes → OPEN circuit (skip for 1 hour)
43
  - After 1 hour → HALF_OPEN (allow 1 test request)
44
  - If test succeeds → CLOSED (normal operation)
45
  - If test fails → OPEN for another hour
46
+
47
+ Redis Persistence (NEW):
48
+ - When a circuit opens, we write the state to Redis with a 1-hour TTL.
49
+ - On server boot, we read from Redis to restore any previously open circuits.
50
+ - When a circuit closes, we delete the Redis key so it doesn't block a recovered provider.
51
+ - If Redis is unavailable, we fall back gracefully to in-memory only.
52
  """
53
+
54
  def __init__(
55
  self,
56
  failure_threshold: int = 3,
 
60
  ):
61
  """
62
  Initialize circuit breaker
63
+
64
  Args:
65
  failure_threshold: Number of failures before opening circuit
66
  failure_window: Time window for counting failures (seconds)
 
71
  self.failure_window = failure_window
72
  self.open_duration = open_duration
73
  self.half_open_max_attempts = half_open_max_attempts
74
+
75
  # Provider state tracking
76
  self.states: Dict[str, CircuitState] = defaultdict(lambda: CircuitState.CLOSED)
77
+ # Fix 1: Track actual timestamps of failures so we can enforce the failure_window.
78
+ self.failure_timestamps: Dict[str, list[float]] = defaultdict(list)
79
  self.circuit_open_time: Dict[str, float] = {}
80
  self.half_open_attempts: Dict[str, int] = defaultdict(int)
81
+
82
+ # Known providers — used by the boot-time Redis restore
83
+ self._known_providers = ["gnews", "newsapi", "newsdata", "google_rss", "medium", "official_cloud"]
84
+
85
  logger.info("=" * 70)
86
  logger.info("⚡ [CIRCUIT BREAKER] Provider protection initialized")
87
  logger.info(f" Failure threshold: {failure_threshold} failures")
88
  logger.info(f" Failure window: {failure_window}s")
89
  logger.info(f" Open duration: {open_duration}s ({open_duration//60} min)")
90
+ logger.info(f" Redis persistence: ENABLED")
91
  logger.info("=" * 70)
92
+
93
+ # Kick off a background restore from Redis.
94
+ # Fix 2: asyncio.get_event_loop() is deprecated.
95
+ try:
96
+ loop = asyncio.get_running_loop()
97
+ loop.create_task(self._load_from_redis())
98
+ except RuntimeError:
99
+ # No running event loop (e.g., during synchronous initialization or tests).
100
+ pass
101
+
102
+ # ──────────────────────────────────────────────────────────────────────────
103
+ # Redis Helpers
104
+ # ──────────────────────────────────────────────────────────────────────────
105
+
106
+ def _redis_key(self, provider: str) -> str:
107
+ """Build the Redis key for a provider's circuit state."""
108
+ return f"circuit:{provider}:state"
109
+
110
+ async def _load_from_redis(self):
111
+ """
112
+ On server boot, check Redis for any circuit states that were open
113
+ before the server restarted. If we find one, restore it in memory
114
+ so we don't call a broken API immediately after booting.
115
+ """
116
+ try:
117
+ from app.services.upstash_cache import get_upstash_cache
118
+ cache = get_upstash_cache()
119
+
120
+ for provider in self._known_providers:
121
+ key = self._redis_key(provider)
122
+ value = await cache._execute_command(["GET", key])
123
+
124
+ if value == "open":
125
+ # The circuit was open before the restart — keep it open.
126
+ self.states[provider] = CircuitState.OPEN
127
+ # We don't know the exact open time, so we set it to "now".
128
+ # This means the 1-hour timeout will count from this boot,
129
+ # which is safe — the TTL on the Redis key is the real gate.
130
+ self.circuit_open_time[provider] = time.time()
131
+ logger.info(
132
+ "⚡ [CIRCUIT BREAKER] Restored OPEN state for %s from Redis (was open before restart).",
133
+ provider
134
+ )
135
+
136
+ except Exception as e:
137
+ # Redis is unavailable. That's fine — we start with all circuits CLOSED.
138
+ logger.debug("[CIRCUIT BREAKER] Redis restore skipped (%s) — starting with clean state.", e)
139
+
140
+ async def _persist_open_to_redis(self, provider: str):
141
+ """
142
+ Write 'circuit:{provider}:state = open' to Redis with a 1-hour TTL.
143
+ Called whenever a circuit trips to OPEN.
144
+ This is fire-and-forget: if Redis is unavailable, we log and move on.
145
+ """
146
+ try:
147
+ from app.services.upstash_cache import get_upstash_cache
148
+ cache = get_upstash_cache()
149
+ key = self._redis_key(provider)
150
+ # SET key "open" EX 3600 — expires in exactly 1 hour, same as open_duration.
151
+ await cache._execute_command(["SET", key, "open", "EX", self.open_duration])
152
+ logger.debug("[CIRCUIT BREAKER] Persisted OPEN state for %s to Redis.", provider)
153
+ except Exception as e:
154
+ logger.debug("[CIRCUIT BREAKER] Redis write failed for %s (%s) — in-memory state still protects us.", provider, e)
155
+
156
+ async def _delete_from_redis(self, provider: str):
157
+ """
158
+ Delete 'circuit:{provider}:state' from Redis.
159
+ Called whenever a circuit recovers to CLOSED, or on a full reset.
160
+ """
161
+ try:
162
+ from app.services.upstash_cache import get_upstash_cache
163
+ cache = get_upstash_cache()
164
+ key = self._redis_key(provider)
165
+ await cache._execute_command(["DEL", key])
166
+ logger.debug("[CIRCUIT BREAKER] Cleared Redis state for %s.", provider)
167
+ except Exception as e:
168
+ logger.debug("[CIRCUIT BREAKER] Redis delete failed for %s (%s) — not a blocker.", provider, e)
169
+
170
+ # ──────────────────────────────────────────────────────────────────────────
171
+ # Core Circuit Breaker Logic
172
+ # ──────────────────────────────────────────────────────────────────────────
173
+
174
  def should_skip(self, provider: str) -> bool:
175
  """
176
  Check if provider should be skipped
177
+
178
  Args:
179
  provider: Provider name (e.g., "gnews", "newsapi")
180
+
181
  Returns:
182
  True if provider should be skipped, False otherwise
183
  """
184
  current_state = self.states[provider]
185
  current_time = time.time()
186
+
187
  # CLOSED = normal operation, don't skip
188
  if current_state == CircuitState.CLOSED:
189
  return False
190
+
191
  # OPEN = provider failing, check if should move to HALF_OPEN
192
  if current_state == CircuitState.OPEN:
193
  open_time = self.circuit_open_time.get(provider, 0)
194
+
195
  # Check if open duration has elapsed
196
  if current_time - open_time >= self.open_duration:
197
  # Move to HALF_OPEN (allow test request)
 
204
  remaining = int(self.open_duration - (current_time - open_time))
205
  logger.debug(f"⚡ Circuit OPEN for {provider} ({remaining}s remaining)")
206
  return True
207
+
208
  # HALF_OPEN = testing recovery
209
  if current_state == CircuitState.HALF_OPEN:
210
  # Allow limited test requests
211
  if self.half_open_attempts[provider] < self.half_open_max_attempts:
212
+ # FIX (Bug A): Increment the counter so we don't let infinite
213
+ # test requests through. The old code checked this counter but
214
+ # never actually increased it, causing an endless loop.
215
+ self.half_open_attempts[provider] += 1
216
  return False # Allow test
217
  else:
218
+ # FIX (Bug B — self-rescue):
219
+ # The test request went through but no success or failure was
220
+ # recorded. This happens when the API returned HTTP 200 with
221
+ # 0 articles — a quiet day with no news, not a broken key.
222
+ #
223
+ # If we just return True here and do nothing, the circuit stays
224
+ # frozen in HALF_OPEN forever because there is no other path out.
225
+ #
226
+ # Solution: push it back to OPEN with a fresh 1-hour timer.
227
+ # The cycle will be:
228
+ # OPEN (1 hour) → HALF_OPEN (1 test) → inconclusive → OPEN again
229
+ # Eventually an actual article response will trigger record_success()
230
+ # and the circuit will properly close. No permanent freeze.
231
+ logger.warning(
232
+ "⚡ [%s] HALF_OPEN test inconclusive (API reached but returned no articles). "
233
+ "Resetting to OPEN for another %d minutes.",
234
+ provider, self.open_duration // 60
235
+ )
236
+ self.states[provider] = CircuitState.OPEN
237
+ self.circuit_open_time[provider] = time.time()
238
+
239
+ # Persist the new OPEN state to Redis so it survives a restart.
240
+ try:
241
+ loop = asyncio.get_running_loop()
242
+ loop.create_task(self._persist_open_to_redis(provider))
243
+ except RuntimeError:
244
+ pass
245
+
246
+ return True
247
+
248
+
249
  return False
250
+
251
  def record_success(self, provider: str):
252
  """
253
  Record successful request
254
+
255
  Args:
256
  provider: Provider name
257
  """
258
  current_state = self.states[provider]
259
+
260
+ # Reset failure tracking
261
+ self.failure_timestamps[provider].clear()
262
+
263
  # Close circuit if it was open/half-open
264
  if current_state != CircuitState.CLOSED:
265
  self.states[provider] = CircuitState.CLOSED
266
  logger.info(f"✅ Circuit CLOSED for {provider} (recovered)")
267
+
268
+ # Clean up the Redis key so this provider isn't blocked after the next restart.
269
+ try:
270
+ loop = asyncio.get_running_loop()
271
+ loop.create_task(self._delete_from_redis(provider))
272
+ except RuntimeError:
273
+ pass
274
+
275
  def record_failure(
276
+ self,
277
+ provider: str,
278
  error_type: str = "unknown",
279
  status_code: Optional[int] = None
280
  ):
281
  """
282
  Record failed request
283
+
284
  Args:
285
  provider: Provider name
286
  error_type: Type of error ("rate_limit", "timeout", "server_error")
 
288
  """
289
  current_state = self.states[provider]
290
  current_time = time.time()
291
+
292
+ # Fix 1: Enforce the failure_window by pruning old failures.
293
+ # Remove any timestamps older than (current_time - self.failure_window)
294
+ cutoff_time = current_time - self.failure_window
295
+ self.failure_timestamps[provider] = [
296
+ ts for ts in self.failure_timestamps[provider] if ts >= cutoff_time
297
+ ]
298
+
299
+ # Append this new failure
300
+ self.failure_timestamps[provider].append(current_time)
301
+ current_failure_count = len(self.failure_timestamps[provider])
302
+
303
  # Log failure with details
304
  status_str = f" (HTTP {status_code})" if status_code else ""
305
  logger.warning(
306
+ f"⚠️ {provider} failure #{current_failure_count} (in last {self.failure_window}s): "
307
  f"{error_type}{status_str}"
308
  )
309
+
310
  # Check if should open circuit
311
  if current_state == CircuitState.CLOSED:
312
  # Check failure window
313
+ if current_failure_count >= self.failure_threshold:
314
+ # Open circuit in memory first (instant protection)
315
  self.states[provider] = CircuitState.OPEN
316
  self.circuit_open_time[provider] = current_time
317
+
318
  logger.warning(
319
  f"🔴 Circuit OPEN for {provider} "
320
+ f"({current_failure_count} failures in {self.failure_window}s) "
321
  f"- skipping for {self.open_duration//60} minutes"
322
  )
323
+
324
+ # Persist to Redis so the state survives a server restart.
325
+ try:
326
+ loop = asyncio.get_running_loop()
327
+ loop.create_task(self._persist_open_to_redis(provider))
328
+ except RuntimeError:
329
+ pass
330
+
331
  # If in HALF_OPEN and fails, go back to OPEN
332
  elif current_state == CircuitState.HALF_OPEN:
333
  self.states[provider] = CircuitState.OPEN
334
  self.circuit_open_time[provider] = current_time
335
+
336
  logger.warning(
337
  f"🔴 Circuit back to OPEN for {provider} "
338
  f"(test failed) - skipping for {self.open_duration//60} minutes"
339
  )
340
+
341
+ # Persist the re-opened state to Redis too.
342
+ try:
343
+ loop = asyncio.get_running_loop()
344
+ loop.create_task(self._persist_open_to_redis(provider))
345
+ except RuntimeError:
346
+ pass
347
+
348
  def reset(self, provider: Optional[str] = None):
349
  """
350
  Reset circuit breaker
351
+
352
  Args:
353
  provider: Provider to reset (None = reset all)
354
  """
355
  if provider:
356
+ # Reset specific provider in memory
357
  self.states[provider] = CircuitState.CLOSED
358
+ self.failure_timestamps[provider].clear()
359
  self.half_open_attempts[provider] = 0
360
  logger.info(f"🔄 Circuit reset for {provider}")
361
+
362
+ # Also remove the Redis key for this provider
363
+ try:
364
+ loop = asyncio.get_running_loop()
365
+ loop.create_task(self._delete_from_redis(provider))
366
+ except RuntimeError:
367
+ pass
368
  else:
369
+ # Reset all providers in memory
370
  self.states.clear()
371
+ self.failure_timestamps.clear()
 
372
  self.circuit_open_time.clear()
373
  self.half_open_attempts.clear()
374
  logger.info("🔄 All circuits reset")
375
+
376
+ # Remove all Redis keys for known providers
377
+ try:
378
+ loop = asyncio.get_running_loop()
379
+ loop.create_task(self._reset_all_redis_keys())
380
+ except RuntimeError:
381
+ pass
382
+
383
+ async def _reset_all_redis_keys(self):
384
+ """Delete all circuit state keys from Redis. Called by reset()."""
385
+ for provider in self._known_providers:
386
+ await self._delete_from_redis(provider)
387
+ logger.info("[CIRCUIT BREAKER] All Redis circuit keys cleared.")
388
+
389
  def get_stats(self) -> dict:
390
  """Get circuit breaker statistics"""
391
  total_open = sum(1 for s in self.states.values() if s == CircuitState.OPEN)
392
  total_half_open = sum(1 for s in self.states.values() if s == CircuitState.HALF_OPEN)
393
  total_closed = sum(1 for s in self.states.values() if s == CircuitState.CLOSED)
394
+
395
  # Provider details
396
  provider_details = {}
397
  for provider, state in self.states.items():
398
+ timestamps = self.failure_timestamps.get(provider, [])
399
+ last_fail = timestamps[-1] if timestamps else None
400
  provider_details[provider] = {
401
  'state': state.value,
402
+ 'failures': len(timestamps),
403
+ 'last_failure': last_fail
404
  }
405
+
406
  return {
407
  'total_open': total_open,
408
  'total_half_open': total_half_open,
409
  'total_closed': total_closed,
410
  'providers': provider_details
411
  }
412
+
413
  def print_stats(self):
414
  """Print circuit breaker statistics"""
415
  stats = self.get_stats()
416
+
417
  logger.info("")
418
  logger.info("=" * 70)
419
  logger.info("⚡ [CIRCUIT BREAKER] Provider Status")
 
422
  logger.info(f" 🔹 Half-Open Circuits: {stats['total_half_open']}")
423
  logger.info(f" 🔹 Closed Circuits: {stats['total_closed']}")
424
  logger.info("")
425
+
426
  for provider, details in stats['providers'].items():
427
  state_emoji = {
428
  'closed': '✅',
429
  'open': '🔴',
430
  'half_open': '🟡'
431
  }.get(details['state'], '❓')
432
+
433
  logger.info(
434
  f" {state_emoji} {provider.upper()}: "
435
  f"{details['state'].upper()} "
436
  f"({details['failures']} failures)"
437
  )
438
+
439
  logger.info("=" * 70)
440
  logger.info("")
441
 
 
447
  def get_circuit_breaker() -> ProviderCircuitBreaker:
448
  """
449
  Get or create global circuit breaker instance
450
+
451
  Returns:
452
  ProviderCircuitBreaker: Singleton instance
453
  """
454
  global _circuit_breaker
455
+
456
  if _circuit_breaker is None:
457
  _circuit_breaker = ProviderCircuitBreaker(
458
  failure_threshold=3, # 3 failures
 
460
  open_duration=3600, # skip for 1 hour
461
  half_open_max_attempts=1 # 1 test request
462
  )
463
+
464
  return _circuit_breaker
app/services/news_providers.py CHANGED
@@ -80,62 +80,70 @@ class GNewsProvider(NewsProvider):
80
  }
81
 
82
  async def fetch_news(self, category: str, limit: int = 20) -> List[Article]:
83
- """Fetch news from GNews API"""
 
 
 
 
 
 
 
 
 
84
  if not self.api_key:
85
  return []
86
-
87
  try:
88
  query = self.category_map.get(category, category)
89
  url = f"{self.base_url}/search"
90
 
91
- # Build a window from midnight IST today (converted to UTC) to right now.
92
- #
93
- # Why IST midnight and not UTC midnight?
94
- # IST is UTC+5:30. If we used UTC midnight as the "from" date, GNews
95
- # would skip articles published in India between 12:00 AM IST and
96
- # 5:30 AM IST — the first 5.5 hours of the Indian day.
97
- # By computing IST midnight and converting it to UTC, we tell GNews:
98
- # "Give me everything published since the Indian day started".
99
- _ist_zone = ZoneInfo("Asia/Kolkata")
100
- _now_ist = datetime.now(_ist_zone)
101
- _cutoff_ist = _now_ist.replace(hour=0, minute=0, second=0, microsecond=0)
102
- # Convert IST midnight → UTC so the API gets a valid UTC timestamp.
103
- _cutoff_utc = _cutoff_ist.astimezone(timezone.utc)
104
- # Current moment in UTC for the "to" bound.
105
- _now_utc = datetime.now(timezone.utc)
106
-
107
  params = {
108
  'q': query,
109
  'lang': 'en',
110
  'country': 'us',
111
  'max': min(limit, 10), # GNews free tier max 10
112
  'apikey': self.api_key,
113
- 'from': _cutoff_utc.strftime('%Y-%m-%dT%H:%M:%SZ'), # IST midnight in UTC
114
- 'to': _now_utc.strftime('%Y-%m-%dT%H:%M:%SZ'),
115
  }
116
-
117
  async with httpx.AsyncClient(timeout=10.0) as client:
118
  response = await client.get(url, params=params)
119
-
120
  if response.status_code == 429:
121
  print("[WARN] [GNews] Rate limit hit! Switching to next provider...")
122
  self.mark_rate_limited()
123
  return []
124
-
125
  if response.status_code == 200:
126
  self.request_count += 1
127
  data = response.json()
128
- articles = self._parse_response(data, category)
 
 
 
 
 
 
 
 
 
 
 
 
129
  articles = self._parse_response(data, category)
130
  if articles:
131
  print(f"[SUCCESS] [GNews] Fetched {len(articles)} articles successfully")
132
  else:
133
- print("[WARN] [GNews] No articles found in response")
134
  return articles
135
  else:
136
  print(f"[ERROR] [GNews] HTTP {response.status_code} error")
137
-
138
  return []
 
 
 
 
139
  except Exception as e:
140
  print(f"[ERROR] [GNews] API error: {e}")
141
  return []
@@ -197,45 +205,63 @@ class NewsAPIProvider(NewsProvider):
197
  }
198
 
199
  async def fetch_news(self, category: str, limit: int = 20) -> List[Article]:
200
- """Fetch news from NewsAPI"""
 
 
 
 
 
 
 
 
201
  if not self.api_key:
202
  return []
203
-
204
  try:
205
  query = self.category_keywords.get(category, category)
206
  url = f"{self.base_url}/everything"
207
 
208
- # Ask NewsAPI for articles published since midnight IST today.
209
- # We compute IST midnight and convert it to UTC before sending it
210
- # to the API, because NewsAPI expects UTC timestamps.
211
- # This gives Indian users full coverage from their midnight onwards.
212
- _ist_zone = ZoneInfo("Asia/Kolkata")
213
- _now_ist = datetime.now(_ist_zone)
214
- _cutoff_ist = _now_ist.replace(hour=0, minute=0, second=0, microsecond=0)
215
- _cutoff_utc = _cutoff_ist.astimezone(timezone.utc) # Convert to UTC for the API
216
-
217
  params = {
218
  'q': query,
219
  'language': 'en',
220
  'sortBy': 'publishedAt',
221
  'pageSize': min(limit, 20),
222
  'apiKey': self.api_key,
223
- 'from': _cutoff_utc.strftime('%Y-%m-%dT%H:%M:%SZ'), # IST midnight in UTC
224
  }
225
-
226
  async with httpx.AsyncClient(timeout=10.0) as client:
227
  response = await client.get(url, params=params)
228
-
229
  if response.status_code == 429 or response.status_code == 426:
230
  self.mark_rate_limited()
231
  return []
232
-
233
  if response.status_code == 200:
234
  self.request_count += 1
235
  data = response.json()
236
- return self._parse_response(data, category)
237
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
238
  return []
 
 
 
 
239
  except Exception as e:
240
  print(f"NewsAPI error: {e}")
241
  return []
 
80
  }
81
 
82
  async def fetch_news(self, category: str, limit: int = 20) -> List[Article]:
83
+ """
84
+ Fetch news from GNews API.
85
+
86
+ Why no 'from'/'to' date filter here?
87
+ GNews free/basic tier does NOT support date filtering and returns HTTP 403
88
+ or an error payload when those params are sent. Removing them lets GNews
89
+ work reliably on any plan tier. Our data_validation.py freshness gate
90
+ already rejects old articles downstream, so date filtering still happens
91
+ — just at the right place.
92
+ """
93
  if not self.api_key:
94
  return []
95
+
96
  try:
97
  query = self.category_map.get(category, category)
98
  url = f"{self.base_url}/search"
99
 
100
+ # Simple, plan-compatible request no date window.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  params = {
102
  'q': query,
103
  'lang': 'en',
104
  'country': 'us',
105
  'max': min(limit, 10), # GNews free tier max 10
106
  'apikey': self.api_key,
 
 
107
  }
108
+
109
  async with httpx.AsyncClient(timeout=10.0) as client:
110
  response = await client.get(url, params=params)
111
+
112
  if response.status_code == 429:
113
  print("[WARN] [GNews] Rate limit hit! Switching to next provider...")
114
  self.mark_rate_limited()
115
  return []
116
+
117
  if response.status_code == 200:
118
  self.request_count += 1
119
  data = response.json()
120
+
121
+ # FIX (Bug B): GNews sometimes returns HTTP 200 but puts an
122
+ # 'errors' key in the JSON body when the API key is wrong or
123
+ # a plan restriction is hit.
124
+ # We raise here so the aggregator's except block catches it
125
+ # and calls circuit.record_failure() automatically.
126
+ # That way the circuit breaker knows this is a real failure,
127
+ # not just a quiet day with no news.
128
+ if data.get('errors'):
129
+ raise RuntimeError(
130
+ f"[GNews] API error payload: {data.get('errors')}"
131
+ )
132
+
133
  articles = self._parse_response(data, category)
134
  if articles:
135
  print(f"[SUCCESS] [GNews] Fetched {len(articles)} articles successfully")
136
  else:
137
+ print("[WARN] [GNews] No articles this run (API is healthy, just quiet)")
138
  return articles
139
  else:
140
  print(f"[ERROR] [GNews] HTTP {response.status_code} error")
141
+
142
  return []
143
+ except RuntimeError:
144
+ # Re-raise RuntimeError (our intentional error-payload signal)
145
+ # so the aggregator's except block records this as a circuit failure.
146
+ raise
147
  except Exception as e:
148
  print(f"[ERROR] [GNews] API error: {e}")
149
  return []
 
205
  }
206
 
207
  async def fetch_news(self, category: str, limit: int = 20) -> List[Article]:
208
+ """
209
+ Fetch news from NewsAPI.
210
+
211
+ Why no 'from' date filter here?
212
+ Some NewsAPI plan tiers restrict date filtering and return status='error'
213
+ when a date param is included. Removing the filter makes the request
214
+ plan-agnostic. Our data_validation.py freshness gate handles date
215
+ filtering downstream.
216
+ """
217
  if not self.api_key:
218
  return []
219
+
220
  try:
221
  query = self.category_keywords.get(category, category)
222
  url = f"{self.base_url}/everything"
223
 
224
+ # Simple, plan-compatible request no date window.
 
 
 
 
 
 
 
 
225
  params = {
226
  'q': query,
227
  'language': 'en',
228
  'sortBy': 'publishedAt',
229
  'pageSize': min(limit, 20),
230
  'apiKey': self.api_key,
 
231
  }
232
+
233
  async with httpx.AsyncClient(timeout=10.0) as client:
234
  response = await client.get(url, params=params)
235
+
236
  if response.status_code == 429 or response.status_code == 426:
237
  self.mark_rate_limited()
238
  return []
239
+
240
  if response.status_code == 200:
241
  self.request_count += 1
242
  data = response.json()
243
+
244
+ # FIX (Bug B): NewsAPI returns HTTP 200 but sets status='error'
245
+ # in the JSON when the API key is invalid or a plan restriction
246
+ # is hit. We raise here so the aggregator's except block catches
247
+ # it and calls circuit.record_failure() automatically.
248
+ if data.get('status') == 'error':
249
+ raise RuntimeError(
250
+ f"[NewsAPI] API error: {data.get('message', 'unknown error')}"
251
+ )
252
+
253
+ articles = self._parse_response(data, category)
254
+ if articles:
255
+ print(f"[SUCCESS] [NewsAPI] Fetched {len(articles)} articles successfully")
256
+ else:
257
+ print("[WARN] [NewsAPI] No articles this run (API is healthy, just quiet)")
258
+ return articles
259
+
260
  return []
261
+ except RuntimeError:
262
+ # Re-raise RuntimeError (our intentional error-payload signal)
263
+ # so the aggregator's except block records this as a circuit failure.
264
+ raise
265
  except Exception as e:
266
  print(f"NewsAPI error: {e}")
267
  return []