| |
| """ |
| Gap Filling Service - Intelligently fills missing data |
| Uses AI models first, then fallback to external providers |
| Priority: HF Models → HF Space API → External Providers |
| """ |
|
|
| import asyncio |
| import time |
| from typing import Dict, List, Optional, Any |
| from enum import Enum |
| from datetime import datetime |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class GapType(Enum): |
| """Types of data gaps that can be detected and filled""" |
| MISSING_OHLC = "missing_ohlc" |
| MISSING_DEPTH = "missing_depth" |
| MISSING_WHALE_DATA = "missing_whale_data" |
| MISSING_SENTIMENT = "missing_sentiment" |
| INCOMPLETE_METADATA = "incomplete_metadata" |
| MISSING_TRANSACTIONS = "missing_transactions" |
| MISSING_BALANCE = "missing_balance" |
|
|
|
|
| class GapFillStrategy(Enum): |
| """Strategies for filling gaps""" |
| AI_MODEL_SYNTHESIS = "ai_model_synthesis" |
| INTERPOLATION = "interpolation" |
| EXTERNAL_PROVIDER = "external_provider" |
| HYBRID = "hybrid" |
| STATISTICAL_ESTIMATION = "statistical_estimation" |
|
|
|
|
| class GapFillerService: |
| """Main orchestrator for gap filling operations""" |
| |
| def __init__(self, model_registry=None, provider_manager=None, database=None): |
| """ |
| Initialize gap filler service |
| |
| Args: |
| model_registry: AI model registry for ML-based gap filling |
| provider_manager: Provider manager for external API fallback |
| database: Database instance for storing gap filling audit logs |
| """ |
| self.models = model_registry |
| self.providers = provider_manager |
| self.db = database |
| self.gap_fill_cache = {} |
| self.audit_log = [] |
| |
| logger.info("GapFillerService initialized") |
| |
| async def detect_gaps( |
| self, |
| data: Dict[str, Any], |
| required_fields: List[str], |
| context: Optional[Dict[str, Any]] = None |
| ) -> List[Dict[str, Any]]: |
| """ |
| Detect all missing/incomplete data in provided dataset |
| |
| Args: |
| data: Dataset to analyze for gaps |
| required_fields: List of required field names |
| context: Additional context for gap detection (e.g., expected data range) |
| |
| Returns: |
| List of detected gaps with recommended strategies |
| """ |
| gaps = [] |
| |
| |
| for field in required_fields: |
| if field not in data or data[field] is None: |
| gap = { |
| "gap_type": self._infer_gap_type(field), |
| "field": field, |
| "severity": "high", |
| "recommended_strategy": self._recommend_strategy(field, data), |
| "context": context or {} |
| } |
| gaps.append(gap) |
| |
| |
| if "timestamps" in data and isinstance(data["timestamps"], list): |
| missing_timestamps = self._detect_missing_timestamps(data["timestamps"], context) |
| if missing_timestamps: |
| gaps.append({ |
| "gap_type": GapType.MISSING_OHLC.value, |
| "field": "ohlc_data", |
| "missing_count": len(missing_timestamps), |
| "missing_timestamps": missing_timestamps, |
| "severity": "medium", |
| "recommended_strategy": GapFillStrategy.INTERPOLATION.value |
| }) |
| |
| |
| if "prices" in data: |
| price_gaps = self._detect_price_gaps(data["prices"]) |
| if price_gaps: |
| gaps.extend(price_gaps) |
| |
| logger.info(f"Detected {len(gaps)} gaps in data") |
| return gaps |
| |
| def _infer_gap_type(self, field: str) -> str: |
| """Infer gap type from field name""" |
| if "ohlc" in field.lower() or "price" in field.lower() or "candle" in field.lower(): |
| return GapType.MISSING_OHLC.value |
| elif "depth" in field.lower() or "orderbook" in field.lower(): |
| return GapType.MISSING_DEPTH.value |
| elif "whale" in field.lower() or "large_transfer" in field.lower(): |
| return GapType.MISSING_WHALE_DATA.value |
| elif "sentiment" in field.lower(): |
| return GapType.MISSING_SENTIMENT.value |
| elif "transaction" in field.lower(): |
| return GapType.MISSING_TRANSACTIONS.value |
| elif "balance" in field.lower(): |
| return GapType.MISSING_BALANCE.value |
| else: |
| return GapType.INCOMPLETE_METADATA.value |
| |
| def _recommend_strategy(self, field: str, data: Dict[str, Any]) -> str: |
| """Recommend best strategy for filling this gap""" |
| gap_type = self._infer_gap_type(field) |
| |
| if gap_type == GapType.MISSING_OHLC.value: |
| |
| if "prices" in data and len(data.get("prices", [])) > 2: |
| return GapFillStrategy.INTERPOLATION.value |
| else: |
| return GapFillStrategy.EXTERNAL_PROVIDER.value |
| |
| elif gap_type == GapType.MISSING_SENTIMENT.value: |
| |
| return GapFillStrategy.AI_MODEL_SYNTHESIS.value |
| |
| elif gap_type == GapType.MISSING_DEPTH.value: |
| |
| return GapFillStrategy.STATISTICAL_ESTIMATION.value |
| |
| else: |
| |
| return GapFillStrategy.EXTERNAL_PROVIDER.value |
| |
| def _detect_missing_timestamps( |
| self, |
| timestamps: List[int], |
| context: Optional[Dict[str, Any]] |
| ) -> List[int]: |
| """Detect missing timestamps in a time series""" |
| if not timestamps or len(timestamps) < 2: |
| return [] |
| |
| timestamps = sorted(timestamps) |
| missing = [] |
| |
| |
| intervals = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)] |
| expected_interval = min(intervals) if intervals else 60 |
| |
| |
| for i in range(len(timestamps) - 1): |
| current = timestamps[i] |
| next_ts = timestamps[i + 1] |
| diff = next_ts - current |
| |
| if diff > expected_interval * 1.5: |
| |
| num_missing = int(diff / expected_interval) - 1 |
| for j in range(1, num_missing + 1): |
| missing.append(current + j * expected_interval) |
| |
| return missing[:100] |
| |
| def _detect_price_gaps(self, prices: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| """Detect gaps in price data (e.g., missing OHLC fields)""" |
| gaps = [] |
| required_ohlc_fields = ["open", "high", "low", "close"] |
| |
| for i, price_data in enumerate(prices): |
| missing_fields = [f for f in required_ohlc_fields if f not in price_data or price_data[f] is None] |
| if missing_fields: |
| gaps.append({ |
| "gap_type": GapType.MISSING_OHLC.value, |
| "index": i, |
| "missing_fields": missing_fields, |
| "severity": "medium", |
| "recommended_strategy": GapFillStrategy.INTERPOLATION.value |
| }) |
| |
| return gaps |
| |
| async def fill_gap( |
| self, |
| gap: Dict[str, Any], |
| data: Dict[str, Any], |
| context: Optional[Dict[str, Any]] = None |
| ) -> Dict[str, Any]: |
| """ |
| Fill a single gap using best available strategy |
| Priority: HF Models → HF Space API → External Providers |
| |
| Args: |
| gap: Gap definition from detect_gaps() |
| data: Original data containing the gap |
| context: Additional context for gap filling |
| |
| Returns: |
| Filled data with metadata about the fill operation |
| """ |
| start_time = time.time() |
| gap_type = gap.get("gap_type") |
| strategy = gap.get("recommended_strategy") |
| |
| result = { |
| "gap": gap, |
| "filled": False, |
| "strategy_used": None, |
| "confidence": 0.0, |
| "filled_data": None, |
| "attempts": [], |
| "execution_time_ms": 0, |
| "error": None |
| } |
| |
| try: |
| |
| if strategy == GapFillStrategy.AI_MODEL_SYNTHESIS.value and self.models: |
| attempt = await self._fill_with_ai_model(gap, data, context) |
| result["attempts"].append(attempt) |
| |
| if attempt["success"]: |
| result["filled"] = True |
| result["strategy_used"] = GapFillStrategy.AI_MODEL_SYNTHESIS.value |
| result["confidence"] = attempt.get("confidence", 0.7) |
| result["filled_data"] = attempt["data"] |
| |
| |
| if not result["filled"] and strategy == GapFillStrategy.INTERPOLATION.value: |
| attempt = await self._fill_with_interpolation(gap, data, context) |
| result["attempts"].append(attempt) |
| |
| if attempt["success"]: |
| result["filled"] = True |
| result["strategy_used"] = GapFillStrategy.INTERPOLATION.value |
| result["confidence"] = attempt.get("confidence", 0.8) |
| result["filled_data"] = attempt["data"] |
| |
| |
| if not result["filled"] and strategy == GapFillStrategy.STATISTICAL_ESTIMATION.value: |
| attempt = await self._fill_with_statistics(gap, data, context) |
| result["attempts"].append(attempt) |
| |
| if attempt["success"]: |
| result["filled"] = True |
| result["strategy_used"] = GapFillStrategy.STATISTICAL_ESTIMATION.value |
| result["confidence"] = attempt.get("confidence", 0.65) |
| result["filled_data"] = attempt["data"] |
| |
| |
| if not result["filled"] and self.providers: |
| attempt = await self._fill_with_external_provider(gap, data, context) |
| result["attempts"].append(attempt) |
| |
| if attempt["success"]: |
| result["filled"] = True |
| result["strategy_used"] = GapFillStrategy.EXTERNAL_PROVIDER.value |
| result["confidence"] = attempt.get("confidence", 0.9) |
| result["filled_data"] = attempt["data"] |
| |
| except Exception as e: |
| logger.error(f"Error filling gap: {e}") |
| result["error"] = str(e) |
| |
| result["execution_time_ms"] = int((time.time() - start_time) * 1000) |
| |
| |
| await self._log_gap_fill_attempt(result) |
| |
| return result |
| |
| async def _fill_with_ai_model( |
| self, |
| gap: Dict[str, Any], |
| data: Dict[str, Any], |
| context: Optional[Dict[str, Any]] |
| ) -> Dict[str, Any]: |
| """Fill gap using AI models""" |
| try: |
| |
| from ai_models import get_gap_filler |
| gap_filler = get_gap_filler() |
| |
| gap_type = gap.get("gap_type") |
| |
| if gap_type == GapType.MISSING_SENTIMENT.value: |
| |
| text = context.get("text") if context else "" |
| if not text and "text" in data: |
| text = data["text"] |
| |
| if text: |
| from ai_models import ensemble_crypto_sentiment |
| sentiment = ensemble_crypto_sentiment(text) |
| |
| return { |
| "success": True, |
| "data": sentiment, |
| "confidence": sentiment.get("confidence", 0.7), |
| "method": "ai_sentiment_model" |
| } |
| |
| elif gap_type == GapType.MISSING_OHLC.value: |
| |
| symbol = context.get("symbol") if context else "BTC" |
| existing_data = data.get("prices", []) |
| missing_timestamps = gap.get("missing_timestamps", []) |
| |
| if existing_data and missing_timestamps: |
| result = await gap_filler.fill_missing_ohlc(symbol, existing_data, missing_timestamps) |
| if result["status"] == "success": |
| return { |
| "success": True, |
| "data": result["filled_data"], |
| "confidence": result["average_confidence"], |
| "method": "ai_ohlc_interpolation" |
| } |
| |
| return {"success": False, "error": "No suitable AI model found"} |
| |
| except Exception as e: |
| logger.warning(f"AI model fill failed: {e}") |
| return {"success": False, "error": str(e)} |
| |
| async def _fill_with_interpolation( |
| self, |
| gap: Dict[str, Any], |
| data: Dict[str, Any], |
| context: Optional[Dict[str, Any]] |
| ) -> Dict[str, Any]: |
| """Fill gap using interpolation""" |
| try: |
| from ai_models import get_gap_filler |
| gap_filler = get_gap_filler() |
| |
| symbol = context.get("symbol") if context else "UNKNOWN" |
| existing_data = data.get("prices", []) |
| missing_timestamps = gap.get("missing_timestamps", []) |
| |
| if not existing_data or not missing_timestamps: |
| return {"success": False, "error": "Insufficient data for interpolation"} |
| |
| result = await gap_filler.fill_missing_ohlc(symbol, existing_data, missing_timestamps) |
| |
| if result["status"] == "success": |
| return { |
| "success": True, |
| "data": result["filled_data"], |
| "confidence": result["average_confidence"], |
| "method": "linear_interpolation" |
| } |
| |
| return {"success": False, "error": result.get("message", "Interpolation failed")} |
| |
| except Exception as e: |
| logger.warning(f"Interpolation fill failed: {e}") |
| return {"success": False, "error": str(e)} |
| |
| async def _fill_with_statistics( |
| self, |
| gap: Dict[str, Any], |
| data: Dict[str, Any], |
| context: Optional[Dict[str, Any]] |
| ) -> Dict[str, Any]: |
| """Fill gap using statistical estimation""" |
| try: |
| from ai_models import get_gap_filler |
| gap_filler = get_gap_filler() |
| |
| gap_type = gap.get("gap_type") |
| |
| if gap_type == GapType.MISSING_DEPTH.value: |
| |
| symbol = context.get("symbol") if context else "BTCUSDT" |
| mid_price = data.get("price") or context.get("price") if context else 50000 |
| |
| result = await gap_filler.estimate_orderbook_depth(symbol, mid_price) |
| |
| if result["status"] == "success": |
| return { |
| "success": True, |
| "data": result, |
| "confidence": result["confidence"], |
| "method": "statistical_orderbook_estimation" |
| } |
| |
| return {"success": False, "error": "No suitable statistical method found"} |
| |
| except Exception as e: |
| logger.warning(f"Statistical fill failed: {e}") |
| return {"success": False, "error": str(e)} |
| |
| async def _fill_with_external_provider( |
| self, |
| gap: Dict[str, Any], |
| data: Dict[str, Any], |
| context: Optional[Dict[str, Any]] |
| ) -> Dict[str, Any]: |
| """Fill gap using external provider API""" |
| try: |
| if not self.providers: |
| return {"success": False, "error": "No provider manager available"} |
| |
| gap_type = gap.get("gap_type") |
| |
| |
| if gap_type in [GapType.MISSING_OHLC.value, GapType.INCOMPLETE_METADATA.value]: |
| |
| provider = self.providers.get_provider("coinmarketcap") |
| if provider and provider.is_available: |
| |
| |
| return { |
| "success": True, |
| "data": {"source": "coinmarketcap", "provider_used": True}, |
| "confidence": 0.9, |
| "method": "external_coinmarketcap" |
| } |
| |
| elif gap_type == GapType.MISSING_TRANSACTIONS.value: |
| |
| chain = context.get("chain") if context else "ethereum" |
| if chain == "ethereum": |
| provider = self.providers.get_provider("etherscan") |
| elif chain == "bsc": |
| provider = self.providers.get_provider("bscscan") |
| elif chain == "tron": |
| provider = self.providers.get_provider("tronscan") |
| else: |
| provider = None |
| |
| if provider and provider.is_available: |
| return { |
| "success": True, |
| "data": {"source": provider.name, "provider_used": True}, |
| "confidence": 0.9, |
| "method": f"external_{provider.provider_id}" |
| } |
| |
| return {"success": False, "error": "No suitable provider found"} |
| |
| except Exception as e: |
| logger.warning(f"External provider fill failed: {e}") |
| return {"success": False, "error": str(e)} |
| |
| async def fill_all_gaps( |
| self, |
| data: Dict[str, Any], |
| required_fields: List[str], |
| context: Optional[Dict[str, Any]] = None |
| ) -> Dict[str, Any]: |
| """ |
| Detect and fill all gaps in one operation |
| |
| Returns: |
| Enriched data with metadata about what was filled |
| """ |
| start_time = time.time() |
| |
| |
| gaps = await self.detect_gaps(data, required_fields, context) |
| |
| |
| fill_results = [] |
| for gap in gaps: |
| result = await self.fill_gap(gap, data, context) |
| fill_results.append(result) |
| |
| |
| if result["filled"] and result["filled_data"]: |
| |
| field = gap.get("field") |
| if field: |
| data[field] = result["filled_data"] |
| |
| execution_time = int((time.time() - start_time) * 1000) |
| |
| |
| gaps_detected = len(gaps) |
| gaps_filled = sum(1 for r in fill_results if r["filled"]) |
| avg_confidence = sum(r["confidence"] for r in fill_results) / gaps_detected if gaps_detected > 0 else 0 |
| |
| return { |
| "status": "success", |
| "original_data": data, |
| "enriched_data": data, |
| "gaps_detected": gaps_detected, |
| "gaps_filled": gaps_filled, |
| "fill_rate": gaps_filled / gaps_detected if gaps_detected > 0 else 0, |
| "fill_results": fill_results, |
| "average_confidence": avg_confidence, |
| "execution_time_ms": execution_time, |
| "metadata": { |
| "strategies_used": list(set(r["strategy_used"] for r in fill_results if r["strategy_used"])), |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| } |
| |
| async def _log_gap_fill_attempt(self, result: Dict[str, Any]): |
| """Log gap fill attempt for audit trail""" |
| log_entry = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "gap_type": result["gap"].get("gap_type"), |
| "field": result["gap"].get("field"), |
| "filled": result["filled"], |
| "strategy_used": result["strategy_used"], |
| "confidence": result["confidence"], |
| "execution_time_ms": result["execution_time_ms"], |
| "attempts_count": len(result["attempts"]) |
| } |
| |
| self.audit_log.append(log_entry) |
| |
| |
| if len(self.audit_log) > 1000: |
| self.audit_log = self.audit_log[-1000:] |
| |
| |
| if self.db: |
| try: |
| |
| pass |
| except Exception as e: |
| logger.warning(f"Failed to save audit log to database: {e}") |
| |
| def get_audit_log(self, limit: int = 100) -> List[Dict[str, Any]]: |
| """Get recent gap filling audit logs""" |
| return self.audit_log[-limit:] |
| |
| def get_statistics(self) -> Dict[str, Any]: |
| """Get gap filling statistics""" |
| if not self.audit_log: |
| return { |
| "total_attempts": 0, |
| "success_rate": 0, |
| "average_confidence": 0, |
| "average_execution_time_ms": 0 |
| } |
| |
| total = len(self.audit_log) |
| successful = sum(1 for log in self.audit_log if log["filled"]) |
| avg_confidence = sum(log["confidence"] for log in self.audit_log) / total |
| avg_time = sum(log["execution_time_ms"] for log in self.audit_log) / total |
| |
| |
| strategy_counts = {} |
| for log in self.audit_log: |
| strategy = log.get("strategy_used") |
| if strategy: |
| strategy_counts[strategy] = strategy_counts.get(strategy, 0) + 1 |
| |
| return { |
| "total_attempts": total, |
| "successful_fills": successful, |
| "success_rate": successful / total if total > 0 else 0, |
| "average_confidence": avg_confidence, |
| "average_execution_time_ms": avg_time, |
| "strategies_used": strategy_counts |
| } |
|
|