File size: 6,401 Bytes
4a2ab42
 
 
 
 
 
 
 
a61bfa6
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a61bfa6
 
 
4a2ab42
 
 
 
a61bfa6
 
 
4a2ab42
a61bfa6
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a61bfa6
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
a61bfa6
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a61bfa6
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import asyncio
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from typing import Any

from core.infrastructure.registry import kernel_registry
from core.plugin_system.registry import plugin_registry_service

logger = logging.getLogger(__name__)


@dataclass
class ShadowResult:
    plugin_id: str
    execution_time_ms: float
    matches_production: bool
    match_percentage: float
    production_result: Any
    shadow_result: Any
    differences: dict[str, Any]
    timestamp: datetime


class ShadowExecutor:
    """
    Execute plugins in shadow mode:
    - Run plugin alongside production code
    - Compare results
    - Log discrepancies
    - Zero production impact
    """

    def __init__(self, metrics_service, registry_service):
        self.metrics = metrics_service
        self.registry = registry_service
        self.comparison_threshold = 0.99  # 99% match required

    async def execute_with_shadow(
        self,
        plugin_id: str,
        production_function: Callable,
        input_data: dict[str, Any],
        comparison_function: Callable | None = None,
        db_session=None,
    ) -> Any:
        """
        Execute production code + plugin in parallel.
        ALWAYS return production result.
        """
        # Execute production code
        start_time = time.time()

        # Check if production_function is async
        if asyncio.iscoroutinefunction(production_function):
            production_result = await production_function(input_data)
        else:
            production_result = production_function(input_data)

        production_time_ms = (time.time() - start_time) * 1000

        # Execute shadow plugin (non-blocking, fire-and-forget)
        # This is intentionally a fire-and-forget task for shadow execution
        asyncio.create_task(
            self._execute_shadow(
                plugin_id=plugin_id,
                input_data=input_data,
                expected_result=production_result,
                expected_time_ms=production_time_ms,
                comparison_function=comparison_function,
                db_session=db_session,
            )
        )

        # ALWAYS return production result
        return production_result

    async def _execute_shadow(
        self,
        plugin_id: str,
        input_data: dict[str, Any],
        expected_result: Any,
        expected_time_ms: float,
        comparison_function: Callable | None = None,
        db_session=None,
    ):
        """Execute and compare shadow plugin (background task)"""
        try:
            # Load plugin
            plugin = await self.registry.get_plugin(plugin_id, db=db_session)

            # Execute with timeout (2x production time or min 1s)
            timeout = max(expected_time_ms * 2 / 1000, 1.0)

            start_time = time.time()
            shadow_result = await asyncio.wait_for(
                plugin.execute(input_data), timeout=timeout
            )
            shadow_time_ms = (time.time() - start_time) * 1000

            # Compare results
            if comparison_function:
                matches, match_pct, diffs = comparison_function(
                    expected_result, shadow_result
                )
            else:
                matches, match_pct, diffs = self._default_comparison(
                    expected_result, shadow_result
                )

            # Record metrics
            result = ShadowResult(
                plugin_id=plugin_id,
                execution_time_ms=shadow_time_ms,
                matches_production=matches,
                match_percentage=match_pct,
                production_result=expected_result,
                shadow_result=shadow_result,
                differences=diffs,
                timestamp=datetime.utcnow(),
            )

            await self._record_shadow_result(result, db_session)

            # Alert if significant discrepancy
            if match_pct < self.comparison_threshold:
                self.metrics.record_error(
                    "shadow_mismatch",
                    f"Shadow plugin {plugin_id} mismatch ({match_pct:.2f})",
                    {"plugin_id": plugin_id, "diffs": str(diffs)},
                )

        except TimeoutError:
            self.metrics.record_error(
                "shadow_timeout", f"Shadow execution timed out for {plugin_id}"
            )
        except Exception as e:
            logger.error(f"Shadow execution error for {plugin_id}: {e}")
            self.metrics.record_error("shadow_error", str(e), {"plugin_id": plugin_id})

    def _default_comparison(self, expected: Any, actual: Any) -> tuple:
        """Default result comparison"""
        if expected == actual:
            return True, 1.0, {}

        # For dict results, calculate field-level match
        if isinstance(expected, dict) and isinstance(actual, dict):
            total_fields = len(expected)
            matching_fields = sum(
                1 for k in expected if k in actual and expected[k] == actual[k]
            )

            match_pct = matching_fields / total_fields if total_fields > 0 else 0.0

            diffs = {
                k: {"expected": expected.get(k), "actual": actual.get(k)}
                for k in set(expected.keys()) | set(actual.keys())
                if expected.get(k) != actual.get(k)
            }

            return match_pct >= self.comparison_threshold, match_pct, diffs

        # Binary match for non-dict results
        return False, 0.0, {"expected": str(expected), "actual": str(actual)}

    async def _record_shadow_result(self, result: ShadowResult, db_session):
        """Store shadow execution results for analysis"""
        self.metrics.record_metric(
            "shadow.match_rate",
            1.0 if result.matches_production else 0.0,
            {"plugin_id": result.plugin_id},
        )
        self.metrics.record_metric(
            "shadow.execution_time",
            result.execution_time_ms,
            {"plugin_id": result.plugin_id},
        )

        # Store in database for analysis
        # await self.registry.store_shadow_result(result, db=db_session)


# Global instance using the kernel registry for late-binding dependencies
shadow_executor = ShadowExecutor(
    kernel_registry.monitoring_service, plugin_registry_service
)