"""Coordination module for the Text2SPARQL repair pipeline. Merges validation and expert feedback into one decision: accept, repair, or discard. Selects exactly one repair action. """ from __future__ import annotations import logging from collections import Counter from .config import RuntimeConfig from .models import ( CandidateQuery, CoordinatorDecision, ExpertFeedback, ValidationResult, ) logger = logging.getLogger(__name__) # Fixed action priority order for tie-breaking _ACTION_PRIORITY = [ "syntax_fix", "entity_relink", "predicate_replace", "direction_fix", "form_fix", "projection_fix", "constraint_fix", ] def decide_action( candidate: CandidateQuery, validation: ValidationResult, feedbacks: list[ExpertFeedback], repair_iteration: int, runtime: RuntimeConfig, ) -> CoordinatorDecision: """Decide on accept, repair, or discard based on validation and expert feedback. Fixed decision logic: 1. If parse_ok is false → repair with syntax_fix 2. If all applicable experts say "ok" → accept 3. Otherwise → repair using the best actionable judge suggestion Action selection: highest-confidence suggested action among experts, with fixed priority order for tie-breaking. Args: candidate: The candidate under inspection. validation: Validation result for this candidate. feedbacks: Expert feedback list (may be empty). repair_iteration: Current repair iteration (0-indexed). runtime: Runtime configuration. Returns: CoordinatorDecision with decision and optional action. """ rationale: list[str] = [] # Rule 1: parse failure → syntax fix if not validation.parse_ok: rationale.append("Query failed to parse — applying syntax fix.") return CoordinatorDecision( candidate_id=candidate.candidate_id, decision="repair", selected_action="syntax_fix", rationale=rationale, ) # Rule 2: all applicable experts ok → accept if feedbacks: all_ok = all(f.verdict == "ok" for f in feedbacks) if all_ok and validation.parse_ok and validation.execute_ok: rationale.append("All applicable judges approved the syntax-valid query.") return CoordinatorDecision( candidate_id=candidate.candidate_id, decision="accept", selected_action=None, rationale=rationale, ) # Rule 3: discard if multiple experts complain strongly bad_count = 0 for f in feedbacks: if f.verdict == "bad" and f.confidence >= 0.8: bad_count += 1 has_action = any(bool(f.suggested_action) for f in feedbacks) if bad_count >= 2 and not has_action: rationale.append("Multiple experts reported fatal errors with high confidence and no available actions.") return CoordinatorDecision( candidate_id=candidate.candidate_id, decision="discard", selected_action=None, rationale=rationale, ) # Rule 4: otherwise → repair # Find the best action selected_action = _select_best_action(feedbacks, validation, rationale) return CoordinatorDecision( candidate_id=candidate.candidate_id, decision="repair", selected_action=selected_action, rationale=rationale, ) def _select_best_action( feedbacks: list[ExpertFeedback], validation: ValidationResult, rationale: list[str], ) -> str: """Select the best repair action from expert feedback. Selection rule: - Choose the highest-confidence suggested action among experts - On tie, prefer actions earlier in _ACTION_PRIORITY Args: feedbacks: Expert feedback list. validation: Validation result. rationale: Rationale list to append reasoning to. Returns: Selected action string. """ # Collect all suggested actions with their confidence action_scores: list[tuple[str, float, int]] = [] for feedback in feedbacks: if feedback.suggested_action and feedback.verdict != "ok": action = feedback.suggested_action priority = ( _ACTION_PRIORITY.index(action) if action in _ACTION_PRIORITY else len(_ACTION_PRIORITY) ) action_scores.append((action, feedback.confidence, priority)) if not action_scores: # No expert suggested an action — infer from validation flags if "form_mismatch" in validation.suspicious_flags: rationale.append("No expert action — inferring form_fix from validation flags.") return "form_fix" if "execute_fail" in validation.suspicious_flags: rationale.append("No expert action — inferring entity_relink from execution failure.") return "entity_relink" if "empty_result" in validation.suspicious_flags: rationale.append("No expert action — inferring predicate_replace from empty results.") return "predicate_replace" rationale.append("No expert action available — defaulting to entity_relink.") return "entity_relink" # Sort by confidence DESC, then by priority ASC action_scores.sort(key=lambda x: (-x[1], x[2])) best_action = action_scores[0][0] best_confidence = action_scores[0][1] rationale.append( f"Selected action '{best_action}' with confidence {best_confidence:.2f} " f"from {len(action_scores)} suggested actions." ) return best_action def should_stop( decision: CoordinatorDecision, repair_iteration: int, runtime: RuntimeConfig, ) -> bool: """Determine whether the repair loop should stop. Stop conditions: - Decision is "accept" - Decision is "discard" - Repair iteration reached max_repair_iterations Args: decision: The coordinator's decision. repair_iteration: Current iteration (0-indexed). runtime: Runtime configuration. Returns: True if the loop should stop. """ if decision.decision == "accept": logger.info("Stopping: candidate accepted.") return True if decision.decision == "discard": logger.info("Stopping: candidate discarded.") return True if repair_iteration >= runtime.max_repair_iterations: logger.info( "Stopping: reached max repair iterations (%d).", runtime.max_repair_iterations, ) return True return False