| |
| """ |
| Data Collection Module for Crypto Data Aggregator |
| Collects price data, news, and sentiment from various sources |
| """ |
|
|
| import requests |
| import aiohttp |
| import asyncio |
| import json |
| import logging |
| import time |
| import threading |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Optional, Any, Tuple |
| import re |
|
|
| |
| try: |
| import feedparser |
| FEEDPARSER_AVAILABLE = True |
| except ImportError: |
| FEEDPARSER_AVAILABLE = False |
| logging.warning("feedparser not installed. RSS feed parsing will be limited.") |
|
|
| try: |
| from bs4 import BeautifulSoup |
| BS4_AVAILABLE = True |
| except ImportError: |
| BS4_AVAILABLE = False |
| logging.warning("beautifulsoup4 not installed. HTML parsing will be limited.") |
|
|
| |
| import config |
| import database |
|
|
| |
| logging.basicConfig( |
| level=getattr(logging, config.LOG_LEVEL), |
| format=config.LOG_FORMAT, |
| handlers=[ |
| logging.FileHandler(config.LOG_FILE), |
| logging.StreamHandler() |
| ] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| db = database.get_database() |
|
|
| |
| _collection_timers = [] |
| _is_collecting = False |
|
|
|
|
| |
| |
|
|
| def analyze_sentiment(text: str) -> Dict[str, Any]: |
| """ |
| Simple sentiment analysis based on keyword matching |
| Returns sentiment score and label |
| |
| Args: |
| text: Text to analyze |
| |
| Returns: |
| Dict with 'score' and 'label' |
| """ |
| if not text: |
| return {'score': 0.0, 'label': 'neutral'} |
|
|
| text_lower = text.lower() |
|
|
| |
| positive_words = [ |
| 'bullish', 'moon', 'rally', 'surge', 'gain', 'profit', 'up', 'green', |
| 'buy', 'long', 'growth', 'rise', 'pump', 'ATH', 'breakthrough', |
| 'adoption', 'positive', 'optimistic', 'upgrade', 'partnership' |
| ] |
|
|
| |
| negative_words = [ |
| 'bearish', 'crash', 'dump', 'drop', 'loss', 'down', 'red', 'sell', |
| 'short', 'decline', 'fall', 'fear', 'scam', 'hack', 'vulnerability', |
| 'negative', 'pessimistic', 'concern', 'warning', 'risk' |
| ] |
|
|
| |
| positive_count = sum(1 for word in positive_words if word in text_lower) |
| negative_count = sum(1 for word in negative_words if word in text_lower) |
|
|
| |
| total = positive_count + negative_count |
| if total == 0: |
| score = 0.0 |
| label = 'neutral' |
| else: |
| score = (positive_count - negative_count) / total |
|
|
| |
| if score <= -0.6: |
| label = 'very_negative' |
| elif score <= -0.2: |
| label = 'negative' |
| elif score <= 0.2: |
| label = 'neutral' |
| elif score <= 0.6: |
| label = 'positive' |
| else: |
| label = 'very_positive' |
|
|
| return {'score': score, 'label': label} |
|
|
|
|
| def summarize_text(text: str, max_length: int = 150) -> str: |
| """ |
| Simple text summarization - takes first sentences up to max_length |
| |
| Args: |
| text: Text to summarize |
| max_length: Maximum length of summary |
| |
| Returns: |
| Summarized text |
| """ |
| if not text: |
| return "" |
|
|
| |
| text = ' '.join(text.split()) |
|
|
| |
| if len(text) <= max_length: |
| return text |
|
|
| |
| sentences = re.split(r'[.!?]+', text) |
| summary = "" |
|
|
| for sentence in sentences: |
| sentence = sentence.strip() |
| if not sentence: |
| continue |
|
|
| if len(summary) + len(sentence) + 2 <= max_length: |
| summary += sentence + ". " |
| else: |
| break |
|
|
| |
| if not summary: |
| summary = text[:max_length-3] + "..." |
|
|
| return summary.strip() |
|
|
|
|
| |
| try: |
| import ai_models |
| |
| analyze_sentiment = ai_models.analyze_sentiment |
| summarize_text = ai_models.summarize_text |
| logger.info("Using AI models for sentiment analysis and summarization") |
| except ImportError: |
| logger.info("AI models not available, using simple keyword-based analysis") |
|
|
|
|
| |
|
|
| def safe_api_call(url: str, timeout: int = 10, headers: Optional[Dict] = None) -> Optional[Dict]: |
| """ |
| Make HTTP GET request with error handling and retry logic |
| |
| Args: |
| url: URL to fetch |
| timeout: Request timeout in seconds |
| headers: Optional request headers |
| |
| Returns: |
| Response JSON or None on failure |
| """ |
| if headers is None: |
| headers = {'User-Agent': config.USER_AGENT} |
|
|
| for attempt in range(config.MAX_RETRIES): |
| try: |
| logger.debug(f"API call attempt {attempt + 1}/{config.MAX_RETRIES}: {url}") |
| response = requests.get(url, timeout=timeout, headers=headers) |
| response.raise_for_status() |
| return response.json() |
| except requests.exceptions.HTTPError as e: |
| logger.warning(f"HTTP error on attempt {attempt + 1}: {e}") |
| if response.status_code == 429: |
| wait_time = (attempt + 1) * 5 |
| logger.info(f"Rate limited, waiting {wait_time}s...") |
| time.sleep(wait_time) |
| elif response.status_code >= 500: |
| time.sleep(attempt + 1) |
| else: |
| break |
| except requests.exceptions.Timeout: |
| logger.warning(f"Timeout on attempt {attempt + 1}") |
| time.sleep(attempt + 1) |
| except requests.exceptions.RequestException as e: |
| logger.warning(f"Request error on attempt {attempt + 1}: {e}") |
| time.sleep(attempt + 1) |
| except json.JSONDecodeError as e: |
| logger.error(f"JSON decode error: {e}") |
| break |
| except Exception as e: |
| logger.error(f"Unexpected error on attempt {attempt + 1}: {e}") |
| break |
|
|
| logger.error(f"All retry attempts failed for {url}") |
| return None |
|
|
|
|
| def extract_mentioned_coins(text: str) -> List[str]: |
| """ |
| Extract cryptocurrency symbols/names mentioned in text |
| |
| Args: |
| text: Text to search for coin mentions |
| |
| Returns: |
| List of coin symbols mentioned |
| """ |
| if not text: |
| return [] |
|
|
| text_upper = text.upper() |
| mentioned = [] |
|
|
| |
| common_symbols = { |
| 'BTC': 'bitcoin', 'ETH': 'ethereum', 'BNB': 'binancecoin', |
| 'XRP': 'ripple', 'ADA': 'cardano', 'SOL': 'solana', |
| 'DOT': 'polkadot', 'DOGE': 'dogecoin', 'AVAX': 'avalanche-2', |
| 'MATIC': 'polygon', 'LINK': 'chainlink', 'UNI': 'uniswap', |
| 'LTC': 'litecoin', 'ATOM': 'cosmos', 'ALGO': 'algorand' |
| } |
|
|
| |
| for symbol, coin_id in common_symbols.items(): |
| |
| pattern = r'\b' + symbol + r'\b|\$' + symbol + r'\b' |
| if re.search(pattern, text_upper): |
| mentioned.append(symbol) |
|
|
| |
| coin_names = { |
| 'bitcoin': 'BTC', 'ethereum': 'ETH', 'binance': 'BNB', |
| 'ripple': 'XRP', 'cardano': 'ADA', 'solana': 'SOL', |
| 'polkadot': 'DOT', 'dogecoin': 'DOGE' |
| } |
|
|
| text_lower = text.lower() |
| for name, symbol in coin_names.items(): |
| if name in text_lower and symbol not in mentioned: |
| mentioned.append(symbol) |
|
|
| return list(set(mentioned)) |
|
|
|
|
| |
|
|
| def collect_price_data() -> Tuple[bool, int]: |
| """ |
| Fetch price data from CoinGecko API, fallback to CoinCap if needed |
| |
| Returns: |
| Tuple of (success: bool, count: int) |
| """ |
| logger.info("Starting price data collection...") |
|
|
| try: |
| |
| url = f"{config.COINGECKO_BASE_URL}{config.COINGECKO_ENDPOINTS['coins_markets']}" |
| params = { |
| 'vs_currency': 'usd', |
| 'order': 'market_cap_desc', |
| 'per_page': config.TOP_COINS_LIMIT, |
| 'page': 1, |
| 'sparkline': 'false', |
| 'price_change_percentage': '1h,24h,7d' |
| } |
|
|
| |
| param_str = '&'.join([f"{k}={v}" for k, v in params.items()]) |
| full_url = f"{url}?{param_str}" |
|
|
| data = safe_api_call(full_url, timeout=config.REQUEST_TIMEOUT) |
|
|
| if data is None: |
| logger.warning("CoinGecko API failed, trying CoinCap backup...") |
| return collect_price_data_coincap() |
|
|
| |
| prices = [] |
| for item in data: |
| try: |
| price = item.get('current_price', 0) |
|
|
| |
| if not config.MIN_PRICE <= price <= config.MAX_PRICE: |
| logger.warning(f"Invalid price for {item.get('symbol')}: {price}") |
| continue |
|
|
| price_data = { |
| 'symbol': item.get('symbol', '').upper(), |
| 'name': item.get('name', ''), |
| 'price_usd': price, |
| 'volume_24h': item.get('total_volume', 0), |
| 'market_cap': item.get('market_cap', 0), |
| 'percent_change_1h': item.get('price_change_percentage_1h_in_currency'), |
| 'percent_change_24h': item.get('price_change_percentage_24h'), |
| 'percent_change_7d': item.get('price_change_percentage_7d'), |
| 'rank': item.get('market_cap_rank', 999) |
| } |
|
|
| |
| if price_data['market_cap'] and price_data['market_cap'] < config.MIN_MARKET_CAP: |
| continue |
| if price_data['volume_24h'] and price_data['volume_24h'] < config.MIN_VOLUME: |
| continue |
|
|
| prices.append(price_data) |
|
|
| except Exception as e: |
| logger.error(f"Error parsing price data item: {e}") |
| continue |
|
|
| |
| if prices: |
| count = db.save_prices_batch(prices) |
| logger.info(f"Successfully collected and saved {count} price records from CoinGecko") |
| return True, count |
| else: |
| logger.warning("No valid price data to save") |
| return False, 0 |
|
|
| except Exception as e: |
| logger.error(f"Error in collect_price_data: {e}") |
| return False, 0 |
|
|
|
|
| def collect_price_data_coincap() -> Tuple[bool, int]: |
| """ |
| Backup function using CoinCap API |
| |
| Returns: |
| Tuple of (success: bool, count: int) |
| """ |
| logger.info("Starting CoinCap price data collection...") |
|
|
| try: |
| url = f"{config.COINCAP_BASE_URL}{config.COINCAP_ENDPOINTS['assets']}" |
| params = { |
| 'limit': config.TOP_COINS_LIMIT |
| } |
|
|
| param_str = '&'.join([f"{k}={v}" for k, v in params.items()]) |
| full_url = f"{url}?{param_str}" |
|
|
| response = safe_api_call(full_url, timeout=config.REQUEST_TIMEOUT) |
|
|
| if response is None or 'data' not in response: |
| logger.error("CoinCap API failed") |
| return False, 0 |
|
|
| data = response['data'] |
|
|
| |
| prices = [] |
| for idx, item in enumerate(data): |
| try: |
| price = float(item.get('priceUsd', 0)) |
|
|
| |
| if not config.MIN_PRICE <= price <= config.MAX_PRICE: |
| logger.warning(f"Invalid price for {item.get('symbol')}: {price}") |
| continue |
|
|
| price_data = { |
| 'symbol': item.get('symbol', '').upper(), |
| 'name': item.get('name', ''), |
| 'price_usd': price, |
| 'volume_24h': float(item.get('volumeUsd24Hr', 0)) if item.get('volumeUsd24Hr') else None, |
| 'market_cap': float(item.get('marketCapUsd', 0)) if item.get('marketCapUsd') else None, |
| 'percent_change_1h': None, |
| 'percent_change_24h': float(item.get('changePercent24Hr', 0)) if item.get('changePercent24Hr') else None, |
| 'percent_change_7d': None, |
| 'rank': int(item.get('rank', idx + 1)) |
| } |
|
|
| |
| if price_data['market_cap'] and price_data['market_cap'] < config.MIN_MARKET_CAP: |
| continue |
| if price_data['volume_24h'] and price_data['volume_24h'] < config.MIN_VOLUME: |
| continue |
|
|
| prices.append(price_data) |
|
|
| except Exception as e: |
| logger.error(f"Error parsing CoinCap data item: {e}") |
| continue |
|
|
| |
| if prices: |
| count = db.save_prices_batch(prices) |
| logger.info(f"Successfully collected and saved {count} price records from CoinCap") |
| return True, count |
| else: |
| logger.warning("No valid price data to save from CoinCap") |
| return False, 0 |
|
|
| except Exception as e: |
| logger.error(f"Error in collect_price_data_coincap: {e}") |
| return False, 0 |
|
|
|
|
| |
|
|
| def collect_news_data() -> int: |
| """ |
| Parse RSS feeds and Reddit posts, analyze sentiment and save to database |
| |
| Returns: |
| Count of articles collected |
| """ |
| logger.info("Starting news data collection...") |
| articles_collected = 0 |
|
|
| |
| if FEEDPARSER_AVAILABLE: |
| articles_collected += _collect_rss_feeds() |
| else: |
| logger.warning("Feedparser not available, skipping RSS feeds") |
|
|
| |
| articles_collected += _collect_reddit_posts() |
|
|
| logger.info(f"News collection completed. Total articles: {articles_collected}") |
| return articles_collected |
|
|
|
|
| def _collect_rss_feeds() -> int: |
| """Collect articles from RSS feeds""" |
| count = 0 |
|
|
| for source_name, feed_url in config.RSS_FEEDS.items(): |
| try: |
| logger.debug(f"Parsing RSS feed: {source_name}") |
| feed = feedparser.parse(feed_url) |
|
|
| for entry in feed.entries[:20]: |
| try: |
| |
| title = entry.get('title', '') |
| url = entry.get('link', '') |
|
|
| |
| if not url: |
| continue |
|
|
| |
| published_date = None |
| if hasattr(entry, 'published_parsed') and entry.published_parsed: |
| try: |
| published_date = datetime(*entry.published_parsed[:6]).isoformat() |
| except: |
| pass |
|
|
| |
| summary = entry.get('summary', '') or entry.get('description', '') |
| if summary and BS4_AVAILABLE: |
| |
| soup = BeautifulSoup(summary, 'html.parser') |
| summary = soup.get_text() |
|
|
| |
| full_text = f"{title} {summary}" |
|
|
| |
| related_coins = extract_mentioned_coins(full_text) |
|
|
| |
| sentiment_result = analyze_sentiment(full_text) |
|
|
| |
| summary_text = summarize_text(summary or title, max_length=200) |
|
|
| |
| news_data = { |
| 'title': title, |
| 'summary': summary_text, |
| 'url': url, |
| 'source': source_name, |
| 'sentiment_score': sentiment_result['score'], |
| 'sentiment_label': sentiment_result['label'], |
| 'related_coins': related_coins, |
| 'published_date': published_date |
| } |
|
|
| |
| if db.save_news(news_data): |
| count += 1 |
|
|
| except Exception as e: |
| logger.error(f"Error processing RSS entry from {source_name}: {e}") |
| continue |
|
|
| except Exception as e: |
| logger.error(f"Error parsing RSS feed {source_name}: {e}") |
| continue |
|
|
| logger.info(f"Collected {count} articles from RSS feeds") |
| return count |
|
|
|
|
| def _collect_reddit_posts() -> int: |
| """Collect posts from Reddit""" |
| count = 0 |
|
|
| for subreddit_name, endpoint_url in config.REDDIT_ENDPOINTS.items(): |
| try: |
| logger.debug(f"Fetching Reddit posts from r/{subreddit_name}") |
|
|
| |
| if not endpoint_url.endswith('.json'): |
| endpoint_url = endpoint_url.rstrip('/') + '.json' |
|
|
| headers = {'User-Agent': config.USER_AGENT} |
| data = safe_api_call(endpoint_url, headers=headers) |
|
|
| if not data or 'data' not in data or 'children' not in data['data']: |
| logger.warning(f"Invalid response from Reddit: {subreddit_name}") |
| continue |
|
|
| posts = data['data']['children'] |
|
|
| for post_data in posts[:15]: |
| try: |
| post = post_data.get('data', {}) |
|
|
| |
| title = post.get('title', '') |
| url = post.get('url', '') |
| permalink = f"https://reddit.com{post.get('permalink', '')}" |
| selftext = post.get('selftext', '') |
|
|
| |
| if not title: |
| continue |
|
|
| |
| article_url = permalink |
|
|
| |
| created_utc = post.get('created_utc') |
| published_date = None |
| if created_utc: |
| try: |
| published_date = datetime.fromtimestamp(created_utc).isoformat() |
| except: |
| pass |
|
|
| |
| full_text = f"{title} {selftext}" |
|
|
| |
| related_coins = extract_mentioned_coins(full_text) |
|
|
| |
| sentiment_result = analyze_sentiment(full_text) |
|
|
| |
| summary_text = summarize_text(selftext or title, max_length=200) |
|
|
| |
| news_data = { |
| 'title': title, |
| 'summary': summary_text, |
| 'url': article_url, |
| 'source': f"reddit_{subreddit_name}", |
| 'sentiment_score': sentiment_result['score'], |
| 'sentiment_label': sentiment_result['label'], |
| 'related_coins': related_coins, |
| 'published_date': published_date |
| } |
|
|
| |
| if db.save_news(news_data): |
| count += 1 |
|
|
| except Exception as e: |
| logger.error(f"Error processing Reddit post from {subreddit_name}: {e}") |
| continue |
|
|
| except Exception as e: |
| logger.error(f"Error fetching Reddit posts from {subreddit_name}: {e}") |
| continue |
|
|
| logger.info(f"Collected {count} posts from Reddit") |
| return count |
|
|
|
|
| |
|
|
| def collect_sentiment_data() -> Optional[Dict[str, Any]]: |
| """ |
| Fetch Fear & Greed Index from Alternative.me |
| |
| Returns: |
| Sentiment data or None on failure |
| """ |
| logger.info("Starting sentiment data collection...") |
|
|
| try: |
| |
| data = safe_api_call(config.ALTERNATIVE_ME_URL, timeout=config.REQUEST_TIMEOUT) |
|
|
| if data is None or 'data' not in data: |
| logger.error("Failed to fetch Fear & Greed Index") |
| return None |
|
|
| |
| fng_data = data['data'][0] if data['data'] else {} |
|
|
| value = fng_data.get('value') |
| classification = fng_data.get('value_classification', 'Unknown') |
| timestamp = fng_data.get('timestamp') |
|
|
| if value is None: |
| logger.warning("No value in Fear & Greed response") |
| return None |
|
|
| |
| |
| sentiment_score = (int(value) - 50) / 50.0 |
|
|
| |
| if int(value) <= 25: |
| sentiment_label = 'extreme_fear' |
| elif int(value) <= 45: |
| sentiment_label = 'fear' |
| elif int(value) <= 55: |
| sentiment_label = 'neutral' |
| elif int(value) <= 75: |
| sentiment_label = 'greed' |
| else: |
| sentiment_label = 'extreme_greed' |
|
|
| sentiment_data = { |
| 'value': int(value), |
| 'classification': classification, |
| 'sentiment_score': sentiment_score, |
| 'sentiment_label': sentiment_label, |
| 'timestamp': timestamp |
| } |
|
|
| |
| news_data = { |
| 'title': f"Market Sentiment: {classification}", |
| 'summary': f"Fear & Greed Index: {value}/100 - {classification}", |
| 'url': config.ALTERNATIVE_ME_URL, |
| 'source': 'alternative_me', |
| 'sentiment_score': sentiment_score, |
| 'sentiment_label': sentiment_label, |
| 'related_coins': ['BTC', 'ETH'], |
| 'published_date': datetime.now().isoformat() |
| } |
|
|
| db.save_news(news_data) |
|
|
| logger.info(f"Sentiment collected: {classification} ({value}/100)") |
| return sentiment_data |
|
|
| except Exception as e: |
| logger.error(f"Error in collect_sentiment_data: {e}") |
| return None |
|
|
|
|
| |
|
|
| def schedule_data_collection(): |
| """ |
| Schedule periodic data collection using threading.Timer |
| Runs collection tasks in background at configured intervals |
| """ |
| global _is_collecting, _collection_timers |
|
|
| if _is_collecting: |
| logger.warning("Data collection already running") |
| return |
|
|
| _is_collecting = True |
| logger.info("Starting scheduled data collection...") |
|
|
| def run_price_collection(): |
| """Wrapper for price collection with rescheduling""" |
| try: |
| collect_price_data() |
| except Exception as e: |
| logger.error(f"Error in scheduled price collection: {e}") |
| finally: |
| |
| if _is_collecting: |
| timer = threading.Timer( |
| config.COLLECTION_INTERVALS['price_data'], |
| run_price_collection |
| ) |
| timer.daemon = True |
| timer.start() |
| _collection_timers.append(timer) |
|
|
| def run_news_collection(): |
| """Wrapper for news collection with rescheduling""" |
| try: |
| collect_news_data() |
| except Exception as e: |
| logger.error(f"Error in scheduled news collection: {e}") |
| finally: |
| |
| if _is_collecting: |
| timer = threading.Timer( |
| config.COLLECTION_INTERVALS['news_data'], |
| run_news_collection |
| ) |
| timer.daemon = True |
| timer.start() |
| _collection_timers.append(timer) |
|
|
| def run_sentiment_collection(): |
| """Wrapper for sentiment collection with rescheduling""" |
| try: |
| collect_sentiment_data() |
| except Exception as e: |
| logger.error(f"Error in scheduled sentiment collection: {e}") |
| finally: |
| |
| if _is_collecting: |
| timer = threading.Timer( |
| config.COLLECTION_INTERVALS['sentiment_data'], |
| run_sentiment_collection |
| ) |
| timer.daemon = True |
| timer.start() |
| _collection_timers.append(timer) |
|
|
| |
| logger.info("Running initial data collection...") |
|
|
| |
| threading.Thread(target=run_price_collection, daemon=True).start() |
| time.sleep(2) |
| threading.Thread(target=run_news_collection, daemon=True).start() |
| time.sleep(2) |
| threading.Thread(target=run_sentiment_collection, daemon=True).start() |
|
|
| logger.info("Scheduled data collection started successfully") |
| logger.info(f"Price data: every {config.COLLECTION_INTERVALS['price_data']}s") |
| logger.info(f"News data: every {config.COLLECTION_INTERVALS['news_data']}s") |
| logger.info(f"Sentiment data: every {config.COLLECTION_INTERVALS['sentiment_data']}s") |
|
|
|
|
| def stop_scheduled_collection(): |
| """Stop all scheduled collection tasks""" |
| global _is_collecting, _collection_timers |
|
|
| logger.info("Stopping scheduled data collection...") |
| _is_collecting = False |
|
|
| |
| for timer in _collection_timers: |
| try: |
| timer.cancel() |
| except: |
| pass |
|
|
| _collection_timers.clear() |
| logger.info("Scheduled data collection stopped") |
|
|
|
|
| |
|
|
| async def collect_price_data_async() -> Tuple[bool, int]: |
| """ |
| Async version of price data collection using aiohttp |
| |
| Returns: |
| Tuple of (success: bool, count: int) |
| """ |
| logger.info("Starting async price data collection...") |
|
|
| try: |
| url = f"{config.COINGECKO_BASE_URL}{config.COINGECKO_ENDPOINTS['coins_markets']}" |
| params = { |
| 'vs_currency': 'usd', |
| 'order': 'market_cap_desc', |
| 'per_page': config.TOP_COINS_LIMIT, |
| 'page': 1, |
| 'sparkline': 'false', |
| 'price_change_percentage': '1h,24h,7d' |
| } |
|
|
| async with aiohttp.ClientSession() as session: |
| async with session.get(url, params=params, timeout=config.REQUEST_TIMEOUT) as response: |
| if response.status != 200: |
| logger.error(f"API returned status {response.status}") |
| return False, 0 |
|
|
| data = await response.json() |
|
|
| |
| prices = [] |
| for item in data: |
| try: |
| price = item.get('current_price', 0) |
|
|
| if not config.MIN_PRICE <= price <= config.MAX_PRICE: |
| continue |
|
|
| price_data = { |
| 'symbol': item.get('symbol', '').upper(), |
| 'name': item.get('name', ''), |
| 'price_usd': price, |
| 'volume_24h': item.get('total_volume', 0), |
| 'market_cap': item.get('market_cap', 0), |
| 'percent_change_1h': item.get('price_change_percentage_1h_in_currency'), |
| 'percent_change_24h': item.get('price_change_percentage_24h'), |
| 'percent_change_7d': item.get('price_change_percentage_7d'), |
| 'rank': item.get('market_cap_rank', 999) |
| } |
|
|
| if price_data['market_cap'] and price_data['market_cap'] < config.MIN_MARKET_CAP: |
| continue |
| if price_data['volume_24h'] and price_data['volume_24h'] < config.MIN_VOLUME: |
| continue |
|
|
| prices.append(price_data) |
|
|
| except Exception as e: |
| logger.error(f"Error parsing price data item: {e}") |
| continue |
|
|
| |
| if prices: |
| count = db.save_prices_batch(prices) |
| logger.info(f"Async collected and saved {count} price records") |
| return True, count |
| else: |
| return False, 0 |
|
|
| except Exception as e: |
| logger.error(f"Error in collect_price_data_async: {e}") |
| return False, 0 |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| logger.info("=" * 60) |
| logger.info("Crypto Data Collector - Manual Test Run") |
| logger.info("=" * 60) |
|
|
| |
| logger.info("\n--- Testing Price Collection ---") |
| success, count = collect_price_data() |
| print(f"Price collection: {'SUCCESS' if success else 'FAILED'} - {count} records") |
|
|
| |
| logger.info("\n--- Testing News Collection ---") |
| news_count = collect_news_data() |
| print(f"News collection: {news_count} articles collected") |
|
|
| |
| logger.info("\n--- Testing Sentiment Collection ---") |
| sentiment = collect_sentiment_data() |
| if sentiment: |
| print(f"Sentiment: {sentiment['classification']} ({sentiment['value']}/100)") |
| else: |
| print("Sentiment collection: FAILED") |
|
|
| logger.info("\n" + "=" * 60) |
| logger.info("Manual test run completed") |
| logger.info("=" * 60) |
|
|