| """ |
| Extended News Collectors |
| Fetches news from RSS feeds, CoinDesk, CoinTelegraph, and other crypto news sources |
| """ |
|
|
| import asyncio |
| import feedparser |
| from datetime import datetime, timezone |
| from typing import Dict, List, Optional, Any |
| from utils.api_client import get_client |
| from utils.logger import setup_logger, log_api_request, log_error |
|
|
| logger = setup_logger("news_extended_collector") |
|
|
|
|
| async def get_rss_feed(provider: str, feed_url: str) -> Dict[str, Any]: |
| """ |
| Fetch and parse RSS feed from a news source |
| |
| Args: |
| provider: Provider name |
| feed_url: RSS feed URL |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| category = "news" |
| endpoint = "/rss" |
|
|
| logger.info(f"Fetching RSS feed from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| response = await client.get(feed_url, timeout=15) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| raw_data = response.get("raw_content", "") |
| if not raw_data: |
| raw_data = str(response.get("data", "")) |
|
|
| |
| feed = feedparser.parse(raw_data) |
|
|
| news_data = None |
| if feed and hasattr(feed, 'entries'): |
| entries = feed.entries[:10] |
|
|
| articles = [] |
| for entry in entries: |
| article = { |
| "title": entry.get("title", ""), |
| "link": entry.get("link", ""), |
| "published": entry.get("published", ""), |
| "summary": entry.get("summary", "")[:200] if "summary" in entry else None |
| } |
| articles.append(article) |
|
|
| news_data = { |
| "feed_title": feed.feed.get("title", provider) if hasattr(feed, 'feed') else provider, |
| "total_entries": len(feed.entries), |
| "articles": articles |
| } |
|
|
| logger.info(f"{provider} - {endpoint} - Retrieved {len(feed.entries) if feed else 0} articles") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": news_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_coindesk_news() -> Dict[str, Any]: |
| """ |
| Fetch news from CoinDesk RSS feed |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| return await get_rss_feed("CoinDesk", "https://www.coindesk.com/arc/outboundfeeds/rss/") |
|
|
|
|
| async def get_cointelegraph_news() -> Dict[str, Any]: |
| """ |
| Fetch news from CoinTelegraph RSS feed |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| return await get_rss_feed("CoinTelegraph", "https://cointelegraph.com/rss") |
|
|
|
|
| async def get_decrypt_news() -> Dict[str, Any]: |
| """ |
| Fetch news from Decrypt RSS feed |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| return await get_rss_feed("Decrypt", "https://decrypt.co/feed") |
|
|
|
|
| async def get_bitcoinmagazine_news() -> Dict[str, Any]: |
| """ |
| Fetch news from Bitcoin Magazine RSS feed |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| return await get_rss_feed("BitcoinMagazine", "https://bitcoinmagazine.com/.rss/full/") |
|
|
|
|
| async def get_theblock_news() -> Dict[str, Any]: |
| """ |
| Fetch news from The Block |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| return await get_rss_feed("TheBlock", "https://www.theblock.co/rss.xml") |
|
|
|
|
| async def get_cryptoslate_news() -> Dict[str, Any]: |
| """ |
| Fetch news from CryptoSlate |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "CryptoSlate" |
| category = "news" |
| endpoint = "/newslist" |
|
|
| logger.info(f"Fetching news from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = "https://cryptoslate.com/wp-json/cs/v1/posts" |
|
|
| params = { |
| "per_page": 10, |
| "orderby": "date" |
| } |
|
|
| |
| response = await client.get(url, params=params, timeout=10) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| |
| logger.info(f"{provider} - API failed, trying RSS feed") |
| return await get_rss_feed(provider, "https://cryptoslate.com/feed/") |
|
|
| |
| data = response["data"] |
|
|
| news_data = None |
| if isinstance(data, list): |
| articles = [ |
| { |
| "title": article.get("title", {}).get("rendered", ""), |
| "link": article.get("link", ""), |
| "published": article.get("date", ""), |
| "excerpt": article.get("excerpt", {}).get("rendered", "")[:200] |
| } |
| for article in data |
| ] |
|
|
| news_data = { |
| "total_entries": len(articles), |
| "articles": articles |
| } |
|
|
| logger.info(f"{provider} - {endpoint} - Retrieved {len(data) if isinstance(data, list) else 0} articles") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": news_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| |
| logger.info(f"{provider} - Exception occurred, trying RSS feed") |
| return await get_rss_feed(provider, "https://cryptoslate.com/feed/") |
|
|
|
|
| async def get_cryptonews_feed() -> Dict[str, Any]: |
| """ |
| Fetch news from Crypto.news RSS feed |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| return await get_rss_feed("CryptoNews", "https://crypto.news/feed/") |
|
|
|
|
| async def get_coinjournal_news() -> Dict[str, Any]: |
| """ |
| Fetch news from CoinJournal RSS feed |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| return await get_rss_feed("CoinJournal", "https://coinjournal.net/feed/") |
|
|
|
|
| async def get_beincrypto_news() -> Dict[str, Any]: |
| """ |
| Fetch news from BeInCrypto RSS feed |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| return await get_rss_feed("BeInCrypto", "https://beincrypto.com/feed/") |
|
|
|
|
| async def get_cryptobriefing_news() -> Dict[str, Any]: |
| """ |
| Fetch news from CryptoBriefing |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| return await get_rss_feed("CryptoBriefing", "https://cryptobriefing.com/feed/") |
|
|
|
|
| async def collect_extended_news() -> List[Dict[str, Any]]: |
| """ |
| Main function to collect news from all extended sources |
| |
| Returns: |
| List of results from all news collectors |
| """ |
| logger.info("Starting extended news collection from all sources") |
|
|
| |
| results = await asyncio.gather( |
| get_coindesk_news(), |
| get_cointelegraph_news(), |
| get_decrypt_news(), |
| get_bitcoinmagazine_news(), |
| get_theblock_news(), |
| get_cryptoslate_news(), |
| get_cryptonews_feed(), |
| get_coinjournal_news(), |
| get_beincrypto_news(), |
| get_cryptobriefing_news(), |
| return_exceptions=True |
| ) |
|
|
| |
| processed_results = [] |
| for result in results: |
| if isinstance(result, Exception): |
| logger.error(f"Collector failed with exception: {str(result)}") |
| processed_results.append({ |
| "provider": "Unknown", |
| "category": "news", |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": str(result), |
| "error_type": "exception" |
| }) |
| else: |
| processed_results.append(result) |
|
|
| |
| successful = sum(1 for r in processed_results if r.get("success", False)) |
| total_articles = sum( |
| r.get("data", {}).get("total_entries", 0) |
| for r in processed_results |
| if r.get("success", False) and r.get("data") |
| ) |
|
|
| logger.info( |
| f"Extended news collection complete: {successful}/{len(processed_results)} sources successful, " |
| f"{total_articles} total articles" |
| ) |
|
|
| return processed_results |
|
|
|
|
| |
| if __name__ == "__main__": |
| async def main(): |
| results = await collect_extended_news() |
|
|
| print("\n=== Extended News Collection Results ===") |
| for result in results: |
| print(f"\nProvider: {result['provider']}") |
| print(f"Success: {result['success']}") |
|
|
| if result['success']: |
| data = result.get('data', {}) |
| if data: |
| print(f"Total Articles: {data.get('total_entries', 'N/A')}") |
| articles = data.get('articles', []) |
| if articles: |
| print(f"Latest: {articles[0].get('title', 'N/A')[:60]}...") |
| else: |
| print(f"Error: {result.get('error', 'Unknown')}") |
|
|
| asyncio.run(main()) |
|
|