Spaces:
Paused
Paused
| import asyncio | |
| import logging | |
| import time | |
| from collections.abc import Callable | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from typing import Any | |
| from core.infrastructure.registry import kernel_registry | |
| from core.plugin_system.registry import plugin_registry_service | |
| logger = logging.getLogger(__name__) | |
| class ShadowResult: | |
| plugin_id: str | |
| execution_time_ms: float | |
| matches_production: bool | |
| match_percentage: float | |
| production_result: Any | |
| shadow_result: Any | |
| differences: dict[str, Any] | |
| timestamp: datetime | |
| class ShadowExecutor: | |
| """ | |
| Execute plugins in shadow mode: | |
| - Run plugin alongside production code | |
| - Compare results | |
| - Log discrepancies | |
| - Zero production impact | |
| """ | |
| def __init__(self, metrics_service, registry_service): | |
| self.metrics = metrics_service | |
| self.registry = registry_service | |
| self.comparison_threshold = 0.99 # 99% match required | |
| async def execute_with_shadow( | |
| self, | |
| plugin_id: str, | |
| production_function: Callable, | |
| input_data: dict[str, Any], | |
| comparison_function: Callable | None = None, | |
| db_session=None, | |
| ) -> Any: | |
| """ | |
| Execute production code + plugin in parallel. | |
| ALWAYS return production result. | |
| """ | |
| # Execute production code | |
| start_time = time.time() | |
| # Check if production_function is async | |
| if asyncio.iscoroutinefunction(production_function): | |
| production_result = await production_function(input_data) | |
| else: | |
| production_result = production_function(input_data) | |
| production_time_ms = (time.time() - start_time) * 1000 | |
| # Execute shadow plugin (non-blocking, fire-and-forget) | |
| # This is intentionally a fire-and-forget task for shadow execution | |
| asyncio.create_task( | |
| self._execute_shadow( | |
| plugin_id=plugin_id, | |
| input_data=input_data, | |
| expected_result=production_result, | |
| expected_time_ms=production_time_ms, | |
| comparison_function=comparison_function, | |
| db_session=db_session, | |
| ) | |
| ) | |
| # ALWAYS return production result | |
| return production_result | |
| async def _execute_shadow( | |
| self, | |
| plugin_id: str, | |
| input_data: dict[str, Any], | |
| expected_result: Any, | |
| expected_time_ms: float, | |
| comparison_function: Callable | None = None, | |
| db_session=None, | |
| ): | |
| """Execute and compare shadow plugin (background task)""" | |
| try: | |
| # Load plugin | |
| plugin = await self.registry.get_plugin(plugin_id, db=db_session) | |
| # Execute with timeout (2x production time or min 1s) | |
| timeout = max(expected_time_ms * 2 / 1000, 1.0) | |
| start_time = time.time() | |
| shadow_result = await asyncio.wait_for( | |
| plugin.execute(input_data), timeout=timeout | |
| ) | |
| shadow_time_ms = (time.time() - start_time) * 1000 | |
| # Compare results | |
| if comparison_function: | |
| matches, match_pct, diffs = comparison_function( | |
| expected_result, shadow_result | |
| ) | |
| else: | |
| matches, match_pct, diffs = self._default_comparison( | |
| expected_result, shadow_result | |
| ) | |
| # Record metrics | |
| result = ShadowResult( | |
| plugin_id=plugin_id, | |
| execution_time_ms=shadow_time_ms, | |
| matches_production=matches, | |
| match_percentage=match_pct, | |
| production_result=expected_result, | |
| shadow_result=shadow_result, | |
| differences=diffs, | |
| timestamp=datetime.utcnow(), | |
| ) | |
| await self._record_shadow_result(result, db_session) | |
| # Alert if significant discrepancy | |
| if match_pct < self.comparison_threshold: | |
| self.metrics.record_error( | |
| "shadow_mismatch", | |
| f"Shadow plugin {plugin_id} mismatch ({match_pct:.2f})", | |
| {"plugin_id": plugin_id, "diffs": str(diffs)}, | |
| ) | |
| except TimeoutError: | |
| self.metrics.record_error( | |
| "shadow_timeout", f"Shadow execution timed out for {plugin_id}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Shadow execution error for {plugin_id}: {e}") | |
| self.metrics.record_error("shadow_error", str(e), {"plugin_id": plugin_id}) | |
| def _default_comparison(self, expected: Any, actual: Any) -> tuple: | |
| """Default result comparison""" | |
| if expected == actual: | |
| return True, 1.0, {} | |
| # For dict results, calculate field-level match | |
| if isinstance(expected, dict) and isinstance(actual, dict): | |
| total_fields = len(expected) | |
| matching_fields = sum( | |
| 1 for k in expected if k in actual and expected[k] == actual[k] | |
| ) | |
| match_pct = matching_fields / total_fields if total_fields > 0 else 0.0 | |
| diffs = { | |
| k: {"expected": expected.get(k), "actual": actual.get(k)} | |
| for k in set(expected.keys()) | set(actual.keys()) | |
| if expected.get(k) != actual.get(k) | |
| } | |
| return match_pct >= self.comparison_threshold, match_pct, diffs | |
| # Binary match for non-dict results | |
| return False, 0.0, {"expected": str(expected), "actual": str(actual)} | |
| async def _record_shadow_result(self, result: ShadowResult, db_session): | |
| """Store shadow execution results for analysis""" | |
| self.metrics.record_metric( | |
| "shadow.match_rate", | |
| 1.0 if result.matches_production else 0.0, | |
| {"plugin_id": result.plugin_id}, | |
| ) | |
| self.metrics.record_metric( | |
| "shadow.execution_time", | |
| result.execution_time_ms, | |
| {"plugin_id": result.plugin_id}, | |
| ) | |
| # Store in database for analysis | |
| # await self.registry.store_shadow_result(result, db=db_session) | |
| # Global instance using the kernel registry for late-binding dependencies | |
| shadow_executor = ShadowExecutor( | |
| kernel_registry.monitoring_service, plugin_registry_service | |
| ) | |