| """ |
| Extended Sentiment Collectors |
| Fetches sentiment data from LunarCrush, Santiment, and other sentiment APIs |
| """ |
|
|
| import asyncio |
| from datetime import datetime, timezone |
| from typing import Dict, List, Optional, Any |
| from utils.api_client import get_client |
| from utils.logger import setup_logger, log_api_request, log_error |
|
|
| logger = setup_logger("sentiment_extended_collector") |
|
|
|
|
| async def get_lunarcrush_global() -> Dict[str, Any]: |
| """ |
| Fetch global market sentiment from LunarCrush |
| |
| Note: LunarCrush API v3 requires API key |
| Free tier available with limited requests |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "LunarCrush" |
| category = "sentiment" |
| endpoint = "/public/metrics/global" |
|
|
| logger.info(f"Fetching global sentiment from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = "https://lunarcrush.com/api3/public/metrics/global" |
|
|
| |
| response = await client.get(url, timeout=10) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| |
| logger.warning(f"{provider} - API requires authentication, returning placeholder") |
| return { |
| "provider": provider, |
| "category": category, |
| "data": { |
| "status": "placeholder", |
| "message": "LunarCrush API requires authentication", |
| "planned_features": [ |
| "Social media sentiment tracking", |
| "Galaxy Score (social activity metric)", |
| "AltRank (relative social dominance)", |
| "Influencer tracking", |
| "Social volume and engagement metrics" |
| ] |
| }, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "is_placeholder": True |
| } |
|
|
| |
| data = response["data"] |
|
|
| sentiment_data = None |
| if isinstance(data, dict): |
| sentiment_data = { |
| "social_volume": data.get("social_volume"), |
| "social_score": data.get("social_score"), |
| "market_sentiment": data.get("sentiment"), |
| "timestamp": data.get("timestamp") |
| } |
|
|
| logger.info(f"{provider} - {endpoint} - Retrieved sentiment data") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": sentiment_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": { |
| "status": "placeholder", |
| "message": f"LunarCrush integration error: {str(e)}" |
| }, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "is_placeholder": True |
| } |
|
|
|
|
| async def get_santiment_metrics() -> Dict[str, Any]: |
| """ |
| Fetch sentiment metrics from Santiment |
| |
| Note: Santiment API requires authentication |
| Provides on-chain, social, and development activity metrics |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "Santiment" |
| category = "sentiment" |
| endpoint = "/graphql" |
|
|
| logger.info(f"Fetching sentiment metrics from {provider} (placeholder)") |
|
|
| try: |
| |
| |
|
|
| placeholder_data = { |
| "status": "placeholder", |
| "message": "Santiment API requires authentication and GraphQL queries", |
| "planned_metrics": [ |
| "Social volume and trends", |
| "Development activity", |
| "Network growth", |
| "Exchange flow", |
| "MVRV ratio", |
| "Daily active addresses", |
| "Token age consumed", |
| "Crowd sentiment" |
| ], |
| "note": "Requires Santiment API key and SAN tokens for full access" |
| } |
|
|
| logger.info(f"{provider} - {endpoint} - Placeholder data returned") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": placeholder_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "is_placeholder": True |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_cryptoquant_sentiment() -> Dict[str, Any]: |
| """ |
| Fetch on-chain sentiment from CryptoQuant |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "CryptoQuant" |
| category = "sentiment" |
| endpoint = "/sentiment" |
|
|
| logger.info(f"Fetching sentiment from {provider} (placeholder)") |
|
|
| try: |
| |
| |
|
|
| placeholder_data = { |
| "status": "placeholder", |
| "message": "CryptoQuant API requires authentication", |
| "planned_metrics": [ |
| "Exchange reserves", |
| "Miner flows", |
| "Whale transactions", |
| "Stablecoin supply ratio", |
| "Funding rates", |
| "Open interest" |
| ] |
| } |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": placeholder_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "is_placeholder": True |
| } |
|
|
| except Exception as e: |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": str(e), |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_augmento_signals() -> Dict[str, Any]: |
| """ |
| Fetch market sentiment signals from Augmento.ai |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "Augmento" |
| category = "sentiment" |
| endpoint = "/signals" |
|
|
| logger.info(f"Fetching sentiment signals from {provider} (placeholder)") |
|
|
| try: |
| |
| |
|
|
| placeholder_data = { |
| "status": "placeholder", |
| "message": "Augmento API requires authentication", |
| "planned_features": [ |
| "AI-powered sentiment signals", |
| "Topic extraction from social media", |
| "Emerging trend detection", |
| "Sentiment momentum indicators" |
| ] |
| } |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": placeholder_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "is_placeholder": True |
| } |
|
|
| except Exception as e: |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": str(e), |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_thetie_sentiment() -> Dict[str, Any]: |
| """ |
| Fetch sentiment data from TheTie.io |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "TheTie" |
| category = "sentiment" |
| endpoint = "/sentiment" |
|
|
| logger.info(f"Fetching sentiment from {provider} (placeholder)") |
|
|
| try: |
| |
| |
|
|
| placeholder_data = { |
| "status": "placeholder", |
| "message": "TheTie API requires authentication", |
| "planned_metrics": [ |
| "Twitter sentiment scores", |
| "Social media momentum", |
| "Influencer tracking", |
| "Sentiment trends over time" |
| ] |
| } |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": placeholder_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "is_placeholder": True |
| } |
|
|
| except Exception as e: |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": str(e), |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_coinmarketcal_events() -> Dict[str, Any]: |
| """ |
| Fetch upcoming crypto events from CoinMarketCal (free API) |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "CoinMarketCal" |
| category = "sentiment" |
| endpoint = "/events" |
|
|
| logger.info(f"Fetching events from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = "https://developers.coinmarketcal.com/v1/events" |
|
|
| params = { |
| "page": 1, |
| "max": 20, |
| "showOnly": "hot_events" |
| } |
|
|
| |
| response = await client.get(url, params=params, timeout=10) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| |
| logger.warning(f"{provider} - API may require authentication, returning placeholder") |
| return { |
| "provider": provider, |
| "category": category, |
| "data": { |
| "status": "placeholder", |
| "message": "CoinMarketCal API may require authentication", |
| "planned_features": [ |
| "Upcoming crypto events calendar", |
| "Project updates and announcements", |
| "Conferences and meetups", |
| "Hard forks and mainnet launches" |
| ] |
| }, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "is_placeholder": True |
| } |
|
|
| |
| data = response["data"] |
|
|
| events_data = None |
| if isinstance(data, dict) and "body" in data: |
| events = data["body"] |
|
|
| events_data = { |
| "total_events": len(events) if isinstance(events, list) else 0, |
| "upcoming_events": [ |
| { |
| "title": event.get("title", {}).get("en"), |
| "coins": [coin.get("symbol") for coin in event.get("coins", [])], |
| "date": event.get("date_event"), |
| "proof": event.get("proof"), |
| "source": event.get("source") |
| } |
| for event in (events[:10] if isinstance(events, list) else []) |
| ] |
| } |
|
|
| logger.info(f"{provider} - {endpoint} - Retrieved {events_data.get('total_events', 0)} events") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": events_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": { |
| "status": "placeholder", |
| "message": f"CoinMarketCal integration error: {str(e)}" |
| }, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "is_placeholder": True |
| } |
|
|
|
|
| async def collect_extended_sentiment_data() -> List[Dict[str, Any]]: |
| """ |
| Main function to collect extended sentiment data from all sources |
| |
| Returns: |
| List of results from all sentiment collectors |
| """ |
| logger.info("Starting extended sentiment data collection from all sources") |
|
|
| |
| results = await asyncio.gather( |
| get_lunarcrush_global(), |
| get_santiment_metrics(), |
| get_cryptoquant_sentiment(), |
| get_augmento_signals(), |
| get_thetie_sentiment(), |
| get_coinmarketcal_events(), |
| return_exceptions=True |
| ) |
|
|
| |
| processed_results = [] |
| for result in results: |
| if isinstance(result, Exception): |
| logger.error(f"Collector failed with exception: {str(result)}") |
| processed_results.append({ |
| "provider": "Unknown", |
| "category": "sentiment", |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": str(result), |
| "error_type": "exception" |
| }) |
| else: |
| processed_results.append(result) |
|
|
| |
| successful = sum(1 for r in processed_results if r.get("success", False)) |
| placeholder_count = sum(1 for r in processed_results if r.get("is_placeholder", False)) |
|
|
| logger.info( |
| f"Extended sentiment collection complete: {successful}/{len(processed_results)} successful " |
| f"({placeholder_count} placeholders)" |
| ) |
|
|
| return processed_results |
|
|
|
|
| |
| if __name__ == "__main__": |
| async def main(): |
| results = await collect_extended_sentiment_data() |
|
|
| print("\n=== Extended Sentiment Data Collection Results ===") |
| for result in results: |
| print(f"\nProvider: {result['provider']}") |
| print(f"Success: {result['success']}") |
| print(f"Is Placeholder: {result.get('is_placeholder', False)}") |
|
|
| if result['success']: |
| data = result.get('data', {}) |
| if isinstance(data, dict): |
| if data.get('status') == 'placeholder': |
| print(f"Status: {data.get('message', 'N/A')}") |
| else: |
| print(f"Data keys: {list(data.keys())}") |
| else: |
| print(f"Error: {result.get('error', 'Unknown')}") |
|
|
| asyncio.run(main()) |
|
|