FairRelay / brain /app /services /admin_service.py
MouleeswaranM's picture
Upload folder using huggingface_hub
fcf8749 verified
"""
Admin-facing service layer for Phase 3 API endpoints.
Provides business logic for admin operations.
"""
from datetime import date, datetime, timedelta
from typing import Optional, List
from uuid import UUID
from sqlalchemy import select, and_, func, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import (
Driver, Assignment, Route, DriverFeedback,
Appeal, AppealStatus,
ManualOverride,
FairnessConfig,
AllocationRun, AllocationRunStatus,
DecisionLog,
)
from app.schemas.admin import (
HealthResponse, LatestAllocationRunInfo,
AllocationRunResponse, AllocationRunsListResponse,
AdminAssignmentsListResponse, AdminAssignmentResponse,
AdminDriverInfo, AdminRouteInfo, AdminFeedbackInfo,
FairnessMetricsResponse, FairnessMetricsPoint,
WorkloadHeatmapResponse, HeatmapDriver, HeatmapCell,
DriverHistoryResponse, DriverHistoryDay,
AppealsListResponse, AppealResponse, AppealDriverInfo, AppealContext,
AppealDecisionResponse,
ManualOverrideResponse, ManualOverrideInfo, MetricsSnapshot, UpdatedAssignment,
FairnessConfigResponse,
AgentTimelineResponse, DecisionLogStep,
)
from app.services.fairness import calculate_global_fairness
async def get_system_health(db: AsyncSession) -> HealthResponse:
"""
Get system health status including latest allocation run.
"""
# Check database connection
try:
await db.execute(select(func.now()))
db_status = "up"
except Exception:
db_status = "down"
# Get latest allocation run
result = await db.execute(
select(AllocationRun)
.order_by(AllocationRun.started_at.desc())
.limit(1)
)
latest_run = result.scalar_one_or_none()
latest_run_info = None
if latest_run:
latest_run_info = LatestAllocationRunInfo(
id=latest_run.id,
run_date=latest_run.date,
status=latest_run.status.value,
started_at=latest_run.started_at,
finished_at=latest_run.finished_at,
)
return HealthResponse(
status="ok" if db_status == "up" else "degraded",
database=db_status,
latest_allocation_run=latest_run_info,
)
async def get_allocation_runs(
db: AsyncSession,
target_date: date,
) -> AllocationRunsListResponse:
"""
Get allocation runs for a specific date.
"""
result = await db.execute(
select(AllocationRun)
.where(AllocationRun.date == target_date)
.order_by(AllocationRun.started_at.desc())
)
runs = result.scalars().all()
return AllocationRunsListResponse(
runs=[
AllocationRunResponse(
id=run.id,
run_date=run.date,
num_drivers=run.num_drivers,
num_routes=run.num_routes,
num_packages=run.num_packages,
global_gini_index=run.global_gini_index,
global_std_dev=run.global_std_dev,
global_max_gap=run.global_max_gap,
status=run.status.value,
error_message=run.error_message,
started_at=run.started_at,
finished_at=run.finished_at,
)
for run in runs
]
)
async def get_assignments_paginated(
db: AsyncSession,
target_date: date,
driver_id: Optional[UUID] = None,
min_fairness: Optional[float] = None,
max_fairness: Optional[float] = None,
page: int = 1,
page_size: int = 50,
) -> AdminAssignmentsListResponse:
"""
Get paginated assignments with filters.
"""
# Build query
query = select(Assignment, Driver, Route).join(
Driver, Assignment.driver_id == Driver.id
).join(
Route, Assignment.route_id == Route.id
).where(
Assignment.date == target_date
)
if driver_id:
query = query.where(Assignment.driver_id == driver_id)
if min_fairness is not None:
query = query.where(Assignment.fairness_score >= min_fairness)
if max_fairness is not None:
query = query.where(Assignment.fairness_score <= max_fairness)
# Get total count
count_query = select(func.count()).select_from(query.subquery())
total_result = await db.execute(count_query)
total_items = total_result.scalar() or 0
# Apply pagination
offset = (page - 1) * page_size
query = query.offset(offset).limit(page_size).order_by(Assignment.created_at)
result = await db.execute(query)
rows = result.all()
items = []
for assignment, driver, route in rows:
# Get feedback if exists
feedback_result = await db.execute(
select(DriverFeedback)
.where(DriverFeedback.assignment_id == assignment.id)
.limit(1)
)
feedback = feedback_result.scalar_one_or_none()
items.append(AdminAssignmentResponse(
assignment_id=assignment.id,
allocation_run_id=assignment.allocation_run_id,
driver=AdminDriverInfo(
id=driver.id,
name=driver.name,
vehicle_type=driver.vehicle_type.value if driver.vehicle_type else None,
),
route=AdminRouteInfo(
id=route.id,
num_packages=route.num_packages,
total_weight_kg=route.total_weight_kg,
num_stops=route.num_stops,
route_difficulty_score=route.route_difficulty_score,
estimated_time_minutes=route.estimated_time_minutes,
),
workload_score=assignment.workload_score,
fairness_score=assignment.fairness_score,
explanation=assignment.explanation,
feedback=AdminFeedbackInfo(
fairness_rating=feedback.fairness_rating if feedback else None,
stress_level=feedback.stress_level if feedback else None,
) if feedback else None,
))
return AdminAssignmentsListResponse(
items=items,
page=page,
page_size=page_size,
total_items=total_items,
)
async def get_fairness_metrics_series(
db: AsyncSession,
start_date: date,
end_date: date,
) -> FairnessMetricsResponse:
"""
Get fairness metrics time series.
"""
# Get allocation runs in range
result = await db.execute(
select(AllocationRun)
.where(
and_(
AllocationRun.date >= start_date,
AllocationRun.date <= end_date,
AllocationRun.status == AllocationRunStatus.SUCCESS,
)
)
.order_by(AllocationRun.date)
)
runs = result.scalars().all()
points = []
for run in runs:
# Get appeals count for date
appeals_result = await db.execute(
select(func.count(Appeal.id))
.join(Assignment, Appeal.assignment_id == Assignment.id)
.where(Assignment.date == run.date)
)
appeals_count = appeals_result.scalar() or 0
# Calculate outliers (assignments > 1.5 std dev from mean)
# For now, use a simplified calculation
outlier_count = 0
if run.global_std_dev > 0:
assignments_result = await db.execute(
select(Assignment.workload_score)
.where(Assignment.allocation_run_id == run.id)
)
workloads = [w for (w,) in assignments_result.all()]
if workloads:
avg = sum(workloads) / len(workloads)
threshold = run.global_std_dev * 1.5
outlier_count = sum(1 for w in workloads if abs(w - avg) > threshold)
points.append(FairnessMetricsPoint(
point_date=run.date,
gini_index=run.global_gini_index,
std_dev=run.global_std_dev,
max_gap=run.global_max_gap,
outlier_count=outlier_count,
appeals_count=appeals_count,
))
return FairnessMetricsResponse(points=points)
async def get_workload_heatmap(
db: AsyncSession,
start_date: date,
end_date: date,
) -> WorkloadHeatmapResponse:
"""
Get workload heatmap data.
"""
# Get all assignments in range
result = await db.execute(
select(Assignment, Driver)
.join(Driver, Assignment.driver_id == Driver.id)
.where(
and_(
Assignment.date >= start_date,
Assignment.date <= end_date,
)
)
.order_by(Assignment.date, Driver.name)
)
rows = result.all()
# Collect unique drivers and dates
drivers_map = {}
dates_set = set()
cells = []
for assignment, driver in rows:
if driver.id not in drivers_map:
drivers_map[driver.id] = HeatmapDriver(id=driver.id, name=driver.name)
dates_set.add(assignment.date)
cells.append(HeatmapCell(
driver_id=driver.id,
cell_date=assignment.date,
workload_score=assignment.workload_score,
fairness_score=assignment.fairness_score,
))
return WorkloadHeatmapResponse(
drivers=list(drivers_map.values()),
dates=sorted(dates_set),
cells=cells,
)
async def get_driver_history(
db: AsyncSession,
driver_id: UUID,
window_days: int = 30,
) -> Optional[DriverHistoryResponse]:
"""
Get detailed driver history including appeals and overrides.
"""
# Verify driver exists
driver_result = await db.execute(
select(Driver).where(Driver.id == driver_id)
)
driver = driver_result.scalar_one_or_none()
if not driver:
return None
end_date = date.today()
start_date = end_date - timedelta(days=window_days)
# Get assignments with feedback
assignments_result = await db.execute(
select(Assignment, DriverFeedback)
.outerjoin(
DriverFeedback,
and_(
DriverFeedback.assignment_id == Assignment.id,
DriverFeedback.driver_id == Assignment.driver_id,
)
)
.where(
and_(
Assignment.driver_id == driver_id,
Assignment.date >= start_date,
Assignment.date <= end_date,
)
)
.order_by(Assignment.date)
)
days = []
for assignment, feedback in assignments_result.all():
# Count appeals for this assignment
appeals_result = await db.execute(
select(func.count(Appeal.id))
.where(Appeal.assignment_id == assignment.id)
)
appeals_count = appeals_result.scalar() or 0
# Count overrides affecting this driver
overrides_result = await db.execute(
select(func.count(ManualOverride.id))
.where(
and_(
ManualOverride.allocation_run_id == assignment.allocation_run_id,
(ManualOverride.old_driver_id == driver_id) |
(ManualOverride.new_driver_id == driver_id),
)
)
)
overrides_count = overrides_result.scalar() or 0
days.append(DriverHistoryDay(
day_date=assignment.date,
workload_score=assignment.workload_score,
fairness_score=assignment.fairness_score,
reported_stress_level=float(feedback.stress_level) if feedback else None,
reported_fairness_rating=feedback.fairness_rating if feedback else None,
appeals_count=appeals_count,
manual_overrides_affecting_driver=overrides_count,
))
return DriverHistoryResponse(
driver_id=driver_id,
window_days=window_days,
days=days,
)
async def list_appeals(
db: AsyncSession,
status_filter: Optional[str] = None,
) -> AppealsListResponse:
"""
List appeals with optional status filter.
"""
query = select(Appeal, Driver, Assignment).join(
Driver, Appeal.driver_id == Driver.id
).join(
Assignment, Appeal.assignment_id == Assignment.id
)
if status_filter:
try:
status = AppealStatus(status_filter)
query = query.where(Appeal.status == status)
except ValueError:
pass # Invalid status, ignore filter
query = query.order_by(Appeal.created_at.desc())
result = await db.execute(query)
rows = result.all()
items = []
for appeal, driver, assignment in rows:
items.append(AppealResponse(
id=appeal.id,
driver=AppealDriverInfo(id=driver.id, name=driver.name),
assignment_id=appeal.assignment_id,
appeal_date=assignment.date,
reason=appeal.reason,
status=appeal.status.value,
admin_note=appeal.admin_note,
created_at=appeal.created_at,
updated_at=appeal.updated_at,
context=AppealContext(
workload_score=assignment.workload_score,
fairness_score=assignment.fairness_score,
recent_streak_hard_days=None, # Would need more complex query
),
))
return AppealsListResponse(items=items)
async def decide_appeal(
db: AsyncSession,
appeal_id: UUID,
new_status: str,
admin_note: Optional[str] = None,
) -> Optional[AppealDecisionResponse]:
"""
Update appeal status.
"""
# Get appeal
result = await db.execute(
select(Appeal).where(Appeal.id == appeal_id)
)
appeal = result.scalar_one_or_none()
if not appeal:
return None
# Parse status
try:
status = AppealStatus(new_status)
except ValueError:
raise ValueError(f"Invalid status: {new_status}")
# Update
appeal.status = status
appeal.admin_note = admin_note
appeal.updated_at = datetime.utcnow()
await db.flush()
await db.refresh(appeal)
return AppealDecisionResponse(
id=appeal.id,
status=appeal.status.value,
admin_note=appeal.admin_note,
updated_at=appeal.updated_at,
)
async def perform_manual_override(
db: AsyncSession,
allocation_run_id: UUID,
old_driver_id: UUID,
new_driver_id: UUID,
route_id: UUID,
reason: Optional[str] = None,
) -> ManualOverrideResponse:
"""
Perform manual override of route assignment.
"""
# Get current assignment
old_assignment_result = await db.execute(
select(Assignment).where(
and_(
Assignment.allocation_run_id == allocation_run_id,
Assignment.driver_id == old_driver_id,
Assignment.route_id == route_id,
)
)
)
old_assignment = old_assignment_result.scalar_one_or_none()
if not old_assignment:
raise ValueError("Original assignment not found")
# Get all workloads before override
before_result = await db.execute(
select(Assignment.workload_score)
.where(Assignment.allocation_run_id == allocation_run_id)
)
before_workloads = [w for (w,) in before_result.all()]
before_metrics = calculate_global_fairness(before_workloads)
before_max_gap = max(before_workloads) - min(before_workloads) if before_workloads else 0
# Update assignment to new driver
old_assignment.driver_id = new_driver_id
await db.flush()
# Get workloads after override
after_result = await db.execute(
select(Assignment.workload_score)
.where(Assignment.allocation_run_id == allocation_run_id)
)
after_workloads = [w for (w,) in after_result.all()]
after_metrics = calculate_global_fairness(after_workloads)
after_max_gap = max(after_workloads) - min(after_workloads) if after_workloads else 0
# Create override record
override = ManualOverride(
allocation_run_id=allocation_run_id,
old_driver_id=old_driver_id,
new_driver_id=new_driver_id,
route_id=route_id,
reason=reason,
before_metrics={
"gini_index": before_metrics.gini_index,
"std_dev": before_metrics.std_dev,
"max_gap": before_max_gap,
},
after_metrics={
"gini_index": after_metrics.gini_index,
"std_dev": after_metrics.std_dev,
"max_gap": after_max_gap,
},
)
db.add(override)
await db.flush()
await db.refresh(override)
return ManualOverrideResponse(
manual_override=ManualOverrideInfo(
id=override.id,
allocation_run_id=override.allocation_run_id,
old_driver_id=override.old_driver_id,
new_driver_id=override.new_driver_id,
route_id=override.route_id,
reason=override.reason,
before_metrics=MetricsSnapshot(
gini_index=before_metrics.gini_index,
std_dev=before_metrics.std_dev,
max_gap=before_max_gap,
),
after_metrics=MetricsSnapshot(
gini_index=after_metrics.gini_index,
std_dev=after_metrics.std_dev,
max_gap=after_max_gap,
),
created_at=override.created_at,
),
updated_assignments=[
UpdatedAssignment(
assignment_id=old_assignment.id,
driver_id=new_driver_id,
route_id=route_id,
),
],
)
async def get_active_fairness_config(db: AsyncSession) -> Optional[FairnessConfigResponse]:
"""
Get the currently active fairness config.
"""
result = await db.execute(
select(FairnessConfig)
.where(FairnessConfig.is_active == True)
.order_by(FairnessConfig.created_at.desc())
.limit(1)
)
config = result.scalar_one_or_none()
if not config:
return None
return FairnessConfigResponse(
id=config.id,
is_active=config.is_active,
workload_weight_packages=config.workload_weight_packages,
workload_weight_weight_kg=config.workload_weight_weight_kg,
workload_weight_difficulty=config.workload_weight_difficulty,
workload_weight_time=config.workload_weight_time,
gini_threshold=config.gini_threshold,
stddev_threshold=config.stddev_threshold,
max_gap_threshold=config.max_gap_threshold,
recovery_mode_enabled=config.recovery_mode_enabled,
created_at=config.created_at,
updated_at=config.updated_at,
)
async def create_fairness_config(
db: AsyncSession,
workload_weight_packages: float,
workload_weight_weight_kg: float,
workload_weight_difficulty: float,
workload_weight_time: float,
gini_threshold: float,
stddev_threshold: float,
max_gap_threshold: float,
recovery_mode_enabled: bool,
) -> FairnessConfigResponse:
"""
Create new fairness config and deactivate old ones.
"""
# Deactivate existing configs
await db.execute(
update(FairnessConfig)
.where(FairnessConfig.is_active == True)
.values(is_active=False)
)
# Create new config
config = FairnessConfig(
is_active=True,
workload_weight_packages=workload_weight_packages,
workload_weight_weight_kg=workload_weight_weight_kg,
workload_weight_difficulty=workload_weight_difficulty,
workload_weight_time=workload_weight_time,
gini_threshold=gini_threshold,
stddev_threshold=stddev_threshold,
max_gap_threshold=max_gap_threshold,
recovery_mode_enabled=recovery_mode_enabled,
)
db.add(config)
await db.flush()
await db.refresh(config)
return FairnessConfigResponse(
id=config.id,
is_active=config.is_active,
workload_weight_packages=config.workload_weight_packages,
workload_weight_weight_kg=config.workload_weight_weight_kg,
workload_weight_difficulty=config.workload_weight_difficulty,
workload_weight_time=config.workload_weight_time,
gini_threshold=config.gini_threshold,
stddev_threshold=config.stddev_threshold,
max_gap_threshold=config.max_gap_threshold,
recovery_mode_enabled=config.recovery_mode_enabled,
created_at=config.created_at,
updated_at=config.updated_at,
)
async def get_agent_timeline(
db: AsyncSession,
allocation_run_id: UUID,
) -> AgentTimelineResponse:
"""
Get enhanced agent decision timeline for an allocation run.
"""
from app.schemas.admin import (
AllocationRunInfo, AgentTimelineEvent,
)
# Fetch allocation run
run_result = await db.execute(
select(AllocationRun).where(AllocationRun.id == allocation_run_id)
)
allocation_run = run_result.scalar_one_or_none()
if not allocation_run:
# Return empty timeline if run not found
return AgentTimelineResponse(
allocation_run=AllocationRunInfo(
id=allocation_run_id,
date=date.today(),
num_drivers=0,
num_routes=0,
num_packages=0,
global_metrics={},
status="NOT_FOUND",
started_at=datetime.utcnow(),
),
timeline=[],
allocation_run_id=allocation_run_id,
steps=[],
)
# Calculate duration
duration = None
if allocation_run.finished_at and allocation_run.started_at:
duration = (allocation_run.finished_at - allocation_run.started_at).total_seconds()
# Compute avg_effort from assignments if not stored
avg_effort = 0.0
assignments_result = await db.execute(
select(func.avg(Assignment.workload_score))
.where(Assignment.allocation_run_id == allocation_run_id)
)
avg_val = assignments_result.scalar()
if avg_val:
avg_effort = float(avg_val)
run_info = AllocationRunInfo(
id=allocation_run.id,
date=allocation_run.date,
num_drivers=allocation_run.num_drivers,
num_routes=allocation_run.num_routes,
num_packages=allocation_run.num_packages,
global_metrics={
"gini_index": allocation_run.global_gini_index,
"std_dev": allocation_run.global_std_dev,
"max_gap": allocation_run.global_max_gap,
"avg_effort": round(avg_effort, 2),
},
status=allocation_run.status.value,
started_at=allocation_run.started_at,
finished_at=allocation_run.finished_at,
duration_seconds=duration,
)
# Fetch decision logs
result = await db.execute(
select(DecisionLog)
.where(DecisionLog.allocation_run_id == allocation_run_id)
.order_by(DecisionLog.created_at, DecisionLog.id)
)
logs = result.scalars().all()
# Build timeline events with short messages
timeline = []
for log in logs:
short_message = _generate_short_message(log)
details = _extract_details(log)
timeline.append(AgentTimelineEvent(
id=log.id,
timestamp=log.created_at,
agent_name=log.agent_name,
step_type=log.step_type,
short_message=short_message,
details=details,
))
# Also build legacy steps for backward compatibility
legacy_steps = [
DecisionLogStep(
id=log.id,
agent_name=log.agent_name,
step_type=log.step_type,
input_snapshot=log.input_snapshot,
output_snapshot=log.output_snapshot,
created_at=log.created_at,
)
for log in logs
]
return AgentTimelineResponse(
allocation_run=run_info,
timeline=timeline,
allocation_run_id=allocation_run_id,
steps=legacy_steps,
)
def _generate_short_message(log: DecisionLog) -> str:
"""Generate a human-readable short message for a decision log entry."""
agent = log.agent_name
step = log.step_type
inp = log.input_snapshot or {}
out = log.output_snapshot or {}
if agent == "ML_EFFORT":
num_d = inp.get("num_drivers", out.get("num_drivers", "?"))
num_r = inp.get("num_routes", out.get("num_routes", "?"))
return f"Computed effort matrix for {num_d} drivers × {num_r} routes"
if agent == "ROUTE_PLANNER":
if step == "PROPOSAL_1":
return "Generated initial route assignment proposal"
if step == "PROPOSAL_2":
return "Generated re-optimized proposal with fairness penalties"
if step == "FINAL_RESOLUTION":
swaps = out.get("swaps_applied", out.get("num_swaps", 0))
return f"Applied {swaps} swaps after negotiation"
if agent == "FAIRNESS_MANAGER":
status = out.get("status", "UNKNOWN")
if status == "REOPTIMIZE":
return "Fairness check requested re-optimization"
return "Fairness check accepted proposal"
if agent == "DRIVER_LIAISON":
accept = out.get("num_accept", 0)
counter = out.get("num_counter", 0)
force = out.get("num_force_accept", 0)
return f"Drivers: {accept} ACCEPT, {counter} COUNTER, {force} FORCE_ACCEPT"
if agent == "EXPLAINABILITY":
total = out.get("total_explanations", "?")
cats = out.get("category_counts", {})
num_cats = len(cats)
return f"Generated {total} explanations in {num_cats} categories"
return f"{agent}: {step}"
def _extract_details(log: DecisionLog) -> dict:
"""Extract relevant details from log snapshots."""
inp = log.input_snapshot or {}
out = log.output_snapshot or {}
details = {}
# Whitelist relevant keys
relevant_keys = {
"num_drivers", "num_routes", "min_effort", "max_effort", "avg_effort",
"total_effort", "gini_index", "std_dev", "max_gap", "status",
"num_accept", "num_counter", "num_force_accept",
"swaps_applied", "unfulfilled_counters",
"total_explanations", "category_counts",
"final_gini_index", "final_std_dev", "final_max_gap",
"matrix_shape", "num_packages",
}
for key in relevant_keys:
if key in out:
details[key] = out[key]
elif key in inp:
details[key] = inp[key]
return details
async def get_driver_allocation_story(
db: AsyncSession,
driver_id: UUID,
target_date: date,
) -> Optional["DriverAllocationStoryResponse"]:
"""
Get the complete allocation story for a driver on a specific date.
"""
from app.schemas.admin import (
DriverAllocationStoryResponse,
StoryDriverInfo, StoryRouteInfo, StoryRouteSummary,
StoryEffortInfo, StoryTodayInfo,
StoryHistoryDay, StoryRecoveryInfo,
StoryNegotiationInfo, StoryManualOverride, StorySwapDetails,
StoryTimelineEvent, StoryGlobalMetrics, StoryAllocationRun,
)
# Get assignment for driver on date
assignment_result = await db.execute(
select(Assignment, Driver, Route)
.join(Driver, Assignment.driver_id == Driver.id)
.join(Route, Assignment.route_id == Route.id)
.where(
and_(
Assignment.driver_id == driver_id,
Assignment.date == target_date,
)
)
)
row = assignment_result.first()
if not row:
return None
assignment, driver, route = row
# Get allocation run
run_result = await db.execute(
select(AllocationRun).where(AllocationRun.id == assignment.allocation_run_id)
)
allocation_run = run_result.scalar_one_or_none()
if not allocation_run:
return None
# Compute avg effort and rank
all_assignments_result = await db.execute(
select(Assignment.driver_id, Assignment.workload_score)
.where(Assignment.allocation_run_id == assignment.allocation_run_id)
)
all_efforts = list(all_assignments_result.all())
efforts_by_driver = {str(did): score for did, score in all_efforts}
sorted_efforts = sorted(efforts_by_driver.items(), key=lambda x: x[1], reverse=True)
rank = 1
for idx, (did, _) in enumerate(sorted_efforts):
if did == str(driver_id):
rank = idx + 1
break
num_drivers = len(all_efforts)
avg_effort = sum(e for _, e in all_efforts) / max(num_drivers, 1)
percent_vs_avg = ((assignment.workload_score - avg_effort) / max(avg_effort, 1)) * 100
# Build today info
today_info = StoryTodayInfo(
assignment_id=assignment.id,
route=StoryRouteInfo(
id=route.id,
summary=StoryRouteSummary(
num_packages=route.num_packages,
total_weight_kg=route.total_weight_kg,
num_stops=route.num_stops,
route_difficulty_score=route.route_difficulty_score,
estimated_time_minutes=route.estimated_time_minutes,
),
),
effort=StoryEffortInfo(
value=assignment.workload_score,
rank=rank,
num_drivers=num_drivers,
percent_vs_avg=round(percent_vs_avg, 1),
),
fairness_score=assignment.fairness_score,
driver_explanation=getattr(assignment, 'driver_explanation', None) or assignment.explanation,
admin_explanation=getattr(assignment, 'admin_explanation', None),
explainability_category=None, # Would need to extract from DecisionLog
)
# Get 7-day history
history_start = target_date - timedelta(days=7)
history_result = await db.execute(
select(Assignment, DriverFeedback)
.outerjoin(
DriverFeedback,
and_(
DriverFeedback.assignment_id == Assignment.id,
DriverFeedback.driver_id == Assignment.driver_id,
)
)
.where(
and_(
Assignment.driver_id == driver_id,
Assignment.date >= history_start,
Assignment.date < target_date,
)
)
.order_by(Assignment.date)
)
history_days = []
hard_day_count = 0
for hist_assignment, feedback in history_result.all():
# Determine tag based on effort relative to avg
if hist_assignment.workload_score > avg_effort * 1.15:
tag = "HARD"
hard_day_count += 1
elif hist_assignment.workload_score < avg_effort * 0.85:
tag = "LIGHT"
else:
tag = "NORMAL"
history_days.append(StoryHistoryDay(
date=hist_assignment.date,
effort=hist_assignment.workload_score,
fairness_score=hist_assignment.fairness_score,
stress_level=feedback.stress_level if feedback else None,
fairness_rating=feedback.fairness_rating if feedback else None,
tag=tag,
))
# Determine if recovery day
is_recovery = (
hard_day_count >= 2 and
assignment.workload_score < avg_effort * 0.9
)
recovery_info = StoryRecoveryInfo(
is_recovery_day=is_recovery,
recent_hard_days=hard_day_count,
)
# Get negotiation info from DecisionLog
liaison_decision = None
swap_applied = False
liaison_log_result = await db.execute(
select(DecisionLog)
.where(
and_(
DecisionLog.allocation_run_id == assignment.allocation_run_id,
DecisionLog.agent_name == "DRIVER_LIAISON",
)
)
.limit(1)
)
liaison_log = liaison_log_result.scalar_one_or_none()
# Check for swaps
resolution_log_result = await db.execute(
select(DecisionLog)
.where(
and_(
DecisionLog.allocation_run_id == assignment.allocation_run_id,
DecisionLog.agent_name == "ROUTE_PLANNER",
DecisionLog.step_type == "FINAL_RESOLUTION",
)
)
.limit(1)
)
resolution_log = resolution_log_result.scalar_one_or_none()
if resolution_log and resolution_log.output_snapshot:
swaps = resolution_log.output_snapshot.get("swaps_applied", [])
if isinstance(swaps, list):
for swap in swaps:
if str(driver_id) in [swap.get("driver_a"), swap.get("driver_b")]:
swap_applied = True
break
elif isinstance(swaps, int) and swaps > 0:
# Can't determine per-driver, assume no swap for this driver
pass
# Check for manual override
override_result = await db.execute(
select(ManualOverride)
.where(
and_(
ManualOverride.allocation_run_id == assignment.allocation_run_id,
(ManualOverride.old_driver_id == driver_id) |
(ManualOverride.new_driver_id == driver_id),
)
)
.limit(1)
)
override = override_result.scalar_one_or_none()
negotiation_info = StoryNegotiationInfo(
liaison_decision=liaison_decision,
liaison_reason=None,
swap_applied=swap_applied,
swap_details=None,
manual_override=StoryManualOverride(
affected=override is not None,
details=override.reason if override else None,
),
)
# Build agent timeline slice
timeline_slice = []
all_logs_result = await db.execute(
select(DecisionLog)
.where(DecisionLog.allocation_run_id == assignment.allocation_run_id)
.order_by(DecisionLog.created_at)
)
all_logs = all_logs_result.scalars().all()
for log in all_logs:
description = _generate_driver_specific_description(log, driver_id, driver.name)
timeline_slice.append(StoryTimelineEvent(
timestamp=log.created_at,
agent_name=log.agent_name,
step_type=log.step_type,
description=description,
))
return DriverAllocationStoryResponse(
driver=StoryDriverInfo(id=driver.id, name=driver.name),
date=target_date,
allocation_run=StoryAllocationRun(
id=allocation_run.id,
global_metrics=StoryGlobalMetrics(
gini_index=allocation_run.global_gini_index,
std_dev=allocation_run.global_std_dev,
max_gap=allocation_run.global_max_gap,
avg_effort=round(avg_effort, 1),
),
),
today=today_info,
history_last_7_days=history_days,
recovery=recovery_info,
negotiation=negotiation_info,
agent_timeline_slice=timeline_slice,
)
def _generate_driver_specific_description(log: DecisionLog, driver_id: UUID, driver_name: str) -> str:
"""Generate driver-specific description for timeline event."""
agent = log.agent_name
step = log.step_type
out = log.output_snapshot or {}
if agent == "ML_EFFORT":
return "Effort matrix computed including this driver"
if agent == "ROUTE_PLANNER":
if step == "PROPOSAL_1":
return "Driver included in initial route proposal"
if step == "PROPOSAL_2":
return "Driver's assignment adjusted in re-optimized proposal"
if step == "FINAL_RESOLUTION":
return "Final route assignments determined after negotiation"
if agent == "FAIRNESS_MANAGER":
status = out.get("status", "UNKNOWN")
if status == "REOPTIMIZE":
return "Fairness check triggered re-optimization affecting all drivers"
return "Fairness check passed for current assignments"
if agent == "DRIVER_LIAISON":
return "Negotiation decisions processed for all drivers"
if agent == "EXPLAINABILITY":
return "Explanation generated for this driver's assignment"
return f"{agent}: {step}"