SHAFI commited on
Commit ·
15c86bc
1
Parent(s): 5b38e69
fix: return 5-tuple on all fetch_and_validate_category paths; circuits/reset covers all 15 providers
Browse files- app/routes/admin.py +17 -6
- app/services/scheduler.py +4 -4
app/routes/admin.py
CHANGED
|
@@ -773,25 +773,36 @@ async def reset_circuit_breakers():
|
|
| 773 |
|
| 774 |
circuit = get_circuit_breaker()
|
| 775 |
|
| 776 |
-
# Step 1: Wipe all Redis circuit keys for known providers
|
|
|
|
| 777 |
cache = get_upstash_cache()
|
| 778 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 779 |
deleted_keys = []
|
| 780 |
|
| 781 |
-
for provider in
|
| 782 |
key = f"circuit:{provider}:state"
|
| 783 |
result = await cache._execute_command(["DEL", key])
|
| 784 |
if result and int(result) > 0:
|
| 785 |
deleted_keys.append(key)
|
| 786 |
|
| 787 |
-
# Step 2: Reset
|
|
|
|
| 788 |
circuit.reset()
|
| 789 |
|
| 790 |
return {
|
| 791 |
"success": True,
|
| 792 |
-
"message": "All circuit breakers have been reset.
|
| 793 |
"redis_keys_deleted": deleted_keys,
|
| 794 |
-
"redis_keys_checked": [f"circuit:{p}:state" for p in
|
|
|
|
| 795 |
"note": "Run this after fixing a broken API key and redeploying."
|
| 796 |
}
|
| 797 |
|
|
|
|
| 773 |
|
| 774 |
circuit = get_circuit_breaker()
|
| 775 |
|
| 776 |
+
# Step 1: Wipe all Redis circuit keys for ALL known providers
|
| 777 |
+
# (legacy Phase 1-2 + new modular Phase 3-11).
|
| 778 |
cache = get_upstash_cache()
|
| 779 |
+
all_known_providers = [
|
| 780 |
+
# Legacy providers (Phase 1-2)
|
| 781 |
+
"gnews", "newsapi", "newsdata",
|
| 782 |
+
"google_rss", "medium", "official_cloud",
|
| 783 |
+
# New modular providers (Phase 3-11)
|
| 784 |
+
"hacker_news", "direct_rss", "thenewsapi",
|
| 785 |
+
"inshorts", "saurav_static", "worldnewsai",
|
| 786 |
+
"openrss", "webz", "wikinews",
|
| 787 |
+
]
|
| 788 |
deleted_keys = []
|
| 789 |
|
| 790 |
+
for provider in all_known_providers:
|
| 791 |
key = f"circuit:{provider}:state"
|
| 792 |
result = await cache._execute_command(["DEL", key])
|
| 793 |
if result and int(result) > 0:
|
| 794 |
deleted_keys.append(key)
|
| 795 |
|
| 796 |
+
# Step 2: Reset ALL in-memory circuit states back to CLOSED.
|
| 797 |
+
# circuit.reset() with no argument resets every tracked provider.
|
| 798 |
circuit.reset()
|
| 799 |
|
| 800 |
return {
|
| 801 |
"success": True,
|
| 802 |
+
"message": "All circuit breakers have been reset. All providers will be tried on the next scheduler run.",
|
| 803 |
"redis_keys_deleted": deleted_keys,
|
| 804 |
+
"redis_keys_checked": [f"circuit:{p}:state" for p in all_known_providers],
|
| 805 |
+
"providers_reset": all_known_providers,
|
| 806 |
"note": "Run this after fixing a broken API key and redeploying."
|
| 807 |
}
|
| 808 |
|
app/services/scheduler.py
CHANGED
|
@@ -538,7 +538,7 @@ async def fetch_and_validate_category(category: str, aggregator) -> tuple:
|
|
| 538 |
Using a shared instance means all 22 parallel tasks
|
| 539 |
share the same quota counters and circuit-breaker state.
|
| 540 |
|
| 541 |
-
Returns: (category, valid_articles, invalid_count, irrelevant_count)
|
| 542 |
"""
|
| 543 |
from app.utils.data_validation import is_valid_article, sanitize_article, is_relevant_to_category
|
| 544 |
from app.utils.date_parser import normalize_article_date
|
|
@@ -559,7 +559,7 @@ async def fetch_and_validate_category(category: str, aggregator) -> tuple:
|
|
| 559 |
raw_articles = await aggregator.fetch_by_category(category)
|
| 560 |
|
| 561 |
if not raw_articles:
|
| 562 |
-
return (category, [], 0, 0)
|
| 563 |
|
| 564 |
# ------------------------------------------------------------------
|
| 565 |
# IN-BATCH DEDUPLICATION
|
|
@@ -670,10 +670,10 @@ async def fetch_and_validate_category(category: str, aggregator) -> tuple:
|
|
| 670 |
|
| 671 |
except asyncio.TimeoutError:
|
| 672 |
logger.error("%s Timeout fetching [%s] (>30s)", TAG_ERROR, category)
|
| 673 |
-
return (category, [], 0, 0)
|
| 674 |
except Exception as e:
|
| 675 |
logger.exception("%s Error fetching [%s]", TAG_ERROR, category)
|
| 676 |
-
return (category, [], 0, 0)
|
| 677 |
|
| 678 |
|
| 679 |
async def cleanup_old_news():
|
|
|
|
| 538 |
Using a shared instance means all 22 parallel tasks
|
| 539 |
share the same quota counters and circuit-breaker state.
|
| 540 |
|
| 541 |
+
Returns: (category, valid_articles, invalid_count, irrelevant_count, relevant_count)
|
| 542 |
"""
|
| 543 |
from app.utils.data_validation import is_valid_article, sanitize_article, is_relevant_to_category
|
| 544 |
from app.utils.date_parser import normalize_article_date
|
|
|
|
| 559 |
raw_articles = await aggregator.fetch_by_category(category)
|
| 560 |
|
| 561 |
if not raw_articles:
|
| 562 |
+
return (category, [], 0, 0, 0)
|
| 563 |
|
| 564 |
# ------------------------------------------------------------------
|
| 565 |
# IN-BATCH DEDUPLICATION
|
|
|
|
| 670 |
|
| 671 |
except asyncio.TimeoutError:
|
| 672 |
logger.error("%s Timeout fetching [%s] (>30s)", TAG_ERROR, category)
|
| 673 |
+
return (category, [], 0, 0, 0)
|
| 674 |
except Exception as e:
|
| 675 |
logger.exception("%s Error fetching [%s]", TAG_ERROR, category)
|
| 676 |
+
return (category, [], 0, 0, 0)
|
| 677 |
|
| 678 |
|
| 679 |
async def cleanup_old_news():
|