uzoagro-api / matching_engine.py
Gini-Eddie's picture
Upload matching_engine.py
dbd9716 verified
"""
UzoAgro AI - Spatio-Temporal Logistics Matching Engine
Calculates intelligent driver-to-cargo matches using a 5-dimensional scoring model:
1. Capacity Fit (Binary Mask): Verifies strict tonnage requirements.
2. Temporal Fit: Scores schedule alignment (0-2 days tolerance).
3. Deadhead Score: Proximity for empty pickup transit.
4. Corridor Score (Vector Deviation): Cross-track distance from backhaul trajectory.
5. Cargo Affinity Matrix: Intelligently matches exact crops, broad categories, and safe pivots.
Outputs `data/matches.csv` containing the top matched drivers based on a weighted composite score.
"""
import os
import math
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
# ----------------- Cargo Matrix Definitions -----------------
CROP_CATEGORIES = {
"Grains": ["Rice", "Maize", "Beans", "Millet", "Sorghum"],
"Tubers": ["Yam", "Cassava", "Potatoes"],
"Perishables": ["Tomatoes", "Onions", "Peppers"]
}
def get_category(crop):
"""Return the category name for a given crop."""
for category, crops in CROP_CATEGORIES.items():
if crop in crops:
return category
return "Unknown"
# ----------------- I/O & Parsing -----------------
def load_data(drivers_path="data/drivers.csv", requests_path="data/requests.csv"):
drivers = pd.read_csv(drivers_path)
requests = pd.read_csv(requests_path)
# Parse Dates
if "available_date" in drivers.columns:
drivers["available_date"] = pd.to_datetime(drivers["available_date"], errors="coerce").dt.normalize()
if "requested_date" in requests.columns:
requests["requested_date"] = pd.to_datetime(requests["requested_date"], errors="coerce").dt.normalize()
return drivers, requests
# ----------------- Core Mathematical Operations -----------------
def compute_distance(lat1, lon1, lat2, lon2):
return np.sqrt((lat1 - lat2) ** 2 + (lon1 - lon2) ** 2)
def compute_cross_track_deviation(A_lat, A_lon, B_lat, B_lon, P_lat, P_lon):
AB_lat = B_lat - A_lat
AB_lon = B_lon - A_lon
AP_lat = P_lat - A_lat
AP_lon = P_lon - A_lon
norm_AB = np.sqrt(AB_lat**2 + AB_lon**2)
with np.errstate(divide='ignore', invalid='ignore'):
cross_prod = np.abs(AB_lon * AP_lat - AB_lat * AP_lon)
deviation = np.divide(cross_prod, norm_AB)
deviation = np.where(norm_AB == 0, compute_distance(A_lat, A_lon, P_lat, P_lon), deviation)
return deviation
# ----------------- Feature Engineering -----------------
def extract_temporal_score(driver_dates, request_date):
if pd.isna(request_date):
return np.ones(len(driver_dates))
days_diff = np.abs((driver_dates - request_date).dt.days).to_numpy(dtype=float)
conditions = [days_diff == 0, days_diff == 1, days_diff == 2]
choices = [1.0, 0.8, 0.4]
return np.select(conditions, choices, default=0.0)
def extract_affinity_score(driver_crops_series, request_crop):
"""
Evaluates cargo compatibility.
- Exact crop match = 1.0
- Same category match = 0.8
- Safe pivot (Grains <-> Tubers) = 0.4
- Unsafe pivot (Perishables to Dry) = 0.0
"""
req_cat = get_category(request_crop)
scores = []
for allowed_str in driver_crops_series:
if pd.isna(allowed_str):
scores.append(0.0)
continue
driver_crops = allowed_str.split("|")
# 1. Exact Match
if request_crop in driver_crops:
scores.append(1.0)
continue
# Determine categories the driver handles
driver_categories = {get_category(c) for c in driver_crops}
# 2. Category Match
if req_cat in driver_categories:
scores.append(0.8)
continue
# 3. Safe Pivots (Grains and Tubers are cross-compatible dry/hardy goods)
safe_pivots = [{"Grains", "Tubers"}]
is_pivot = False
for pair in safe_pivots:
if req_cat in pair and any(dc in pair for dc in driver_categories):
scores.append(0.4)
is_pivot = True
break
if is_pivot:
continue
# 4. Unsafe Pivot (e.g., Perishables to Grains)
scores.append(0.0)
return np.array(scores, dtype=float)
def compute_scores(drivers_df, request_row):
p_lat, p_lon = float(request_row.get("pickup_lat")), float(request_row.get("pickup_lon"))
d_lat, d_lon = float(request_row.get("dropoff_lat")), float(request_row.get("dropoff_lon"))
req_cap = float(request_row.get("required_capacity", 0))
req_date = request_row.get("requested_date")
req_crop = request_row.get("crop_type", "")
curr_lats = drivers_df["current_lat"].to_numpy(dtype=float)
curr_lons = drivers_df["current_lon"].to_numpy(dtype=float)
home_lats = drivers_df.get("home_base_lat", drivers_df["current_lat"]).to_numpy(dtype=float)
home_lons = drivers_df.get("home_base_lon", drivers_df["current_lon"]).to_numpy(dtype=float)
avail_caps = drivers_df.get("available_capacity", pd.Series(np.zeros(len(drivers_df)))).to_numpy(dtype=float)
driver_dates = drivers_df.get("available_date")
driver_crops = drivers_df.get("allowed_crops")
# 1. Capacity
capacity_score = (avail_caps >= req_cap).astype(float)
# 2. Temporal
time_score = extract_temporal_score(driver_dates, req_date)
# 3. Cargo Affinity
affinity_score = extract_affinity_score(driver_crops, req_crop)
# 4. Deadhead
deadhead_dists = compute_distance(curr_lats, curr_lons, p_lat, p_lon)
# 5. Corridor
pickup_dev = compute_cross_track_deviation(curr_lats, curr_lons, home_lats, home_lons, p_lat, p_lon)
dropoff_dev = compute_cross_track_deviation(curr_lats, curr_lons, home_lats, home_lons, d_lat, d_lon)
total_dev = pickup_dev + dropoff_dev
def inv_normalize(arr):
minv, maxv = arr.min(), arr.max()
if maxv - minv <= 1e-12:
return np.ones_like(arr, dtype=float)
return 1.0 - ((arr - minv) / (maxv - minv))
features_df = pd.DataFrame({
"driver_id": drivers_df["driver_id"].values,
"driver_name": drivers_df["name"].values,
"capacity_score": capacity_score,
"time_score": time_score,
"affinity_score": affinity_score,
"deadhead_score": inv_normalize(deadhead_dists),
"corridor_score": inv_normalize(total_dev)
})
return features_df
# ----------------- Execution & Ranking -----------------
def run_matching_engine(drivers_df, requests_df, top_k=3):
matches = []
# Updated Algorithm Weights (Total = 1.0)
W_DEADHEAD = 0.25
W_CORRIDOR = 0.35
W_AFFINITY = 0.20
W_TIME = 0.20
for _, req in requests_df.iterrows():
features = compute_scores(drivers_df, req)
# Calculate composite score (Capacity acts as a strict multiplier mask)
features["final_score"] = (
(W_DEADHEAD * features["deadhead_score"]) +
(W_CORRIDOR * features["corridor_score"]) +
(W_AFFINITY * features["affinity_score"]) +
(W_TIME * features["time_score"])
) * features["capacity_score"]
top = features.sort_values(by="final_score", ascending=False).head(top_k)
for _, row in top.iterrows():
# Only save matches that are viable (score > 0)
if float(row["final_score"]) > 0:
matches.append({
"request_id": req["request_id"],
"driver_id": row["driver_id"],
"driver_name": str(row["driver_name"]),
"final_score": float(row["final_score"]),
"capacity_score": float(row["capacity_score"]),
"time_score": float(row["time_score"]),
"affinity_score": float(row["affinity_score"]),
"deadhead_score": float(row["deadhead_score"]),
"corridor_score": float(row["corridor_score"])
})
return pd.DataFrame(matches)
# ----------------- Entry Point -----------------
def main():
drivers_path = os.path.join("data", "drivers.csv")
requests_path = os.path.join("data", "requests.csv")
if not os.path.exists(drivers_path) or not os.path.exists(requests_path):
raise FileNotFoundError("Missing datasets. Ensure data/drivers.csv and data/requests.csv exist.")
drivers_df, requests_df = load_data(drivers_path, requests_path)
matches_df = run_matching_engine(drivers_df, requests_df, top_k=3)
out_path = os.path.join("data", "matches.csv")
matches_df.to_csv(out_path, index=False, encoding="utf-8-sig")
print(f"Intelligence Engine Complete. Saved {len(matches_df)} matches to {out_path}.")
if __name__ == "__main__":
main()