File size: 11,301 Bytes
fcf8749 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 | """
Final Resolution Agent - Local swap logic for honoring COUNTER decisions.
Phase 4.2 implementation for post-negotiation optimization.
"""
import statistics
from typing import Dict, List, Optional
from app.schemas.agent_schemas import (
DriverLiaisonDecision,
FinalResolutionResult,
FairnessMetrics,
RoutePlanResult,
SwapRecord,
)
class FinalResolutionAgent:
"""
Final Resolution Agent attempts to honor COUNTER decisions using local swaps.
Strategy:
1. Build driver↔route mappings from approved proposal
2. For each COUNTER decision:
- Try 1-to-1 swap with current route holder
- Evaluate fairness impact
- Accept swap only if fairness stays within tolerance
3. Return updated allocation with swap records
"""
# Default tolerance for accepting swaps
METRIC_EPSILON: float = 0.02 # Allow slight worsening (2%)
def __init__(self, metric_epsilon: float = 0.02):
"""
Initialize agent with configurable tolerance.
Args:
metric_epsilon: Maximum allowed metric degradation (as ratio)
"""
self.METRIC_EPSILON = metric_epsilon
def resolve_counters(
self,
approved_proposal: RoutePlanResult,
decisions: List[DriverLiaisonDecision],
effort_matrix: List[List[float]],
driver_ids: List[str],
route_ids: List[str],
current_metrics: FairnessMetrics,
) -> FinalResolutionResult:
"""
Attempt to resolve COUNTER decisions via local swaps.
Args:
approved_proposal: The accepted proposal from fairness manager
decisions: All driver liaison decisions
effort_matrix: 2D effort matrix [driver_idx][route_idx]
driver_ids: Driver IDs in matrix row order
route_ids: Route IDs in matrix column order
current_metrics: Current fairness metrics of approved proposal
Returns:
FinalResolutionResult with potentially updated allocation
"""
# Build index maps
driver_idx_map = {did: idx for idx, did in enumerate(driver_ids)}
route_idx_map = {rid: idx for idx, rid in enumerate(route_ids)}
# Build current mappings from proposal
driver_to_route: Dict[str, str] = {}
route_to_driver: Dict[str, str] = {}
per_driver_effort: Dict[str, float] = {}
for item in approved_proposal.allocation:
did = str(item.driver_id)
rid = str(item.route_id)
driver_to_route[did] = rid
route_to_driver[rid] = did
per_driver_effort[did] = item.effort
# Track current metrics
current_gini = current_metrics.gini_index
current_std = current_metrics.std_dev
current_max_gap = current_metrics.max_gap
swaps_applied: List[SwapRecord] = []
unfulfilled_counters: List[str] = []
# Filter COUNTER decisions
counter_decisions = [d for d in decisions if d.decision == "COUNTER"]
# Process each counter
for counter in counter_decisions:
driver_a = counter.driver_id
preferred_route = counter.preferred_route_id
# Skip if no preferred route specified
if not preferred_route:
unfulfilled_counters.append(driver_a)
continue
# Skip if driver A not in mapping
if driver_a not in driver_to_route:
unfulfilled_counters.append(driver_a)
continue
route_a = driver_to_route[driver_a]
# Skip if preferred route not assigned to anyone
if preferred_route not in route_to_driver:
unfulfilled_counters.append(driver_a)
continue
driver_b = route_to_driver[preferred_route]
route_b = preferred_route
# Skip if trying to swap with self
if driver_a == driver_b:
unfulfilled_counters.append(driver_a)
continue
# Get indices for effort lookup
idx_a = driver_idx_map.get(driver_a)
idx_b = driver_idx_map.get(driver_b)
idx_route_a = route_idx_map.get(route_a)
idx_route_b = route_idx_map.get(route_b)
if any(x is None for x in [idx_a, idx_b, idx_route_a, idx_route_b]):
unfulfilled_counters.append(driver_a)
continue
# Current efforts
effort_a_before = per_driver_effort[driver_a]
effort_b_before = per_driver_effort[driver_b]
# New efforts after swap
effort_a_after = effort_matrix[idx_a][idx_route_b] # A gets route B
effort_b_after = effort_matrix[idx_b][idx_route_a] # B gets route A
# Evaluate swap: compute new metrics
test_efforts = per_driver_effort.copy()
test_efforts[driver_a] = effort_a_after
test_efforts[driver_b] = effort_b_after
new_metrics = self._compute_metrics(list(test_efforts.values()))
# Check if swap is acceptable
if self._is_swap_acceptable(
current_gini, current_std, current_max_gap,
new_metrics, effort_b_before, effort_b_after
):
# Apply swap
driver_to_route[driver_a] = route_b
driver_to_route[driver_b] = route_a
route_to_driver[route_a] = driver_b
route_to_driver[route_b] = driver_a
per_driver_effort[driver_a] = effort_a_after
per_driver_effort[driver_b] = effort_b_after
# Update current metrics
current_gini = new_metrics["gini_index"]
current_std = new_metrics["std_dev"]
current_max_gap = new_metrics["max_gap"]
# Record swap
swaps_applied.append(SwapRecord(
driver_a=driver_a,
driver_b=driver_b,
route_a=route_a,
route_b=route_b,
effort_a_before=effort_a_before,
effort_a_after=effort_a_after,
effort_b_before=effort_b_before,
effort_b_after=effort_b_after,
))
else:
unfulfilled_counters.append(driver_a)
# Build final allocation list
allocation = [
{"driver_id": did, "route_id": rid, "effort": per_driver_effort[did]}
for did, rid in driver_to_route.items()
]
final_metrics = self._compute_metrics(list(per_driver_effort.values()))
return FinalResolutionResult(
allocation=allocation,
per_driver_effort=per_driver_effort,
metrics=final_metrics,
swaps_applied=swaps_applied,
unfulfilled_counters=unfulfilled_counters,
)
def _compute_metrics(self, efforts: List[float]) -> Dict[str, float]:
"""Compute fairness metrics from effort values."""
if not efforts:
return {
"avg_effort": 0.0,
"std_dev": 0.0,
"max_gap": 0.0,
"gini_index": 0.0,
}
n = len(efforts)
avg = statistics.mean(efforts)
min_e = min(efforts)
max_e = max(efforts)
std = statistics.stdev(efforts) if n > 1 else 0.0
gini = self._compute_gini(efforts)
return {
"avg_effort": round(avg, 2),
"std_dev": round(std, 2),
"max_gap": round(max_e - min_e, 2),
"gini_index": round(gini, 4),
"min_effort": round(min_e, 2),
"max_effort": round(max_e, 2),
}
def _compute_gini(self, values: List[float]) -> float:
"""Compute Gini coefficient."""
if not values or len(values) == 1:
return 0.0
n = len(values)
mean = statistics.mean(values)
if mean == 0:
return 0.0
total_diff = sum(
abs(values[i] - values[j])
for i in range(n)
for j in range(n)
)
return min(total_diff / (2 * n * n * mean), 1.0)
def _is_swap_acceptable(
self,
old_gini: float,
old_std: float,
old_max_gap: float,
new_metrics: Dict[str, float],
effort_b_before: float,
effort_b_after: float,
) -> bool:
"""
Check if a swap is acceptable based on fairness impact.
Accept if:
- New metrics not significantly worse (within epsilon)
- OR any metric strictly improves
- AND driver B's new effort is not drastically worse
"""
new_gini = new_metrics["gini_index"]
new_std = new_metrics["std_dev"]
new_max_gap = new_metrics["max_gap"]
# Check if any metric improves
improves = (
new_gini < old_gini - 0.001 or
new_std < old_std - 0.1 or
new_max_gap < old_max_gap - 0.1
)
# Check if metrics stay within tolerance
gini_ok = new_gini <= old_gini * (1 + self.METRIC_EPSILON)
std_ok = new_std <= old_std * (1 + self.METRIC_EPSILON) + 0.5
gap_ok = new_max_gap <= old_max_gap * (1 + self.METRIC_EPSILON) + 0.5
within_tolerance = gini_ok and std_ok and gap_ok
# Check driver B impact - don't drastically increase their effort
# Allow up to 30% increase for driver B
b_increase_ok = effort_b_after <= effort_b_before * 1.30 + 5.0
return (within_tolerance or improves) and b_increase_ok
def get_input_snapshot(
self,
num_counters: int,
current_metrics: FairnessMetrics,
global_avg: float,
) -> dict:
"""Generate input snapshot for DecisionLog."""
return {
"num_counters": num_counters,
"original_gini": current_metrics.gini_index,
"original_std_dev": current_metrics.std_dev,
"original_max_gap": current_metrics.max_gap,
"global_avg_effort": round(global_avg, 2),
"metric_epsilon": self.METRIC_EPSILON,
}
def get_output_snapshot(self, result: FinalResolutionResult) -> dict:
"""Generate output snapshot for DecisionLog."""
return {
"num_swaps_applied": len(result.swaps_applied),
"num_unfulfilled": len(result.unfulfilled_counters),
"final_gini": result.metrics.get("gini_index", 0),
"final_std_dev": result.metrics.get("std_dev", 0),
"final_max_gap": result.metrics.get("max_gap", 0),
"swaps": [
{"driver_a": s.driver_a, "driver_b": s.driver_b}
for s in result.swaps_applied[:5] # First 5 swaps
],
}
|