Really-amin's picture
Upload 143 files
5cd2b89 verified
# Cryptocurrency Data Collectors
Comprehensive data collection modules for cryptocurrency APIs, blockchain explorers, news sources, sentiment indicators, and on-chain analytics.
## Overview
This package provides production-ready collectors for gathering cryptocurrency data from various sources. Each collector is designed with robust error handling, logging, staleness tracking, and standardized output formats.
## Modules
### 1. Market Data (`market_data.py`)
Collects cryptocurrency market data from multiple providers.
**Providers:**
- **CoinGecko** - Free API for BTC, ETH, BNB prices with market cap and volume
- **CoinMarketCap** - Professional market data with API key
- **Binance** - Real-time ticker data from Binance exchange
**Functions:**
```python
from collectors.market_data import (
get_coingecko_simple_price,
get_coinmarketcap_quotes,
get_binance_ticker,
collect_market_data # Collects from all sources
)
# Collect from all market data sources
results = await collect_market_data()
```
**Features:**
- Concurrent data collection
- Price tracking with volume and market cap
- 24-hour change percentages
- Timestamp extraction for staleness calculation
### 2. Blockchain Explorers (`explorers.py`)
Collects data from blockchain explorers and network statistics.
**Providers:**
- **Etherscan** - Ethereum gas prices and network stats
- **BscScan** - BNB prices and BSC network data
- **TronScan** - TRON network statistics
**Functions:**
```python
from collectors.explorers import (
get_etherscan_gas_price,
get_bscscan_bnb_price,
get_tronscan_stats,
collect_explorer_data # Collects from all sources
)
# Collect from all explorers
results = await collect_explorer_data()
```
**Features:**
- Real-time gas price tracking
- Network health monitoring
- API key management
- Rate limit handling
### 3. News Aggregation (`news.py`)
Collects cryptocurrency news from multiple sources.
**Providers:**
- **CryptoPanic** - Cryptocurrency news aggregator with sentiment
- **NewsAPI** - General news with crypto filtering
**Functions:**
```python
from collectors.news import (
get_cryptopanic_posts,
get_newsapi_headlines,
collect_news_data # Collects from all sources
)
# Collect from all news sources
results = await collect_news_data()
```
**Features:**
- News post aggregation
- Article timestamps for freshness tracking
- Article count reporting
- Content filtering
### 4. Sentiment Analysis (`sentiment.py`)
Collects cryptocurrency market sentiment data.
**Providers:**
- **Alternative.me** - Fear & Greed Index (0-100 scale)
**Functions:**
```python
from collectors.sentiment import (
get_fear_greed_index,
collect_sentiment_data # Collects from all sources
)
# Collect sentiment data
results = await collect_sentiment_data()
```
**Features:**
- Market sentiment indicator (Fear/Greed)
- Historical sentiment tracking
- Classification (Extreme Fear, Fear, Neutral, Greed, Extreme Greed)
### 5. On-Chain Analytics (`onchain.py`)
Placeholder implementations for on-chain data sources.
**Providers (Placeholder):**
- **The Graph** - GraphQL-based blockchain data
- **Blockchair** - Blockchain explorer and statistics
- **Glassnode** - Advanced on-chain metrics
**Functions:**
```python
from collectors.onchain import (
get_the_graph_data,
get_blockchair_data,
get_glassnode_metrics,
collect_onchain_data # Collects from all sources
)
# Collect on-chain data (placeholder)
results = await collect_onchain_data()
```
**Planned Features:**
- DEX volume and liquidity tracking
- Token holder analytics
- NUPL, SOPR, and other on-chain metrics
- Exchange flow monitoring
- Whale transaction tracking
## Standard Output Format
All collectors return a standardized dictionary format:
```python
{
"provider": str, # Provider name (e.g., "CoinGecko")
"category": str, # Category (e.g., "market_data")
"data": dict/list/None, # Raw API response data
"timestamp": str, # Collection timestamp (ISO format)
"data_timestamp": str/None, # Data timestamp from API (ISO format)
"staleness_minutes": float/None, # Age of data in minutes
"success": bool, # Whether collection succeeded
"error": str/None, # Error message if failed
"error_type": str/None, # Error classification
"response_time_ms": float # API response time
}
```
## Common Features
All collectors implement:
1. **Error Handling**
- Graceful failure with detailed error messages
- Exception catching and logging
- API-specific error parsing
2. **Logging**
- Structured JSON logging
- Request/response logging
- Error logging with context
3. **Staleness Tracking**
- Extracts timestamps from API responses
- Calculates data age in minutes
- Handles missing timestamps
4. **Rate Limiting**
- Respects provider rate limits
- Exponential backoff on failures
- Rate limit error detection
5. **Retry Logic**
- Automatic retries on failure
- Configurable retry attempts
- Timeout handling
6. **API Key Management**
- Loads keys from config
- Handles missing keys gracefully
- API key masking in logs
## Usage Examples
### Basic Usage
```python
import asyncio
from collectors import collect_market_data
async def main():
results = await collect_market_data()
for result in results:
if result['success']:
print(f"{result['provider']}: Success")
print(f" Staleness: {result['staleness_minutes']:.2f}m")
else:
print(f"{result['provider']}: Failed - {result['error']}")
asyncio.run(main())
```
### Collecting All Data
```python
import asyncio
from collectors import (
collect_market_data,
collect_explorer_data,
collect_news_data,
collect_sentiment_data,
collect_onchain_data
)
async def collect_all():
results = await asyncio.gather(
collect_market_data(),
collect_explorer_data(),
collect_news_data(),
collect_sentiment_data(),
collect_onchain_data()
)
market, explorers, news, sentiment, onchain = results
return {
"market_data": market,
"explorers": explorers,
"news": news,
"sentiment": sentiment,
"onchain": onchain
}
all_data = asyncio.run(collect_all())
```
### Individual Collector Usage
```python
import asyncio
from collectors.market_data import get_coingecko_simple_price
async def get_prices():
result = await get_coingecko_simple_price()
if result['success']:
data = result['data']
print(f"Bitcoin: ${data['bitcoin']['usd']}")
print(f"Ethereum: ${data['ethereum']['usd']}")
print(f"BNB: ${data['binancecoin']['usd']}")
asyncio.run(get_prices())
```
## Demo Script
Run the comprehensive demo to test all collectors:
```bash
python collectors/demo_collectors.py
```
This will:
- Execute all collectors concurrently
- Display detailed results for each category
- Show overall statistics
- Save results to a JSON file
## Configuration
Collectors use the central configuration system from `config.py`:
```python
from config import config
# Get provider configuration
provider = config.get_provider('CoinGecko')
# Get API key
api_key = config.get_api_key('coinmarketcap')
# Get providers by category
market_providers = config.get_providers_by_category('market_data')
```
## API Keys
API keys are loaded from environment variables:
```bash
# Market Data
export COINMARKETCAP_KEY_1="your_key_here"
export COINMARKETCAP_KEY_2="backup_key"
# Blockchain Explorers
export ETHERSCAN_KEY_1="your_key_here"
export ETHERSCAN_KEY_2="backup_key"
export BSCSCAN_KEY="your_key_here"
export TRONSCAN_KEY="your_key_here"
# News
export NEWSAPI_KEY="your_key_here"
# Analytics
export CRYPTOCOMPARE_KEY="your_key_here"
```
Or use `.env` file with `python-dotenv`:
```env
COINMARKETCAP_KEY_1=your_key_here
ETHERSCAN_KEY_1=your_key_here
BSCSCAN_KEY=your_key_here
NEWSAPI_KEY=your_key_here
```
## Dependencies
- `aiohttp` - Async HTTP client
- `asyncio` - Async programming
- `datetime` - Timestamp handling
- `utils.api_client` - Robust API client with retry logic
- `utils.logger` - Structured JSON logging
- `config` - Centralized configuration
## Error Handling
Collectors handle various error types:
- **config_error** - Provider not configured
- **missing_api_key** - API key required but not available
- **authentication** - API key invalid or expired
- **rate_limit** - Rate limit exceeded
- **timeout** - Request timeout
- **server_error** - API server error (5xx)
- **network_error** - Network connectivity issue
- **api_error** - API-specific error
- **exception** - Unexpected Python exception
## Extending Collectors
To add a new collector:
1. Create a new module or add to existing category
2. Implement collector function following the standard pattern
3. Use `get_client()` for API requests
4. Extract and calculate staleness from timestamps
5. Return standardized output format
6. Add to `__init__.py` exports
7. Update this README
Example:
```python
async def get_new_provider_data() -> Dict[str, Any]:
"""Fetch data from new provider"""
provider = "NewProvider"
category = "market_data"
endpoint = "/api/v1/data"
logger.info(f"Fetching data from {provider}")
try:
client = get_client()
provider_config = config.get_provider(provider)
# Make request
url = f"{provider_config.endpoint_url}{endpoint}"
response = await client.get(url)
# Log request
log_api_request(
logger, provider, endpoint,
response.get("response_time_ms", 0),
"success" if response["success"] else "error",
response.get("status_code")
)
if not response["success"]:
# Handle error
return {
"provider": provider,
"category": category,
"success": False,
"error": response.get("error_message")
}
# Parse data and timestamps
data = response["data"]
data_timestamp = # extract from response
staleness = calculate_staleness_minutes(data_timestamp)
return {
"provider": provider,
"category": category,
"data": data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data_timestamp": data_timestamp.isoformat(),
"staleness_minutes": staleness,
"success": True,
"error": None,
"response_time_ms": response.get("response_time_ms", 0)
}
except Exception as e:
log_error(logger, provider, "exception", str(e), endpoint, exc_info=True)
return {
"provider": provider,
"category": category,
"success": False,
"error": str(e),
"error_type": "exception"
}
```
## Testing
Test individual collectors:
```bash
# Test market data collector
python -m collectors.market_data
# Test explorers
python -m collectors.explorers
# Test news
python -m collectors.news
# Test sentiment
python -m collectors.sentiment
# Test on-chain (placeholder)
python -m collectors.onchain
```
## Performance
- Collectors run concurrently using `asyncio.gather()`
- Typical response times: 100-2000ms per collector
- Connection pooling for efficiency
- Configurable timeouts
- Automatic retry with exponential backoff
## Monitoring
All collectors provide metrics for monitoring:
- **Success Rate** - Percentage of successful collections
- **Response Time** - API response time in milliseconds
- **Staleness** - Data age in minutes
- **Error Types** - Classification of failures
- **Retry Count** - Number of retries needed
## Future Enhancements
1. **On-Chain Implementation**
- Complete The Graph integration
- Implement Blockchair endpoints
- Add Glassnode metrics
2. **Additional Providers**
- Messari
- DeFiLlama
- CoinAPI
- Nomics
3. **Advanced Features**
- Circuit breaker pattern
- Data caching
- Webhook notifications
- Real-time streaming
4. **Performance**
- Redis caching
- Database persistence
- Rate limit optimization
- Parallel processing
## Support
For issues or questions:
1. Check the logs for detailed error messages
2. Verify API keys are configured correctly
3. Review provider rate limits
4. Check network connectivity
5. Consult provider documentation
## License
Part of the Crypto API Monitoring system.