autofarm / scripts /prepare_public_datasets.py
isabelku's picture
AutoFarm Space deploy
826dd96
#!/usr/bin/env python3
from __future__ import annotations
import hashlib
import json
import os
import shutil
import sys
import xml.etree.ElementTree as ET
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from zipfile import ZipFile
import numpy as np
import pandas as pd
import yaml
from PIL import Image
REPO_ROOT = Path(__file__).resolve().parents[1]
DOWNLOADS_ROOT = REPO_ROOT / "data_local" / "downloads"
PROCESSED_ROOT = REPO_ROOT / "data" / "processed"
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from autofarm.unified_weed_dataset import (
PROCESSED_DATASET_NAME as ACTIVE_GROUND_WEEDS_DATASET_NAME,
build_phenobench_weed_dataset,
)
GROUND_WEED_CLASSES = {
"segmentation": {0: "background", 1: "crop", 2: "weed"},
"binary_vegetation": {0: "vegetation", 255: "background"},
"detection": {0: "weed"},
"semantic_segmentation": {
0: "background",
1: "crop",
2: "weed",
3: "partial_crop",
4: "partial_weed",
},
}
@dataclass(frozen=True)
class DatasetPaths:
root: Path
images: Path
masks: Path
labels: Path
def main() -> None:
PROCESSED_ROOT.mkdir(parents=True, exist_ok=True)
summary: dict[str, Any] = {
"generated_from": str(REPO_ROOT),
"blocked": {
"zone_state_bootstrap": (
"Missing data/raw/field/field_boundary.geojson, so no field-specific "
"geospatial bootstrap could be materialized."
)
},
}
ground_weeds = prepare_ground_weeds_phenobench()
replay = prepare_replay_multimodal()
summary["datasets"] = {
ACTIVE_GROUND_WEEDS_DATASET_NAME: ground_weeds,
"replay_multimodal": replay,
}
write_json(summary, PROCESSED_ROOT / "manifest.json")
print("Prepared datasets:")
for key, value in summary["datasets"].items():
print(f" - {key}: {value['records']} records")
def prepare_ground_weeds_phenobench() -> dict[str, Any]:
# The active weed pipeline is single-source PhenoBench only; mixed-source public builders are legacy.
return build_phenobench_weed_dataset(
downloads_root=DOWNLOADS_ROOT,
output_root=PROCESSED_ROOT / ACTIVE_GROUND_WEEDS_DATASET_NAME,
)
def process_cwfid(paths: DatasetPaths, records: list[dict[str, Any]]) -> dict[str, Any]:
source_root = DOWNLOADS_ROOT / "cwfid"
split_config = yaml.safe_load((source_root / "train_test_split.yaml").read_text())
provided_train = {f"{idx:03d}" for idx in split_config["train"]}
provided_test = {f"{idx:03d}" for idx in split_config["test"]}
count = 0
for image_path in sorted((source_root / "images").glob("*_image.png")):
sample_id = image_path.stem.split("_")[0]
split = "train" if sample_id in provided_train else "test" if sample_id in provided_test else "unspecified"
vegetation_mask_path = source_root / "masks" / f"{sample_id}_mask.png"
crop_weed_mask_path = source_root / "annotations" / f"{sample_id}_annotation.png"
polygon_yaml_path = source_root / "annotations" / f"{sample_id}_annotation.yaml"
width, height = image_size_from_file(image_path)
image_target = paths.images / f"cwfid__{sample_id}.png"
vegetation_target = paths.masks / f"cwfid__{sample_id}__vegetation.png"
crop_weed_target = paths.masks / f"cwfid__{sample_id}__crop_weed.png"
label_target = paths.labels / f"cwfid__{sample_id}.json"
link_or_copy_file(image_path, image_target)
link_or_copy_file(vegetation_mask_path, vegetation_target)
link_or_copy_file(crop_weed_mask_path, crop_weed_target)
annotations = yaml.safe_load(polygon_yaml_path.read_text())
polygons = [
{
"class_name": obj["type"],
"points_xy": list(
zip(
ensure_list(obj["points"]["x"]),
ensure_list(obj["points"]["y"]),
strict=True,
)
),
}
for obj in annotations["annotation"]
]
label_payload = {
"dataset": "cwfid",
"sample_id": sample_id,
"split": split,
"image": {
"path": rel_to_root(image_target, paths.root),
"width": width,
"height": height,
},
"annotation_type": "segmentation_and_polygons",
"binary_vegetation_mask": {
"path": rel_to_root(vegetation_target, paths.root),
"value_map": {"vegetation": 0, "background": 255},
},
"crop_weed_mask": {
"path": rel_to_root(crop_weed_target, paths.root),
"color_map": {
"background": [0, 0, 0],
"weed": [255, 0, 0],
"crop": [0, 255, 0],
},
},
"polygons": polygons,
}
write_json(label_payload, label_target)
records.append(
{
"sample_id": sample_id,
"source_dataset": "cwfid",
"split": split,
"annotation_type": "segmentation_and_polygons",
"image_path": rel_to_root(image_target, paths.root),
"label_path": rel_to_root(label_target, paths.root),
"mask_path": rel_to_root(crop_weed_target, paths.root),
"aux_mask_path": rel_to_root(vegetation_target, paths.root),
"width": width,
"height": height,
"scene": None,
"native_annotation_format": "png_mask + yaml_polygons",
}
)
count += 1
return {
"raw_source": "data_local/downloads/cwfid/",
"records": count,
"provided_splits": {
"train": len(provided_train),
"test": len(provided_test),
},
}
def process_ronin(paths: DatasetPaths, records: list[dict[str, Any]]) -> dict[str, Any]:
ronin_zip = DOWNLOADS_ROOT / "Ronin_OPEN_DB.zip"
count = 0
with ZipFile(ronin_zip) as zf:
annotation_members = sorted(
name
for name in zf.namelist()
if name.startswith("Ronin_OPEN_DB/annotations/") and name.endswith(".xml")
)
for xml_member in annotation_members:
root = ET.fromstring(zf.read(xml_member))
image_name = root.findtext("filename")
if not image_name:
continue
sample_id = Path(image_name).stem
image_member = f"Ronin_OPEN_DB/raw images/{image_name}"
if image_member not in zf.namelist():
continue
split = stable_hash_split(sample_id)
image_bytes = zf.read(image_member)
width, height = image_size_from_bytes(image_bytes)
image_target = paths.images / f"ronin__{sample_id}.jpg"
label_target = paths.labels / f"ronin__{sample_id}.json"
write_bytes(image_target, image_bytes)
boxes = []
for obj in root.findall("object"):
box = obj.find("bndbox")
if box is None:
continue
boxes.append(
{
"class_name": obj.findtext("name", default="weed"),
"bbox_xyxy": [
int(box.findtext("xmin", default="0")),
int(box.findtext("ymin", default="0")),
int(box.findtext("xmax", default="0")),
int(box.findtext("ymax", default="0")),
],
}
)
label_payload = {
"dataset": "ronin_open_db",
"sample_id": sample_id,
"split": split,
"image": {
"path": rel_to_root(image_target, paths.root),
"width": width,
"height": height,
},
"annotation_type": "detection_boxes",
"objects": boxes,
}
write_json(label_payload, label_target)
records.append(
{
"sample_id": sample_id,
"source_dataset": "ronin_open_db",
"split": split,
"annotation_type": "detection_boxes",
"image_path": rel_to_root(image_target, paths.root),
"label_path": rel_to_root(label_target, paths.root),
"mask_path": None,
"aux_mask_path": None,
"width": width,
"height": height,
"scene": None,
"native_annotation_format": "pascal_voc_xml",
}
)
count += 1
return {
"raw_source": "data_local/downloads/Ronin_OPEN_DB.zip",
"records": count,
"split_policy": "Deterministic hash split: 80% train, 10% val, 10% test.",
}
def process_phenobench(paths: DatasetPaths, records: list[dict[str, Any]]) -> dict[str, Any]:
phenobench_zip = DOWNLOADS_ROOT / "PhenoBench-v110.zip"
if not phenobench_zip.exists():
return {
"raw_source": str(phenobench_zip),
"records": 0,
"note": "PhenoBench archive not found locally; weed-negative reference images were not imported.",
}
count = 0
clean_crop_count = 0
weed_positive_count = 0
split_counts: dict[str, int] = {"train": 0, "val": 0, "test": 0}
source_split_counts: dict[str, int] = {"train": 0, "val": 0}
with ZipFile(phenobench_zip) as zf:
members = set(zf.namelist())
for source_split in ("train", "val"):
semantic_members = sorted(
name
for name in members
if name.startswith(f"PhenoBench/{source_split}/semantics/") and name.endswith(".png")
)
for semantic_member in semantic_members:
sample_id = Path(semantic_member).stem
image_member = f"PhenoBench/{source_split}/images/{sample_id}.png"
if image_member not in members:
continue
image_bytes = zf.read(image_member)
width, height = image_size_from_bytes(image_bytes)
semantic_mask = np.asarray(Image.open(zf.open(semantic_member)), dtype=np.uint16)
semantic_summary = phenobench_semantic_summary(semantic_mask)
split = phenobench_processed_split(source_split=source_split, sample_id=sample_id)
image_target = paths.images / f"phenobench__{source_split}__{sample_id}.png"
mask_target = paths.masks / f"phenobench__{source_split}__{sample_id}.png"
label_target = paths.labels / f"phenobench__{source_split}__{sample_id}.json"
write_bytes(image_target, image_bytes)
write_mask(mask_target, semantic_mask.astype(np.uint8))
label_payload = {
"dataset": "phenobench",
"sample_id": sample_id,
"source_split": source_split,
"split": split,
"image": {
"path": rel_to_root(image_target, paths.root),
"width": width,
"height": height,
},
"annotation_type": "semantic_segmentation",
"semantic_mask": {
"path": rel_to_root(mask_target, paths.root),
"class_map": GROUND_WEED_CLASSES["semantic_segmentation"],
},
"semantic_summary": semantic_summary["fractions"],
"derived_tags": semantic_summary["derived_tags"],
}
write_json(label_payload, label_target)
records.append(
{
"sample_id": sample_id,
"source_dataset": "phenobench",
"split": split,
"annotation_type": "semantic_segmentation",
"image_path": rel_to_root(image_target, paths.root),
"label_path": rel_to_root(label_target, paths.root),
"mask_path": rel_to_root(mask_target, paths.root),
"aux_mask_path": None,
"width": width,
"height": height,
"scene": source_split,
"native_annotation_format": "uint16_semantic_mask",
"source_split": source_split,
"crop_fraction": semantic_summary["fractions"]["crop_fraction"],
"partial_crop_fraction": semantic_summary["fractions"]["partial_crop_fraction"],
"weed_fraction": semantic_summary["fractions"]["weed_fraction"],
"partial_weed_fraction": semantic_summary["fractions"]["partial_weed_fraction"],
"derived_tags": json.dumps(semantic_summary["derived_tags"]),
}
)
if "weed_presence" in semantic_summary["derived_tags"]:
weed_positive_count += 1
if "weed_free_crop_context" in semantic_summary["derived_tags"]:
clean_crop_count += 1
split_counts[split] = split_counts.get(split, 0) + 1
source_split_counts[source_split] = source_split_counts.get(source_split, 0) + 1
count += 1
return {
"raw_source": str(phenobench_zip),
"records": count,
"source_splits": source_split_counts,
"processed_splits": split_counts,
"weed_positive_records": weed_positive_count,
"weed_free_crop_context_records": clean_crop_count,
"split_policy": (
"Source train is deterministically partitioned into processed train/val; "
"source val is reserved for processed test because PhenoBench test images ship without labels."
),
}
def prepare_replay_multimodal() -> dict[str, Any]:
dataset_root = PROCESSED_ROOT / "replay_multimodal"
dataset_root.mkdir(parents=True, exist_ok=True)
bag_src = DOWNLOADS_ROOT / "RELLIS-3D-full-stack-merged-60s.bag"
bag_target = dataset_root / "rellis_3d_full_stack_merged_60s.bag"
if not bag_target.exists():
os.symlink(bag_src.resolve(), bag_target)
topics = [
"/img_node/intensity_image",
"/img_node/noise_image",
"/img_node/range_image",
"/imu/data",
"/imu/data_raw",
"/nerian/left/image_raw",
"/nerian/right/image_raw",
"/odometry/filtered",
"/os1_cloud_node/points",
"/pylon_camera_node/image_raw",
"/tf",
"/tf_static",
"/vectornav/GPS",
"/vectornav/IMU",
"/vectornav/Odom",
"/velodyne_points",
"/warthog_velocity_controller/cmd_vel",
"/warthog_velocity_controller/odom",
]
metadata = {
"dataset": "replay_multimodal",
"description": "Stable replay entrypoint for the 60-second RELLIS-3D full-stack ROS bag.",
"records": 1,
"bag_path": rel_to_root(bag_target, dataset_root),
"source_bag_path": str(bag_src),
"topics": topics,
"note": "This dataset is a replay/evaluation asset, not a supervised training table.",
}
write_json(metadata, dataset_root / "manifest.json")
return metadata
def dataset_paths(root: Path) -> DatasetPaths:
images = root / "images"
masks = root / "masks"
labels = root / "labels"
for path in (images, masks, labels):
path.mkdir(parents=True, exist_ok=True)
return DatasetPaths(root=root, images=images, masks=masks, labels=labels)
def write_index_files(df: pd.DataFrame, dataset_root: Path) -> None:
df.to_parquet(dataset_root / "index.parquet", index=False)
df.to_json(dataset_root / "index.jsonl", orient="records", lines=True)
def write_json(payload: dict[str, Any], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True, default=json_default))
def write_bytes(path: Path, payload: bytes) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists():
return
path.write_bytes(payload)
def write_mask(path: Path, mask: np.ndarray) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists():
return
Image.fromarray(mask.astype(np.uint8), mode="L").save(path)
def link_or_copy_file(source: Path, target: Path) -> None:
target.parent.mkdir(parents=True, exist_ok=True)
if target.exists():
return
try:
os.link(source, target)
except OSError:
shutil.copy2(source, target)
def image_size_from_file(path: Path) -> tuple[int, int]:
with Image.open(path) as image:
return image.size
def image_size_from_bytes(payload: bytes) -> tuple[int, int]:
with Image.open(io_from_bytes(payload)) as image:
return image.size
def io_from_bytes(payload: bytes):
from io import BytesIO
return BytesIO(payload)
def rel_to_root(path: Path, root: Path) -> str:
return path.relative_to(root).as_posix()
def stable_hash_split(sample_id: str) -> str:
bucket = int(hashlib.sha1(sample_id.encode()).hexdigest()[:8], 16) % 10
if bucket == 0:
return "val"
if bucket == 1:
return "test"
return "train"
def phenobench_processed_split(*, source_split: str, sample_id: str) -> str:
if source_split == "val":
return "test"
bucket = int(hashlib.sha1(f"phenobench:{sample_id}".encode()).hexdigest()[:8], 16) % 10
if bucket == 0:
return "val"
return "train"
def phenobench_semantic_summary(mask: np.ndarray) -> dict[str, Any]:
total_pixels = max(1, int(mask.size))
unique_values, counts = np.unique(mask.astype(np.uint16), return_counts=True)
fractions = {int(value): float(count) / float(total_pixels) for value, count in zip(unique_values, counts, strict=True)}
crop_fraction = fractions.get(1, 0.0)
weed_fraction = fractions.get(2, 0.0)
partial_crop_fraction = fractions.get(3, 0.0)
partial_weed_fraction = fractions.get(4, 0.0)
crop_present = (crop_fraction + partial_crop_fraction) > 0.0
weed_present = (weed_fraction + partial_weed_fraction) > 0.0
derived_tags: list[str] = []
if crop_present:
derived_tags.append("crop_present")
if weed_present:
derived_tags.append("weed_presence")
elif crop_present:
derived_tags.append("weed_free_crop_context")
return {
"fractions": {
"background_fraction": round(fractions.get(0, 0.0), 4),
"crop_fraction": round(crop_fraction, 4),
"weed_fraction": round(weed_fraction, 4),
"partial_crop_fraction": round(partial_crop_fraction, 4),
"partial_weed_fraction": round(partial_weed_fraction, 4),
},
"derived_tags": derived_tags,
}
def count_by(df: pd.DataFrame, column: str) -> dict[str, int]:
series = df[column].fillna("none").value_counts().sort_index()
return {str(key): int(value) for key, value in series.items()}
def json_default(value: Any) -> Any:
if isinstance(value, set):
return sorted(value)
if isinstance(value, Path):
return str(value)
raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable")
def ensure_list(value: Any) -> list[Any]:
if isinstance(value, list):
return value
return [value]
def load_rugd_color_map(path: Path) -> dict[int, str]:
mapping: dict[int, str] = {}
with ZipFile(path) as zf:
raw = zf.read("RUGD_annotations/RUGD_annotation-colormap.txt").decode().splitlines()
for line in raw:
class_id, class_name, red, green, blue = line.split()
encoded = (int(red) << 16) | (int(green) << 8) | int(blue)
mapping[encoded] = class_name
return mapping
def load_rellis_ontology(path: Path) -> dict[int, str]:
mapping: dict[int, str] = {}
with ZipFile(path) as zf:
raw = zf.read("Rellis_3D_ontology/ontology.csv").decode().splitlines()
for row in csv.DictReader(raw):
mapping[int(row["output_value"])] = row["class_name"]
return mapping
def load_rellis_split_map(path: Path) -> dict[str, str]:
mapping: dict[str, str] = {}
with ZipFile(path) as zf:
for split_name in ("train.lst", "val.lst", "test.lst"):
split = split_name.split(".")[0]
for line in zf.read(split_name).decode().splitlines():
image_rel, _mask_rel = line.split()
mapping[image_rel] = split
return mapping
def scene_to_split(scene: str, split_map: dict[str, set[str]]) -> str:
for split, scenes in split_map.items():
if scene in scenes:
return split
raise KeyError(f"No split registered for scene: {scene}")
def encode_rgb_mask(mask: np.ndarray) -> np.ndarray:
return (
(mask[:, :, 0].astype(np.uint32) << 16)
| (mask[:, :, 1].astype(np.uint32) << 8)
| mask[:, :, 2].astype(np.uint32)
)
def map_encoded_mask(encoded: np.ndarray, keys: np.ndarray, values: np.ndarray) -> np.ndarray:
indices = np.searchsorted(keys, encoded.ravel())
valid = indices < len(keys)
matched = np.zeros_like(valid, dtype=bool)
matched[valid] = keys[indices[valid]] == encoded.ravel()[valid]
if not np.all(valid & matched):
missing = np.unique(encoded.ravel()[~(valid & matched)])[:10]
raise ValueError(f"Encountered unknown RGB labels: {missing.tolist()}")
return values[indices].reshape(encoded.shape)
def validate_coarse_mapping(native_classes: set[str], coarse_map: dict[str, int], name: str) -> None:
missing = sorted(native_classes - set(coarse_map))
if missing:
raise ValueError(f"{name} coarse mapping is missing classes: {missing}")
if __name__ == "__main__":
main()