| |
|
|
| |
| |
| |
| |
|
|
| import os, uuid, threading, shutil, time, heapq, cv2, numpy as np |
| from PIL import Image |
| import uvicorn |
| from fastapi import FastAPI, File, UploadFile, Request |
| from fastapi.responses import HTMLResponse, StreamingResponse, Response |
| from fastapi.staticfiles import StaticFiles |
|
|
|
|
| |
| import torch, yolov5, ffmpeg |
| from ultralytics import YOLO |
| from transformers import ( |
| DetrImageProcessor, DetrForObjectDetection, |
| SegformerFeatureExtractor, SegformerForSemanticSegmentation |
| ) |
| |
| from inference_sdk import InferenceHTTPClient |
|
|
|
|
| |
| BASE = "/home/user/app" |
| CACHE = f"{BASE}/cache" |
| UPLOAD_DIR = f"{CACHE}/uploads" |
| OUTPUT_DIR = f"{BASE}/outputs" |
| MODEL_DIR = f"{BASE}/model" |
| SPRITE = f"{BASE}/sprite.png" |
|
|
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
| os.makedirs(CACHE , exist_ok=True) |
| os.environ["TRANSFORMERS_CACHE"] = CACHE |
| os.environ["HF_HOME"] = CACHE |
|
|
| |
| print("🔄 Loading models …") |
| |
| model_self = YOLO(f"{MODEL_DIR}/garbage_detector.pt") |
| model_yolo5 = yolov5.load(f"{MODEL_DIR}/yolov5-detect-trash-classification.pt") |
| processor_detr = DetrImageProcessor.from_pretrained(f"{MODEL_DIR}/detr") |
| model_detr = DetrForObjectDetection.from_pretrained(f"{MODEL_DIR}/detr") |
| feat_extractor = SegformerFeatureExtractor.from_pretrained( |
| "nvidia/segformer-b4-finetuned-ade-512-512") |
| segformer = SegformerForSemanticSegmentation.from_pretrained( |
| "nvidia/segformer-b4-finetuned-ade-512-512") |
| |
| model_animal = YOLO(f"{MODEL_DIR}/yolov8n.pt") |
| |
| model_garbage_cls = YOLO(f"{MODEL_DIR}/garbage_cls_yolov8s.pt") |
| print("✅ Models ready\n") |
|
|
| |
| |
| ade_palette = np.array([ |
| [0, 0, 0], [120, 120, 120], [180, 120, 120], [6, 230, 230], [80, 50, 50], |
| [4, 200, 3], [120, 120, 80], [140, 140, 140], [204, 5, 255], [230, 230, 230], |
| [4, 250, 7], [224, 5, 255], [235, 255, 7], [150, 5, 61], [120, 120, 70], |
| [8, 255, 51], [255, 6, 82], [143, 255, 140], [204, 255, 4], [255, 51, 7], |
| [204, 70, 3], [0, 102, 200], [61, 230, 250], [255, 6, 51], [11, 102, 255], |
| [255, 7, 71], [255, 9, 224], [9, 7, 230], [220, 220, 220], |
| [112, 9, 255], [8, 255, 214], [7, 255, 224], [255, 184, 6], [10, 255, 71], |
| [255, 41, 10], [7, 255, 255], [224, 255, 8], [102, 8, 255], [255, 61, 6], |
| [255, 194, 7], [255, 122, 8], [0, 255, 20], [255, 8, 41], [255, 5, 153], |
| [6, 51, 255], [235, 12, 255], [160, 150, 20], [0, 163, 255], [140, 140, 140], |
| [250, 10, 15], [20, 255, 0], [31, 255, 0], [255, 31, 0], [255, 224, 0], |
| [153, 255, 0], [0, 0, 255], [255, 71, 0], [0, 235, 255], [0, 173, 255], |
| [31, 0, 255], [11, 200, 200], [255, 82, 0], [0, 255, 245], [0, 61, 255], |
| [0, 255, 112], [0, 255, 133], [255, 0, 0], [255, 163, 0], [255, 102, 0], |
| [194, 255, 0], [0, 143, 255], [51, 255, 0], [0, 82, 255], [0, 255, 41], |
| [255, 0, 255], [255, 0, 245], [255, 0, 102], [255, 173, 0], [255, 0, 20], |
| [255, 184, 184], [0, 31, 255], [0, 255, 61], [0, 71, 255], [255, 0, 204], |
| [0, 255, 194], [0, 255, 82], [0, 10, 255], [0, 112, 255], [51, 0, 255], |
| [0, 194, 255], [0, 122, 255], [0, 255, 163], [255, 153, 0], [0, 255, 10], |
| [255, 112, 0], [143, 255, 0], [82, 0, 255], [163, 255, 0], [255, 235, 0], |
| [8, 184, 170], [133, 0, 255], [0, 255, 92], [184, 0, 255], [255, 0, 31], |
| [0, 184, 255], [0, 214, 255], [255, 0, 112], [92, 255, 0], [0, 224, 255], |
| [112, 224, 255], [70, 184, 160], [163, 0, 255], [153, 0, 255], [71, 255, 0], |
| [255, 0, 163], [255, 204, 0], [255, 0, 143], [0, 255, 235], [133, 255, 0], |
| [255, 0, 235], [245, 0, 255], [255, 0, 122], [255, 245, 0], [10, 190, 212], |
| [214, 255, 0], [0, 204, 255], [20, 0, 255], [255, 255, 0], [0, 153, 255], |
| [0, 41, 255], [0, 255, 204], [41, 0, 255], [41, 255, 0], [173, 0, 255], |
| [0, 245, 255], [71, 0, 255], [122, 0, 255], [0, 255, 184], [0, 92, 255], |
| [184, 255, 0], [0, 133, 255], [255, 214, 0], [25, 194, 194], [102, 255, 0], |
| [92, 0, 255] |
| ], dtype=np.uint8) |
| if ade_palette.shape[0] < 150: |
| missing = 150 - ade_palette.shape[0] |
| padding = np.zeros((missing, 3), dtype=np.uint8) |
| ade_palette = np.vstack([ade_palette, padding]) |
|
|
| custom_class_map = { |
| "Garbage": [(255, 8, 41), (235, 255, 7), (255, 5, 153), (255, 0, 102)], |
| "Water": [(0, 102, 200), (11, 102, 255), (31, 0, 255), (10, 0, 255), (9, 7, 230)], |
| "Grass / Vegetation": [(10, 255, 71), (143, 255, 140)], |
| "Tree / Natural Obstacle": [(4, 200, 3), (235, 12, 255), (255, 6, 82), (255, 163, 0)], |
| "Sand / Soil / Ground": [(80, 50, 50), (230, 230, 230)], |
| "Buildings / Structures": [(255, 0, 255), (184, 0, 255), (120, 120, 120), (7, 255, 224)], |
| "Sky / Background": [(180, 120, 120)], |
| "Undetecable": [(0, 0, 0)], |
| "Unknown Class": [] |
| } |
| TOL = 30 |
|
|
| |
| def interpret_rgb_class(decoded_img): |
| ambiguous_rgb = np.array([150, 5, 61]) |
| matches = np.all(np.abs(decoded_img - ambiguous_rgb) <= TOL, axis=-1) |
| match_ratio = np.count_nonzero(matches) / matches.size |
| return "garbage" if match_ratio > 0.15 else "sand" |
|
|
| |
| def build_masks(seg): |
| """ |
| Returns three binary masks at (H,W): |
| water_mask – 1 = water |
| garbage_mask – 1 = semantic “Garbage” pixels |
| movable_mask – union of water & garbage (robot can travel here) |
| """ |
| decoded = ade_palette[seg] |
| water_mask = np.zeros(seg.shape, np.uint8) |
| garbage_mask = np.zeros_like(water_mask) |
| |
| context_label = interpret_rgb_class(decoded) |
| resolved_map = custom_class_map.copy() |
| |
| if context_label == "garbage": |
| resolved_map["Garbage"].append((150, 5, 61)) |
| resolved_map["Sand / Soil / Ground"] = [rgb for rgb in resolved_map["Sand / Soil / Ground"] if rgb != (150, 5, 61)] |
| else: |
| resolved_map["Sand / Soil / Ground"].append((150, 5, 61)) |
| resolved_map["Garbage"] = [rgb for rgb in resolved_map["Garbage"] if rgb != (150, 5, 61)] |
| |
| for rgb in custom_class_map["Water"]: |
| water_mask |= (np.abs(decoded - rgb).max(axis=-1) <= TOL) |
| |
| for rgb in custom_class_map["Garbage"]: |
| garbage_mask |= (np.abs(decoded - rgb).max(axis=-1) <= TOL) |
| movable_mask = water_mask | garbage_mask |
| return water_mask, garbage_mask, movable_mask |
|
|
| |
| def highlight_chunk_masks_on_frame( |
| frame, labels, objs, |
| color_uncollected=(0, 0, 128), |
| color_collected=(0, 128, 0), |
| color_unreachable=(0, 255, 255), |
| alpha=0.8 |
| ): |
| """ |
| Overlays semi-transparent colored regions for garbage chunks on the frame. |
| `objs` must have 'pos' and 'col' keys. The collection status changes the overlay color. |
| """ |
| overlay = frame.copy() |
| for i, obj in enumerate(objs): |
| x, y = obj["pos"] |
| lab = labels[y, x] |
| if lab == 0: |
| continue |
| mask = (labels == lab).astype(np.uint8) |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| |
| if obj.get("unreachable"): |
| color = color_unreachable |
| elif obj["col"]: |
| color = color_collected |
| else: |
| color = color_uncollected |
| cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED) |
| |
| return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0) |
| |
| def highlight_water_mask_on_frame(frame, binary_mask, color=(255, 0, 0), alpha=0.3): |
| """ |
| Overlays semi-transparent colored mask (binary) on the frame. |
| """ |
| overlay = frame.copy() |
| mask = binary_mask.astype(np.uint8) * 255 |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| |
| cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED) |
| return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0) |
|
|
|
|
| |
| def astar(start, goal, occ): |
| h = lambda a,b: abs(a[0]-b[0])+abs(a[1]-b[1]) |
| N8 = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)] |
| openq=[(0,start)]; g={start:0}; came={} |
| while openq: |
| _,cur=heapq.heappop(openq) |
| if cur==goal: |
| p=[cur]; |
| while cur in came: cur=came[cur]; p.append(cur) |
| return p[::-1] |
| for dx,dy in N8: |
| nx,ny=cur[0]+dx,cur[1]+dy |
| |
| if not (0<=nx<640 and 0<=ny<640) or occ[ny,nx]==0: continue |
| |
| if abs(dx)==1 and abs(dy)==1: |
| if occ[cur[1]+dy, cur[0]]==0 or occ[cur[1], cur[0]+dx]==0: |
| continue |
| ng=g[cur]+1 |
| if (nx,ny) not in g or ng<g[(nx,ny)]: |
| g[(nx,ny)]=ng |
| f=ng+h((nx,ny),goal) |
| heapq.heappush(openq,(f,(nx,ny))) |
| came[(nx,ny)]=cur |
| return [] |
| |
| def knn_path(start, targets, occ): |
| todo = targets[:]; path=[] |
| cur = tuple(start) |
| reachable = []; unreachable = [] |
| while todo: |
| |
| best = None |
| best_len = float('inf') |
| best_seg = [] |
| |
| for t in todo: |
| seg = astar(cur, tuple(t), occ) |
| if seg and len(seg) < best_len: |
| best = tuple(t) |
| best_len = len(seg) |
| best_seg = seg |
| if not best: |
| |
| for u in todo: |
| print(f"⚠️ Garbage unreachable at {u}") |
| unreachable.append(u) |
| break |
| if path and path[-1] == best_seg[0]: |
| best_seg = best_seg[1:] |
| path.extend(best_seg) |
| reachable.append(list(best)) |
| cur = best |
| todo.remove(list(best)) |
| return path, unreachable |
|
|
|
|
| |
| class Robot: |
| def __init__(self, sprite, speed=2000): |
| img = Image.open(sprite).convert("RGBA").resize((40, 40)) |
| self.png = np.array(img) |
| if self.png.shape[-1] != 4: |
| raise ValueError("Sprite image must have 4 channels (RGBA)") |
| self.png = np.array(Image.open(sprite).convert("RGBA").resize((40,40))) |
| self.speed = speed |
| self.pos = [20, 20] |
| def step(self, path): |
| while path: |
| dx, dy = path[0][0] - self.pos[0], path[0][1] - self.pos[1] |
| dist = (dx * dx + dy * dy) ** 0.5 |
| if dist <= self.speed: |
| self.pos = list(path.pop(0)) |
| else: |
| r = self.speed / dist |
| new_x = self.pos[0] + dx * r |
| new_y = self.pos[1] + dy * r |
| |
| self.pos = [ |
| int(np.clip(new_x, 20, 640 - 20)), |
| int(np.clip(new_y, 20, 640 - 20)) |
| ] |
| |
| break |
|
|
|
|
| |
| from fastapi.responses import JSONResponse, FileResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| app = FastAPI() |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
| app.mount("/statics", StaticFiles(directory="statics"), name="statics") |
| video_ready={} |
| @app.get("/ui", response_class=HTMLResponse) |
| async def serve_index(): |
| p = "statics/index.html" |
| if os.path.exists(p): |
| print("[STATIC] Serving index.html") |
| return FileResponse(p) |
| print("[STATIC] index.html not found") |
| return JSONResponse(status_code=404, content={"detail":"Not found"}) |
| def _uid(): return uuid.uuid4().hex[:8] |
|
|
|
|
| |
| |
| @app.post("/upload/") |
| async def upload(file:UploadFile=File(...)): |
| uid=_uid(); dest=f"{UPLOAD_DIR}/{uid}_{file.filename}" |
| with open(dest,"wb") as bf: shutil.copyfileobj(file.file,bf) |
| threading.Thread(target=_pipeline, args=(uid,dest)).start() |
| return {"user_id":uid} |
|
|
| |
| @app.get("/check_video/{uid}") |
| def chk(uid:str): return {"ready":video_ready.get(uid,False)} |
|
|
| |
| @app.get("/video/{uid}") |
| def stream(uid:str): |
| vid=f"{OUTPUT_DIR}/{uid}.mp4" |
| if not os.path.exists(vid): return Response(status_code=404) |
| return StreamingResponse(open(vid,"rb"), media_type="video/mp4") |
|
|
| |
| |
| |
| import base64, requests |
| def roboflow_infer(image_path, api_url, api_key): |
| with open(image_path, "rb") as image_file: |
| files = {"file": image_file} |
| res = requests.post( |
| f"{api_url}?api_key={api_key}&confidence=70", |
| files=files |
| ) |
| print(f"[Roboflow] {res.status_code} response") |
| try: |
| return res.json() |
| except Exception as e: |
| print("[Roboflow JSON decode error]", e) |
| return {} |
|
|
| |
| @app.post("/animal/") |
| async def detect_animals(file: UploadFile = File(...)): |
| img_id = _uid() |
| img_path = f"{UPLOAD_DIR}/{img_id}_{file.filename}" |
| with open(img_path, "wb") as f: |
| shutil.copyfileobj(file.file, f) |
| print(f"[Animal] Uploaded image: {img_path}") |
| |
| image = cv2.imread(img_path) |
| detections = [] |
|
|
| |
| print("[Animal] Detecting via YOLOv8…") |
| try: |
| results = model_animal(image)[0] |
| for box in results.boxes: |
| conf = box.conf[0].item() |
| if conf >= 0.70: |
| cls_id = int(box.cls[0].item()) |
| label = model_animal.names[cls_id].lower() |
| if label in ["dog", "cat", "cow", "horse", "elephant", "bear", "zebra", "giraffe", "bird"]: |
| x1, y1, x2, y2 = map(int, box.xyxy[0].tolist()) |
| detections.append(((x1, y1, x2, y2), f"Animal Alert {conf}")) |
| except Exception as e: |
| print("[YOLOv8 Error]", e) |
|
|
| |
| print("[API] Roboflow key:", os.getenv("ROBOFLOW_KEY", "❌ not set")) |
|
|
| |
| try: |
| print("[Animal] Detecting via Roboflow Fish model…") |
| fish_response = roboflow_infer( |
| img_path, |
| "https://detect.roboflow.com/hydroquest/1", |
| api_key=os.getenv("ROBOFLOW_KEY", "") |
| ) |
| for pred in fish_response.get("predictions", []): |
| if pred["confidence"] >= 0.70: |
| acc = pred["confidence"] |
| x1 = int(pred["x"] - pred["width"] / 2) |
| y1 = int(pred["y"] - pred["height"] / 2) |
| x2 = int(pred["x"] + pred["width"] / 2) |
| y2 = int(pred["y"] + pred["height"] / 2) |
| detections.append(((x1, y1, x2, y2), f"Fish Alert {acc}")) |
| print("[Roboflow Fish Response]", fish_response) |
| except Exception as e: |
| print("[Roboflow Fish Error]", e) |
|
|
| |
| try: |
| print("[Animal] Detecting via Roboflow Bird model…") |
| bird_response = roboflow_infer( |
| img_path, |
| "https://detect.roboflow.com/bird_only-pt0bm/1", |
| api_key=os.getenv("ROBOFLOW_KEY", "") |
| ) |
| for pred in bird_response.get("predictions", []): |
| if pred["confidence"] >= 0.70: |
| acc = pred["confidence"] |
| x1 = int(pred["x"] - pred["width"] / 2) |
| y1 = int(pred["y"] - pred["height"] / 2) |
| x2 = int(pred["x"] + pred["width"] / 2) |
| y2 = int(pred["y"] + pred["height"] / 2) |
| detections.append(((x1, y1, x2, y2), f"Bird Alert {acc}")) |
| print("[Roboflow Bird Response]", bird_response) |
| except Exception as e: |
| print("[Roboflow Bird Error]", e) |
|
|
| |
| print(f"[Animal] Total detections: {len(detections)}") |
| |
| for (x1, y1, x2, y2), label in detections: |
| cv2.rectangle(image, (x1, y1), (x2, y2), (0, 0, 255), 2) |
| cv2.putText(image, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) |
| |
| result_path = f"{OUTPUT_DIR}/{img_id}_animal.jpg" |
| cv2.imwrite(result_path, image) |
| return FileResponse(result_path, media_type="image/jpeg") |
|
|
|
|
| |
| @app.post("/classification/") |
| async def classify_garbage(file: UploadFile = File(...)): |
| img_id = _uid() |
| img_path = f"{UPLOAD_DIR}/{img_id}_{file.filename}" |
| out_path = f"{OUTPUT_DIR}/{img_id}_classified.jpg" |
| |
| with open(img_path, "wb") as f: |
| shutil.copyfileobj(file.file, f) |
| |
| print(f"[Classification] Received image: {img_path}") |
| image = cv2.imread(img_path) |
| rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
| pil = Image.fromarray(rgb) |
| |
| detections = [] |
| |
| for r in model_self(image): |
| detections += [b.xyxy[0].tolist() for b in r.boxes] |
| |
| r = model_yolo5(image) |
| if hasattr(r, 'pred') and len(r.pred) > 0: |
| detections += [p[:4].tolist() for p in r.pred[0]] |
| |
| with torch.no_grad(): |
| out = model_detr(**processor_detr(images=pil, return_tensors="pt")) |
| results = processor_detr.post_process_object_detection( |
| outputs=out, |
| target_sizes=torch.tensor([pil.size[::-1]]), |
| threshold=0.5 |
| )[0] |
| detections += [b.tolist() for b in results["boxes"]] |
| print(f"[Classification] Total detections from 3 models: {len(detections)}") |
| |
| for box in detections: |
| x1, y1, x2, y2 = map(int, box) |
| x1, x2 = max(0, min(x1, 639)), max(0, min(x2, 639)) |
| y1, y2 = max(0, min(y1, 639)), max(0, min(y2, 639)) |
| |
| crop = image[y1:y2, x1:x2] |
| if crop.shape[0] < 10 or crop.shape[1] < 10: |
| continue |
| |
| pil_crop = Image.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)) |
| with torch.no_grad(): |
| pred = model_garbage_cls(pil_crop, verbose=False)[0] |
| class_id = int(pred.probs.top1) |
| class_name = model_garbage_cls.names[class_id] |
| conf = float(pred.probs.top1conf) |
| |
| label = f"{class_name} ({conf:.2f})" |
| |
| if conf < 0.4: |
| color = (0, 0, 255) |
| elif conf < 0.6: |
| color = (0, 255, 0) |
| elif conf < 0.8: |
| color = (255, 255, 0) |
| else: |
| color = (255, 0, 255) |
| |
| cv2.rectangle(image, (x1, y1), (x2, y2), color, 2) |
| cv2.putText(image, label, (x1, y1 - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) |
| |
| cv2.imwrite(out_path, image) |
| print(f"[Classification] Output saved: {out_path}") |
| return FileResponse(out_path, media_type="image/jpeg") |
|
|
|
|
|
|
| |
| def _pipeline(uid,img_path): |
| print(f"▶️ [{uid}] processing") |
| bgr=cv2.resize(cv2.imread(img_path),(640,640)); rgb=cv2.cvtColor(bgr,cv2.COLOR_BGR2RGB) |
| pil=Image.fromarray(rgb) |
|
|
| |
| with torch.no_grad(): |
| inputs = feat_extractor(pil, return_tensors="pt") |
| seg_logits = segformer(**inputs).logits |
| |
| seg_tensor = seg_logits.argmax(1)[0].cpu() |
| if seg_tensor.numel() == 0: |
| print(f"❌ [{uid}] segmentation failed (empty tensor)") |
| video_ready[uid] = True |
| return |
| |
| seg = cv2.resize(seg_tensor.numpy(), (640, 640), interpolation=cv2.INTER_NEAREST) |
| print(f"🧪 [{uid}] segmentation input shape: {inputs['pixel_values'].shape}") |
| water_mask, garbage_mask, movable_mask = build_masks(seg) |
|
|
| |
| detections=[] |
| |
| num_cc, labels = cv2.connectedComponents(garbage_mask.astype(np.uint8)) |
| chunk_centres = [] |
| for lab in range(1, num_cc): |
| ys, xs = np.where(labels == lab) |
| if xs.size == 0: |
| continue |
| chunk_centres.append([int(xs.mean()), int(ys.mean())]) |
| print(f"🧠 {len(chunk_centres)} garbage chunk detected") |
| |
| for r in model_self(bgr): |
| detections += [b.xyxy[0].tolist() for b in r.boxes] |
| r = model_yolo5(bgr) |
| if hasattr(r, 'pred') and len(r.pred) > 0: |
| detections += [p[:4].tolist() for p in r.pred[0]] |
| inp=processor_detr(images=pil,return_tensors="pt") |
| with torch.no_grad(): out=model_detr(**inp) |
| post = processor_detr.post_process_object_detection( |
| outputs=out, |
| target_sizes=torch.tensor([pil.size[::-1]]), |
| threshold=0.5 |
| )[0] |
| detections += [b.tolist() for b in post["boxes"]] |
| |
| centres = [] |
| for x1, y1, x2, y2 in detections: |
| ''' |
| We conduct a 20% allowance whether the center |
| of the detected garbage's bbox lies within the travelable zone |
| which was segmented earlier to be the water and garbage zone |
| ''' |
| x1, y1, x2, y2 = map(int, [x1, y1, x2, y2]) |
| x1 = max(0, min(x1, 639)); y1 = max(0, min(y1, 639)) |
| x2 = max(0, min(x2, 639)); y2 = max(0, min(y2, 639)) |
| box_mask = movable_mask[y1:y2, x1:x2] |
| if box_mask.size == 0: |
| continue |
| if np.count_nonzero(box_mask) / box_mask.size >= 0.5: |
| centres.append([int((x1 + x2) / 2), int((y1 + y2) / 2)]) |
| |
| centres.extend(chunk_centres) |
| |
| |
| |
| |
| |
| |
| if not centres: |
| print(f"🛑 [{uid}] no reachable garbage"); video_ready[uid]=True; return |
| else: |
| print(f"🧠 {len(centres)} garbage objects on water selected from {len(detections)} detections") |
|
|
| |
| |
| ys, xs = np.where(water_mask) |
| if len(ys)==0: |
| |
| print(f"❌ [{uid}] no water to spawn on") |
| video_ready[uid] = True |
| return |
| |
| idx = np.lexsort((xs, ys)) |
| spawn_y, spawn_x = int(ys[idx[0]]), int(xs[idx[0]]) |
| |
| spawn_x = np.clip(spawn_x, 20, 640-20) |
| spawn_y = np.clip(spawn_y, 20, 640-20) |
| robot = Robot(SPRITE) |
| |
| robot.pos = [spawn_x, spawn_y] |
| path, unreachable = knn_path(robot.pos, centres, movable_mask) |
| if unreachable: |
| print(f"⚠️ Unreachable garbage chunks at: {unreachable}") |
|
|
| |
| out_tmp=f"{OUTPUT_DIR}/{uid}_tmp.mp4" |
| vw=cv2.VideoWriter(out_tmp,cv2.VideoWriter_fourcc(*"mp4v"),10.0,(640,640)) |
| objs = [{"pos": p, "col": False, "unreachable": False} for p in centres if p not in unreachable] |
| objs += [{"pos": p, "col": False, "unreachable": True} for p in unreachable] |
| bg = bgr.copy() |
| for _ in range(15000): |
| frame=bg.copy() |
| |
| frame = highlight_chunk_masks_on_frame( |
| frame, |
| labels, |
| objs, |
| color_uncollected=(0, 0, 128), |
| color_collected=(0, 128, 0), |
| color_unreachable=(0, 255, 255) |
| ) |
| frame = highlight_water_mask_on_frame(frame, water_mask) |
| |
| for o in objs: |
| color = (0, 0, 128) if not o["col"] else (0, 128, 0) |
| x, y = o["pos"] |
| cv2.circle(frame, (x, y), 6, color, -1) |
| |
| robot.step(path) |
| sp = robot.png |
| sprite_h, sprite_w = sp.shape[:2] |
| rx, ry = robot.pos |
| x1, y1 = rx - sprite_w // 2, ry - sprite_h // 2 |
| x2, y2 = x1 + sprite_w, y1 + sprite_h |
| |
| x1_clip, x2_clip = max(0, x1), min(frame.shape[1], x2) |
| y1_clip, y2_clip = max(0, y1), min(frame.shape[0], y2) |
| |
| sx1, sy1 = x1_clip - x1, y1_clip - y1 |
| sx2, sy2 = sprite_w - (x2 - x2_clip), sprite_h - (y2 - y2_clip) |
| sprite_crop = sp[sy1:sy2, sx1:sx2] |
| alpha = sprite_crop[:, :, 3] / 255.0 |
| alpha = np.stack([alpha] * 3, axis=-1) |
| bgroi = frame[y1_clip:y2_clip, x1_clip:x2_clip] |
| blended = (alpha * sprite_crop[:, :, :3] + (1 - alpha) * bgroi).astype(np.uint8) |
| frame[y1_clip:y2_clip, x1_clip:x2_clip] = blended |
| |
| for o in objs: |
| if not o["col"] and np.hypot(o["pos"][0]-robot.pos[0], o["pos"][1]-robot.pos[1]) <= 20: |
| o["col"]=True |
| vw.write(frame) |
| if all(o["col"] for o in objs): break |
| if not path: break |
| vw.release() |
|
|
| |
| final=f"{OUTPUT_DIR}/{uid}.mp4" |
| ffmpeg.input(out_tmp).output(final,vcodec="libx264",pix_fmt="yuv420p").run(overwrite_output=True,quiet=True) |
| os.remove(out_tmp); video_ready[uid]=True |
| print(f"✅ [{uid}] video ready → {final}") |
|
|
|
|
| |
| if __name__=="__main__": |
| uvicorn.run(app,host="0.0.0.0",port=7860) |
|
|