| """ |
| Admin-facing service layer for Phase 3 API endpoints. |
| Provides business logic for admin operations. |
| """ |
|
|
| from datetime import date, datetime, timedelta |
| from typing import Optional, List |
| from uuid import UUID |
|
|
| from sqlalchemy import select, and_, func, update |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy.orm import selectinload |
|
|
| from app.models import ( |
| Driver, Assignment, Route, DriverFeedback, |
| Appeal, AppealStatus, |
| ManualOverride, |
| FairnessConfig, |
| AllocationRun, AllocationRunStatus, |
| DecisionLog, |
| ) |
| from app.schemas.admin import ( |
| HealthResponse, LatestAllocationRunInfo, |
| AllocationRunResponse, AllocationRunsListResponse, |
| AdminAssignmentsListResponse, AdminAssignmentResponse, |
| AdminDriverInfo, AdminRouteInfo, AdminFeedbackInfo, |
| FairnessMetricsResponse, FairnessMetricsPoint, |
| WorkloadHeatmapResponse, HeatmapDriver, HeatmapCell, |
| DriverHistoryResponse, DriverHistoryDay, |
| AppealsListResponse, AppealResponse, AppealDriverInfo, AppealContext, |
| AppealDecisionResponse, |
| ManualOverrideResponse, ManualOverrideInfo, MetricsSnapshot, UpdatedAssignment, |
| FairnessConfigResponse, |
| AgentTimelineResponse, DecisionLogStep, |
| ) |
| from app.services.fairness import calculate_global_fairness |
|
|
|
|
| async def get_system_health(db: AsyncSession) -> HealthResponse: |
| """ |
| Get system health status including latest allocation run. |
| """ |
| |
| try: |
| await db.execute(select(func.now())) |
| db_status = "up" |
| except Exception: |
| db_status = "down" |
| |
| |
| result = await db.execute( |
| select(AllocationRun) |
| .order_by(AllocationRun.started_at.desc()) |
| .limit(1) |
| ) |
| latest_run = result.scalar_one_or_none() |
| |
| latest_run_info = None |
| if latest_run: |
| latest_run_info = LatestAllocationRunInfo( |
| id=latest_run.id, |
| run_date=latest_run.date, |
| status=latest_run.status.value, |
| started_at=latest_run.started_at, |
| finished_at=latest_run.finished_at, |
| ) |
| |
| return HealthResponse( |
| status="ok" if db_status == "up" else "degraded", |
| database=db_status, |
| latest_allocation_run=latest_run_info, |
| ) |
|
|
|
|
| async def get_allocation_runs( |
| db: AsyncSession, |
| target_date: date, |
| ) -> AllocationRunsListResponse: |
| """ |
| Get allocation runs for a specific date. |
| """ |
| result = await db.execute( |
| select(AllocationRun) |
| .where(AllocationRun.date == target_date) |
| .order_by(AllocationRun.started_at.desc()) |
| ) |
| runs = result.scalars().all() |
| |
| return AllocationRunsListResponse( |
| runs=[ |
| AllocationRunResponse( |
| id=run.id, |
| run_date=run.date, |
| num_drivers=run.num_drivers, |
| num_routes=run.num_routes, |
| num_packages=run.num_packages, |
| global_gini_index=run.global_gini_index, |
| global_std_dev=run.global_std_dev, |
| global_max_gap=run.global_max_gap, |
| status=run.status.value, |
| error_message=run.error_message, |
| started_at=run.started_at, |
| finished_at=run.finished_at, |
| ) |
| for run in runs |
| ] |
| ) |
|
|
|
|
| async def get_assignments_paginated( |
| db: AsyncSession, |
| target_date: date, |
| driver_id: Optional[UUID] = None, |
| min_fairness: Optional[float] = None, |
| max_fairness: Optional[float] = None, |
| page: int = 1, |
| page_size: int = 50, |
| ) -> AdminAssignmentsListResponse: |
| """ |
| Get paginated assignments with filters. |
| """ |
| |
| query = select(Assignment, Driver, Route).join( |
| Driver, Assignment.driver_id == Driver.id |
| ).join( |
| Route, Assignment.route_id == Route.id |
| ).where( |
| Assignment.date == target_date |
| ) |
| |
| if driver_id: |
| query = query.where(Assignment.driver_id == driver_id) |
| if min_fairness is not None: |
| query = query.where(Assignment.fairness_score >= min_fairness) |
| if max_fairness is not None: |
| query = query.where(Assignment.fairness_score <= max_fairness) |
| |
| |
| count_query = select(func.count()).select_from(query.subquery()) |
| total_result = await db.execute(count_query) |
| total_items = total_result.scalar() or 0 |
| |
| |
| offset = (page - 1) * page_size |
| query = query.offset(offset).limit(page_size).order_by(Assignment.created_at) |
| |
| result = await db.execute(query) |
| rows = result.all() |
| |
| items = [] |
| for assignment, driver, route in rows: |
| |
| feedback_result = await db.execute( |
| select(DriverFeedback) |
| .where(DriverFeedback.assignment_id == assignment.id) |
| .limit(1) |
| ) |
| feedback = feedback_result.scalar_one_or_none() |
| |
| items.append(AdminAssignmentResponse( |
| assignment_id=assignment.id, |
| allocation_run_id=assignment.allocation_run_id, |
| driver=AdminDriverInfo( |
| id=driver.id, |
| name=driver.name, |
| vehicle_type=driver.vehicle_type.value if driver.vehicle_type else None, |
| ), |
| route=AdminRouteInfo( |
| id=route.id, |
| num_packages=route.num_packages, |
| total_weight_kg=route.total_weight_kg, |
| num_stops=route.num_stops, |
| route_difficulty_score=route.route_difficulty_score, |
| estimated_time_minutes=route.estimated_time_minutes, |
| ), |
| workload_score=assignment.workload_score, |
| fairness_score=assignment.fairness_score, |
| explanation=assignment.explanation, |
| feedback=AdminFeedbackInfo( |
| fairness_rating=feedback.fairness_rating if feedback else None, |
| stress_level=feedback.stress_level if feedback else None, |
| ) if feedback else None, |
| )) |
| |
| return AdminAssignmentsListResponse( |
| items=items, |
| page=page, |
| page_size=page_size, |
| total_items=total_items, |
| ) |
|
|
|
|
| async def get_fairness_metrics_series( |
| db: AsyncSession, |
| start_date: date, |
| end_date: date, |
| ) -> FairnessMetricsResponse: |
| """ |
| Get fairness metrics time series. |
| """ |
| |
| result = await db.execute( |
| select(AllocationRun) |
| .where( |
| and_( |
| AllocationRun.date >= start_date, |
| AllocationRun.date <= end_date, |
| AllocationRun.status == AllocationRunStatus.SUCCESS, |
| ) |
| ) |
| .order_by(AllocationRun.date) |
| ) |
| runs = result.scalars().all() |
| |
| points = [] |
| for run in runs: |
| |
| appeals_result = await db.execute( |
| select(func.count(Appeal.id)) |
| .join(Assignment, Appeal.assignment_id == Assignment.id) |
| .where(Assignment.date == run.date) |
| ) |
| appeals_count = appeals_result.scalar() or 0 |
| |
| |
| |
| outlier_count = 0 |
| if run.global_std_dev > 0: |
| assignments_result = await db.execute( |
| select(Assignment.workload_score) |
| .where(Assignment.allocation_run_id == run.id) |
| ) |
| workloads = [w for (w,) in assignments_result.all()] |
| if workloads: |
| avg = sum(workloads) / len(workloads) |
| threshold = run.global_std_dev * 1.5 |
| outlier_count = sum(1 for w in workloads if abs(w - avg) > threshold) |
| |
| points.append(FairnessMetricsPoint( |
| point_date=run.date, |
| gini_index=run.global_gini_index, |
| std_dev=run.global_std_dev, |
| max_gap=run.global_max_gap, |
| outlier_count=outlier_count, |
| appeals_count=appeals_count, |
| )) |
| |
| return FairnessMetricsResponse(points=points) |
|
|
|
|
| async def get_workload_heatmap( |
| db: AsyncSession, |
| start_date: date, |
| end_date: date, |
| ) -> WorkloadHeatmapResponse: |
| """ |
| Get workload heatmap data. |
| """ |
| |
| result = await db.execute( |
| select(Assignment, Driver) |
| .join(Driver, Assignment.driver_id == Driver.id) |
| .where( |
| and_( |
| Assignment.date >= start_date, |
| Assignment.date <= end_date, |
| ) |
| ) |
| .order_by(Assignment.date, Driver.name) |
| ) |
| rows = result.all() |
| |
| |
| drivers_map = {} |
| dates_set = set() |
| cells = [] |
| |
| for assignment, driver in rows: |
| if driver.id not in drivers_map: |
| drivers_map[driver.id] = HeatmapDriver(id=driver.id, name=driver.name) |
| dates_set.add(assignment.date) |
| cells.append(HeatmapCell( |
| driver_id=driver.id, |
| cell_date=assignment.date, |
| workload_score=assignment.workload_score, |
| fairness_score=assignment.fairness_score, |
| )) |
| |
| return WorkloadHeatmapResponse( |
| drivers=list(drivers_map.values()), |
| dates=sorted(dates_set), |
| cells=cells, |
| ) |
|
|
|
|
| async def get_driver_history( |
| db: AsyncSession, |
| driver_id: UUID, |
| window_days: int = 30, |
| ) -> Optional[DriverHistoryResponse]: |
| """ |
| Get detailed driver history including appeals and overrides. |
| """ |
| |
| driver_result = await db.execute( |
| select(Driver).where(Driver.id == driver_id) |
| ) |
| driver = driver_result.scalar_one_or_none() |
| if not driver: |
| return None |
| |
| end_date = date.today() |
| start_date = end_date - timedelta(days=window_days) |
| |
| |
| assignments_result = await db.execute( |
| select(Assignment, DriverFeedback) |
| .outerjoin( |
| DriverFeedback, |
| and_( |
| DriverFeedback.assignment_id == Assignment.id, |
| DriverFeedback.driver_id == Assignment.driver_id, |
| ) |
| ) |
| .where( |
| and_( |
| Assignment.driver_id == driver_id, |
| Assignment.date >= start_date, |
| Assignment.date <= end_date, |
| ) |
| ) |
| .order_by(Assignment.date) |
| ) |
| |
| days = [] |
| for assignment, feedback in assignments_result.all(): |
| |
| appeals_result = await db.execute( |
| select(func.count(Appeal.id)) |
| .where(Appeal.assignment_id == assignment.id) |
| ) |
| appeals_count = appeals_result.scalar() or 0 |
| |
| |
| overrides_result = await db.execute( |
| select(func.count(ManualOverride.id)) |
| .where( |
| and_( |
| ManualOverride.allocation_run_id == assignment.allocation_run_id, |
| (ManualOverride.old_driver_id == driver_id) | |
| (ManualOverride.new_driver_id == driver_id), |
| ) |
| ) |
| ) |
| overrides_count = overrides_result.scalar() or 0 |
| |
| days.append(DriverHistoryDay( |
| day_date=assignment.date, |
| workload_score=assignment.workload_score, |
| fairness_score=assignment.fairness_score, |
| reported_stress_level=float(feedback.stress_level) if feedback else None, |
| reported_fairness_rating=feedback.fairness_rating if feedback else None, |
| appeals_count=appeals_count, |
| manual_overrides_affecting_driver=overrides_count, |
| )) |
| |
| return DriverHistoryResponse( |
| driver_id=driver_id, |
| window_days=window_days, |
| days=days, |
| ) |
|
|
|
|
| async def list_appeals( |
| db: AsyncSession, |
| status_filter: Optional[str] = None, |
| ) -> AppealsListResponse: |
| """ |
| List appeals with optional status filter. |
| """ |
| query = select(Appeal, Driver, Assignment).join( |
| Driver, Appeal.driver_id == Driver.id |
| ).join( |
| Assignment, Appeal.assignment_id == Assignment.id |
| ) |
| |
| if status_filter: |
| try: |
| status = AppealStatus(status_filter) |
| query = query.where(Appeal.status == status) |
| except ValueError: |
| pass |
| |
| query = query.order_by(Appeal.created_at.desc()) |
| result = await db.execute(query) |
| rows = result.all() |
| |
| items = [] |
| for appeal, driver, assignment in rows: |
| items.append(AppealResponse( |
| id=appeal.id, |
| driver=AppealDriverInfo(id=driver.id, name=driver.name), |
| assignment_id=appeal.assignment_id, |
| appeal_date=assignment.date, |
| reason=appeal.reason, |
| status=appeal.status.value, |
| admin_note=appeal.admin_note, |
| created_at=appeal.created_at, |
| updated_at=appeal.updated_at, |
| context=AppealContext( |
| workload_score=assignment.workload_score, |
| fairness_score=assignment.fairness_score, |
| recent_streak_hard_days=None, |
| ), |
| )) |
| |
| return AppealsListResponse(items=items) |
|
|
|
|
| async def decide_appeal( |
| db: AsyncSession, |
| appeal_id: UUID, |
| new_status: str, |
| admin_note: Optional[str] = None, |
| ) -> Optional[AppealDecisionResponse]: |
| """ |
| Update appeal status. |
| """ |
| |
| result = await db.execute( |
| select(Appeal).where(Appeal.id == appeal_id) |
| ) |
| appeal = result.scalar_one_or_none() |
| if not appeal: |
| return None |
| |
| |
| try: |
| status = AppealStatus(new_status) |
| except ValueError: |
| raise ValueError(f"Invalid status: {new_status}") |
| |
| |
| appeal.status = status |
| appeal.admin_note = admin_note |
| appeal.updated_at = datetime.utcnow() |
| |
| await db.flush() |
| await db.refresh(appeal) |
| |
| return AppealDecisionResponse( |
| id=appeal.id, |
| status=appeal.status.value, |
| admin_note=appeal.admin_note, |
| updated_at=appeal.updated_at, |
| ) |
|
|
|
|
| async def perform_manual_override( |
| db: AsyncSession, |
| allocation_run_id: UUID, |
| old_driver_id: UUID, |
| new_driver_id: UUID, |
| route_id: UUID, |
| reason: Optional[str] = None, |
| ) -> ManualOverrideResponse: |
| """ |
| Perform manual override of route assignment. |
| """ |
| |
| old_assignment_result = await db.execute( |
| select(Assignment).where( |
| and_( |
| Assignment.allocation_run_id == allocation_run_id, |
| Assignment.driver_id == old_driver_id, |
| Assignment.route_id == route_id, |
| ) |
| ) |
| ) |
| old_assignment = old_assignment_result.scalar_one_or_none() |
| if not old_assignment: |
| raise ValueError("Original assignment not found") |
| |
| |
| before_result = await db.execute( |
| select(Assignment.workload_score) |
| .where(Assignment.allocation_run_id == allocation_run_id) |
| ) |
| before_workloads = [w for (w,) in before_result.all()] |
| before_metrics = calculate_global_fairness(before_workloads) |
| before_max_gap = max(before_workloads) - min(before_workloads) if before_workloads else 0 |
| |
| |
| old_assignment.driver_id = new_driver_id |
| await db.flush() |
| |
| |
| after_result = await db.execute( |
| select(Assignment.workload_score) |
| .where(Assignment.allocation_run_id == allocation_run_id) |
| ) |
| after_workloads = [w for (w,) in after_result.all()] |
| after_metrics = calculate_global_fairness(after_workloads) |
| after_max_gap = max(after_workloads) - min(after_workloads) if after_workloads else 0 |
| |
| |
| override = ManualOverride( |
| allocation_run_id=allocation_run_id, |
| old_driver_id=old_driver_id, |
| new_driver_id=new_driver_id, |
| route_id=route_id, |
| reason=reason, |
| before_metrics={ |
| "gini_index": before_metrics.gini_index, |
| "std_dev": before_metrics.std_dev, |
| "max_gap": before_max_gap, |
| }, |
| after_metrics={ |
| "gini_index": after_metrics.gini_index, |
| "std_dev": after_metrics.std_dev, |
| "max_gap": after_max_gap, |
| }, |
| ) |
| db.add(override) |
| await db.flush() |
| await db.refresh(override) |
| |
| return ManualOverrideResponse( |
| manual_override=ManualOverrideInfo( |
| id=override.id, |
| allocation_run_id=override.allocation_run_id, |
| old_driver_id=override.old_driver_id, |
| new_driver_id=override.new_driver_id, |
| route_id=override.route_id, |
| reason=override.reason, |
| before_metrics=MetricsSnapshot( |
| gini_index=before_metrics.gini_index, |
| std_dev=before_metrics.std_dev, |
| max_gap=before_max_gap, |
| ), |
| after_metrics=MetricsSnapshot( |
| gini_index=after_metrics.gini_index, |
| std_dev=after_metrics.std_dev, |
| max_gap=after_max_gap, |
| ), |
| created_at=override.created_at, |
| ), |
| updated_assignments=[ |
| UpdatedAssignment( |
| assignment_id=old_assignment.id, |
| driver_id=new_driver_id, |
| route_id=route_id, |
| ), |
| ], |
| ) |
|
|
|
|
| async def get_active_fairness_config(db: AsyncSession) -> Optional[FairnessConfigResponse]: |
| """ |
| Get the currently active fairness config. |
| """ |
| result = await db.execute( |
| select(FairnessConfig) |
| .where(FairnessConfig.is_active == True) |
| .order_by(FairnessConfig.created_at.desc()) |
| .limit(1) |
| ) |
| config = result.scalar_one_or_none() |
| |
| if not config: |
| return None |
| |
| return FairnessConfigResponse( |
| id=config.id, |
| is_active=config.is_active, |
| workload_weight_packages=config.workload_weight_packages, |
| workload_weight_weight_kg=config.workload_weight_weight_kg, |
| workload_weight_difficulty=config.workload_weight_difficulty, |
| workload_weight_time=config.workload_weight_time, |
| gini_threshold=config.gini_threshold, |
| stddev_threshold=config.stddev_threshold, |
| max_gap_threshold=config.max_gap_threshold, |
| recovery_mode_enabled=config.recovery_mode_enabled, |
| created_at=config.created_at, |
| updated_at=config.updated_at, |
| ) |
|
|
|
|
| async def create_fairness_config( |
| db: AsyncSession, |
| workload_weight_packages: float, |
| workload_weight_weight_kg: float, |
| workload_weight_difficulty: float, |
| workload_weight_time: float, |
| gini_threshold: float, |
| stddev_threshold: float, |
| max_gap_threshold: float, |
| recovery_mode_enabled: bool, |
| ) -> FairnessConfigResponse: |
| """ |
| Create new fairness config and deactivate old ones. |
| """ |
| |
| await db.execute( |
| update(FairnessConfig) |
| .where(FairnessConfig.is_active == True) |
| .values(is_active=False) |
| ) |
| |
| |
| config = FairnessConfig( |
| is_active=True, |
| workload_weight_packages=workload_weight_packages, |
| workload_weight_weight_kg=workload_weight_weight_kg, |
| workload_weight_difficulty=workload_weight_difficulty, |
| workload_weight_time=workload_weight_time, |
| gini_threshold=gini_threshold, |
| stddev_threshold=stddev_threshold, |
| max_gap_threshold=max_gap_threshold, |
| recovery_mode_enabled=recovery_mode_enabled, |
| ) |
| db.add(config) |
| await db.flush() |
| await db.refresh(config) |
| |
| return FairnessConfigResponse( |
| id=config.id, |
| is_active=config.is_active, |
| workload_weight_packages=config.workload_weight_packages, |
| workload_weight_weight_kg=config.workload_weight_weight_kg, |
| workload_weight_difficulty=config.workload_weight_difficulty, |
| workload_weight_time=config.workload_weight_time, |
| gini_threshold=config.gini_threshold, |
| stddev_threshold=config.stddev_threshold, |
| max_gap_threshold=config.max_gap_threshold, |
| recovery_mode_enabled=config.recovery_mode_enabled, |
| created_at=config.created_at, |
| updated_at=config.updated_at, |
| ) |
|
|
|
|
| async def get_agent_timeline( |
| db: AsyncSession, |
| allocation_run_id: UUID, |
| ) -> AgentTimelineResponse: |
| """ |
| Get enhanced agent decision timeline for an allocation run. |
| """ |
| from app.schemas.admin import ( |
| AllocationRunInfo, AgentTimelineEvent, |
| ) |
| |
| |
| run_result = await db.execute( |
| select(AllocationRun).where(AllocationRun.id == allocation_run_id) |
| ) |
| allocation_run = run_result.scalar_one_or_none() |
| |
| if not allocation_run: |
| |
| return AgentTimelineResponse( |
| allocation_run=AllocationRunInfo( |
| id=allocation_run_id, |
| date=date.today(), |
| num_drivers=0, |
| num_routes=0, |
| num_packages=0, |
| global_metrics={}, |
| status="NOT_FOUND", |
| started_at=datetime.utcnow(), |
| ), |
| timeline=[], |
| allocation_run_id=allocation_run_id, |
| steps=[], |
| ) |
| |
| |
| duration = None |
| if allocation_run.finished_at and allocation_run.started_at: |
| duration = (allocation_run.finished_at - allocation_run.started_at).total_seconds() |
| |
| |
| avg_effort = 0.0 |
| assignments_result = await db.execute( |
| select(func.avg(Assignment.workload_score)) |
| .where(Assignment.allocation_run_id == allocation_run_id) |
| ) |
| avg_val = assignments_result.scalar() |
| if avg_val: |
| avg_effort = float(avg_val) |
| |
| run_info = AllocationRunInfo( |
| id=allocation_run.id, |
| date=allocation_run.date, |
| num_drivers=allocation_run.num_drivers, |
| num_routes=allocation_run.num_routes, |
| num_packages=allocation_run.num_packages, |
| global_metrics={ |
| "gini_index": allocation_run.global_gini_index, |
| "std_dev": allocation_run.global_std_dev, |
| "max_gap": allocation_run.global_max_gap, |
| "avg_effort": round(avg_effort, 2), |
| }, |
| status=allocation_run.status.value, |
| started_at=allocation_run.started_at, |
| finished_at=allocation_run.finished_at, |
| duration_seconds=duration, |
| ) |
| |
| |
| result = await db.execute( |
| select(DecisionLog) |
| .where(DecisionLog.allocation_run_id == allocation_run_id) |
| .order_by(DecisionLog.created_at, DecisionLog.id) |
| ) |
| logs = result.scalars().all() |
| |
| |
| timeline = [] |
| for log in logs: |
| short_message = _generate_short_message(log) |
| details = _extract_details(log) |
| |
| timeline.append(AgentTimelineEvent( |
| id=log.id, |
| timestamp=log.created_at, |
| agent_name=log.agent_name, |
| step_type=log.step_type, |
| short_message=short_message, |
| details=details, |
| )) |
| |
| |
| legacy_steps = [ |
| DecisionLogStep( |
| id=log.id, |
| agent_name=log.agent_name, |
| step_type=log.step_type, |
| input_snapshot=log.input_snapshot, |
| output_snapshot=log.output_snapshot, |
| created_at=log.created_at, |
| ) |
| for log in logs |
| ] |
| |
| return AgentTimelineResponse( |
| allocation_run=run_info, |
| timeline=timeline, |
| allocation_run_id=allocation_run_id, |
| steps=legacy_steps, |
| ) |
|
|
|
|
| def _generate_short_message(log: DecisionLog) -> str: |
| """Generate a human-readable short message for a decision log entry.""" |
| agent = log.agent_name |
| step = log.step_type |
| inp = log.input_snapshot or {} |
| out = log.output_snapshot or {} |
| |
| if agent == "ML_EFFORT": |
| num_d = inp.get("num_drivers", out.get("num_drivers", "?")) |
| num_r = inp.get("num_routes", out.get("num_routes", "?")) |
| return f"Computed effort matrix for {num_d} drivers × {num_r} routes" |
| |
| if agent == "ROUTE_PLANNER": |
| if step == "PROPOSAL_1": |
| return "Generated initial route assignment proposal" |
| if step == "PROPOSAL_2": |
| return "Generated re-optimized proposal with fairness penalties" |
| if step == "FINAL_RESOLUTION": |
| swaps = out.get("swaps_applied", out.get("num_swaps", 0)) |
| return f"Applied {swaps} swaps after negotiation" |
| |
| if agent == "FAIRNESS_MANAGER": |
| status = out.get("status", "UNKNOWN") |
| if status == "REOPTIMIZE": |
| return "Fairness check requested re-optimization" |
| return "Fairness check accepted proposal" |
| |
| if agent == "DRIVER_LIAISON": |
| accept = out.get("num_accept", 0) |
| counter = out.get("num_counter", 0) |
| force = out.get("num_force_accept", 0) |
| return f"Drivers: {accept} ACCEPT, {counter} COUNTER, {force} FORCE_ACCEPT" |
| |
| if agent == "EXPLAINABILITY": |
| total = out.get("total_explanations", "?") |
| cats = out.get("category_counts", {}) |
| num_cats = len(cats) |
| return f"Generated {total} explanations in {num_cats} categories" |
| |
| return f"{agent}: {step}" |
|
|
|
|
| def _extract_details(log: DecisionLog) -> dict: |
| """Extract relevant details from log snapshots.""" |
| inp = log.input_snapshot or {} |
| out = log.output_snapshot or {} |
| details = {} |
| |
| |
| relevant_keys = { |
| "num_drivers", "num_routes", "min_effort", "max_effort", "avg_effort", |
| "total_effort", "gini_index", "std_dev", "max_gap", "status", |
| "num_accept", "num_counter", "num_force_accept", |
| "swaps_applied", "unfulfilled_counters", |
| "total_explanations", "category_counts", |
| "final_gini_index", "final_std_dev", "final_max_gap", |
| "matrix_shape", "num_packages", |
| } |
| |
| for key in relevant_keys: |
| if key in out: |
| details[key] = out[key] |
| elif key in inp: |
| details[key] = inp[key] |
| |
| return details |
|
|
|
|
| async def get_driver_allocation_story( |
| db: AsyncSession, |
| driver_id: UUID, |
| target_date: date, |
| ) -> Optional["DriverAllocationStoryResponse"]: |
| """ |
| Get the complete allocation story for a driver on a specific date. |
| """ |
| from app.schemas.admin import ( |
| DriverAllocationStoryResponse, |
| StoryDriverInfo, StoryRouteInfo, StoryRouteSummary, |
| StoryEffortInfo, StoryTodayInfo, |
| StoryHistoryDay, StoryRecoveryInfo, |
| StoryNegotiationInfo, StoryManualOverride, StorySwapDetails, |
| StoryTimelineEvent, StoryGlobalMetrics, StoryAllocationRun, |
| ) |
| |
| |
| assignment_result = await db.execute( |
| select(Assignment, Driver, Route) |
| .join(Driver, Assignment.driver_id == Driver.id) |
| .join(Route, Assignment.route_id == Route.id) |
| .where( |
| and_( |
| Assignment.driver_id == driver_id, |
| Assignment.date == target_date, |
| ) |
| ) |
| ) |
| row = assignment_result.first() |
| |
| if not row: |
| return None |
| |
| assignment, driver, route = row |
| |
| |
| run_result = await db.execute( |
| select(AllocationRun).where(AllocationRun.id == assignment.allocation_run_id) |
| ) |
| allocation_run = run_result.scalar_one_or_none() |
| |
| if not allocation_run: |
| return None |
| |
| |
| all_assignments_result = await db.execute( |
| select(Assignment.driver_id, Assignment.workload_score) |
| .where(Assignment.allocation_run_id == assignment.allocation_run_id) |
| ) |
| all_efforts = list(all_assignments_result.all()) |
| |
| efforts_by_driver = {str(did): score for did, score in all_efforts} |
| sorted_efforts = sorted(efforts_by_driver.items(), key=lambda x: x[1], reverse=True) |
| |
| rank = 1 |
| for idx, (did, _) in enumerate(sorted_efforts): |
| if did == str(driver_id): |
| rank = idx + 1 |
| break |
| |
| num_drivers = len(all_efforts) |
| avg_effort = sum(e for _, e in all_efforts) / max(num_drivers, 1) |
| percent_vs_avg = ((assignment.workload_score - avg_effort) / max(avg_effort, 1)) * 100 |
| |
| |
| today_info = StoryTodayInfo( |
| assignment_id=assignment.id, |
| route=StoryRouteInfo( |
| id=route.id, |
| summary=StoryRouteSummary( |
| num_packages=route.num_packages, |
| total_weight_kg=route.total_weight_kg, |
| num_stops=route.num_stops, |
| route_difficulty_score=route.route_difficulty_score, |
| estimated_time_minutes=route.estimated_time_minutes, |
| ), |
| ), |
| effort=StoryEffortInfo( |
| value=assignment.workload_score, |
| rank=rank, |
| num_drivers=num_drivers, |
| percent_vs_avg=round(percent_vs_avg, 1), |
| ), |
| fairness_score=assignment.fairness_score, |
| driver_explanation=getattr(assignment, 'driver_explanation', None) or assignment.explanation, |
| admin_explanation=getattr(assignment, 'admin_explanation', None), |
| explainability_category=None, |
| ) |
| |
| |
| history_start = target_date - timedelta(days=7) |
| history_result = await db.execute( |
| select(Assignment, DriverFeedback) |
| .outerjoin( |
| DriverFeedback, |
| and_( |
| DriverFeedback.assignment_id == Assignment.id, |
| DriverFeedback.driver_id == Assignment.driver_id, |
| ) |
| ) |
| .where( |
| and_( |
| Assignment.driver_id == driver_id, |
| Assignment.date >= history_start, |
| Assignment.date < target_date, |
| ) |
| ) |
| .order_by(Assignment.date) |
| ) |
| |
| history_days = [] |
| hard_day_count = 0 |
| |
| for hist_assignment, feedback in history_result.all(): |
| |
| if hist_assignment.workload_score > avg_effort * 1.15: |
| tag = "HARD" |
| hard_day_count += 1 |
| elif hist_assignment.workload_score < avg_effort * 0.85: |
| tag = "LIGHT" |
| else: |
| tag = "NORMAL" |
| |
| history_days.append(StoryHistoryDay( |
| date=hist_assignment.date, |
| effort=hist_assignment.workload_score, |
| fairness_score=hist_assignment.fairness_score, |
| stress_level=feedback.stress_level if feedback else None, |
| fairness_rating=feedback.fairness_rating if feedback else None, |
| tag=tag, |
| )) |
| |
| |
| is_recovery = ( |
| hard_day_count >= 2 and |
| assignment.workload_score < avg_effort * 0.9 |
| ) |
| |
| recovery_info = StoryRecoveryInfo( |
| is_recovery_day=is_recovery, |
| recent_hard_days=hard_day_count, |
| ) |
| |
| |
| liaison_decision = None |
| swap_applied = False |
| |
| liaison_log_result = await db.execute( |
| select(DecisionLog) |
| .where( |
| and_( |
| DecisionLog.allocation_run_id == assignment.allocation_run_id, |
| DecisionLog.agent_name == "DRIVER_LIAISON", |
| ) |
| ) |
| .limit(1) |
| ) |
| liaison_log = liaison_log_result.scalar_one_or_none() |
| |
| |
| resolution_log_result = await db.execute( |
| select(DecisionLog) |
| .where( |
| and_( |
| DecisionLog.allocation_run_id == assignment.allocation_run_id, |
| DecisionLog.agent_name == "ROUTE_PLANNER", |
| DecisionLog.step_type == "FINAL_RESOLUTION", |
| ) |
| ) |
| .limit(1) |
| ) |
| resolution_log = resolution_log_result.scalar_one_or_none() |
| |
| if resolution_log and resolution_log.output_snapshot: |
| swaps = resolution_log.output_snapshot.get("swaps_applied", []) |
| if isinstance(swaps, list): |
| for swap in swaps: |
| if str(driver_id) in [swap.get("driver_a"), swap.get("driver_b")]: |
| swap_applied = True |
| break |
| elif isinstance(swaps, int) and swaps > 0: |
| |
| pass |
| |
| |
| override_result = await db.execute( |
| select(ManualOverride) |
| .where( |
| and_( |
| ManualOverride.allocation_run_id == assignment.allocation_run_id, |
| (ManualOverride.old_driver_id == driver_id) | |
| (ManualOverride.new_driver_id == driver_id), |
| ) |
| ) |
| .limit(1) |
| ) |
| override = override_result.scalar_one_or_none() |
| |
| negotiation_info = StoryNegotiationInfo( |
| liaison_decision=liaison_decision, |
| liaison_reason=None, |
| swap_applied=swap_applied, |
| swap_details=None, |
| manual_override=StoryManualOverride( |
| affected=override is not None, |
| details=override.reason if override else None, |
| ), |
| ) |
| |
| |
| timeline_slice = [] |
| |
| all_logs_result = await db.execute( |
| select(DecisionLog) |
| .where(DecisionLog.allocation_run_id == assignment.allocation_run_id) |
| .order_by(DecisionLog.created_at) |
| ) |
| all_logs = all_logs_result.scalars().all() |
| |
| for log in all_logs: |
| description = _generate_driver_specific_description(log, driver_id, driver.name) |
| timeline_slice.append(StoryTimelineEvent( |
| timestamp=log.created_at, |
| agent_name=log.agent_name, |
| step_type=log.step_type, |
| description=description, |
| )) |
| |
| return DriverAllocationStoryResponse( |
| driver=StoryDriverInfo(id=driver.id, name=driver.name), |
| date=target_date, |
| allocation_run=StoryAllocationRun( |
| id=allocation_run.id, |
| global_metrics=StoryGlobalMetrics( |
| gini_index=allocation_run.global_gini_index, |
| std_dev=allocation_run.global_std_dev, |
| max_gap=allocation_run.global_max_gap, |
| avg_effort=round(avg_effort, 1), |
| ), |
| ), |
| today=today_info, |
| history_last_7_days=history_days, |
| recovery=recovery_info, |
| negotiation=negotiation_info, |
| agent_timeline_slice=timeline_slice, |
| ) |
|
|
|
|
| def _generate_driver_specific_description(log: DecisionLog, driver_id: UUID, driver_name: str) -> str: |
| """Generate driver-specific description for timeline event.""" |
| agent = log.agent_name |
| step = log.step_type |
| out = log.output_snapshot or {} |
| |
| if agent == "ML_EFFORT": |
| return "Effort matrix computed including this driver" |
| |
| if agent == "ROUTE_PLANNER": |
| if step == "PROPOSAL_1": |
| return "Driver included in initial route proposal" |
| if step == "PROPOSAL_2": |
| return "Driver's assignment adjusted in re-optimized proposal" |
| if step == "FINAL_RESOLUTION": |
| return "Final route assignments determined after negotiation" |
| |
| if agent == "FAIRNESS_MANAGER": |
| status = out.get("status", "UNKNOWN") |
| if status == "REOPTIMIZE": |
| return "Fairness check triggered re-optimization affecting all drivers" |
| return "Fairness check passed for current assignments" |
| |
| if agent == "DRIVER_LIAISON": |
| return "Negotiation decisions processed for all drivers" |
| |
| if agent == "EXPLAINABILITY": |
| return "Explanation generated for this driver's assignment" |
| |
| return f"{agent}: {step}" |
|
|
|
|