#!/usr/bin/env python3 from __future__ import annotations import hashlib import json import os import shutil import sys import xml.etree.ElementTree as ET import csv from dataclasses import dataclass from pathlib import Path from typing import Any from zipfile import ZipFile import numpy as np import pandas as pd import yaml from PIL import Image REPO_ROOT = Path(__file__).resolve().parents[1] DOWNLOADS_ROOT = REPO_ROOT / "data_local" / "downloads" PROCESSED_ROOT = REPO_ROOT / "data" / "processed" if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) from autofarm.unified_weed_dataset import ( PROCESSED_DATASET_NAME as ACTIVE_GROUND_WEEDS_DATASET_NAME, build_phenobench_weed_dataset, ) GROUND_WEED_CLASSES = { "segmentation": {0: "background", 1: "crop", 2: "weed"}, "binary_vegetation": {0: "vegetation", 255: "background"}, "detection": {0: "weed"}, "semantic_segmentation": { 0: "background", 1: "crop", 2: "weed", 3: "partial_crop", 4: "partial_weed", }, } @dataclass(frozen=True) class DatasetPaths: root: Path images: Path masks: Path labels: Path def main() -> None: PROCESSED_ROOT.mkdir(parents=True, exist_ok=True) summary: dict[str, Any] = { "generated_from": str(REPO_ROOT), "blocked": { "zone_state_bootstrap": ( "Missing data/raw/field/field_boundary.geojson, so no field-specific " "geospatial bootstrap could be materialized." ) }, } ground_weeds = prepare_ground_weeds_phenobench() replay = prepare_replay_multimodal() summary["datasets"] = { ACTIVE_GROUND_WEEDS_DATASET_NAME: ground_weeds, "replay_multimodal": replay, } write_json(summary, PROCESSED_ROOT / "manifest.json") print("Prepared datasets:") for key, value in summary["datasets"].items(): print(f" - {key}: {value['records']} records") def prepare_ground_weeds_phenobench() -> dict[str, Any]: # The active weed pipeline is single-source PhenoBench only; mixed-source public builders are legacy. return build_phenobench_weed_dataset( downloads_root=DOWNLOADS_ROOT, output_root=PROCESSED_ROOT / ACTIVE_GROUND_WEEDS_DATASET_NAME, ) def process_cwfid(paths: DatasetPaths, records: list[dict[str, Any]]) -> dict[str, Any]: source_root = DOWNLOADS_ROOT / "cwfid" split_config = yaml.safe_load((source_root / "train_test_split.yaml").read_text()) provided_train = {f"{idx:03d}" for idx in split_config["train"]} provided_test = {f"{idx:03d}" for idx in split_config["test"]} count = 0 for image_path in sorted((source_root / "images").glob("*_image.png")): sample_id = image_path.stem.split("_")[0] split = "train" if sample_id in provided_train else "test" if sample_id in provided_test else "unspecified" vegetation_mask_path = source_root / "masks" / f"{sample_id}_mask.png" crop_weed_mask_path = source_root / "annotations" / f"{sample_id}_annotation.png" polygon_yaml_path = source_root / "annotations" / f"{sample_id}_annotation.yaml" width, height = image_size_from_file(image_path) image_target = paths.images / f"cwfid__{sample_id}.png" vegetation_target = paths.masks / f"cwfid__{sample_id}__vegetation.png" crop_weed_target = paths.masks / f"cwfid__{sample_id}__crop_weed.png" label_target = paths.labels / f"cwfid__{sample_id}.json" link_or_copy_file(image_path, image_target) link_or_copy_file(vegetation_mask_path, vegetation_target) link_or_copy_file(crop_weed_mask_path, crop_weed_target) annotations = yaml.safe_load(polygon_yaml_path.read_text()) polygons = [ { "class_name": obj["type"], "points_xy": list( zip( ensure_list(obj["points"]["x"]), ensure_list(obj["points"]["y"]), strict=True, ) ), } for obj in annotations["annotation"] ] label_payload = { "dataset": "cwfid", "sample_id": sample_id, "split": split, "image": { "path": rel_to_root(image_target, paths.root), "width": width, "height": height, }, "annotation_type": "segmentation_and_polygons", "binary_vegetation_mask": { "path": rel_to_root(vegetation_target, paths.root), "value_map": {"vegetation": 0, "background": 255}, }, "crop_weed_mask": { "path": rel_to_root(crop_weed_target, paths.root), "color_map": { "background": [0, 0, 0], "weed": [255, 0, 0], "crop": [0, 255, 0], }, }, "polygons": polygons, } write_json(label_payload, label_target) records.append( { "sample_id": sample_id, "source_dataset": "cwfid", "split": split, "annotation_type": "segmentation_and_polygons", "image_path": rel_to_root(image_target, paths.root), "label_path": rel_to_root(label_target, paths.root), "mask_path": rel_to_root(crop_weed_target, paths.root), "aux_mask_path": rel_to_root(vegetation_target, paths.root), "width": width, "height": height, "scene": None, "native_annotation_format": "png_mask + yaml_polygons", } ) count += 1 return { "raw_source": "data_local/downloads/cwfid/", "records": count, "provided_splits": { "train": len(provided_train), "test": len(provided_test), }, } def process_ronin(paths: DatasetPaths, records: list[dict[str, Any]]) -> dict[str, Any]: ronin_zip = DOWNLOADS_ROOT / "Ronin_OPEN_DB.zip" count = 0 with ZipFile(ronin_zip) as zf: annotation_members = sorted( name for name in zf.namelist() if name.startswith("Ronin_OPEN_DB/annotations/") and name.endswith(".xml") ) for xml_member in annotation_members: root = ET.fromstring(zf.read(xml_member)) image_name = root.findtext("filename") if not image_name: continue sample_id = Path(image_name).stem image_member = f"Ronin_OPEN_DB/raw images/{image_name}" if image_member not in zf.namelist(): continue split = stable_hash_split(sample_id) image_bytes = zf.read(image_member) width, height = image_size_from_bytes(image_bytes) image_target = paths.images / f"ronin__{sample_id}.jpg" label_target = paths.labels / f"ronin__{sample_id}.json" write_bytes(image_target, image_bytes) boxes = [] for obj in root.findall("object"): box = obj.find("bndbox") if box is None: continue boxes.append( { "class_name": obj.findtext("name", default="weed"), "bbox_xyxy": [ int(box.findtext("xmin", default="0")), int(box.findtext("ymin", default="0")), int(box.findtext("xmax", default="0")), int(box.findtext("ymax", default="0")), ], } ) label_payload = { "dataset": "ronin_open_db", "sample_id": sample_id, "split": split, "image": { "path": rel_to_root(image_target, paths.root), "width": width, "height": height, }, "annotation_type": "detection_boxes", "objects": boxes, } write_json(label_payload, label_target) records.append( { "sample_id": sample_id, "source_dataset": "ronin_open_db", "split": split, "annotation_type": "detection_boxes", "image_path": rel_to_root(image_target, paths.root), "label_path": rel_to_root(label_target, paths.root), "mask_path": None, "aux_mask_path": None, "width": width, "height": height, "scene": None, "native_annotation_format": "pascal_voc_xml", } ) count += 1 return { "raw_source": "data_local/downloads/Ronin_OPEN_DB.zip", "records": count, "split_policy": "Deterministic hash split: 80% train, 10% val, 10% test.", } def process_phenobench(paths: DatasetPaths, records: list[dict[str, Any]]) -> dict[str, Any]: phenobench_zip = DOWNLOADS_ROOT / "PhenoBench-v110.zip" if not phenobench_zip.exists(): return { "raw_source": str(phenobench_zip), "records": 0, "note": "PhenoBench archive not found locally; weed-negative reference images were not imported.", } count = 0 clean_crop_count = 0 weed_positive_count = 0 split_counts: dict[str, int] = {"train": 0, "val": 0, "test": 0} source_split_counts: dict[str, int] = {"train": 0, "val": 0} with ZipFile(phenobench_zip) as zf: members = set(zf.namelist()) for source_split in ("train", "val"): semantic_members = sorted( name for name in members if name.startswith(f"PhenoBench/{source_split}/semantics/") and name.endswith(".png") ) for semantic_member in semantic_members: sample_id = Path(semantic_member).stem image_member = f"PhenoBench/{source_split}/images/{sample_id}.png" if image_member not in members: continue image_bytes = zf.read(image_member) width, height = image_size_from_bytes(image_bytes) semantic_mask = np.asarray(Image.open(zf.open(semantic_member)), dtype=np.uint16) semantic_summary = phenobench_semantic_summary(semantic_mask) split = phenobench_processed_split(source_split=source_split, sample_id=sample_id) image_target = paths.images / f"phenobench__{source_split}__{sample_id}.png" mask_target = paths.masks / f"phenobench__{source_split}__{sample_id}.png" label_target = paths.labels / f"phenobench__{source_split}__{sample_id}.json" write_bytes(image_target, image_bytes) write_mask(mask_target, semantic_mask.astype(np.uint8)) label_payload = { "dataset": "phenobench", "sample_id": sample_id, "source_split": source_split, "split": split, "image": { "path": rel_to_root(image_target, paths.root), "width": width, "height": height, }, "annotation_type": "semantic_segmentation", "semantic_mask": { "path": rel_to_root(mask_target, paths.root), "class_map": GROUND_WEED_CLASSES["semantic_segmentation"], }, "semantic_summary": semantic_summary["fractions"], "derived_tags": semantic_summary["derived_tags"], } write_json(label_payload, label_target) records.append( { "sample_id": sample_id, "source_dataset": "phenobench", "split": split, "annotation_type": "semantic_segmentation", "image_path": rel_to_root(image_target, paths.root), "label_path": rel_to_root(label_target, paths.root), "mask_path": rel_to_root(mask_target, paths.root), "aux_mask_path": None, "width": width, "height": height, "scene": source_split, "native_annotation_format": "uint16_semantic_mask", "source_split": source_split, "crop_fraction": semantic_summary["fractions"]["crop_fraction"], "partial_crop_fraction": semantic_summary["fractions"]["partial_crop_fraction"], "weed_fraction": semantic_summary["fractions"]["weed_fraction"], "partial_weed_fraction": semantic_summary["fractions"]["partial_weed_fraction"], "derived_tags": json.dumps(semantic_summary["derived_tags"]), } ) if "weed_presence" in semantic_summary["derived_tags"]: weed_positive_count += 1 if "weed_free_crop_context" in semantic_summary["derived_tags"]: clean_crop_count += 1 split_counts[split] = split_counts.get(split, 0) + 1 source_split_counts[source_split] = source_split_counts.get(source_split, 0) + 1 count += 1 return { "raw_source": str(phenobench_zip), "records": count, "source_splits": source_split_counts, "processed_splits": split_counts, "weed_positive_records": weed_positive_count, "weed_free_crop_context_records": clean_crop_count, "split_policy": ( "Source train is deterministically partitioned into processed train/val; " "source val is reserved for processed test because PhenoBench test images ship without labels." ), } def prepare_replay_multimodal() -> dict[str, Any]: dataset_root = PROCESSED_ROOT / "replay_multimodal" dataset_root.mkdir(parents=True, exist_ok=True) bag_src = DOWNLOADS_ROOT / "RELLIS-3D-full-stack-merged-60s.bag" bag_target = dataset_root / "rellis_3d_full_stack_merged_60s.bag" if not bag_target.exists(): os.symlink(bag_src.resolve(), bag_target) topics = [ "/img_node/intensity_image", "/img_node/noise_image", "/img_node/range_image", "/imu/data", "/imu/data_raw", "/nerian/left/image_raw", "/nerian/right/image_raw", "/odometry/filtered", "/os1_cloud_node/points", "/pylon_camera_node/image_raw", "/tf", "/tf_static", "/vectornav/GPS", "/vectornav/IMU", "/vectornav/Odom", "/velodyne_points", "/warthog_velocity_controller/cmd_vel", "/warthog_velocity_controller/odom", ] metadata = { "dataset": "replay_multimodal", "description": "Stable replay entrypoint for the 60-second RELLIS-3D full-stack ROS bag.", "records": 1, "bag_path": rel_to_root(bag_target, dataset_root), "source_bag_path": str(bag_src), "topics": topics, "note": "This dataset is a replay/evaluation asset, not a supervised training table.", } write_json(metadata, dataset_root / "manifest.json") return metadata def dataset_paths(root: Path) -> DatasetPaths: images = root / "images" masks = root / "masks" labels = root / "labels" for path in (images, masks, labels): path.mkdir(parents=True, exist_ok=True) return DatasetPaths(root=root, images=images, masks=masks, labels=labels) def write_index_files(df: pd.DataFrame, dataset_root: Path) -> None: df.to_parquet(dataset_root / "index.parquet", index=False) df.to_json(dataset_root / "index.jsonl", orient="records", lines=True) def write_json(payload: dict[str, Any], path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, sort_keys=True, default=json_default)) def write_bytes(path: Path, payload: bytes) -> None: path.parent.mkdir(parents=True, exist_ok=True) if path.exists(): return path.write_bytes(payload) def write_mask(path: Path, mask: np.ndarray) -> None: path.parent.mkdir(parents=True, exist_ok=True) if path.exists(): return Image.fromarray(mask.astype(np.uint8), mode="L").save(path) def link_or_copy_file(source: Path, target: Path) -> None: target.parent.mkdir(parents=True, exist_ok=True) if target.exists(): return try: os.link(source, target) except OSError: shutil.copy2(source, target) def image_size_from_file(path: Path) -> tuple[int, int]: with Image.open(path) as image: return image.size def image_size_from_bytes(payload: bytes) -> tuple[int, int]: with Image.open(io_from_bytes(payload)) as image: return image.size def io_from_bytes(payload: bytes): from io import BytesIO return BytesIO(payload) def rel_to_root(path: Path, root: Path) -> str: return path.relative_to(root).as_posix() def stable_hash_split(sample_id: str) -> str: bucket = int(hashlib.sha1(sample_id.encode()).hexdigest()[:8], 16) % 10 if bucket == 0: return "val" if bucket == 1: return "test" return "train" def phenobench_processed_split(*, source_split: str, sample_id: str) -> str: if source_split == "val": return "test" bucket = int(hashlib.sha1(f"phenobench:{sample_id}".encode()).hexdigest()[:8], 16) % 10 if bucket == 0: return "val" return "train" def phenobench_semantic_summary(mask: np.ndarray) -> dict[str, Any]: total_pixels = max(1, int(mask.size)) unique_values, counts = np.unique(mask.astype(np.uint16), return_counts=True) fractions = {int(value): float(count) / float(total_pixels) for value, count in zip(unique_values, counts, strict=True)} crop_fraction = fractions.get(1, 0.0) weed_fraction = fractions.get(2, 0.0) partial_crop_fraction = fractions.get(3, 0.0) partial_weed_fraction = fractions.get(4, 0.0) crop_present = (crop_fraction + partial_crop_fraction) > 0.0 weed_present = (weed_fraction + partial_weed_fraction) > 0.0 derived_tags: list[str] = [] if crop_present: derived_tags.append("crop_present") if weed_present: derived_tags.append("weed_presence") elif crop_present: derived_tags.append("weed_free_crop_context") return { "fractions": { "background_fraction": round(fractions.get(0, 0.0), 4), "crop_fraction": round(crop_fraction, 4), "weed_fraction": round(weed_fraction, 4), "partial_crop_fraction": round(partial_crop_fraction, 4), "partial_weed_fraction": round(partial_weed_fraction, 4), }, "derived_tags": derived_tags, } def count_by(df: pd.DataFrame, column: str) -> dict[str, int]: series = df[column].fillna("none").value_counts().sort_index() return {str(key): int(value) for key, value in series.items()} def json_default(value: Any) -> Any: if isinstance(value, set): return sorted(value) if isinstance(value, Path): return str(value) raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable") def ensure_list(value: Any) -> list[Any]: if isinstance(value, list): return value return [value] def load_rugd_color_map(path: Path) -> dict[int, str]: mapping: dict[int, str] = {} with ZipFile(path) as zf: raw = zf.read("RUGD_annotations/RUGD_annotation-colormap.txt").decode().splitlines() for line in raw: class_id, class_name, red, green, blue = line.split() encoded = (int(red) << 16) | (int(green) << 8) | int(blue) mapping[encoded] = class_name return mapping def load_rellis_ontology(path: Path) -> dict[int, str]: mapping: dict[int, str] = {} with ZipFile(path) as zf: raw = zf.read("Rellis_3D_ontology/ontology.csv").decode().splitlines() for row in csv.DictReader(raw): mapping[int(row["output_value"])] = row["class_name"] return mapping def load_rellis_split_map(path: Path) -> dict[str, str]: mapping: dict[str, str] = {} with ZipFile(path) as zf: for split_name in ("train.lst", "val.lst", "test.lst"): split = split_name.split(".")[0] for line in zf.read(split_name).decode().splitlines(): image_rel, _mask_rel = line.split() mapping[image_rel] = split return mapping def scene_to_split(scene: str, split_map: dict[str, set[str]]) -> str: for split, scenes in split_map.items(): if scene in scenes: return split raise KeyError(f"No split registered for scene: {scene}") def encode_rgb_mask(mask: np.ndarray) -> np.ndarray: return ( (mask[:, :, 0].astype(np.uint32) << 16) | (mask[:, :, 1].astype(np.uint32) << 8) | mask[:, :, 2].astype(np.uint32) ) def map_encoded_mask(encoded: np.ndarray, keys: np.ndarray, values: np.ndarray) -> np.ndarray: indices = np.searchsorted(keys, encoded.ravel()) valid = indices < len(keys) matched = np.zeros_like(valid, dtype=bool) matched[valid] = keys[indices[valid]] == encoded.ravel()[valid] if not np.all(valid & matched): missing = np.unique(encoded.ravel()[~(valid & matched)])[:10] raise ValueError(f"Encountered unknown RGB labels: {missing.tolist()}") return values[indices].reshape(encoded.shape) def validate_coarse_mapping(native_classes: set[str], coarse_map: dict[str, int], name: str) -> None: missing = sorted(native_classes - set(coarse_map)) if missing: raise ValueError(f"{name} coarse mapping is missing classes: {missing}") if __name__ == "__main__": main()