crypto-api-clean / background_agents.py
Really-amin's picture
Update background_agents.py
6e1537c verified
"""
Background Agents for Data Collection and System Monitoring
دو agent برای جمع‌آوری داده و نظارت بر سیستم
"""
import asyncio
import logging
from datetime import datetime
from typing import Dict, Any
import httpx
logger = logging.getLogger(__name__)
class DataCollectionAgent:
"""Agent برای جمع‌آوری دوره‌ای داده"""
def __init__(self):
self.is_running = False
self.stats = {
"total_collections": 0,
"successful_collections": 0,
"failed_collections": 0,
"last_collection": None
}
self.collected_data = {
"market": None,
"sentiment": None,
"trending": None,
"news": None
}
async def start(self):
"""شروع agent"""
if self.is_running:
logger.warning("Data collection agent is already running")
return
self.is_running = True
logger.info("🤖 Data Collection Agent started")
# شروع loop جمع‌آوری
asyncio.create_task(self._collection_loop())
async def stop(self):
"""توقف agent"""
self.is_running = False
logger.info("🛑 Data Collection Agent stopped")
async def _collection_loop(self):
"""حلقه اصلی جمع‌آوری داده"""
while self.is_running:
try:
await self.collect_all_data()
await asyncio.sleep(300) # هر 5 دقیقه
except Exception as e:
logger.error(f"Error in collection loop: {e}")
await asyncio.sleep(60)
async def collect_all_data(self):
"""جمع‌آوری تمام داده‌ها"""
self.stats["total_collections"] += 1
try:
async with httpx.AsyncClient(timeout=30.0) as client:
# جمع‌آوری موازی
tasks = [
self._collect_market_data(client),
self._collect_sentiment_data(client),
self._collect_trending_data(client),
self._collect_news_data(client)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# بررسی نتایج
success_count = sum(1 for r in results if not isinstance(r, Exception))
if success_count > 0:
self.stats["successful_collections"] += 1
else:
self.stats["failed_collections"] += 1
self.stats["last_collection"] = datetime.utcnow().isoformat() + "Z"
logger.info(f"✅ Data collection completed: {success_count}/4 successful")
except Exception as e:
self.stats["failed_collections"] += 1
logger.error(f"Data collection failed: {e}")
async def _collect_market_data(self, client: httpx.AsyncClient):
"""جمع‌آوری داده بازار"""
try:
response = await client.get("https://api.coingecko.com/api/v3/global")
if response.status_code == 200:
self.collected_data["market"] = response.json()
logger.debug("✅ Market data collected")
except Exception as e:
logger.debug(f"Failed to collect market data: {e}")
raise
async def _collect_sentiment_data(self, client: httpx.AsyncClient):
"""جمع‌آوری داده sentiment"""
try:
response = await client.get("https://api.alternative.me/fng/?limit=1")
if response.status_code == 200:
self.collected_data["sentiment"] = response.json()
logger.debug("✅ Sentiment data collected")
except Exception as e:
logger.debug(f"Failed to collect sentiment data: {e}")
raise
async def _collect_trending_data(self, client: httpx.AsyncClient):
"""جمع‌آوری داده trending"""
try:
response = await client.get("https://api.coingecko.com/api/v3/search/trending")
if response.status_code == 200:
self.collected_data["trending"] = response.json()
logger.debug("✅ Trending data collected")
except Exception as e:
logger.debug(f"Failed to collect trending data: {e}")
raise
async def _collect_news_data(self, client: httpx.AsyncClient):
"""جمع‌آوری اخبار"""
try:
response = await client.get(
"https://cryptopanic.com/api/v1/posts/",
params={"auth_token": "free", "public": "true", "kind": "news"}
)
if response.status_code == 200:
self.collected_data["news"] = response.json()
logger.debug("✅ News data collected")
except Exception as e:
logger.debug(f"Failed to collect news data: {e}")
raise
def get_stats(self) -> Dict[str, Any]:
"""دریافت آمار agent"""
return {
**self.stats,
"is_running": self.is_running,
"has_data": any(v is not None for v in self.collected_data.values())
}
def get_collected_data(self) -> Dict[str, Any]:
"""دریافت داده‌های جمع‌آوری شده"""
return self.collected_data
class SystemMonitorAgent:
"""Agent برای نظارت بر وضعیت سیستم"""
def __init__(self):
self.is_running = False
self.stats = {
"total_checks": 0,
"system_healthy": True,
"last_check": None,
"alerts": []
}
self.system_metrics = {
"cpu_usage": 0,
"memory_usage": 0,
"active_connections": 0,
"requests_per_minute": 0,
"error_rate": 0
}
async def start(self):
"""شروع agent"""
if self.is_running:
logger.warning("System monitor agent is already running")
return
self.is_running = True
logger.info("🤖 System Monitor Agent started")
# شروع loop نظارت
asyncio.create_task(self._monitoring_loop())
async def stop(self):
"""توقف agent"""
self.is_running = False
logger.info("🛑 System Monitor Agent stopped")
async def _monitoring_loop(self):
"""حلقه اصلی نظارت"""
while self.is_running:
try:
await self.check_system_health()
await asyncio.sleep(60) # هر 1 دقیقه
except Exception as e:
logger.error(f"Error in monitoring loop: {e}")
await asyncio.sleep(30)
async def check_system_health(self):
"""بررسی سلامت سیستم"""
self.stats["total_checks"] += 1
try:
# شبیه‌سازی metrics (در production از psutil استفاده کنید)
import random
self.system_metrics = {
"cpu_usage": random.randint(20, 60),
"memory_usage": random.randint(40, 70),
"active_connections": random.randint(1, 10),
"requests_per_minute": random.randint(50, 150),
"error_rate": random.uniform(0, 5)
}
# بررسی آستانه‌ها
alerts = []
if self.system_metrics["cpu_usage"] > 80:
alerts.append("High CPU usage detected")
if self.system_metrics["memory_usage"] > 85:
alerts.append("High memory usage detected")
if self.system_metrics["error_rate"] > 10:
alerts.append("High error rate detected")
self.stats["alerts"] = alerts
self.stats["system_healthy"] = len(alerts) == 0
self.stats["last_check"] = datetime.utcnow().isoformat() + "Z"
if alerts:
logger.warning(f"⚠️ System alerts: {', '.join(alerts)}")
else:
logger.debug("✅ System health check passed")
except Exception as e:
logger.error(f"System health check failed: {e}")
self.stats["system_healthy"] = False
def get_stats(self) -> Dict[str, Any]:
"""دریافت آمار agent"""
return {
**self.stats,
"is_running": self.is_running
}
def get_metrics(self) -> Dict[str, Any]:
"""دریافت metrics سیستم"""
return self.system_metrics
# Global instances
data_agent = DataCollectionAgent()
monitor_agent = SystemMonitorAgent()
async def start_agents():
"""شروع تمام agents"""
await data_agent.start()
await monitor_agent.start()
logger.info("✅ All background agents started")
async def stop_agents():
"""توقف تمام agents"""
await data_agent.stop()
await monitor_agent.stop()
logger.info("✅ All background agents stopped")
def get_agents_status() -> Dict[str, Any]:
"""دریافت وضعیت تمام agents"""
return {
"data_collection_agent": data_agent.get_stats(),
"system_monitor_agent": monitor_agent.get_stats(),
"timestamp": datetime.utcnow().isoformat() + "Z"
}