Spaces:
Paused
Paused
File size: 6,401 Bytes
4a2ab42 a61bfa6 4a2ab42 a61bfa6 4a2ab42 a61bfa6 4a2ab42 a61bfa6 4a2ab42 a61bfa6 4a2ab42 a61bfa6 4a2ab42 a61bfa6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | import asyncio
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from core.infrastructure.registry import kernel_registry
from core.plugin_system.registry import plugin_registry_service
logger = logging.getLogger(__name__)
@dataclass
class ShadowResult:
plugin_id: str
execution_time_ms: float
matches_production: bool
match_percentage: float
production_result: Any
shadow_result: Any
differences: dict[str, Any]
timestamp: datetime
class ShadowExecutor:
"""
Execute plugins in shadow mode:
- Run plugin alongside production code
- Compare results
- Log discrepancies
- Zero production impact
"""
def __init__(self, metrics_service, registry_service):
self.metrics = metrics_service
self.registry = registry_service
self.comparison_threshold = 0.99 # 99% match required
async def execute_with_shadow(
self,
plugin_id: str,
production_function: Callable,
input_data: dict[str, Any],
comparison_function: Callable | None = None,
db_session=None,
) -> Any:
"""
Execute production code + plugin in parallel.
ALWAYS return production result.
"""
# Execute production code
start_time = time.time()
# Check if production_function is async
if asyncio.iscoroutinefunction(production_function):
production_result = await production_function(input_data)
else:
production_result = production_function(input_data)
production_time_ms = (time.time() - start_time) * 1000
# Execute shadow plugin (non-blocking, fire-and-forget)
# This is intentionally a fire-and-forget task for shadow execution
asyncio.create_task(
self._execute_shadow(
plugin_id=plugin_id,
input_data=input_data,
expected_result=production_result,
expected_time_ms=production_time_ms,
comparison_function=comparison_function,
db_session=db_session,
)
)
# ALWAYS return production result
return production_result
async def _execute_shadow(
self,
plugin_id: str,
input_data: dict[str, Any],
expected_result: Any,
expected_time_ms: float,
comparison_function: Callable | None = None,
db_session=None,
):
"""Execute and compare shadow plugin (background task)"""
try:
# Load plugin
plugin = await self.registry.get_plugin(plugin_id, db=db_session)
# Execute with timeout (2x production time or min 1s)
timeout = max(expected_time_ms * 2 / 1000, 1.0)
start_time = time.time()
shadow_result = await asyncio.wait_for(
plugin.execute(input_data), timeout=timeout
)
shadow_time_ms = (time.time() - start_time) * 1000
# Compare results
if comparison_function:
matches, match_pct, diffs = comparison_function(
expected_result, shadow_result
)
else:
matches, match_pct, diffs = self._default_comparison(
expected_result, shadow_result
)
# Record metrics
result = ShadowResult(
plugin_id=plugin_id,
execution_time_ms=shadow_time_ms,
matches_production=matches,
match_percentage=match_pct,
production_result=expected_result,
shadow_result=shadow_result,
differences=diffs,
timestamp=datetime.utcnow(),
)
await self._record_shadow_result(result, db_session)
# Alert if significant discrepancy
if match_pct < self.comparison_threshold:
self.metrics.record_error(
"shadow_mismatch",
f"Shadow plugin {plugin_id} mismatch ({match_pct:.2f})",
{"plugin_id": plugin_id, "diffs": str(diffs)},
)
except TimeoutError:
self.metrics.record_error(
"shadow_timeout", f"Shadow execution timed out for {plugin_id}"
)
except Exception as e:
logger.error(f"Shadow execution error for {plugin_id}: {e}")
self.metrics.record_error("shadow_error", str(e), {"plugin_id": plugin_id})
def _default_comparison(self, expected: Any, actual: Any) -> tuple:
"""Default result comparison"""
if expected == actual:
return True, 1.0, {}
# For dict results, calculate field-level match
if isinstance(expected, dict) and isinstance(actual, dict):
total_fields = len(expected)
matching_fields = sum(
1 for k in expected if k in actual and expected[k] == actual[k]
)
match_pct = matching_fields / total_fields if total_fields > 0 else 0.0
diffs = {
k: {"expected": expected.get(k), "actual": actual.get(k)}
for k in set(expected.keys()) | set(actual.keys())
if expected.get(k) != actual.get(k)
}
return match_pct >= self.comparison_threshold, match_pct, diffs
# Binary match for non-dict results
return False, 0.0, {"expected": str(expected), "actual": str(actual)}
async def _record_shadow_result(self, result: ShadowResult, db_session):
"""Store shadow execution results for analysis"""
self.metrics.record_metric(
"shadow.match_rate",
1.0 if result.matches_production else 0.0,
{"plugin_id": result.plugin_id},
)
self.metrics.record_metric(
"shadow.execution_time",
result.execution_time_ms,
{"plugin_id": result.plugin_id},
)
# Store in database for analysis
# await self.registry.store_shadow_result(result, db=db_session)
# Global instance using the kernel registry for late-binding dependencies
shadow_executor = ShadowExecutor(
kernel_registry.monitoring_service, plugin_registry_service
)
|