import json
import math
import os
import ast
import re
from dataclasses import dataclass
from functools import lru_cache
from html import escape
from itertools import permutations
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import gradio as gr
import pandas as pd
DEPOT = {
"customer": "Depot",
"lat": 40.7280,
"lng": -73.9980,
}
SAMPLE_PATH = Path(__file__).with_name("sample_orders.csv")
AVG_SPEED_KMPH = 22.0
CAPACITY = 18
START_MINUTE = 8 * 60
MINICPM_REPO = "openbmb/MiniCPM5-1B-GGUF"
MINICPM_FILE = "MiniCPM5-1B-Q4_K_M.gguf"
MINICPM_PARAMS = "1.08B"
@dataclass(frozen=True)
class Stop:
order_id: str
customer: str
lat: float
lng: float
demand: int
service_min: int
ready_time: int
due_time: int
priority: str
notes: str
manual_sequence: int
@dataclass(frozen=True)
class PlanStop:
stop: Stop
route_id: int
arrival: int
start: int
depart: int
distance_km: float
late_min: int
wait_min: int
def time_to_min(value: str) -> int:
value = str(value or "").strip()
if not value:
return 17 * 60
match = re.match(r"^(\d{1,2}):(\d{2})$", value)
if not match:
return 17 * 60
hour, minute = int(match.group(1)), int(match.group(2))
return max(0, min(23 * 60 + 59, hour * 60 + minute))
def min_to_time(value: int) -> str:
value = max(0, int(round(value)))
return f"{value // 60:02d}:{value % 60:02d}"
def haversine_km(a_lat: float, a_lng: float, b_lat: float, b_lng: float) -> float:
radius = 6371.0
lat1, lat2 = math.radians(a_lat), math.radians(b_lat)
d_lat = math.radians(b_lat - a_lat)
d_lng = math.radians(b_lng - a_lng)
h = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat1) * math.cos(lat2) * math.sin(d_lng / 2) ** 2
)
return 2 * radius * math.asin(math.sqrt(h))
def travel_minutes(distance_km: float) -> int:
return int(math.ceil((distance_km / AVG_SPEED_KMPH) * 60))
def parse_orders(file_obj) -> List[Stop]:
if file_obj is None:
df = pd.read_csv(SAMPLE_PATH)
else:
file_path = file_obj if isinstance(file_obj, str) else file_obj.name
df = pd.read_csv(file_path)
required = {
"order_id",
"customer",
"lat",
"lng",
"demand",
"service_min",
"ready_time",
"due_time",
"priority",
"notes",
}
missing = sorted(required - set(df.columns))
if missing:
raise gr.Error(f"CSV is missing required columns: {', '.join(missing)}")
if "manual_sequence" not in df.columns:
df["manual_sequence"] = range(1, len(df) + 1)
stops: List[Stop] = []
for row in df.to_dict("records"):
stops.append(
Stop(
order_id=str(row["order_id"]),
customer=str(row["customer"]),
lat=float(row["lat"]),
lng=float(row["lng"]),
demand=int(row["demand"]),
service_min=int(row["service_min"]),
ready_time=time_to_min(str(row["ready_time"])),
due_time=time_to_min(str(row["due_time"])),
priority=str(row["priority"]).lower(),
notes=str(row.get("notes", "")),
manual_sequence=int(row.get("manual_sequence", len(stops) + 1)),
)
)
return stops
def parse_dispatch_notes(notes: str) -> Dict[str, object]:
text = (notes or "").lower()
constraints: Dict[str, object] = {
"prefer_early_priority": True,
"avoid_late_penalty": 2.0,
"max_route_load": CAPACITY,
"depot_start": START_MINUTE,
"boost_terms": [],
}
if "cold" in text or "fresh" in text or "produce" in text:
constraints["boost_terms"].append("fresh")
if "medical" in text or "clinic" in text or "medicine" in text:
constraints["boost_terms"].append("medical")
if "school" in text:
constraints["boost_terms"].append("school")
if "lunch" in text or "noon" in text:
constraints["soft_due_before"] = 12 * 60
hour_match = re.search(r"(?:start|leave|depart)\D{0,12}(\d{1,2})(?::(\d{2}))?", text)
if hour_match:
hour = int(hour_match.group(1))
minute = int(hour_match.group(2) or 0)
if 1 <= hour <= 23:
constraints["depot_start"] = hour * 60 + minute
capacity_match = re.search(r"(?:capacity|load|max load|van)\D{0,12}(\d{1,3})", text)
if capacity_match:
constraints["max_route_load"] = max(1, int(capacity_match.group(1)))
return constraints
def normalize_constraints(raw: Dict[str, object]) -> Dict[str, object]:
constraints = {
"prefer_early_priority": bool(raw.get("prefer_early_priority", True)),
"avoid_late_penalty": float(raw.get("avoid_late_penalty", 2.0) or 2.0),
"max_route_load": int(raw.get("max_route_load", CAPACITY) or CAPACITY),
"depot_start": int(raw.get("depot_start", START_MINUTE) or START_MINUTE),
"boost_terms": list(raw.get("boost_terms", []) or []),
"source": raw.get("source", "rule-fallback"),
}
if raw.get("soft_due_before") is not None:
constraints["soft_due_before"] = int(raw["soft_due_before"])
constraints["max_route_load"] = max(1, min(200, constraints["max_route_load"]))
constraints["avoid_late_penalty"] = max(0.5, min(10.0, constraints["avoid_late_penalty"]))
return constraints
def extract_json_object(text: str) -> Optional[Dict[str, object]]:
cleaned = (text or "").strip()
fenced = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", cleaned, re.DOTALL | re.IGNORECASE)
if fenced:
cleaned = fenced.group(1)
match = re.search(r"\{.*\}", cleaned, re.DOTALL)
if not match:
return None
raw = match.group(0)
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
try:
parsed = ast.literal_eval(raw)
except (SyntaxError, ValueError):
normalized = (
raw.replace("True", "true")
.replace("False", "false")
.replace("None", "null")
)
try:
parsed = json.loads(normalized)
except json.JSONDecodeError:
return None
return parsed if isinstance(parsed, dict) else None
@lru_cache(maxsize=1)
def get_minicpm_llm():
if os.environ.get("DISABLE_MINICPM", "").lower() in {"1", "true", "yes"}:
return None
try:
from huggingface_hub import hf_hub_download
from llama_cpp import Llama
except Exception:
return None
try:
model_path = hf_hub_download(repo_id=MINICPM_REPO, filename=MINICPM_FILE)
return Llama(
model_path=model_path,
n_ctx=768,
n_threads=max(1, min(4, os.cpu_count() or 2)),
n_batch=32,
n_gpu_layers=0,
verbose=False,
)
except Exception:
return None
def minicpm_parse_dispatch_notes(notes: str, use_minicpm: bool = False) -> Tuple[Dict[str, object], str]:
fallback = normalize_constraints(parse_dispatch_notes(notes))
if not use_minicpm:
fallback["source"] = "rule-fallback"
return (
fallback,
"Fast CPU Basic mode used the deterministic parser. Enable MiniCPM5 parser to run the local GGUF model path.",
)
llm = get_minicpm_llm()
if llm is None:
fallback["source"] = "rule-fallback"
return fallback, "MiniCPM5 local runtime is unavailable, so the deterministic parser handled the notes."
prompt = f"""<|im_start|>system
You convert dispatcher notes into one compact JSON object.
Return JSON only. No markdown. No explanation.
Schema:
{{
"prefer_early_priority": true,
"avoid_late_penalty": 2.0,
"max_route_load": 18,
"depot_start": 480,
"soft_due_before": 720,
"boost_terms": ["school", "medical", "fresh"]
}}
Use minutes after midnight for times. Omit soft_due_before if no lunch/noon constraint is present.
<|im_end|>
<|im_start|>user
Dispatcher notes: {notes}
<|im_end|>
<|im_start|>assistant
"""
try:
result = llm(
prompt,
max_tokens=96,
temperature=0.0,
top_p=1.0,
stop=["<|im_end|>", "\n\n\n"],
)
text = result["choices"][0]["text"]
parsed = extract_json_object(text)
if parsed:
parsed["source"] = f"{MINICPM_REPO}/{MINICPM_FILE}"
return normalize_constraints(parsed), text.strip()
except Exception as exc:
fallback["source"] = "rule-fallback"
return fallback, f"MiniCPM5 parsing failed and fallback parser was used: {exc}"
fallback["source"] = "rule-fallback"
return fallback, "MiniCPM5 returned no valid JSON, so the deterministic parser handled the notes."
def priority_weight(stop: Stop, constraints: Dict[str, object]) -> float:
score = 0.0
if stop.priority == "high":
score -= 1.4
if constraints.get("soft_due_before") and stop.due_time <= int(constraints["soft_due_before"]):
score -= 0.8
searchable = f"{stop.customer} {stop.notes}".lower()
for term in constraints.get("boost_terms", []):
if term in searchable:
score -= 1.0
return score
def score_route(route: List[Stop], start_minute: int) -> Tuple[int, float, int]:
plan, metrics = simulate_single_route(route, start_minute, route_id=1)
late_stops = sum(1 for item in plan if item.late_min > 0)
return int(metrics["late_min"]), float(metrics["distance_km"]), late_stops
def best_order_for_group(stops: List[Stop], start_minute: int) -> List[Stop]:
if len(stops) <= 1:
return stops[:]
if len(stops) <= 7:
candidates = permutations(stops)
else:
ordered = sorted(stops, key=lambda s: (s.due_time, s.ready_time, -priority_weight(s, {})))
candidates = [ordered]
best_route: Optional[List[Stop]] = None
best_score: Optional[Tuple[int, float, int]] = None
for candidate in candidates:
route = list(candidate)
score = score_route(route, start_minute)
if best_score is None or score < best_score:
best_score = score
best_route = route
return best_route or stops[:]
def build_capacity_routes(stops: List[Stop], constraints: Dict[str, object]) -> List[List[Stop]]:
capacity = int(constraints["max_route_load"])
ordered = sorted(
stops,
key=lambda stop: (
stop.due_time,
stop.ready_time,
0 if stop.priority == "high" else 1,
priority_weight(stop, constraints),
),
)
routes: List[List[Stop]] = []
loads: List[int] = []
for stop in ordered:
best_idx = None
best_score = None
for idx, route in enumerate(routes):
if loads[idx] + stop.demand > capacity:
continue
trial = best_order_for_group(route + [stop], int(constraints["depot_start"]))
late, dist, late_stops = score_route(trial, int(constraints["depot_start"]))
score = (late, late_stops, dist)
if best_score is None or score < best_score:
best_score = score
best_idx = idx
if best_idx is None:
routes.append([stop])
loads.append(stop.demand)
else:
routes[best_idx] = best_order_for_group(routes[best_idx] + [stop], int(constraints["depot_start"]))
loads[best_idx] += stop.demand
return [best_order_for_group(route, int(constraints["depot_start"])) for route in routes]
def route_distance(route: Iterable[Stop]) -> float:
cur_lat, cur_lng = DEPOT["lat"], DEPOT["lng"]
total = 0.0
last_lat, last_lng = cur_lat, cur_lng
for stop in route:
total += haversine_km(last_lat, last_lng, stop.lat, stop.lng)
last_lat, last_lng = stop.lat, stop.lng
total += haversine_km(last_lat, last_lng, cur_lat, cur_lng)
return total
def simulate_single_route(route: List[Stop], start_minute: int, route_id: int) -> Tuple[List[PlanStop], Dict[str, float]]:
cur_lat, cur_lng = DEPOT["lat"], DEPOT["lng"]
current = start_minute
plan: List[PlanStop] = []
load = 0
total_distance = 0.0
total_late = 0
total_wait = 0
for stop in route:
distance = haversine_km(cur_lat, cur_lng, stop.lat, stop.lng)
arrival = current + travel_minutes(distance)
start = max(arrival, stop.ready_time)
wait = max(0, start - arrival)
late = max(0, start - stop.due_time)
depart = start + stop.service_min
plan.append(
PlanStop(
stop=stop,
route_id=route_id,
arrival=arrival,
start=start,
depart=depart,
distance_km=distance,
late_min=late,
wait_min=wait,
)
)
total_distance += distance
total_late += late
total_wait += wait
load += stop.demand
current = depart
cur_lat, cur_lng = stop.lat, stop.lng
back = haversine_km(cur_lat, cur_lng, DEPOT["lat"], DEPOT["lng"])
total_distance += back
finish = current + travel_minutes(back)
metrics = {
"distance_km": total_distance,
"late_min": total_late,
"wait_min": total_wait,
"finish_min": finish,
"load": load,
"on_time_rate": 100.0 * (1 - sum(1 for p in plan if p.late_min > 0) / max(1, len(plan))),
}
return plan, metrics
def simulate_routes(routes: List[List[Stop]], start_minute: int) -> Tuple[List[PlanStop], Dict[str, float]]:
all_plan: List[PlanStop] = []
total_distance = 0.0
total_late = 0
total_wait = 0
total_load = 0
finish_min = start_minute
for route_id, route in enumerate(routes, start=1):
plan, metrics = simulate_single_route(route, start_minute, route_id)
all_plan.extend(plan)
total_distance += metrics["distance_km"]
total_late += metrics["late_min"]
total_wait += metrics["wait_min"]
total_load += metrics["load"]
finish_min = max(finish_min, metrics["finish_min"])
metrics = {
"distance_km": total_distance,
"late_min": total_late,
"wait_min": total_wait,
"finish_min": finish_min,
"load": total_load,
"routes": len(routes),
"on_time_rate": 100.0 * (1 - sum(1 for p in all_plan if p.late_min > 0) / max(1, len(all_plan))),
}
return all_plan, metrics
def manual_route(stops: List[Stop]) -> List[Stop]:
return sorted(stops, key=lambda stop: stop.manual_sequence)
def route_table(plan: List[PlanStop]) -> pd.DataFrame:
return pd.DataFrame(
[
{
"Route": item.route_id,
"Stop #": sum(1 for prev in plan[:idx] if prev.route_id == item.route_id) + 1,
"Order": item.stop.order_id,
"Customer": item.stop.customer,
"Arrive": min_to_time(item.arrival),
"Start": min_to_time(item.start),
"Depart": min_to_time(item.depart),
"Window": f"{min_to_time(item.stop.ready_time)}-{min_to_time(item.stop.due_time)}",
"Demand": item.stop.demand,
"Late min": item.late_min,
"Notes": item.stop.notes,
}
for idx, item in enumerate(plan)
]
)
def metrics_markdown(auto_metrics: Dict[str, float], manual_metrics: Dict[str, float]) -> str:
distance_delta = manual_metrics["distance_km"] - auto_metrics["distance_km"]
late_delta = manual_metrics["late_min"] - auto_metrics["late_min"]
return f"""
### Dispatch Score
| Metric | Manual baseline | Tiny Dispatch Coach | Change |
|---|---:|---:|---:|
| Routes / trips | {manual_metrics.get('routes', 1):.0f} | {auto_metrics.get('routes', 1):.0f} | |
| Distance | {manual_metrics['distance_km']:.1f} km | {auto_metrics['distance_km']:.1f} km | {distance_delta:+.1f} km |
| Late minutes | {manual_metrics['late_min']:.0f} | {auto_metrics['late_min']:.0f} | {late_delta:+.0f} |
| Waiting minutes | {manual_metrics['wait_min']:.0f} | {auto_metrics['wait_min']:.0f} | {manual_metrics['wait_min'] - auto_metrics['wait_min']:+.0f} |
| Finish time | {min_to_time(manual_metrics['finish_min'])} | {min_to_time(auto_metrics['finish_min'])} | |
| On-time rate | {manual_metrics['on_time_rate']:.0f}% | {auto_metrics['on_time_rate']:.0f}% | {auto_metrics['on_time_rate'] - manual_metrics['on_time_rate']:+.0f} pts |
**Coach note:** The planner treats time-window risk as the first objective, then uses distance as a tie-breaker. It may split the day into multiple feasible trips when the notes imply a small van capacity.
"""
def constraints_markdown(constraints: Dict[str, object], model_trace: str) -> str:
rows = "\n".join(f"- **{key}**: `{value}`" for key, value in constraints.items())
trace = escape(model_trace or "")
return f"""### OpenBMB MiniCPM5 Constraint Parse
**Model path:** `{MINICPM_REPO}` / `{MINICPM_FILE}`
**Parameter count:** `{MINICPM_PARAMS}`
**Runtime target:** local GGUF via llama.cpp; deterministic parser fallback if the runtime is unavailable.
{rows}
Parser trace
```text
{trace}
```
Turn a small delivery sheet and messy dispatcher notes into route plans, tradeoff explanations, and driver-ready cards. Built around OpenBMB MiniCPM5-1B-GGUF plus a deterministic planner.
{MINICPM_REPO}{MINICPM_FILE}