| |
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import os |
| import shutil |
| import sys |
| import xml.etree.ElementTree as ET |
| import csv |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
| from zipfile import ZipFile |
|
|
| import numpy as np |
| import pandas as pd |
| import yaml |
| from PIL import Image |
|
|
|
|
| REPO_ROOT = Path(__file__).resolve().parents[1] |
| DOWNLOADS_ROOT = REPO_ROOT / "data_local" / "downloads" |
| PROCESSED_ROOT = REPO_ROOT / "data" / "processed" |
|
|
| if str(REPO_ROOT) not in sys.path: |
| sys.path.insert(0, str(REPO_ROOT)) |
|
|
| from autofarm.unified_weed_dataset import ( |
| PROCESSED_DATASET_NAME as ACTIVE_GROUND_WEEDS_DATASET_NAME, |
| build_phenobench_weed_dataset, |
| ) |
|
|
|
|
| GROUND_WEED_CLASSES = { |
| "segmentation": {0: "background", 1: "crop", 2: "weed"}, |
| "binary_vegetation": {0: "vegetation", 255: "background"}, |
| "detection": {0: "weed"}, |
| "semantic_segmentation": { |
| 0: "background", |
| 1: "crop", |
| 2: "weed", |
| 3: "partial_crop", |
| 4: "partial_weed", |
| }, |
| } |
|
|
| @dataclass(frozen=True) |
| class DatasetPaths: |
| root: Path |
| images: Path |
| masks: Path |
| labels: Path |
|
|
|
|
| def main() -> None: |
| PROCESSED_ROOT.mkdir(parents=True, exist_ok=True) |
| summary: dict[str, Any] = { |
| "generated_from": str(REPO_ROOT), |
| "blocked": { |
| "zone_state_bootstrap": ( |
| "Missing data/raw/field/field_boundary.geojson, so no field-specific " |
| "geospatial bootstrap could be materialized." |
| ) |
| }, |
| } |
|
|
| ground_weeds = prepare_ground_weeds_phenobench() |
| replay = prepare_replay_multimodal() |
|
|
| summary["datasets"] = { |
| ACTIVE_GROUND_WEEDS_DATASET_NAME: ground_weeds, |
| "replay_multimodal": replay, |
| } |
|
|
| write_json(summary, PROCESSED_ROOT / "manifest.json") |
| print("Prepared datasets:") |
| for key, value in summary["datasets"].items(): |
| print(f" - {key}: {value['records']} records") |
|
|
|
|
| def prepare_ground_weeds_phenobench() -> dict[str, Any]: |
| |
| return build_phenobench_weed_dataset( |
| downloads_root=DOWNLOADS_ROOT, |
| output_root=PROCESSED_ROOT / ACTIVE_GROUND_WEEDS_DATASET_NAME, |
| ) |
|
|
|
|
| def process_cwfid(paths: DatasetPaths, records: list[dict[str, Any]]) -> dict[str, Any]: |
| source_root = DOWNLOADS_ROOT / "cwfid" |
| split_config = yaml.safe_load((source_root / "train_test_split.yaml").read_text()) |
| provided_train = {f"{idx:03d}" for idx in split_config["train"]} |
| provided_test = {f"{idx:03d}" for idx in split_config["test"]} |
|
|
| count = 0 |
| for image_path in sorted((source_root / "images").glob("*_image.png")): |
| sample_id = image_path.stem.split("_")[0] |
| split = "train" if sample_id in provided_train else "test" if sample_id in provided_test else "unspecified" |
|
|
| vegetation_mask_path = source_root / "masks" / f"{sample_id}_mask.png" |
| crop_weed_mask_path = source_root / "annotations" / f"{sample_id}_annotation.png" |
| polygon_yaml_path = source_root / "annotations" / f"{sample_id}_annotation.yaml" |
|
|
| width, height = image_size_from_file(image_path) |
|
|
| image_target = paths.images / f"cwfid__{sample_id}.png" |
| vegetation_target = paths.masks / f"cwfid__{sample_id}__vegetation.png" |
| crop_weed_target = paths.masks / f"cwfid__{sample_id}__crop_weed.png" |
| label_target = paths.labels / f"cwfid__{sample_id}.json" |
|
|
| link_or_copy_file(image_path, image_target) |
| link_or_copy_file(vegetation_mask_path, vegetation_target) |
| link_or_copy_file(crop_weed_mask_path, crop_weed_target) |
|
|
| annotations = yaml.safe_load(polygon_yaml_path.read_text()) |
| polygons = [ |
| { |
| "class_name": obj["type"], |
| "points_xy": list( |
| zip( |
| ensure_list(obj["points"]["x"]), |
| ensure_list(obj["points"]["y"]), |
| strict=True, |
| ) |
| ), |
| } |
| for obj in annotations["annotation"] |
| ] |
|
|
| label_payload = { |
| "dataset": "cwfid", |
| "sample_id": sample_id, |
| "split": split, |
| "image": { |
| "path": rel_to_root(image_target, paths.root), |
| "width": width, |
| "height": height, |
| }, |
| "annotation_type": "segmentation_and_polygons", |
| "binary_vegetation_mask": { |
| "path": rel_to_root(vegetation_target, paths.root), |
| "value_map": {"vegetation": 0, "background": 255}, |
| }, |
| "crop_weed_mask": { |
| "path": rel_to_root(crop_weed_target, paths.root), |
| "color_map": { |
| "background": [0, 0, 0], |
| "weed": [255, 0, 0], |
| "crop": [0, 255, 0], |
| }, |
| }, |
| "polygons": polygons, |
| } |
| write_json(label_payload, label_target) |
|
|
| records.append( |
| { |
| "sample_id": sample_id, |
| "source_dataset": "cwfid", |
| "split": split, |
| "annotation_type": "segmentation_and_polygons", |
| "image_path": rel_to_root(image_target, paths.root), |
| "label_path": rel_to_root(label_target, paths.root), |
| "mask_path": rel_to_root(crop_weed_target, paths.root), |
| "aux_mask_path": rel_to_root(vegetation_target, paths.root), |
| "width": width, |
| "height": height, |
| "scene": None, |
| "native_annotation_format": "png_mask + yaml_polygons", |
| } |
| ) |
| count += 1 |
|
|
| return { |
| "raw_source": "data_local/downloads/cwfid/", |
| "records": count, |
| "provided_splits": { |
| "train": len(provided_train), |
| "test": len(provided_test), |
| }, |
| } |
|
|
|
|
| def process_ronin(paths: DatasetPaths, records: list[dict[str, Any]]) -> dict[str, Any]: |
| ronin_zip = DOWNLOADS_ROOT / "Ronin_OPEN_DB.zip" |
| count = 0 |
|
|
| with ZipFile(ronin_zip) as zf: |
| annotation_members = sorted( |
| name |
| for name in zf.namelist() |
| if name.startswith("Ronin_OPEN_DB/annotations/") and name.endswith(".xml") |
| ) |
|
|
| for xml_member in annotation_members: |
| root = ET.fromstring(zf.read(xml_member)) |
| image_name = root.findtext("filename") |
| if not image_name: |
| continue |
|
|
| sample_id = Path(image_name).stem |
| image_member = f"Ronin_OPEN_DB/raw images/{image_name}" |
| if image_member not in zf.namelist(): |
| continue |
|
|
| split = stable_hash_split(sample_id) |
| image_bytes = zf.read(image_member) |
| width, height = image_size_from_bytes(image_bytes) |
|
|
| image_target = paths.images / f"ronin__{sample_id}.jpg" |
| label_target = paths.labels / f"ronin__{sample_id}.json" |
|
|
| write_bytes(image_target, image_bytes) |
|
|
| boxes = [] |
| for obj in root.findall("object"): |
| box = obj.find("bndbox") |
| if box is None: |
| continue |
| boxes.append( |
| { |
| "class_name": obj.findtext("name", default="weed"), |
| "bbox_xyxy": [ |
| int(box.findtext("xmin", default="0")), |
| int(box.findtext("ymin", default="0")), |
| int(box.findtext("xmax", default="0")), |
| int(box.findtext("ymax", default="0")), |
| ], |
| } |
| ) |
|
|
| label_payload = { |
| "dataset": "ronin_open_db", |
| "sample_id": sample_id, |
| "split": split, |
| "image": { |
| "path": rel_to_root(image_target, paths.root), |
| "width": width, |
| "height": height, |
| }, |
| "annotation_type": "detection_boxes", |
| "objects": boxes, |
| } |
| write_json(label_payload, label_target) |
|
|
| records.append( |
| { |
| "sample_id": sample_id, |
| "source_dataset": "ronin_open_db", |
| "split": split, |
| "annotation_type": "detection_boxes", |
| "image_path": rel_to_root(image_target, paths.root), |
| "label_path": rel_to_root(label_target, paths.root), |
| "mask_path": None, |
| "aux_mask_path": None, |
| "width": width, |
| "height": height, |
| "scene": None, |
| "native_annotation_format": "pascal_voc_xml", |
| } |
| ) |
| count += 1 |
|
|
| return { |
| "raw_source": "data_local/downloads/Ronin_OPEN_DB.zip", |
| "records": count, |
| "split_policy": "Deterministic hash split: 80% train, 10% val, 10% test.", |
| } |
|
|
|
|
| def process_phenobench(paths: DatasetPaths, records: list[dict[str, Any]]) -> dict[str, Any]: |
| phenobench_zip = DOWNLOADS_ROOT / "PhenoBench-v110.zip" |
| if not phenobench_zip.exists(): |
| return { |
| "raw_source": str(phenobench_zip), |
| "records": 0, |
| "note": "PhenoBench archive not found locally; weed-negative reference images were not imported.", |
| } |
|
|
| count = 0 |
| clean_crop_count = 0 |
| weed_positive_count = 0 |
| split_counts: dict[str, int] = {"train": 0, "val": 0, "test": 0} |
| source_split_counts: dict[str, int] = {"train": 0, "val": 0} |
|
|
| with ZipFile(phenobench_zip) as zf: |
| members = set(zf.namelist()) |
| for source_split in ("train", "val"): |
| semantic_members = sorted( |
| name |
| for name in members |
| if name.startswith(f"PhenoBench/{source_split}/semantics/") and name.endswith(".png") |
| ) |
| for semantic_member in semantic_members: |
| sample_id = Path(semantic_member).stem |
| image_member = f"PhenoBench/{source_split}/images/{sample_id}.png" |
| if image_member not in members: |
| continue |
|
|
| image_bytes = zf.read(image_member) |
| width, height = image_size_from_bytes(image_bytes) |
| semantic_mask = np.asarray(Image.open(zf.open(semantic_member)), dtype=np.uint16) |
| semantic_summary = phenobench_semantic_summary(semantic_mask) |
| split = phenobench_processed_split(source_split=source_split, sample_id=sample_id) |
|
|
| image_target = paths.images / f"phenobench__{source_split}__{sample_id}.png" |
| mask_target = paths.masks / f"phenobench__{source_split}__{sample_id}.png" |
| label_target = paths.labels / f"phenobench__{source_split}__{sample_id}.json" |
|
|
| write_bytes(image_target, image_bytes) |
| write_mask(mask_target, semantic_mask.astype(np.uint8)) |
|
|
| label_payload = { |
| "dataset": "phenobench", |
| "sample_id": sample_id, |
| "source_split": source_split, |
| "split": split, |
| "image": { |
| "path": rel_to_root(image_target, paths.root), |
| "width": width, |
| "height": height, |
| }, |
| "annotation_type": "semantic_segmentation", |
| "semantic_mask": { |
| "path": rel_to_root(mask_target, paths.root), |
| "class_map": GROUND_WEED_CLASSES["semantic_segmentation"], |
| }, |
| "semantic_summary": semantic_summary["fractions"], |
| "derived_tags": semantic_summary["derived_tags"], |
| } |
| write_json(label_payload, label_target) |
|
|
| records.append( |
| { |
| "sample_id": sample_id, |
| "source_dataset": "phenobench", |
| "split": split, |
| "annotation_type": "semantic_segmentation", |
| "image_path": rel_to_root(image_target, paths.root), |
| "label_path": rel_to_root(label_target, paths.root), |
| "mask_path": rel_to_root(mask_target, paths.root), |
| "aux_mask_path": None, |
| "width": width, |
| "height": height, |
| "scene": source_split, |
| "native_annotation_format": "uint16_semantic_mask", |
| "source_split": source_split, |
| "crop_fraction": semantic_summary["fractions"]["crop_fraction"], |
| "partial_crop_fraction": semantic_summary["fractions"]["partial_crop_fraction"], |
| "weed_fraction": semantic_summary["fractions"]["weed_fraction"], |
| "partial_weed_fraction": semantic_summary["fractions"]["partial_weed_fraction"], |
| "derived_tags": json.dumps(semantic_summary["derived_tags"]), |
| } |
| ) |
|
|
| if "weed_presence" in semantic_summary["derived_tags"]: |
| weed_positive_count += 1 |
| if "weed_free_crop_context" in semantic_summary["derived_tags"]: |
| clean_crop_count += 1 |
| split_counts[split] = split_counts.get(split, 0) + 1 |
| source_split_counts[source_split] = source_split_counts.get(source_split, 0) + 1 |
| count += 1 |
|
|
| return { |
| "raw_source": str(phenobench_zip), |
| "records": count, |
| "source_splits": source_split_counts, |
| "processed_splits": split_counts, |
| "weed_positive_records": weed_positive_count, |
| "weed_free_crop_context_records": clean_crop_count, |
| "split_policy": ( |
| "Source train is deterministically partitioned into processed train/val; " |
| "source val is reserved for processed test because PhenoBench test images ship without labels." |
| ), |
| } |
|
|
| def prepare_replay_multimodal() -> dict[str, Any]: |
| dataset_root = PROCESSED_ROOT / "replay_multimodal" |
| dataset_root.mkdir(parents=True, exist_ok=True) |
|
|
| bag_src = DOWNLOADS_ROOT / "RELLIS-3D-full-stack-merged-60s.bag" |
| bag_target = dataset_root / "rellis_3d_full_stack_merged_60s.bag" |
| if not bag_target.exists(): |
| os.symlink(bag_src.resolve(), bag_target) |
|
|
| topics = [ |
| "/img_node/intensity_image", |
| "/img_node/noise_image", |
| "/img_node/range_image", |
| "/imu/data", |
| "/imu/data_raw", |
| "/nerian/left/image_raw", |
| "/nerian/right/image_raw", |
| "/odometry/filtered", |
| "/os1_cloud_node/points", |
| "/pylon_camera_node/image_raw", |
| "/tf", |
| "/tf_static", |
| "/vectornav/GPS", |
| "/vectornav/IMU", |
| "/vectornav/Odom", |
| "/velodyne_points", |
| "/warthog_velocity_controller/cmd_vel", |
| "/warthog_velocity_controller/odom", |
| ] |
| metadata = { |
| "dataset": "replay_multimodal", |
| "description": "Stable replay entrypoint for the 60-second RELLIS-3D full-stack ROS bag.", |
| "records": 1, |
| "bag_path": rel_to_root(bag_target, dataset_root), |
| "source_bag_path": str(bag_src), |
| "topics": topics, |
| "note": "This dataset is a replay/evaluation asset, not a supervised training table.", |
| } |
| write_json(metadata, dataset_root / "manifest.json") |
| return metadata |
|
|
|
|
| def dataset_paths(root: Path) -> DatasetPaths: |
| images = root / "images" |
| masks = root / "masks" |
| labels = root / "labels" |
| for path in (images, masks, labels): |
| path.mkdir(parents=True, exist_ok=True) |
| return DatasetPaths(root=root, images=images, masks=masks, labels=labels) |
|
|
|
|
| def write_index_files(df: pd.DataFrame, dataset_root: Path) -> None: |
| df.to_parquet(dataset_root / "index.parquet", index=False) |
| df.to_json(dataset_root / "index.jsonl", orient="records", lines=True) |
|
|
|
|
| def write_json(payload: dict[str, Any], path: Path) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, indent=2, sort_keys=True, default=json_default)) |
|
|
|
|
| def write_bytes(path: Path, payload: bytes) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| if path.exists(): |
| return |
| path.write_bytes(payload) |
|
|
|
|
| def write_mask(path: Path, mask: np.ndarray) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| if path.exists(): |
| return |
| Image.fromarray(mask.astype(np.uint8), mode="L").save(path) |
|
|
|
|
| def link_or_copy_file(source: Path, target: Path) -> None: |
| target.parent.mkdir(parents=True, exist_ok=True) |
| if target.exists(): |
| return |
| try: |
| os.link(source, target) |
| except OSError: |
| shutil.copy2(source, target) |
|
|
|
|
| def image_size_from_file(path: Path) -> tuple[int, int]: |
| with Image.open(path) as image: |
| return image.size |
|
|
|
|
| def image_size_from_bytes(payload: bytes) -> tuple[int, int]: |
| with Image.open(io_from_bytes(payload)) as image: |
| return image.size |
|
|
|
|
| def io_from_bytes(payload: bytes): |
| from io import BytesIO |
|
|
| return BytesIO(payload) |
|
|
|
|
| def rel_to_root(path: Path, root: Path) -> str: |
| return path.relative_to(root).as_posix() |
|
|
|
|
| def stable_hash_split(sample_id: str) -> str: |
| bucket = int(hashlib.sha1(sample_id.encode()).hexdigest()[:8], 16) % 10 |
| if bucket == 0: |
| return "val" |
| if bucket == 1: |
| return "test" |
| return "train" |
|
|
|
|
| def phenobench_processed_split(*, source_split: str, sample_id: str) -> str: |
| if source_split == "val": |
| return "test" |
| bucket = int(hashlib.sha1(f"phenobench:{sample_id}".encode()).hexdigest()[:8], 16) % 10 |
| if bucket == 0: |
| return "val" |
| return "train" |
|
|
|
|
| def phenobench_semantic_summary(mask: np.ndarray) -> dict[str, Any]: |
| total_pixels = max(1, int(mask.size)) |
| unique_values, counts = np.unique(mask.astype(np.uint16), return_counts=True) |
| fractions = {int(value): float(count) / float(total_pixels) for value, count in zip(unique_values, counts, strict=True)} |
| crop_fraction = fractions.get(1, 0.0) |
| weed_fraction = fractions.get(2, 0.0) |
| partial_crop_fraction = fractions.get(3, 0.0) |
| partial_weed_fraction = fractions.get(4, 0.0) |
| crop_present = (crop_fraction + partial_crop_fraction) > 0.0 |
| weed_present = (weed_fraction + partial_weed_fraction) > 0.0 |
| derived_tags: list[str] = [] |
| if crop_present: |
| derived_tags.append("crop_present") |
| if weed_present: |
| derived_tags.append("weed_presence") |
| elif crop_present: |
| derived_tags.append("weed_free_crop_context") |
| return { |
| "fractions": { |
| "background_fraction": round(fractions.get(0, 0.0), 4), |
| "crop_fraction": round(crop_fraction, 4), |
| "weed_fraction": round(weed_fraction, 4), |
| "partial_crop_fraction": round(partial_crop_fraction, 4), |
| "partial_weed_fraction": round(partial_weed_fraction, 4), |
| }, |
| "derived_tags": derived_tags, |
| } |
|
|
|
|
| def count_by(df: pd.DataFrame, column: str) -> dict[str, int]: |
| series = df[column].fillna("none").value_counts().sort_index() |
| return {str(key): int(value) for key, value in series.items()} |
|
|
|
|
| def json_default(value: Any) -> Any: |
| if isinstance(value, set): |
| return sorted(value) |
| if isinstance(value, Path): |
| return str(value) |
| raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable") |
|
|
|
|
| def ensure_list(value: Any) -> list[Any]: |
| if isinstance(value, list): |
| return value |
| return [value] |
|
|
|
|
| def load_rugd_color_map(path: Path) -> dict[int, str]: |
| mapping: dict[int, str] = {} |
| with ZipFile(path) as zf: |
| raw = zf.read("RUGD_annotations/RUGD_annotation-colormap.txt").decode().splitlines() |
| for line in raw: |
| class_id, class_name, red, green, blue = line.split() |
| encoded = (int(red) << 16) | (int(green) << 8) | int(blue) |
| mapping[encoded] = class_name |
| return mapping |
|
|
|
|
| def load_rellis_ontology(path: Path) -> dict[int, str]: |
| mapping: dict[int, str] = {} |
| with ZipFile(path) as zf: |
| raw = zf.read("Rellis_3D_ontology/ontology.csv").decode().splitlines() |
| for row in csv.DictReader(raw): |
| mapping[int(row["output_value"])] = row["class_name"] |
| return mapping |
|
|
|
|
| def load_rellis_split_map(path: Path) -> dict[str, str]: |
| mapping: dict[str, str] = {} |
| with ZipFile(path) as zf: |
| for split_name in ("train.lst", "val.lst", "test.lst"): |
| split = split_name.split(".")[0] |
| for line in zf.read(split_name).decode().splitlines(): |
| image_rel, _mask_rel = line.split() |
| mapping[image_rel] = split |
| return mapping |
|
|
|
|
| def scene_to_split(scene: str, split_map: dict[str, set[str]]) -> str: |
| for split, scenes in split_map.items(): |
| if scene in scenes: |
| return split |
| raise KeyError(f"No split registered for scene: {scene}") |
|
|
|
|
| def encode_rgb_mask(mask: np.ndarray) -> np.ndarray: |
| return ( |
| (mask[:, :, 0].astype(np.uint32) << 16) |
| | (mask[:, :, 1].astype(np.uint32) << 8) |
| | mask[:, :, 2].astype(np.uint32) |
| ) |
|
|
|
|
| def map_encoded_mask(encoded: np.ndarray, keys: np.ndarray, values: np.ndarray) -> np.ndarray: |
| indices = np.searchsorted(keys, encoded.ravel()) |
| valid = indices < len(keys) |
| matched = np.zeros_like(valid, dtype=bool) |
| matched[valid] = keys[indices[valid]] == encoded.ravel()[valid] |
| if not np.all(valid & matched): |
| missing = np.unique(encoded.ravel()[~(valid & matched)])[:10] |
| raise ValueError(f"Encountered unknown RGB labels: {missing.tolist()}") |
| return values[indices].reshape(encoded.shape) |
|
|
|
|
| def validate_coarse_mapping(native_classes: set[str], coarse_map: dict[str, int], name: str) -> None: |
| missing = sorted(native_classes - set(coarse_map)) |
| if missing: |
| raise ValueError(f"{name} coarse mapping is missing classes: {missing}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|