| |
| """ |
| Data Sources Database Model |
| مدل دیتابیس برای مدیریت منابع داده |
| |
| این مدل برای ذخیره و مدیریت منابع داده استفاده میشود. |
| شامل اطلاعات منبع، وضعیت فعال/غیرفعال، و آمار استفاده. |
| """ |
|
|
| from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, Text, Enum, Index |
| from sqlalchemy.orm import relationship |
| from datetime import datetime |
| import enum |
| from typing import Dict, Any, List, Optional |
| import json |
|
|
| |
| try: |
| from database.models import Base |
| except ImportError: |
| from sqlalchemy.ext.declarative import declarative_base |
| Base = declarative_base() |
|
|
|
|
| class DataSourceType(enum.Enum): |
| """نوع منبع داده""" |
| MARKET = "market" |
| NEWS = "news" |
| SENTIMENT = "sentiment" |
| SOCIAL = "social" |
| ONCHAIN = "onchain" |
| DEFI = "defi" |
| HISTORICAL = "historical" |
| TECHNICAL = "technical" |
| AGGREGATED = "aggregated" |
|
|
|
|
| class DataSourceStatus(enum.Enum): |
| """وضعیت منبع داده""" |
| ACTIVE = "active" |
| INACTIVE = "inactive" |
| RATE_LIMITED = "rate_limited" |
| ERROR = "error" |
| MAINTENANCE = "maintenance" |
|
|
|
|
| class CollectionInterval(enum.Enum): |
| """بازه جمعآوری داده""" |
| REALTIME = "realtime" |
| MINUTES_1 = "1m" |
| MINUTES_5 = "5m" |
| MINUTES_15 = "15m" |
| MINUTES_30 = "30m" |
| HOURLY = "1h" |
| HOURS_4 = "4h" |
| DAILY = "1d" |
|
|
|
|
| class DataSource(Base): |
| """ |
| Data Source Model - منبع داده |
| ذخیره اطلاعات و وضعیت منابع داده در دیتابیس |
| """ |
| __tablename__ = 'data_sources' |
| |
| id = Column(Integer, primary_key=True, autoincrement=True) |
| |
| |
| source_id = Column(String(100), nullable=False, unique=True, index=True) |
| name = Column(String(255), nullable=False) |
| source_type = Column(String(50), nullable=False, index=True) |
| description = Column(Text, nullable=True) |
| |
| |
| base_url = Column(String(500), nullable=False) |
| api_version = Column(String(20), nullable=True) |
| |
| |
| requires_api_key = Column(Boolean, default=False) |
| api_key_env_var = Column(String(100), nullable=True) |
| has_api_key_configured = Column(Boolean, default=False) |
| |
| |
| rate_limit_description = Column(String(100), nullable=True) |
| rate_limit_per_minute = Column(Integer, nullable=True) |
| rate_limit_per_hour = Column(Integer, nullable=True) |
| rate_limit_per_day = Column(Integer, nullable=True) |
| |
| |
| collection_interval = Column(String(20), default="30m") |
| supports_realtime = Column(Boolean, default=False) |
| |
| |
| supported_timeframes = Column(Text, nullable=True) |
| categories = Column(Text, nullable=True) |
| features = Column(Text, nullable=True) |
| |
| |
| is_active = Column(Boolean, default=True, index=True) |
| status = Column(String(50), default="active", index=True) |
| status_message = Column(Text, nullable=True) |
| |
| |
| priority = Column(Integer, default=5) |
| weight = Column(Integer, default=1) |
| |
| |
| is_verified = Column(Boolean, default=False) |
| is_free_tier = Column(Boolean, default=True) |
| |
| |
| total_requests = Column(Integer, default=0) |
| successful_requests = Column(Integer, default=0) |
| failed_requests = Column(Integer, default=0) |
| avg_response_time_ms = Column(Float, default=0.0) |
| last_success_at = Column(DateTime, nullable=True) |
| last_failure_at = Column(DateTime, nullable=True) |
| last_error_message = Column(Text, nullable=True) |
| |
| |
| created_at = Column(DateTime, default=datetime.utcnow) |
| updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) |
| last_checked_at = Column(DateTime, nullable=True) |
| |
| |
| __table_args__ = ( |
| Index('idx_source_type_active', 'source_type', 'is_active'), |
| Index('idx_status_priority', 'status', 'priority'), |
| ) |
| |
| def __repr__(self): |
| return f"<DataSource(id={self.source_id}, name={self.name}, active={self.is_active})>" |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """تبدیل به دیکشنری""" |
| return { |
| "id": self.id, |
| "source_id": self.source_id, |
| "name": self.name, |
| "source_type": self.source_type, |
| "description": self.description, |
| "base_url": self.base_url, |
| "api_version": self.api_version, |
| "requires_api_key": self.requires_api_key, |
| "api_key_env_var": self.api_key_env_var, |
| "has_api_key_configured": self.has_api_key_configured, |
| "rate_limit_description": self.rate_limit_description, |
| "collection_interval": self.collection_interval, |
| "supports_realtime": self.supports_realtime, |
| "supported_timeframes": json.loads(self.supported_timeframes) if self.supported_timeframes else [], |
| "categories": json.loads(self.categories) if self.categories else [], |
| "features": json.loads(self.features) if self.features else [], |
| "is_active": self.is_active, |
| "status": self.status, |
| "status_message": self.status_message, |
| "priority": self.priority, |
| "weight": self.weight, |
| "is_verified": self.is_verified, |
| "is_free_tier": self.is_free_tier, |
| "total_requests": self.total_requests, |
| "successful_requests": self.successful_requests, |
| "failed_requests": self.failed_requests, |
| "success_rate": (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0, |
| "avg_response_time_ms": self.avg_response_time_ms, |
| "last_success_at": self.last_success_at.isoformat() if self.last_success_at else None, |
| "last_failure_at": self.last_failure_at.isoformat() if self.last_failure_at else None, |
| "created_at": self.created_at.isoformat() if self.created_at else None, |
| "updated_at": self.updated_at.isoformat() if self.updated_at else None, |
| "last_checked_at": self.last_checked_at.isoformat() if self.last_checked_at else None |
| } |
|
|
|
|
| class DataCollectionLog(Base): |
| """ |
| Data Collection Log - لاگ جمعآوری داده |
| ثبت تاریخچه جمعآوری داده از منابع |
| """ |
| __tablename__ = 'data_collection_logs' |
| |
| id = Column(Integer, primary_key=True, autoincrement=True) |
| source_id = Column(String(100), nullable=False, index=True) |
| |
| |
| collection_type = Column(String(50), nullable=False) |
| interval_used = Column(String(20), nullable=True) |
| |
| |
| started_at = Column(DateTime, nullable=False, default=datetime.utcnow) |
| completed_at = Column(DateTime, nullable=True) |
| duration_ms = Column(Integer, nullable=True) |
| |
| |
| success = Column(Boolean, default=False) |
| records_fetched = Column(Integer, default=0) |
| records_stored = Column(Integer, default=0) |
| |
| |
| error_type = Column(String(100), nullable=True) |
| error_message = Column(Text, nullable=True) |
| |
| |
| http_status_code = Column(Integer, nullable=True) |
| response_size_bytes = Column(Integer, nullable=True) |
| |
| |
| __table_args__ = ( |
| Index('idx_collection_source_time', 'source_id', 'started_at'), |
| Index('idx_collection_success', 'success', 'started_at'), |
| ) |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """تبدیل به دیکشنری""" |
| return { |
| "id": self.id, |
| "source_id": self.source_id, |
| "collection_type": self.collection_type, |
| "interval_used": self.interval_used, |
| "started_at": self.started_at.isoformat() if self.started_at else None, |
| "completed_at": self.completed_at.isoformat() if self.completed_at else None, |
| "duration_ms": self.duration_ms, |
| "success": self.success, |
| "records_fetched": self.records_fetched, |
| "records_stored": self.records_stored, |
| "error_type": self.error_type, |
| "error_message": self.error_message, |
| "http_status_code": self.http_status_code, |
| "response_size_bytes": self.response_size_bytes |
| } |
|
|
|
|
| class CollectionSchedule(Base): |
| """ |
| Collection Schedule - زمانبندی جمعآوری |
| تنظیم بازههای جمعآوری داده برای هر منبع |
| """ |
| __tablename__ = 'collection_schedules' |
| |
| id = Column(Integer, primary_key=True, autoincrement=True) |
| source_id = Column(String(100), nullable=False, unique=True, index=True) |
| |
| |
| collection_interval = Column(String(20), nullable=False, default="30m") |
| is_enabled = Column(Boolean, default=True) |
| |
| |
| last_run_at = Column(DateTime, nullable=True) |
| next_run_at = Column(DateTime, nullable=True) |
| |
| |
| consecutive_failures = Column(Integer, default=0) |
| total_runs = Column(Integer, default=0) |
| successful_runs = Column(Integer, default=0) |
| |
| |
| backoff_until = Column(DateTime, nullable=True) |
| backoff_multiplier = Column(Float, default=1.0) |
| |
| |
| created_at = Column(DateTime, default=datetime.utcnow) |
| updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """تبدیل به دیکشنری""" |
| return { |
| "id": self.id, |
| "source_id": self.source_id, |
| "collection_interval": self.collection_interval, |
| "is_enabled": self.is_enabled, |
| "last_run_at": self.last_run_at.isoformat() if self.last_run_at else None, |
| "next_run_at": self.next_run_at.isoformat() if self.next_run_at else None, |
| "consecutive_failures": self.consecutive_failures, |
| "total_runs": self.total_runs, |
| "successful_runs": self.successful_runs, |
| "success_rate": (self.successful_runs / self.total_runs * 100) if self.total_runs > 0 else 0, |
| "backoff_until": self.backoff_until.isoformat() if self.backoff_until else None, |
| "backoff_multiplier": self.backoff_multiplier, |
| "created_at": self.created_at.isoformat() if self.created_at else None, |
| "updated_at": self.updated_at.isoformat() if self.updated_at else None |
| } |
|
|
|
|
| |
|
|
| class DataSourceManager: |
| """ |
| مدیریت منابع داده در دیتابیس |
| Data Source Manager for database operations |
| """ |
| |
| def __init__(self, session): |
| self.session = session |
| |
| def create_source(self, source_data: Dict[str, Any]) -> Optional[DataSource]: |
| """ایجاد منبع جدید""" |
| try: |
| source = DataSource( |
| source_id=source_data["source_id"], |
| name=source_data["name"], |
| source_type=source_data.get("source_type", "market"), |
| description=source_data.get("description"), |
| base_url=source_data["base_url"], |
| api_version=source_data.get("api_version"), |
| requires_api_key=source_data.get("requires_api_key", False), |
| api_key_env_var=source_data.get("api_key_env_var"), |
| rate_limit_description=source_data.get("rate_limit_description"), |
| collection_interval=source_data.get("collection_interval", "30m"), |
| supports_realtime=source_data.get("supports_realtime", False), |
| supported_timeframes=json.dumps(source_data.get("supported_timeframes", [])), |
| categories=json.dumps(source_data.get("categories", [])), |
| features=json.dumps(source_data.get("features", [])), |
| is_active=source_data.get("is_active", True), |
| status=source_data.get("status", "active"), |
| priority=source_data.get("priority", 5), |
| weight=source_data.get("weight", 1), |
| is_verified=source_data.get("is_verified", False), |
| is_free_tier=source_data.get("is_free_tier", True) |
| ) |
| self.session.add(source) |
| self.session.commit() |
| return source |
| except Exception as e: |
| self.session.rollback() |
| print(f"Error creating source: {e}") |
| return None |
| |
| def get_source(self, source_id: str) -> Optional[DataSource]: |
| """دریافت منبع با شناسه""" |
| return self.session.query(DataSource).filter_by(source_id=source_id).first() |
| |
| def get_all_sources(self) -> List[DataSource]: |
| """دریافت همه منابع""" |
| return self.session.query(DataSource).all() |
| |
| def get_active_sources(self) -> List[DataSource]: |
| """دریافت منابع فعال""" |
| return self.session.query(DataSource).filter_by(is_active=True).all() |
| |
| def get_sources_by_type(self, source_type: str) -> List[DataSource]: |
| """دریافت منابع بر اساس نوع""" |
| return self.session.query(DataSource).filter_by(source_type=source_type, is_active=True).all() |
| |
| def update_source_status(self, source_id: str, is_active: bool, status: str = None, status_message: str = None) -> bool: |
| """بهروزرسانی وضعیت منبع""" |
| try: |
| source = self.get_source(source_id) |
| if source: |
| source.is_active = is_active |
| if status: |
| source.status = status |
| if status_message: |
| source.status_message = status_message |
| source.updated_at = datetime.utcnow() |
| self.session.commit() |
| return True |
| return False |
| except Exception as e: |
| self.session.rollback() |
| print(f"Error updating source status: {e}") |
| return False |
| |
| def record_request(self, source_id: str, success: bool, response_time_ms: float, error_message: str = None) -> bool: |
| """ثبت درخواست""" |
| try: |
| source = self.get_source(source_id) |
| if source: |
| source.total_requests += 1 |
| if success: |
| source.successful_requests += 1 |
| source.last_success_at = datetime.utcnow() |
| else: |
| source.failed_requests += 1 |
| source.last_failure_at = datetime.utcnow() |
| if error_message: |
| source.last_error_message = error_message |
| |
| |
| if source.avg_response_time_ms > 0: |
| source.avg_response_time_ms = (source.avg_response_time_ms + response_time_ms) / 2 |
| else: |
| source.avg_response_time_ms = response_time_ms |
| |
| source.last_checked_at = datetime.utcnow() |
| self.session.commit() |
| return True |
| return False |
| except Exception as e: |
| self.session.rollback() |
| print(f"Error recording request: {e}") |
| return False |
| |
| def get_sources_for_collection(self, interval: str) -> List[DataSource]: |
| """دریافت منابع برای جمعآوری بر اساس بازه""" |
| return self.session.query(DataSource).filter( |
| DataSource.is_active == True, |
| DataSource.collection_interval == interval, |
| DataSource.status != "error" |
| ).order_by(DataSource.priority).all() |
| |
| def get_statistics(self) -> Dict[str, Any]: |
| """آمار منابع""" |
| all_sources = self.get_all_sources() |
| active_sources = [s for s in all_sources if s.is_active] |
| |
| total_requests = sum(s.total_requests for s in all_sources) |
| successful_requests = sum(s.successful_requests for s in all_sources) |
| |
| return { |
| "total_sources": len(all_sources), |
| "active_sources": len(active_sources), |
| "by_type": {}, |
| "total_requests": total_requests, |
| "successful_requests": successful_requests, |
| "success_rate": (successful_requests / total_requests * 100) if total_requests > 0 else 0, |
| "sources_with_errors": len([s for s in all_sources if s.status == "error"]) |
| } |
|
|
|
|
| |
|
|
| def init_data_sources_from_registry(session, registry): |
| """ |
| Initialize data sources in database from registry |
| پر کردن جدول منابع از رجیستری |
| """ |
| manager = DataSourceManager(session) |
| |
| for source_id, source_info in registry.to_dict().items(): |
| existing = manager.get_source(source_id) |
| if not existing: |
| source_data = { |
| "source_id": source_id, |
| "name": source_info["name"], |
| "source_type": source_info["source_type"], |
| "description": source_info.get("description"), |
| "base_url": source_info["url"], |
| "requires_api_key": source_info.get("requires_api_key", False), |
| "api_key_env_var": source_info.get("api_key_env"), |
| "rate_limit_description": source_info.get("rate_limit"), |
| "collection_interval": "30m", |
| "supports_realtime": "realtime" in source_info.get("supported_timeframes", []), |
| "supported_timeframes": source_info.get("supported_timeframes", []), |
| "categories": source_info.get("categories", []), |
| "features": source_info.get("features", []), |
| "is_active": source_info.get("is_active", True), |
| "priority": source_info.get("priority", 5), |
| "is_verified": source_info.get("verified", False), |
| "is_free_tier": source_info.get("free_tier", True) |
| } |
| manager.create_source(source_data) |
| print(f"Created data source: {source_id}") |
| else: |
| print(f"Data source already exists: {source_id}") |
| |
| return manager |
|
|
|
|
| |
|
|
| |
| COLLECTION_INTERVALS = { |
| |
| "market": "15m", |
| "historical": "30m", |
| "onchain": "30m", |
| "defi": "15m", |
| |
| |
| "news": "15m", |
| "social": "30m", |
| |
| |
| "sentiment": "15m", |
| |
| |
| "technical": "15m", |
| |
| |
| "aggregated": "15m" |
| } |
|
|
| |
| REALTIME_SOURCES = [ |
| "binance_historical", |
| "coingecko_historical", |
| "coincap_realtime", |
| "fear_greed_index" |
| ] |
|
|