Roopalgn's picture
Add queue-planning helpdesk routing mechanics
8241eb5
from __future__ import annotations
MILESTONE_HIGH_THRESHOLD = 0.8
MILESTONE_LOW_THRESHOLD = 0.2
MILESTONE_BONUS = 0.05
MILESTONE_PENALTY = 0.05
DELTA_REWARD_WEIGHT = 0.08
DELTA_REWARD_CAP = 0.04
PROCESS_BONUS_CAP = 0.08
RISK_PENALTY_CAP = 0.12
def _clamp_unit_interval(value: float) -> float:
return max(0.0, min(1.0, value))
def clamp_open_unit_interval(value: float, epsilon: float = 0.0) -> float:
return _clamp_unit_interval(value)
def compute_step_adjustments(
score: float,
*,
previous_average: float = 0.0,
process_bonus: float = 0.0,
risk_penalty: float = 0.0,
) -> dict[str, float]:
base = _clamp_unit_interval(score)
if score >= MILESTONE_HIGH_THRESHOLD:
milestone_adjustment = MILESTONE_BONUS
elif score < MILESTONE_LOW_THRESHOLD:
milestone_adjustment = -MILESTONE_PENALTY
else:
milestone_adjustment = 0.0
delta_adjustment = _clamp_delta((base - previous_average) * DELTA_REWARD_WEIGHT)
bounded_process_bonus = max(0.0, min(PROCESS_BONUS_CAP, process_bonus))
bounded_risk_penalty = max(0.0, min(RISK_PENALTY_CAP, risk_penalty))
final_reward = _clamp_unit_interval(
base
+ milestone_adjustment
+ delta_adjustment
+ bounded_process_bonus
- bounded_risk_penalty
)
return {
"base_reward": base,
"milestone_adjustment": milestone_adjustment,
"delta_adjustment": delta_adjustment,
"process_bonus": bounded_process_bonus,
"risk_penalty": bounded_risk_penalty,
"final_reward": final_reward,
}
def _clamp_delta(value: float) -> float:
return max(-DELTA_REWARD_CAP, min(DELTA_REWARD_CAP, value))
def compute_step_reward(
score: float,
*,
previous_average: float = 0.0,
process_bonus: float = 0.0,
risk_penalty: float = 0.0,
) -> float:
return compute_step_adjustments(
score,
previous_average=previous_average,
process_bonus=process_bonus,
risk_penalty=risk_penalty,
)["final_reward"]
def compute_trajectory_adjustments(
per_ticket_scores: list[float],
queue_size: int,
steps_taken: int,
*,
completion_bonus: float = 0.0,
consistency_bonus: float = 0.0,
) -> dict[str, float]:
if not per_ticket_scores:
return {
"average_reward": 0.0,
"completion_bonus": 0.0,
"consistency_bonus": 0.0,
"final_reward": 0.0,
}
avg = sum(per_ticket_scores) / len(per_ticket_scores)
bounded_completion_bonus = max(0.0, min(0.08, completion_bonus))
bounded_consistency_bonus = max(0.0, min(0.05, consistency_bonus))
final_reward = _clamp_unit_interval(
avg + bounded_completion_bonus + bounded_consistency_bonus
)
return {
"average_reward": avg,
"completion_bonus": bounded_completion_bonus,
"consistency_bonus": bounded_consistency_bonus,
"final_reward": final_reward,
}
def compute_trajectory_reward(
per_ticket_scores: list[float],
queue_size: int,
steps_taken: int,
*,
completion_bonus: float = 0.0,
consistency_bonus: float = 0.0,
) -> float:
return compute_trajectory_adjustments(
per_ticket_scores,
queue_size,
steps_taken,
completion_bonus=completion_bonus,
consistency_bonus=consistency_bonus,
)["final_reward"]