| |
| """ |
| Comprehensive Data Worker - Collect ALL Data from ALL Sources |
| Uses all resources from crypto_resources_unified_2025-11-11.json |
| |
| This worker ensures ZERO data sources are left unused: |
| - 23 Market Data APIs |
| - 15 News APIs |
| - 12 Sentiment APIs |
| - 13 On-chain Analytics APIs |
| - 9 Whale Tracking APIs |
| - 18 Block Explorers |
| - 1 Community Sentiment API |
| - 24 RPC Nodes |
| - 7 HuggingFace Resources |
| - 13 Free HTTP Endpoints |
| |
| ALL data is uploaded to HuggingFace Datasets |
| """ |
|
|
| import asyncio |
| import time |
| import logging |
| import os |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Any, Optional |
| import httpx |
|
|
| from database.cache_queries import get_cache_queries |
| from database.db_manager import db_manager |
| from utils.logger import setup_logger |
| from unified_resource_loader import get_loader |
|
|
| logger = setup_logger("comprehensive_worker") |
|
|
| |
| resource_loader = get_loader() |
| cache = get_cache_queries(db_manager) |
|
|
| |
| HF_UPLOAD_ENABLED = bool(os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")) |
| if HF_UPLOAD_ENABLED: |
| try: |
| from hf_dataset_uploader import get_dataset_uploader |
| hf_uploader = get_dataset_uploader() |
| logger.info("✅ HuggingFace Dataset upload ENABLED for comprehensive worker") |
| except Exception as e: |
| logger.warning(f"HuggingFace Dataset upload disabled: {e}") |
| HF_UPLOAD_ENABLED = False |
| hf_uploader = None |
| else: |
| logger.info("ℹ️ HuggingFace Dataset upload DISABLED (no HF_TOKEN)") |
| hf_uploader = None |
|
|
|
|
| |
| |
| |
|
|
| async def fetch_news_from_cryptopanic() -> List[Dict[str, Any]]: |
| """Fetch news from CryptoPanic (FREE, no API key)""" |
| try: |
| url = "https://cryptopanic.com/api/v1/posts/" |
| params = {"auth_token": "free", "public": "true", "kind": "news", "filter": "rising"} |
| |
| async with httpx.AsyncClient(timeout=10.0) as client: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
| |
| news_items = [] |
| for post in data.get("results", [])[:15]: |
| news_items.append({ |
| "title": post.get("title", ""), |
| "description": post.get("title", ""), |
| "url": post.get("url", ""), |
| "published_at": post.get("created_at", ""), |
| "source": "CryptoPanic", |
| "source_id": "cryptopanic", |
| "category": "news", |
| "fetched_at": datetime.utcnow().isoformat() + "Z" |
| }) |
| |
| logger.info(f"✅ CryptoPanic: {len(news_items)} articles") |
| return news_items |
| except Exception as e: |
| logger.debug(f"CryptoPanic error: {e}") |
| return [] |
|
|
|
|
| async def fetch_news_from_coinstats() -> List[Dict[str, Any]]: |
| """Fetch news from CoinStats (FREE, no API key)""" |
| try: |
| url = "https://api.coin-stats.com/v2/news" |
| params = {"limit": 20} |
| |
| async with httpx.AsyncClient(timeout=10.0) as client: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
| |
| news_items = [] |
| for article in data.get("news", [])[:15]: |
| news_items.append({ |
| "title": article.get("title", ""), |
| "description": article.get("description", ""), |
| "url": article.get("link", ""), |
| "published_at": article.get("published", ""), |
| "source": "CoinStats", |
| "source_id": "coinstats", |
| "category": "news", |
| "fetched_at": datetime.utcnow().isoformat() + "Z" |
| }) |
| |
| logger.info(f"✅ CoinStats: {len(news_items)} articles") |
| return news_items |
| except Exception as e: |
| logger.debug(f"CoinStats error: {e}") |
| return [] |
|
|
|
|
| async def fetch_news_data() -> List[Dict[str, Any]]: |
| """ |
| Fetch news from multiple free sources |
| |
| Sources: |
| - CryptoPanic (FREE, no API key) |
| - CoinStats (FREE, no API key) |
| - Other news sources from registry |
| """ |
| news_data = [] |
| |
| |
| try: |
| cryptopanic_news = await fetch_news_from_cryptopanic() |
| news_data.extend(cryptopanic_news) |
| except Exception as e: |
| logger.debug(f"Error fetching CryptoPanic: {e}") |
| |
| try: |
| coinstats_news = await fetch_news_from_coinstats() |
| news_data.extend(coinstats_news) |
| except Exception as e: |
| logger.debug(f"Error fetching CoinStats: {e}") |
| |
| |
| news_resources = resource_loader.get_resources_by_category("news") |
| logger.info(f"📰 Fetching news from {len(news_resources)} additional sources...") |
|
|
| for resource in news_resources: |
| try: |
| |
| if resource.auth_type != "none" and not resource.api_key: |
| logger.debug(f"Skipping {resource.name} (no API key)") |
| continue |
|
|
| |
| url = resource.base_url |
| headers = {} |
| params = {} |
|
|
| |
| if resource.auth_type == "apiKeyHeader" and resource.api_key: |
| headers["Authorization"] = f"Bearer {resource.api_key}" |
| elif resource.auth_type == "apiKeyQuery" and resource.api_key: |
| params["apiKey"] = resource.api_key |
|
|
| |
| if "newsapi" in resource.id: |
| |
| if not resource.api_key or resource.api_key.startswith("pub_"): |
| logger.debug(f"Skipping {resource.name} (invalid API key)") |
| continue |
| url = f"{resource.base_url}/everything" |
| params.update({ |
| "q": "cryptocurrency OR bitcoin OR ethereum", |
| "language": "en", |
| "sortBy": "publishedAt", |
| "pageSize": 20 |
| }) |
| elif "cryptopanic" in resource.id: |
| |
| continue |
| elif "cryptocontrol" in resource.id: |
| url = f"{resource.base_url}/news" |
|
|
| |
| logger.debug(f"Fetching from {resource.name}...") |
| async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: |
| response = await client.get(url, headers=headers, params=params) |
| response.raise_for_status() |
| |
| |
| content_type = response.headers.get("content-type", "") |
| if "application/json" not in content_type and "text/json" not in content_type: |
| |
| logger.debug(f"Non-JSON response from {resource.name}: {content_type}") |
| continue |
| |
| data = response.json() |
|
|
| |
| articles = [] |
| if "newsapi" in resource.id: |
| articles = data.get("articles", []) |
| elif "cryptopanic" in resource.id: |
| articles = data.get("results", []) |
| else: |
| articles = data if isinstance(data, list) else data.get("news", []) |
|
|
| |
| for article in articles[:10]: |
| try: |
| normalized = { |
| "title": article.get("title", article.get("name", "")), |
| "description": article.get("description", article.get("summary", "")), |
| "url": article.get("url", article.get("link", "")), |
| "published_at": article.get("publishedAt", article.get("published_at", article.get("created_at", ""))), |
| "source": resource.name, |
| "source_id": resource.id, |
| "category": "news", |
| "fetched_at": datetime.utcnow().isoformat() + "Z" |
| } |
| news_data.append(normalized) |
| except Exception as e: |
| logger.debug(f"Error parsing article: {e}") |
| continue |
|
|
| logger.info(f"✅ {resource.name}: {len(articles[:10])} articles") |
|
|
| except httpx.HTTPError as e: |
| logger.debug(f"HTTP error from {resource.name}: {e}") |
| except Exception as e: |
| logger.debug(f"Error fetching from {resource.name}: {e}") |
|
|
| logger.info(f"📰 Total news articles collected: {len(news_data)}") |
| return news_data |
|
|
|
|
| |
| |
| |
|
|
| async def fetch_fear_greed_index() -> List[Dict[str, Any]]: |
| """Fetch Fear & Greed Index from Alternative.me (FREE, no API key)""" |
| try: |
| url = "https://api.alternative.me/fng/" |
| params = {"limit": "1"} |
| |
| async with httpx.AsyncClient(timeout=10.0) as client: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
| |
| fng_list = data.get("data", []) |
| if isinstance(fng_list, list) and len(fng_list) > 0: |
| fng_data = fng_list[0] |
| sentiment = { |
| "metric": "fear_greed_index", |
| "value": float(fng_data.get("value", 0)), |
| "classification": fng_data.get("value_classification", ""), |
| "source": "Alternative.me", |
| "source_id": "alternative-me-fng", |
| "timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z", |
| "fetched_at": datetime.utcnow().isoformat() + "Z" |
| } |
| logger.info(f"✅ Fear & Greed Index: {fng_data.get('value')} ({fng_data.get('value_classification')})") |
| return [sentiment] |
| except Exception as e: |
| logger.debug(f"Fear & Greed Index error: {e}") |
| |
| return [] |
|
|
|
|
| async def fetch_sentiment_data() -> List[Dict[str, Any]]: |
| """ |
| Fetch sentiment data from multiple sources |
| |
| Sources: |
| - Alternative.me Fear & Greed Index (FREE, no API key) |
| - LunarCrush (requires API key) |
| - Santiment (requires API key) |
| - And other sentiment sources from registry |
| """ |
| sentiment_data = [] |
| |
| |
| try: |
| fng_data = await fetch_fear_greed_index() |
| sentiment_data.extend(fng_data) |
| except Exception as e: |
| logger.debug(f"Error fetching Fear & Greed Index: {e}") |
| |
| sentiment_resources = resource_loader.get_resources_by_category("sentiment") |
| logger.info(f"😊 Fetching sentiment from {len(sentiment_resources)} additional sources...") |
|
|
| for resource in sentiment_resources: |
| try: |
| |
| if resource.auth_type != "none" and not resource.api_key: |
| logger.debug(f"Skipping {resource.name} (no API key)") |
| continue |
|
|
| url = resource.base_url |
| headers = {} |
| params = {} |
|
|
| |
| if resource.auth_type == "apiKeyHeader" and resource.api_key: |
| headers["Authorization"] = f"Bearer {resource.api_key}" |
| elif resource.auth_type == "apiKeyQuery" and resource.api_key: |
| params["api_key"] = resource.api_key |
|
|
| |
| if "alternative.me" in resource.id or "alternative-me" in resource.id: |
| |
| continue |
| elif "lunarcrush" in resource.id: |
| url = f"{resource.base_url}/assets" |
| params.update({"symbol": "BTC,ETH,BNB", "data_points": 1}) |
|
|
| |
| logger.debug(f"Fetching from {resource.name}...") |
| async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: |
| response = await client.get(url, headers=headers, params=params) |
| response.raise_for_status() |
| |
| |
| content_type = response.headers.get("content-type", "") |
| if "application/json" not in content_type and "text/json" not in content_type: |
| logger.debug(f"Non-JSON response from {resource.name}: {content_type}") |
| continue |
| |
| data = response.json() |
|
|
| |
| if "alternative.me" in resource.id or "alternative-me" in resource.id: |
| fng_list = data.get("data", []) |
| if isinstance(fng_list, list) and len(fng_list) > 0: |
| fng_data = fng_list[0] |
| sentiment_data.append({ |
| "metric": "fear_greed_index", |
| "value": float(fng_data.get("value", 0)), |
| "classification": fng_data.get("value_classification", ""), |
| "source": resource.name, |
| "source_id": resource.id, |
| "timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z", |
| "fetched_at": datetime.utcnow().isoformat() + "Z" |
| }) |
| logger.info(f"✅ {resource.name}: FNG = {fng_data.get('value')} ({fng_data.get('value_classification')})") |
|
|
| elif "lunarcrush" in resource.id: |
| assets = data.get("data", []) |
| for asset in assets: |
| sentiment_data.append({ |
| "symbol": asset.get("symbol", ""), |
| "metric": "galaxy_score", |
| "value": float(asset.get("galaxy_score", 0)), |
| "alt_rank": asset.get("alt_rank"), |
| "social_volume": asset.get("social_volume"), |
| "source": resource.name, |
| "source_id": resource.id, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "fetched_at": datetime.utcnow().isoformat() + "Z" |
| }) |
| logger.info(f"✅ {resource.name}: {len(assets)} assets") |
|
|
| except httpx.HTTPError as e: |
| logger.debug(f"HTTP error from {resource.name}: {e}") |
| except Exception as e: |
| logger.debug(f"Error fetching from {resource.name}: {e}") |
|
|
| logger.info(f"😊 Total sentiment data collected: {len(sentiment_data)}") |
| return sentiment_data |
|
|
|
|
| |
| |
| |
|
|
| async def fetch_onchain_data() -> List[Dict[str, Any]]: |
| """ |
| Fetch on-chain analytics from ALL on-chain APIs |
| |
| Sources: |
| - Glassnode |
| - IntoTheBlock |
| - CryptoQuant |
| - And all other on-chain sources (13 total) |
| """ |
| onchain_data = [] |
| onchain_resources = resource_loader.get_resources_by_category("onchain_analytics") |
|
|
| logger.info(f"⛓️ Fetching on-chain data from {len(onchain_resources)} sources...") |
|
|
| for resource in onchain_resources: |
| try: |
| |
| if resource.auth_type != "none" and not resource.api_key: |
| logger.debug(f"Skipping {resource.name} (no API key)") |
| continue |
|
|
| |
| url = resource.base_url |
| headers = {} |
| params = {} |
|
|
| if resource.auth_type == "apiKeyQuery" and resource.api_key: |
| params["api_key"] = resource.api_key |
| elif resource.auth_type == "apiKeyHeader" and resource.api_key: |
| headers["Authorization"] = f"Bearer {resource.api_key}" |
|
|
| |
| logger.debug(f"Attempting {resource.name}...") |
| async with httpx.AsyncClient(timeout=10.0) as client: |
| response = await client.get(url, headers=headers, params=params) |
| response.raise_for_status() |
| data = response.json() |
|
|
| |
| onchain_data.append({ |
| "source": resource.name, |
| "source_id": resource.id, |
| "data": data, |
| "fetched_at": datetime.utcnow().isoformat() + "Z" |
| }) |
| logger.info(f"✅ {resource.name}: Data received") |
|
|
| except httpx.HTTPError as e: |
| logger.debug(f"HTTP error from {resource.name}: {e}") |
| except Exception as e: |
| logger.debug(f"Error from {resource.name}: {e}") |
|
|
| logger.info(f"⛓️ Total on-chain data points: {len(onchain_data)}") |
| return onchain_data |
|
|
|
|
| |
| |
| |
|
|
| async def fetch_whale_data() -> List[Dict[str, Any]]: |
| """ |
| Fetch whale transactions from ALL whale tracking APIs |
| |
| Sources: |
| - Whale Alert |
| - Whale Watcher |
| - And all other whale tracking sources (9 total) |
| """ |
| whale_data = [] |
| whale_resources = resource_loader.get_resources_by_category("whale_tracking") |
|
|
| logger.info(f"🐋 Fetching whale data from {len(whale_resources)} sources...") |
|
|
| for resource in whale_resources: |
| try: |
| if resource.auth_type != "none" and not resource.api_key: |
| logger.debug(f"Skipping {resource.name} (no API key)") |
| continue |
|
|
| url = resource.base_url |
| headers = {} |
| params = {} |
|
|
| if resource.auth_type == "apiKeyQuery" and resource.api_key: |
| params["api_key"] = resource.api_key |
| elif resource.auth_type == "apiKeyHeader" and resource.api_key: |
| headers["X-API-Key"] = resource.api_key |
|
|
| |
| if "whale-alert" in resource.id and resource.endpoints: |
| url = f"{resource.base_url}/transactions" |
| params["min_value"] = 500000 |
|
|
| logger.debug(f"Fetching from {resource.name}...") |
| async with httpx.AsyncClient(timeout=10.0) as client: |
| response = await client.get(url, headers=headers, params=params) |
| response.raise_for_status() |
| data = response.json() |
|
|
| transactions = data.get("transactions", []) if isinstance(data, dict) else data |
|
|
| for tx in transactions[:20]: |
| whale_data.append({ |
| "source": resource.name, |
| "source_id": resource.id, |
| "transaction": tx, |
| "fetched_at": datetime.utcnow().isoformat() + "Z" |
| }) |
|
|
| logger.info(f"✅ {resource.name}: {len(transactions[:20])} transactions") |
|
|
| except httpx.HTTPError as e: |
| logger.debug(f"HTTP error from {resource.name}: {e}") |
| except Exception as e: |
| logger.debug(f"Error from {resource.name}: {e}") |
|
|
| logger.info(f"🐋 Total whale transactions: {len(whale_data)}") |
| return whale_data |
|
|
|
|
| |
| |
| |
|
|
| async def fetch_block_explorer_data() -> List[Dict[str, Any]]: |
| """ |
| Fetch blockchain data from ALL block explorers |
| |
| Sources: |
| - Etherscan |
| - BscScan |
| - Polygonscan |
| - And all other block explorers (18 total) |
| """ |
| explorer_data = [] |
| explorer_resources = resource_loader.get_resources_by_category("block_explorers") |
|
|
| logger.info(f"🔍 Fetching from {len(explorer_resources)} block explorers...") |
|
|
| for resource in explorer_resources: |
| try: |
| if resource.auth_type != "none" and not resource.api_key: |
| logger.debug(f"Skipping {resource.name} (no API key)") |
| continue |
|
|
| url = f"{resource.base_url}/api" |
| params = { |
| "module": "stats", |
| "action": "ethprice", |
| } |
|
|
| if resource.api_key: |
| params["apikey"] = resource.api_key |
|
|
| logger.debug(f"Fetching from {resource.name}...") |
| async with httpx.AsyncClient(timeout=10.0) as client: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
|
|
| if data.get("status") == "1": |
| result = data.get("result", {}) |
| explorer_data.append({ |
| "chain": resource.chain if hasattr(resource, 'chain') else "unknown", |
| "source": resource.name, |
| "source_id": resource.id, |
| "price_usd": result.get("ethusd"), |
| "price_btc": result.get("ethbtc"), |
| "fetched_at": datetime.utcnow().isoformat() + "Z" |
| }) |
| logger.info(f"✅ {resource.name}: Price data received") |
|
|
| except httpx.HTTPError as e: |
| logger.debug(f"HTTP error from {resource.name}: {e}") |
| except Exception as e: |
| logger.debug(f"Error from {resource.name}: {e}") |
|
|
| logger.info(f"🔍 Total block explorer data: {len(explorer_data)}") |
| return explorer_data |
|
|
|
|
| |
| |
| |
|
|
| async def save_and_upload_news(news_data: List[Dict[str, Any]]) -> bool: |
| """Save news data and upload to HuggingFace""" |
| if not news_data: |
| return False |
|
|
| logger.info(f"💾 Saving {len(news_data)} news articles...") |
|
|
| |
| if HF_UPLOAD_ENABLED and hf_uploader: |
| try: |
| logger.info(f"📤 Uploading {len(news_data)} news articles to HuggingFace...") |
| success = await hf_uploader.upload_news_data(news_data, append=True) |
|
|
| if success: |
| logger.info(f"✅ Successfully uploaded news to HuggingFace") |
| return True |
| else: |
| logger.warning(f"⚠️ Failed to upload news to HuggingFace") |
| return False |
|
|
| except Exception as e: |
| logger.error(f"Error uploading news to HuggingFace: {e}") |
| return False |
|
|
| return True |
|
|
|
|
| async def save_and_upload_sentiment(sentiment_data: List[Dict[str, Any]]) -> bool: |
| """Save sentiment data and upload to HuggingFace""" |
| if not sentiment_data: |
| return False |
|
|
| logger.info(f"💾 Saving {len(sentiment_data)} sentiment records...") |
|
|
| if HF_UPLOAD_ENABLED and hf_uploader: |
| try: |
| logger.info(f"📤 Uploading {len(sentiment_data)} sentiment records to HuggingFace...") |
| success = await hf_uploader.upload_sentiment_data(sentiment_data, append=True) |
|
|
| if success: |
| logger.info(f"✅ Successfully uploaded sentiment to HuggingFace") |
| return True |
| else: |
| logger.warning(f"⚠️ Failed to upload sentiment to HuggingFace") |
| return False |
|
|
| except Exception as e: |
| logger.error(f"Error uploading sentiment: {e}") |
| return False |
|
|
| return True |
|
|
|
|
| async def save_and_upload_onchain(onchain_data: List[Dict[str, Any]]) -> bool: |
| """Save on-chain data and upload to HuggingFace""" |
| if not onchain_data: |
| return False |
|
|
| logger.info(f"💾 Saving {len(onchain_data)} on-chain records...") |
|
|
| if HF_UPLOAD_ENABLED and hf_uploader: |
| try: |
| logger.info(f"📤 Uploading {len(onchain_data)} on-chain records to HuggingFace...") |
| success = await hf_uploader.upload_onchain_data(onchain_data, append=True) |
|
|
| if success: |
| logger.info(f"✅ Successfully uploaded on-chain data to HuggingFace") |
| return True |
| else: |
| logger.warning(f"⚠️ Failed to upload on-chain data to HuggingFace") |
| return False |
|
|
| except Exception as e: |
| logger.error(f"Error uploading on-chain data: {e}") |
| return False |
|
|
| return True |
|
|
|
|
| async def save_and_upload_whale(whale_data: List[Dict[str, Any]]) -> bool: |
| """Save whale data and upload to HuggingFace""" |
| if not whale_data: |
| return False |
|
|
| logger.info(f"💾 Saving {len(whale_data)} whale records...") |
|
|
| if HF_UPLOAD_ENABLED and hf_uploader: |
| try: |
| logger.info(f"📤 Uploading {len(whale_data)} whale records to HuggingFace...") |
| success = await hf_uploader.upload_whale_data(whale_data, append=True) |
|
|
| if success: |
| logger.info(f"✅ Successfully uploaded whale data to HuggingFace") |
| return True |
| else: |
| logger.warning(f"⚠️ Failed to upload whale data to HuggingFace") |
| return False |
|
|
| except Exception as e: |
| logger.error(f"Error uploading whale data: {e}") |
| return False |
|
|
| return True |
|
|
|
|
| async def save_and_upload_explorer(explorer_data: List[Dict[str, Any]]) -> bool: |
| """Save explorer data and upload to HuggingFace""" |
| if not explorer_data: |
| return False |
|
|
| logger.info(f"💾 Saving {len(explorer_data)} explorer records...") |
|
|
| if HF_UPLOAD_ENABLED and hf_uploader: |
| try: |
| logger.info(f"📤 Uploading {len(explorer_data)} explorer records to HuggingFace...") |
| success = await hf_uploader.upload_explorer_data(explorer_data, append=True) |
|
|
| if success: |
| logger.info(f"✅ Successfully uploaded explorer data to HuggingFace") |
| return True |
| else: |
| logger.warning(f"⚠️ Failed to upload explorer data to HuggingFace") |
| return False |
|
|
| except Exception as e: |
| logger.error(f"Error uploading explorer data: {e}") |
| return False |
|
|
| return True |
|
|
|
|
| |
| |
| |
|
|
| async def comprehensive_worker_loop(): |
| """ |
| Main worker loop - Fetch ALL data from ALL sources |
| |
| Runs every 5 minutes to avoid rate limits |
| """ |
| logger.info("🚀 Starting comprehensive data worker") |
| logger.info(f"📊 Resource statistics: {resource_loader.get_stats()}") |
|
|
| iteration = 0 |
|
|
| while True: |
| try: |
| iteration += 1 |
| start_time = time.time() |
|
|
| logger.info(f"\n{'='*80}") |
| logger.info(f"[Iteration {iteration}] Starting comprehensive data collection") |
| logger.info(f"{'='*80}") |
|
|
| |
| results = await asyncio.gather( |
| fetch_news_data(), |
| fetch_sentiment_data(), |
| fetch_onchain_data(), |
| fetch_whale_data(), |
| fetch_block_explorer_data(), |
| return_exceptions=True |
| ) |
|
|
| news_data, sentiment_data, onchain_data, whale_data, explorer_data = results |
|
|
| |
| await asyncio.gather( |
| save_and_upload_news(news_data if not isinstance(news_data, Exception) else []), |
| save_and_upload_sentiment(sentiment_data if not isinstance(sentiment_data, Exception) else []), |
| save_and_upload_onchain(onchain_data if not isinstance(onchain_data, Exception) else []), |
| save_and_upload_whale(whale_data if not isinstance(whale_data, Exception) else []), |
| save_and_upload_explorer(explorer_data if not isinstance(explorer_data, Exception) else []), |
| return_exceptions=True |
| ) |
|
|
| elapsed = time.time() - start_time |
| total_records = sum([ |
| len(news_data) if not isinstance(news_data, Exception) else 0, |
| len(sentiment_data) if not isinstance(sentiment_data, Exception) else 0, |
| len(onchain_data) if not isinstance(onchain_data, Exception) else 0, |
| len(whale_data) if not isinstance(whale_data, Exception) else 0, |
| len(explorer_data) if not isinstance(explorer_data, Exception) else 0, |
| ]) |
|
|
| logger.info(f"\n{'='*80}") |
| logger.info(f"[Iteration {iteration}] Completed in {elapsed:.2f}s") |
| logger.info(f"Total records collected: {total_records}") |
| logger.info(f"{'='*80}\n") |
|
|
| |
| await asyncio.sleep(300) |
|
|
| except Exception as e: |
| logger.error(f"[Iteration {iteration}] Worker error: {e}", exc_info=True) |
| await asyncio.sleep(300) |
|
|
|
|
| async def start_comprehensive_worker(): |
| """Start comprehensive data worker""" |
| try: |
| logger.info("Initializing comprehensive data worker...") |
|
|
| |
| logger.info("Running initial data fetch...") |
| asyncio.create_task(comprehensive_worker_loop()) |
| logger.info("Comprehensive data worker started successfully") |
|
|
| except Exception as e: |
| logger.error(f"Failed to start comprehensive worker: {e}", exc_info=True) |
|
|
|
|
| |
| if __name__ == "__main__": |
| async def test(): |
| """Test the worker""" |
| logger.info("Testing comprehensive data worker...") |
|
|
| |
| news = await fetch_news_data() |
| logger.info(f"\n✅ News: {len(news)} articles") |
|
|
| sentiment = await fetch_sentiment_data() |
| logger.info(f"✅ Sentiment: {len(sentiment)} records") |
|
|
| onchain = await fetch_onchain_data() |
| logger.info(f"✅ On-chain: {len(onchain)} records") |
|
|
| whale = await fetch_whale_data() |
| logger.info(f"✅ Whale: {len(whale)} transactions") |
|
|
| explorer = await fetch_block_explorer_data() |
| logger.info(f"✅ Explorer: {len(explorer)} records") |
|
|
| asyncio.run(test()) |
|
|