import json import math import os import ast import re from dataclasses import dataclass from functools import lru_cache from html import escape from itertools import permutations from pathlib import Path from typing import Dict, Iterable, List, Optional, Tuple import gradio as gr import pandas as pd DEPOT = { "customer": "Depot", "lat": 40.7280, "lng": -73.9980, } SAMPLE_PATH = Path(__file__).with_name("sample_orders.csv") AVG_SPEED_KMPH = 22.0 CAPACITY = 18 START_MINUTE = 8 * 60 MINICPM_REPO = "openbmb/MiniCPM5-1B-GGUF" MINICPM_FILE = "MiniCPM5-1B-Q4_K_M.gguf" MINICPM_PARAMS = "1.08B" @dataclass(frozen=True) class Stop: order_id: str customer: str lat: float lng: float demand: int service_min: int ready_time: int due_time: int priority: str notes: str manual_sequence: int @dataclass(frozen=True) class PlanStop: stop: Stop route_id: int arrival: int start: int depart: int distance_km: float late_min: int wait_min: int def time_to_min(value: str) -> int: value = str(value or "").strip() if not value: return 17 * 60 match = re.match(r"^(\d{1,2}):(\d{2})$", value) if not match: return 17 * 60 hour, minute = int(match.group(1)), int(match.group(2)) return max(0, min(23 * 60 + 59, hour * 60 + minute)) def min_to_time(value: int) -> str: value = max(0, int(round(value))) return f"{value // 60:02d}:{value % 60:02d}" def haversine_km(a_lat: float, a_lng: float, b_lat: float, b_lng: float) -> float: radius = 6371.0 lat1, lat2 = math.radians(a_lat), math.radians(b_lat) d_lat = math.radians(b_lat - a_lat) d_lng = math.radians(b_lng - a_lng) h = ( math.sin(d_lat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(d_lng / 2) ** 2 ) return 2 * radius * math.asin(math.sqrt(h)) def travel_minutes(distance_km: float) -> int: return int(math.ceil((distance_km / AVG_SPEED_KMPH) * 60)) def parse_orders(file_obj) -> List[Stop]: if file_obj is None: df = pd.read_csv(SAMPLE_PATH) else: file_path = file_obj if isinstance(file_obj, str) else file_obj.name df = pd.read_csv(file_path) required = { "order_id", "customer", "lat", "lng", "demand", "service_min", "ready_time", "due_time", "priority", "notes", } missing = sorted(required - set(df.columns)) if missing: raise gr.Error(f"CSV is missing required columns: {', '.join(missing)}") if "manual_sequence" not in df.columns: df["manual_sequence"] = range(1, len(df) + 1) stops: List[Stop] = [] for row in df.to_dict("records"): stops.append( Stop( order_id=str(row["order_id"]), customer=str(row["customer"]), lat=float(row["lat"]), lng=float(row["lng"]), demand=int(row["demand"]), service_min=int(row["service_min"]), ready_time=time_to_min(str(row["ready_time"])), due_time=time_to_min(str(row["due_time"])), priority=str(row["priority"]).lower(), notes=str(row.get("notes", "")), manual_sequence=int(row.get("manual_sequence", len(stops) + 1)), ) ) return stops def parse_dispatch_notes(notes: str) -> Dict[str, object]: text = (notes or "").lower() constraints: Dict[str, object] = { "prefer_early_priority": True, "avoid_late_penalty": 2.0, "max_route_load": CAPACITY, "depot_start": START_MINUTE, "boost_terms": [], } if "cold" in text or "fresh" in text or "produce" in text: constraints["boost_terms"].append("fresh") if "medical" in text or "clinic" in text or "medicine" in text: constraints["boost_terms"].append("medical") if "school" in text: constraints["boost_terms"].append("school") if "lunch" in text or "noon" in text: constraints["soft_due_before"] = 12 * 60 hour_match = re.search(r"(?:start|leave|depart)\D{0,12}(\d{1,2})(?::(\d{2}))?", text) if hour_match: hour = int(hour_match.group(1)) minute = int(hour_match.group(2) or 0) if 1 <= hour <= 23: constraints["depot_start"] = hour * 60 + minute capacity_match = re.search(r"(?:capacity|load|max load|van)\D{0,12}(\d{1,3})", text) if capacity_match: constraints["max_route_load"] = max(1, int(capacity_match.group(1))) return constraints def normalize_constraints(raw: Dict[str, object]) -> Dict[str, object]: constraints = { "prefer_early_priority": bool(raw.get("prefer_early_priority", True)), "avoid_late_penalty": float(raw.get("avoid_late_penalty", 2.0) or 2.0), "max_route_load": int(raw.get("max_route_load", CAPACITY) or CAPACITY), "depot_start": int(raw.get("depot_start", START_MINUTE) or START_MINUTE), "boost_terms": list(raw.get("boost_terms", []) or []), "source": raw.get("source", "rule-fallback"), } if raw.get("soft_due_before") is not None: constraints["soft_due_before"] = int(raw["soft_due_before"]) constraints["max_route_load"] = max(1, min(200, constraints["max_route_load"])) constraints["avoid_late_penalty"] = max(0.5, min(10.0, constraints["avoid_late_penalty"])) return constraints def extract_json_object(text: str) -> Optional[Dict[str, object]]: cleaned = (text or "").strip() fenced = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", cleaned, re.DOTALL | re.IGNORECASE) if fenced: cleaned = fenced.group(1) match = re.search(r"\{.*\}", cleaned, re.DOTALL) if not match: return None raw = match.group(0) try: parsed = json.loads(raw) except json.JSONDecodeError: try: parsed = ast.literal_eval(raw) except (SyntaxError, ValueError): normalized = ( raw.replace("True", "true") .replace("False", "false") .replace("None", "null") ) try: parsed = json.loads(normalized) except json.JSONDecodeError: return None return parsed if isinstance(parsed, dict) else None @lru_cache(maxsize=1) def get_minicpm_llm(): if os.environ.get("DISABLE_MINICPM", "").lower() in {"1", "true", "yes"}: return None try: from huggingface_hub import hf_hub_download from llama_cpp import Llama except Exception: return None try: model_path = hf_hub_download(repo_id=MINICPM_REPO, filename=MINICPM_FILE) return Llama( model_path=model_path, n_ctx=768, n_threads=max(1, min(4, os.cpu_count() or 2)), n_batch=32, n_gpu_layers=0, verbose=False, ) except Exception: return None def minicpm_parse_dispatch_notes(notes: str, use_minicpm: bool = False) -> Tuple[Dict[str, object], str]: fallback = normalize_constraints(parse_dispatch_notes(notes)) if not use_minicpm: fallback["source"] = "rule-fallback" return ( fallback, "Fast CPU Basic mode used the deterministic parser. Enable MiniCPM5 parser to run the local GGUF model path.", ) llm = get_minicpm_llm() if llm is None: fallback["source"] = "rule-fallback" return fallback, "MiniCPM5 local runtime is unavailable, so the deterministic parser handled the notes." prompt = f"""<|im_start|>system You convert dispatcher notes into one compact JSON object. Return JSON only. No markdown. No explanation. Schema: {{ "prefer_early_priority": true, "avoid_late_penalty": 2.0, "max_route_load": 18, "depot_start": 480, "soft_due_before": 720, "boost_terms": ["school", "medical", "fresh"] }} Use minutes after midnight for times. Omit soft_due_before if no lunch/noon constraint is present. <|im_end|> <|im_start|>user Dispatcher notes: {notes} <|im_end|> <|im_start|>assistant """ try: result = llm( prompt, max_tokens=96, temperature=0.0, top_p=1.0, stop=["<|im_end|>", "\n\n\n"], ) text = result["choices"][0]["text"] parsed = extract_json_object(text) if parsed: parsed["source"] = f"{MINICPM_REPO}/{MINICPM_FILE}" return normalize_constraints(parsed), text.strip() except Exception as exc: fallback["source"] = "rule-fallback" return fallback, f"MiniCPM5 parsing failed and fallback parser was used: {exc}" fallback["source"] = "rule-fallback" return fallback, "MiniCPM5 returned no valid JSON, so the deterministic parser handled the notes." def priority_weight(stop: Stop, constraints: Dict[str, object]) -> float: score = 0.0 if stop.priority == "high": score -= 1.4 if constraints.get("soft_due_before") and stop.due_time <= int(constraints["soft_due_before"]): score -= 0.8 searchable = f"{stop.customer} {stop.notes}".lower() for term in constraints.get("boost_terms", []): if term in searchable: score -= 1.0 return score def score_route(route: List[Stop], start_minute: int) -> Tuple[int, float, int]: plan, metrics = simulate_single_route(route, start_minute, route_id=1) late_stops = sum(1 for item in plan if item.late_min > 0) return int(metrics["late_min"]), float(metrics["distance_km"]), late_stops def best_order_for_group(stops: List[Stop], start_minute: int) -> List[Stop]: if len(stops) <= 1: return stops[:] if len(stops) <= 7: candidates = permutations(stops) else: ordered = sorted(stops, key=lambda s: (s.due_time, s.ready_time, -priority_weight(s, {}))) candidates = [ordered] best_route: Optional[List[Stop]] = None best_score: Optional[Tuple[int, float, int]] = None for candidate in candidates: route = list(candidate) score = score_route(route, start_minute) if best_score is None or score < best_score: best_score = score best_route = route return best_route or stops[:] def build_capacity_routes(stops: List[Stop], constraints: Dict[str, object]) -> List[List[Stop]]: capacity = int(constraints["max_route_load"]) ordered = sorted( stops, key=lambda stop: ( stop.due_time, stop.ready_time, 0 if stop.priority == "high" else 1, priority_weight(stop, constraints), ), ) routes: List[List[Stop]] = [] loads: List[int] = [] for stop in ordered: best_idx = None best_score = None for idx, route in enumerate(routes): if loads[idx] + stop.demand > capacity: continue trial = best_order_for_group(route + [stop], int(constraints["depot_start"])) late, dist, late_stops = score_route(trial, int(constraints["depot_start"])) score = (late, late_stops, dist) if best_score is None or score < best_score: best_score = score best_idx = idx if best_idx is None: routes.append([stop]) loads.append(stop.demand) else: routes[best_idx] = best_order_for_group(routes[best_idx] + [stop], int(constraints["depot_start"])) loads[best_idx] += stop.demand return [best_order_for_group(route, int(constraints["depot_start"])) for route in routes] def route_distance(route: Iterable[Stop]) -> float: cur_lat, cur_lng = DEPOT["lat"], DEPOT["lng"] total = 0.0 last_lat, last_lng = cur_lat, cur_lng for stop in route: total += haversine_km(last_lat, last_lng, stop.lat, stop.lng) last_lat, last_lng = stop.lat, stop.lng total += haversine_km(last_lat, last_lng, cur_lat, cur_lng) return total def simulate_single_route(route: List[Stop], start_minute: int, route_id: int) -> Tuple[List[PlanStop], Dict[str, float]]: cur_lat, cur_lng = DEPOT["lat"], DEPOT["lng"] current = start_minute plan: List[PlanStop] = [] load = 0 total_distance = 0.0 total_late = 0 total_wait = 0 for stop in route: distance = haversine_km(cur_lat, cur_lng, stop.lat, stop.lng) arrival = current + travel_minutes(distance) start = max(arrival, stop.ready_time) wait = max(0, start - arrival) late = max(0, start - stop.due_time) depart = start + stop.service_min plan.append( PlanStop( stop=stop, route_id=route_id, arrival=arrival, start=start, depart=depart, distance_km=distance, late_min=late, wait_min=wait, ) ) total_distance += distance total_late += late total_wait += wait load += stop.demand current = depart cur_lat, cur_lng = stop.lat, stop.lng back = haversine_km(cur_lat, cur_lng, DEPOT["lat"], DEPOT["lng"]) total_distance += back finish = current + travel_minutes(back) metrics = { "distance_km": total_distance, "late_min": total_late, "wait_min": total_wait, "finish_min": finish, "load": load, "on_time_rate": 100.0 * (1 - sum(1 for p in plan if p.late_min > 0) / max(1, len(plan))), } return plan, metrics def simulate_routes(routes: List[List[Stop]], start_minute: int) -> Tuple[List[PlanStop], Dict[str, float]]: all_plan: List[PlanStop] = [] total_distance = 0.0 total_late = 0 total_wait = 0 total_load = 0 finish_min = start_minute for route_id, route in enumerate(routes, start=1): plan, metrics = simulate_single_route(route, start_minute, route_id) all_plan.extend(plan) total_distance += metrics["distance_km"] total_late += metrics["late_min"] total_wait += metrics["wait_min"] total_load += metrics["load"] finish_min = max(finish_min, metrics["finish_min"]) metrics = { "distance_km": total_distance, "late_min": total_late, "wait_min": total_wait, "finish_min": finish_min, "load": total_load, "routes": len(routes), "on_time_rate": 100.0 * (1 - sum(1 for p in all_plan if p.late_min > 0) / max(1, len(all_plan))), } return all_plan, metrics def manual_route(stops: List[Stop]) -> List[Stop]: return sorted(stops, key=lambda stop: stop.manual_sequence) def route_table(plan: List[PlanStop]) -> pd.DataFrame: return pd.DataFrame( [ { "Route": item.route_id, "Stop #": sum(1 for prev in plan[:idx] if prev.route_id == item.route_id) + 1, "Order": item.stop.order_id, "Customer": item.stop.customer, "Arrive": min_to_time(item.arrival), "Start": min_to_time(item.start), "Depart": min_to_time(item.depart), "Window": f"{min_to_time(item.stop.ready_time)}-{min_to_time(item.stop.due_time)}", "Demand": item.stop.demand, "Late min": item.late_min, "Notes": item.stop.notes, } for idx, item in enumerate(plan) ] ) def metrics_markdown(auto_metrics: Dict[str, float], manual_metrics: Dict[str, float]) -> str: distance_delta = manual_metrics["distance_km"] - auto_metrics["distance_km"] late_delta = manual_metrics["late_min"] - auto_metrics["late_min"] return f""" ### Dispatch Score | Metric | Manual baseline | Tiny Dispatch Coach | Change | |---|---:|---:|---:| | Routes / trips | {manual_metrics.get('routes', 1):.0f} | {auto_metrics.get('routes', 1):.0f} | | | Distance | {manual_metrics['distance_km']:.1f} km | {auto_metrics['distance_km']:.1f} km | {distance_delta:+.1f} km | | Late minutes | {manual_metrics['late_min']:.0f} | {auto_metrics['late_min']:.0f} | {late_delta:+.0f} | | Waiting minutes | {manual_metrics['wait_min']:.0f} | {auto_metrics['wait_min']:.0f} | {manual_metrics['wait_min'] - auto_metrics['wait_min']:+.0f} | | Finish time | {min_to_time(manual_metrics['finish_min'])} | {min_to_time(auto_metrics['finish_min'])} | | | On-time rate | {manual_metrics['on_time_rate']:.0f}% | {auto_metrics['on_time_rate']:.0f}% | {auto_metrics['on_time_rate'] - manual_metrics['on_time_rate']:+.0f} pts | **Coach note:** The planner treats time-window risk as the first objective, then uses distance as a tie-breaker. It may split the day into multiple feasible trips when the notes imply a small van capacity. """ def constraints_markdown(constraints: Dict[str, object], model_trace: str) -> str: rows = "\n".join(f"- **{key}**: `{value}`" for key, value in constraints.items()) trace = escape(model_trace or "") return f"""### OpenBMB MiniCPM5 Constraint Parse **Model path:** `{MINICPM_REPO}` / `{MINICPM_FILE}` **Parameter count:** `{MINICPM_PARAMS}` **Runtime target:** local GGUF via llama.cpp; deterministic parser fallback if the runtime is unavailable. {rows}
Parser trace ```text {trace} ```
""" def route_cards(plan: List[PlanStop]) -> str: cards = [] for idx, item in enumerate(plan, start=1): status = "late" if item.late_min else "on time" cards.append( f"""
{item.route_id}.{sum(1 for prev in plan[:idx - 1] if prev.route_id == item.route_id) + 1} {escape(item.stop.customer)} {status}
{escape(item.stop.order_id)} · route {item.route_id} · arrive {min_to_time(item.arrival)} · depart {min_to_time(item.depart)} · load {item.stop.demand}
{escape(item.stop.notes)}
""" ) return "
" + "\n".join(cards) + "
" def route_map(plan: List[PlanStop]) -> str: points = [(DEPOT["lat"], DEPOT["lng"], "Depot")] points.extend((item.stop.lat, item.stop.lng, item.stop.customer) for item in plan) lat_values = [p[0] for p in points] lng_values = [p[1] for p in points] min_lat, max_lat = min(lat_values), max(lat_values) min_lng, max_lng = min(lng_values), max(lng_values) pad_lat = max(0.002, (max_lat - min_lat) * 0.12) pad_lng = max(0.002, (max_lng - min_lng) * 0.12) min_lat -= pad_lat max_lat += pad_lat min_lng -= pad_lng max_lng += pad_lng def xy(lat: float, lng: float) -> Tuple[float, float]: x = 40 + (lng - min_lng) / (max_lng - min_lng) * 820 y = 520 - (lat - min_lat) / (max_lat - min_lat) * 460 return x, y coords = [xy(lat, lng) for lat, lng, _ in points] route_paths = [] for route_id in sorted({item.route_id for item in plan}): route_items = [item for item in plan if item.route_id == route_id] route_points = [coords[0]] for item in route_items: idx = plan.index(item) + 1 route_points.append(coords[idx]) route_points.append(coords[0]) route_paths.append(" ".join(f"{x:.1f},{y:.1f}" for x, y in route_points)) marker_html = [] for idx, ((lat, lng, label), (x, y)) in enumerate(zip(points, coords)): is_depot = idx == 0 fill = "#0f766e" if is_depot else "#f59e0b" text = "D" if is_depot else str(idx) marker_html.append( f""" {text} {label} """ ) return f"""
{''.join(f'' for path in route_paths)} {''.join(marker_html)}
""" def analyze(file_obj, notes: str, use_minicpm: bool): stops = parse_orders(file_obj) constraints, model_trace = minicpm_parse_dispatch_notes(notes, use_minicpm) auto_routes = build_capacity_routes(stops, constraints) manual = manual_route(stops) auto_plan, auto_metrics = simulate_routes(auto_routes, int(constraints["depot_start"])) manual_plan, manual_metrics = simulate_routes([manual], int(constraints["depot_start"])) return ( metrics_markdown(auto_metrics, manual_metrics), constraints_markdown(constraints, model_trace), route_table(auto_plan), route_cards(auto_plan), route_map(auto_plan), ) CUSTOM_CSS = """ .gradio-container { --radius-lg: 8px; } .hero { min-height: 260px; border-radius: 8px; padding: 36px; background: linear-gradient(rgba(9, 47, 44, .72), rgba(9, 47, 44, .62)), url('https://images.unsplash.com/photo-1601584115197-04ecc0da31d7?auto=format&fit=crop&w=1600&q=80'); background-size: cover; background-position: center; color: white; display: flex; flex-direction: column; justify-content: end; } .hero h1 { font-size: 42px; line-height: 1.05; margin: 0 0 10px 0; letter-spacing: 0; } .hero p { max-width: 760px; font-size: 16px; margin: 0; } .badges { display: flex; flex-wrap: wrap; gap: 8px; margin-top: 18px; } .badge { border: 1px solid rgba(255, 255, 255, .42); border-radius: 999px; padding: 5px 10px; color: #f8fafc; background: rgba(15, 118, 110, .58); font-size: 13px; font-weight: 700; } .model-note { border: 1px solid #d6d3d1; border-radius: 8px; padding: 12px; background: #f8fafc; color: #292524; font-size: 14px; } .route-cards { display: grid; grid-template-columns: repeat(auto-fit, minmax(260px, 1fr)); gap: 10px; } .route-card { border: 1px solid #d6d3d1; border-radius: 8px; padding: 12px; background: #fff; } .route-card-top { display: flex; align-items: center; gap: 8px; } .route-index { display: inline-grid; place-items: center; width: 26px; height: 26px; border-radius: 50%; background: #0f766e; color: white; font-weight: 700; } .route-title { font-weight: 700; flex: 1; } .route-status { border-radius: 999px; padding: 3px 8px; font-size: 12px; background: #dcfce7; color: #166534; } .route-status.late { background: #fee2e2; color: #991b1b; } .route-meta { color: #57534e; font-size: 13px; margin-top: 8px; } .route-note { color: #292524; font-size: 14px; margin-top: 6px; } .map-wrap { border: 1px solid #d6d3d1; border-radius: 8px; overflow: hidden; background: white; } """ DEFAULT_NOTES = ( "Start at 8:00. School and clinic stops are urgent. Fresh produce should be " "delivered before lunch. Van capacity 18." ) with gr.Blocks( title="Tiny Dispatch Coach", css=CUSTOM_CSS, theme=gr.themes.Soft(primary_hue="emerald", secondary_hue="amber", neutral_hue="stone"), ) as demo: gr.HTML( """

Tiny Dispatch Coach

Turn a small delivery sheet and messy dispatcher notes into route plans, tradeoff explanations, and driver-ready cards. Built around OpenBMB MiniCPM5-1B-GGUF plus a deterministic planner.

OpenBMB MiniCPM5 1.08B params GGUF / llama.cpp No cloud LLM API Synthetic demo data
""" ) with gr.Row(): with gr.Column(scale=2): order_file = gr.File( label="Orders CSV", file_types=[".csv"], type="filepath", ) notes = gr.Textbox( label="Dispatcher notes", value=DEFAULT_NOTES, lines=5, ) use_minicpm = gr.Checkbox( label="Use MiniCPM5 parser", value=False, info="Optional on CPU Basic. Default fast mode keeps the demo responsive.", ) run = gr.Button("Plan route", variant="primary") with gr.Column(scale=1): gr.HTML( f"""
Small-model core:
{MINICPM_REPO}
{MINICPM_FILE}
MiniCPM5 parses human dispatch notes when llama.cpp is available. CPU Basic falls back to the same auditable constraint schema.
""" ) gr.Markdown( """ ### CSV columns `order_id`, `customer`, `lat`, `lng`, `demand`, `service_min`, `ready_time`, `due_time`, `priority`, `notes`, optional `manual_sequence`. Leave the file empty to run the included sample route. """ ) gr.Markdown( """ ### Build Small fit OpenBMB MiniCPM5, 1.08B parameters, local GGUF path, no cloud LLM API, synthetic sample data, explicit parser trace. """ ) metrics = gr.Markdown() constraints = gr.Markdown( "### OpenBMB MiniCPM5 Constraint Parse\nClick **Plan route** to parse notes with MiniCPM5-1B-GGUF when available, or the deterministic fallback on CPU Basic." ) table = gr.Dataframe(label="Optimized route", interactive=False) cards = gr.HTML(label="Driver cards") map_html = gr.HTML(label="Route map") run.click( analyze, inputs=[order_file, notes, use_minicpm], outputs=[metrics, constraints, table, cards, map_html], ) if __name__ == "__main__": demo.launch()