| |
| """ |
| هماهنگکننده جمعآوری داده |
| Data Collection Orchestrator - Manages all collectors |
| """ |
|
|
| import asyncio |
| import sys |
| import os |
| from pathlib import Path |
| from typing import Dict, List, Any, Optional |
| from datetime import datetime, timedelta |
| import logging |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| from crypto_data_bank.database import get_db |
| from crypto_data_bank.collectors.free_price_collector import FreePriceCollector |
| from crypto_data_bank.collectors.rss_news_collector import RSSNewsCollector |
| from crypto_data_bank.collectors.sentiment_collector import SentimentCollector |
| from crypto_data_bank.ai.huggingface_models import get_analyzer |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class DataCollectionOrchestrator: |
| """ |
| هماهنگکننده اصلی جمعآوری داده |
| Main orchestrator for data collection from all FREE sources |
| """ |
|
|
| def __init__(self): |
| self.db = get_db() |
| self.price_collector = FreePriceCollector() |
| self.news_collector = RSSNewsCollector() |
| self.sentiment_collector = SentimentCollector() |
| self.ai_analyzer = get_analyzer() |
|
|
| self.collection_tasks = [] |
| self.is_running = False |
|
|
| |
| self.intervals = { |
| 'prices': 60, |
| 'news': 300, |
| 'sentiment': 180, |
| } |
|
|
| self.last_collection = { |
| 'prices': None, |
| 'news': None, |
| 'sentiment': None, |
| } |
|
|
| async def collect_and_store_prices(self): |
| """جمعآوری و ذخیره قیمتها""" |
| try: |
| logger.info("💰 Collecting prices from FREE sources...") |
|
|
| |
| all_prices = await self.price_collector.collect_all_free_sources() |
|
|
| |
| aggregated = self.price_collector.aggregate_prices(all_prices) |
|
|
| |
| saved_count = 0 |
| for price_data in aggregated: |
| try: |
| self.db.save_price( |
| symbol=price_data['symbol'], |
| price_data=price_data, |
| source='free_aggregated' |
| ) |
| saved_count += 1 |
| except Exception as e: |
| logger.error(f"Error saving price for {price_data.get('symbol')}: {e}") |
|
|
| self.last_collection['prices'] = datetime.now() |
|
|
| logger.info(f"✅ Saved {saved_count}/{len(aggregated)} prices to database") |
|
|
| return { |
| "success": True, |
| "prices_collected": len(aggregated), |
| "prices_saved": saved_count, |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| except Exception as e: |
| logger.error(f"❌ Error collecting prices: {e}") |
| return { |
| "success": False, |
| "error": str(e), |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| async def collect_and_store_news(self): |
| """جمعآوری و ذخیره اخبار""" |
| try: |
| logger.info("📰 Collecting news from FREE RSS feeds...") |
|
|
| |
| all_news = await self.news_collector.collect_all_rss_feeds() |
|
|
| |
| unique_news = self.news_collector.deduplicate_news(all_news) |
|
|
| |
| if hasattr(self.ai_analyzer, 'analyze_news_batch'): |
| logger.info("🤖 Analyzing news with AI...") |
| analyzed_news = await self.ai_analyzer.analyze_news_batch(unique_news[:50]) |
| else: |
| analyzed_news = unique_news |
|
|
| |
| saved_count = 0 |
| for news_item in analyzed_news: |
| try: |
| |
| if 'ai_sentiment' in news_item: |
| news_item['sentiment'] = news_item['ai_confidence'] |
|
|
| self.db.save_news(news_item) |
| saved_count += 1 |
| except Exception as e: |
| logger.error(f"Error saving news: {e}") |
|
|
| self.last_collection['news'] = datetime.now() |
|
|
| logger.info(f"✅ Saved {saved_count}/{len(analyzed_news)} news items to database") |
|
|
| |
| if analyzed_news and 'ai_sentiment' in analyzed_news[0]: |
| try: |
| |
| trending = self.news_collector.get_trending_coins(analyzed_news) |
|
|
| |
| for trend in trending[:10]: |
| symbol = trend['coin'] |
| symbol_news = [n for n in analyzed_news if symbol in n.get('coins', [])] |
|
|
| if symbol_news: |
| agg_sentiment = await self.ai_analyzer.calculate_aggregated_sentiment( |
| symbol_news, |
| symbol |
| ) |
|
|
| self.db.save_ai_analysis({ |
| 'symbol': symbol, |
| 'analysis_type': 'news_sentiment', |
| 'model_used': 'finbert', |
| 'input_data': { |
| 'news_count': len(symbol_news), |
| 'mentions': trend['mentions'] |
| }, |
| 'output_data': agg_sentiment, |
| 'confidence': agg_sentiment.get('confidence', 0.0) |
| }) |
|
|
| logger.info(f"✅ Saved AI analysis for {len(trending[:10])} trending coins") |
|
|
| except Exception as e: |
| logger.error(f"Error saving AI analysis: {e}") |
|
|
| return { |
| "success": True, |
| "news_collected": len(unique_news), |
| "news_saved": saved_count, |
| "ai_analyzed": 'ai_sentiment' in analyzed_news[0] if analyzed_news else False, |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| except Exception as e: |
| logger.error(f"❌ Error collecting news: {e}") |
| return { |
| "success": False, |
| "error": str(e), |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| async def collect_and_store_sentiment(self): |
| """جمعآوری و ذخیره احساسات بازار""" |
| try: |
| logger.info("😊 Collecting market sentiment from FREE sources...") |
|
|
| |
| sentiment_data = await self.sentiment_collector.collect_all_sentiment_data() |
|
|
| |
| if sentiment_data.get('overall_sentiment'): |
| self.db.save_sentiment( |
| sentiment_data['overall_sentiment'], |
| source='free_aggregated' |
| ) |
|
|
| self.last_collection['sentiment'] = datetime.now() |
|
|
| logger.info(f"✅ Saved market sentiment: {sentiment_data['overall_sentiment']['overall_sentiment']}") |
|
|
| return { |
| "success": True, |
| "sentiment": sentiment_data['overall_sentiment'], |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| except Exception as e: |
| logger.error(f"❌ Error collecting sentiment: {e}") |
| return { |
| "success": False, |
| "error": str(e), |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| async def collect_all_data_once(self) -> Dict[str, Any]: |
| """ |
| جمعآوری همه دادهها یک بار |
| Collect all data once (prices, news, sentiment) |
| """ |
| logger.info("🚀 Starting full data collection cycle...") |
|
|
| results = await asyncio.gather( |
| self.collect_and_store_prices(), |
| self.collect_and_store_news(), |
| self.collect_and_store_sentiment(), |
| return_exceptions=True |
| ) |
|
|
| return { |
| "prices": results[0] if not isinstance(results[0], Exception) else {"error": str(results[0])}, |
| "news": results[1] if not isinstance(results[1], Exception) else {"error": str(results[1])}, |
| "sentiment": results[2] if not isinstance(results[2], Exception) else {"error": str(results[2])}, |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| async def price_collection_loop(self): |
| """حلقه جمعآوری مستمر قیمتها""" |
| while self.is_running: |
| try: |
| await self.collect_and_store_prices() |
| await asyncio.sleep(self.intervals['prices']) |
| except Exception as e: |
| logger.error(f"Error in price collection loop: {e}") |
| await asyncio.sleep(60) |
|
|
| async def news_collection_loop(self): |
| """حلقه جمعآوری مستمر اخبار""" |
| while self.is_running: |
| try: |
| await self.collect_and_store_news() |
| await asyncio.sleep(self.intervals['news']) |
| except Exception as e: |
| logger.error(f"Error in news collection loop: {e}") |
| await asyncio.sleep(300) |
|
|
| async def sentiment_collection_loop(self): |
| """حلقه جمعآوری مستمر احساسات""" |
| while self.is_running: |
| try: |
| await self.collect_and_store_sentiment() |
| await asyncio.sleep(self.intervals['sentiment']) |
| except Exception as e: |
| logger.error(f"Error in sentiment collection loop: {e}") |
| await asyncio.sleep(180) |
|
|
| async def start_background_collection(self): |
| """ |
| شروع جمعآوری پسزمینه |
| Start continuous background data collection |
| """ |
| logger.info("🚀 Starting background data collection...") |
|
|
| self.is_running = True |
|
|
| |
| self.collection_tasks = [ |
| asyncio.create_task(self.price_collection_loop()), |
| asyncio.create_task(self.news_collection_loop()), |
| asyncio.create_task(self.sentiment_collection_loop()), |
| ] |
|
|
| logger.info("✅ Background collection started!") |
| logger.info(f" Prices: every {self.intervals['prices']}s") |
| logger.info(f" News: every {self.intervals['news']}s") |
| logger.info(f" Sentiment: every {self.intervals['sentiment']}s") |
|
|
| async def stop_background_collection(self): |
| """توقف جمعآوری پسزمینه""" |
| logger.info("🛑 Stopping background data collection...") |
|
|
| self.is_running = False |
|
|
| |
| for task in self.collection_tasks: |
| task.cancel() |
|
|
| |
| await asyncio.gather(*self.collection_tasks, return_exceptions=True) |
|
|
| logger.info("✅ Background collection stopped!") |
|
|
| def get_collection_status(self) -> Dict[str, Any]: |
| """دریافت وضعیت جمعآوری""" |
| return { |
| "is_running": self.is_running, |
| "last_collection": { |
| k: v.isoformat() if v else None |
| for k, v in self.last_collection.items() |
| }, |
| "intervals": self.intervals, |
| "database_stats": self.db.get_statistics(), |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
|
|
| |
| _orchestrator = None |
|
|
| def get_orchestrator() -> DataCollectionOrchestrator: |
| """دریافت instance هماهنگکننده""" |
| global _orchestrator |
| if _orchestrator is None: |
| _orchestrator = DataCollectionOrchestrator() |
| return _orchestrator |
|
|
|
|
| async def main(): |
| """Test the orchestrator""" |
| print("\n" + "="*70) |
| print("🧪 Testing Data Collection Orchestrator") |
| print("="*70) |
|
|
| orchestrator = get_orchestrator() |
|
|
| |
| print("\n1️⃣ Testing Single Collection Cycle...") |
| results = await orchestrator.collect_all_data_once() |
|
|
| print("\n📊 Results:") |
| print(f" Prices: {results['prices'].get('prices_saved', 0)} saved") |
| print(f" News: {results['news'].get('news_saved', 0)} saved") |
| print(f" Sentiment: {results['sentiment'].get('success', False)}") |
|
|
| |
| print("\n2️⃣ Database Statistics:") |
| stats = orchestrator.get_collection_status() |
| print(f" Database size: {stats['database_stats'].get('database_size', 0):,} bytes") |
| print(f" Prices: {stats['database_stats'].get('prices_count', 0)}") |
| print(f" News: {stats['database_stats'].get('news_count', 0)}") |
| print(f" AI Analysis: {stats['database_stats'].get('ai_analysis_count', 0)}") |
|
|
| print("\n✅ Orchestrator test complete!") |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|