Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 6,668 Bytes
d745844 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | """Coordination module for the Text2SPARQL repair pipeline.
Merges validation and expert feedback into one decision:
accept, repair, or discard. Selects exactly one repair action.
"""
from __future__ import annotations
import logging
from collections import Counter
from .config import RuntimeConfig
from .models import (
CandidateQuery,
CoordinatorDecision,
ExpertFeedback,
ValidationResult,
)
logger = logging.getLogger(__name__)
# Fixed action priority order for tie-breaking
_ACTION_PRIORITY = [
"syntax_fix",
"entity_relink",
"predicate_replace",
"direction_fix",
"form_fix",
"projection_fix",
"constraint_fix",
]
def decide_action(
candidate: CandidateQuery,
validation: ValidationResult,
feedbacks: list[ExpertFeedback],
repair_iteration: int,
runtime: RuntimeConfig,
) -> CoordinatorDecision:
"""Decide on accept, repair, or discard based on validation and expert feedback.
Fixed decision logic:
1. If parse_ok is false β repair with syntax_fix
2. If all applicable experts say "ok" β accept
3. Otherwise β repair using the best actionable judge suggestion
Action selection: highest-confidence suggested action among experts,
with fixed priority order for tie-breaking.
Args:
candidate: The candidate under inspection.
validation: Validation result for this candidate.
feedbacks: Expert feedback list (may be empty).
repair_iteration: Current repair iteration (0-indexed).
runtime: Runtime configuration.
Returns:
CoordinatorDecision with decision and optional action.
"""
rationale: list[str] = []
# Rule 1: parse failure β syntax fix
if not validation.parse_ok:
rationale.append("Query failed to parse β applying syntax fix.")
return CoordinatorDecision(
candidate_id=candidate.candidate_id,
decision="repair",
selected_action="syntax_fix",
rationale=rationale,
)
# Rule 2: all applicable experts ok β accept
if feedbacks:
all_ok = all(f.verdict == "ok" for f in feedbacks)
if all_ok and validation.parse_ok and validation.execute_ok:
rationale.append("All applicable judges approved the syntax-valid query.")
return CoordinatorDecision(
candidate_id=candidate.candidate_id,
decision="accept",
selected_action=None,
rationale=rationale,
)
# Rule 3: discard if multiple experts complain strongly
bad_count = 0
for f in feedbacks:
if f.verdict == "bad" and f.confidence >= 0.8:
bad_count += 1
has_action = any(bool(f.suggested_action) for f in feedbacks)
if bad_count >= 2 and not has_action:
rationale.append("Multiple experts reported fatal errors with high confidence and no available actions.")
return CoordinatorDecision(
candidate_id=candidate.candidate_id,
decision="discard",
selected_action=None,
rationale=rationale,
)
# Rule 4: otherwise β repair
# Find the best action
selected_action = _select_best_action(feedbacks, validation, rationale)
return CoordinatorDecision(
candidate_id=candidate.candidate_id,
decision="repair",
selected_action=selected_action,
rationale=rationale,
)
def _select_best_action(
feedbacks: list[ExpertFeedback],
validation: ValidationResult,
rationale: list[str],
) -> str:
"""Select the best repair action from expert feedback.
Selection rule:
- Choose the highest-confidence suggested action among experts
- On tie, prefer actions earlier in _ACTION_PRIORITY
Args:
feedbacks: Expert feedback list.
validation: Validation result.
rationale: Rationale list to append reasoning to.
Returns:
Selected action string.
"""
# Collect all suggested actions with their confidence
action_scores: list[tuple[str, float, int]] = []
for feedback in feedbacks:
if feedback.suggested_action and feedback.verdict != "ok":
action = feedback.suggested_action
priority = (
_ACTION_PRIORITY.index(action)
if action in _ACTION_PRIORITY
else len(_ACTION_PRIORITY)
)
action_scores.append((action, feedback.confidence, priority))
if not action_scores:
# No expert suggested an action β infer from validation flags
if "form_mismatch" in validation.suspicious_flags:
rationale.append("No expert action β inferring form_fix from validation flags.")
return "form_fix"
if "execute_fail" in validation.suspicious_flags:
rationale.append("No expert action β inferring entity_relink from execution failure.")
return "entity_relink"
if "empty_result" in validation.suspicious_flags:
rationale.append("No expert action β inferring predicate_replace from empty results.")
return "predicate_replace"
rationale.append("No expert action available β defaulting to entity_relink.")
return "entity_relink"
# Sort by confidence DESC, then by priority ASC
action_scores.sort(key=lambda x: (-x[1], x[2]))
best_action = action_scores[0][0]
best_confidence = action_scores[0][1]
rationale.append(
f"Selected action '{best_action}' with confidence {best_confidence:.2f} "
f"from {len(action_scores)} suggested actions."
)
return best_action
def should_stop(
decision: CoordinatorDecision,
repair_iteration: int,
runtime: RuntimeConfig,
) -> bool:
"""Determine whether the repair loop should stop.
Stop conditions:
- Decision is "accept"
- Decision is "discard"
- Repair iteration reached max_repair_iterations
Args:
decision: The coordinator's decision.
repair_iteration: Current iteration (0-indexed).
runtime: Runtime configuration.
Returns:
True if the loop should stop.
"""
if decision.decision == "accept":
logger.info("Stopping: candidate accepted.")
return True
if decision.decision == "discard":
logger.info("Stopping: candidate discarded.")
return True
if repair_iteration >= runtime.max_repair_iterations:
logger.info(
"Stopping: reached max repair iterations (%d).",
runtime.max_repair_iterations,
)
return True
return False
|