File size: 3,242 Bytes
02ff91f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""
Fallback chain resolver — handles specialist failures with graceful degradation.

Fallback chains are loaded from the specialist catalog (optional field).
If not defined in the catalog, a default strategy is used:
  - Try any specialist that shares a complexity_affinity with the failed one
  - Fall back to the lowest-latency specialist as last resort
"""

from __future__ import annotations
import yaml
from pathlib import Path
from reward.failure_reward import SpecialistResult, SpecialistStatus


class FallbackChainResolver:
    """
    If a specialist fails, automatically selects a fallback specialist.
    Chains are loaded from the catalog; no hardcoded specialist IDs.
    """

    def __init__(self, catalog_path: str = "configs/specialist_catalog.yaml"):
        self._chains: dict[str, list[str]] = {}
        self._specialists: list[dict] = []
        self._load_catalog(catalog_path)

    def _load_catalog(self, catalog_path: str) -> None:
        with open(catalog_path) as f:
            catalog = yaml.safe_load(f)

        self._specialists = catalog.get("specialists", [])

        # Load explicit fallback chains if defined in catalog
        for spec in self._specialists:
            if "fallback_to" in spec:
                self._chains[spec["id"]] = spec["fallback_to"]

    def get_fallback(
        self, failed_specialist_id: str, already_called: list[str]
    ) -> str | None:
        """
        Return the next fallback specialist, or None if exhausted.

        Priority:
        1. Explicit fallback_to chain from catalog
        2. Specialist sharing complexity_affinity with the failed one
        3. Lowest-latency available specialist
        """
        # 1. Explicit chain
        if failed_specialist_id in self._chains:
            for fallback_id in self._chains[failed_specialist_id]:
                if fallback_id not in already_called:
                    return fallback_id

        # 2. Shared complexity affinity
        failed_spec = next(
            (s for s in self._specialists if s["id"] == failed_specialist_id), None
        )
        if failed_spec:
            failed_affinities = set(failed_spec.get("complexity_affinity", []))
            candidates = [
                s for s in self._specialists
                if s["id"] != failed_specialist_id
                and s["id"] not in already_called
                and set(s.get("complexity_affinity", [])) & failed_affinities
            ]
            if candidates:
                # Pick lowest latency among affinity-compatible specialists
                candidates.sort(key=lambda s: s.get("avg_latency_ms", 9999))
                return candidates[0]["id"]

        # 3. Any available specialist (lowest latency)
        available = [
            s for s in self._specialists
            if s["id"] != failed_specialist_id
            and s["id"] not in already_called
        ]
        if available:
            available.sort(key=lambda s: s.get("avg_latency_ms", 9999))
            return available[0]["id"]

        return None

    def needs_fallback(self, result: SpecialistResult) -> bool:
        return result.status in (
            SpecialistStatus.TIMEOUT,
            SpecialistStatus.ERROR,
        )