| | from __future__ import annotations |
| |
|
| | import base64 |
| | import imghdr |
| | import io |
| | import json |
| | import logging |
| | import os |
| | import random |
| | import re |
| | import shutil |
| | import stat |
| | import tempfile |
| | import zipfile |
| | from collections import Counter |
| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Dict, List, Tuple |
| |
|
| | import gradio as gr |
| | import numpy as np |
| | import pandas as pd |
| | import yaml |
| | from PIL import Image |
| |
|
| | |
| | try: |
| | import cv2 |
| | except ImportError: |
| | cv2 = None |
| | try: |
| | import imagehash |
| | except ImportError: |
| | imagehash = None |
| | try: |
| | import fastdup |
| | except ImportError: |
| | fastdup = None |
| | try: |
| | from ultralytics import YOLO |
| | except ImportError: |
| | YOLO = None |
| | try: |
| | from roboflow import Roboflow |
| | except ImportError: |
| | Roboflow = None |
| | try: |
| | from cleanlab.pruning import get_noise_indices |
| | except ImportError: |
| | get_noise_indices = None |
| |
|
| | |
| | TMP_ROOT = Path(tempfile.gettempdir()) / "rf_datasets" |
| | TMP_ROOT.mkdir(parents=True, exist_ok=True) |
| | CPU_COUNT = int(os.getenv("QC_CPU", 1)) |
| | BATCH_SIZE = int(os.getenv("QC_BATCH", 4)) |
| | SAMPLE_LIMIT = int(os.getenv("QC_SAMPLE", 200)) |
| |
|
| | DEFAULT_W = { |
| | "Integrity": 0.25, |
| | "Class balance": 0.10, |
| | "Image quality": 0.15, |
| | "Duplicates": 0.10, |
| | "Model QA": 0.30, |
| | "Label issues": 0.10, |
| | } |
| |
|
| | logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") |
| |
|
| | _model_cache: dict[str, YOLO] = {} |
| | autoinc = 0 |
| |
|
| | |
| | |
| | |
| | @dataclass |
| | class QCConfig: |
| | blur_thr: float |
| | iou_thr: float |
| | conf_thr: float |
| | weights: str | None |
| | cpu_count: int = CPU_COUNT |
| | batch_size: int = BATCH_SIZE |
| | sample_limit:int = SAMPLE_LIMIT |
| |
|
| | def load_yaml(path: Path) -> Dict: |
| | with path.open('r', encoding='utf-8') as f: |
| | return yaml.safe_load(f) |
| |
|
| | def load_class_names(yaml_path: Path) -> List[str]: |
| | data = load_yaml(yaml_path) |
| | names = data.get("names", []) |
| | if isinstance(names, dict): |
| | return [names[k] for k in sorted(names, key=lambda x: int(x))] |
| | return list(names) |
| |
|
| | def parse_label_file(path: Path) -> list[tuple[int, float, float, float, float]]: |
| | if not path or not path.exists() or path.stat().st_size == 0: |
| | return [] |
| | try: |
| | arr = np.loadtxt(path, dtype=float) |
| | if arr.ndim == 1: |
| | arr = arr.reshape(1, -1) |
| | return [tuple(row) for row in arr] |
| | except Exception: |
| | return [] |
| |
|
| | def guess_image_dirs(root: Path) -> List[Path]: |
| | candidates = [ |
| | root/'images', |
| | root/'train'/'images', |
| | root/'valid'/'images', |
| | root/'val' /'images', |
| | root/'test' /'images', |
| | ] |
| | return [d for d in candidates if d.exists()] |
| |
|
| | def gather_dataset(root: Path, yaml_path: Path | None): |
| | if yaml_path is None: |
| | yamls = list(root.glob('*.yaml')) |
| | if not yamls: |
| | raise FileNotFoundError("Dataset YAML not found") |
| | yaml_path = yamls[0] |
| | meta = load_yaml(yaml_path) |
| | img_dirs = guess_image_dirs(root) |
| | if not img_dirs: |
| | raise FileNotFoundError("images/ directory missing") |
| | imgs = [p for d in img_dirs for p in d.rglob('*.*') if imghdr.what(p)] |
| | labels_roots = {d.parent/'labels' for d in img_dirs} |
| | lbls = [ |
| | next((lr/f"{p.stem}.txt" for lr in labels_roots if (lr/f"{p.stem}.txt").exists()), None) |
| | for p in imgs |
| | ] |
| | return imgs, lbls, meta |
| |
|
| | def get_model(weights: str) -> YOLO | None: |
| | if not weights or YOLO is None: |
| | return None |
| | if weights not in _model_cache: |
| | _model_cache[weights] = YOLO(weights) |
| | return _model_cache[weights] |
| |
|
| | |
| | def _quality_stat_args(args: Tuple[Path, float]) -> Tuple[Path, bool, bool, bool]: |
| | path, thr = args |
| | if cv2 is None: |
| | return path, False, False, False |
| | im = cv2.imread(str(path)) |
| | if im is None: |
| | return path, False, False, False |
| | gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY) |
| | lap = cv2.Laplacian(gray, cv2.CV_64F).var() |
| | mean = gray.mean() |
| | return path, lap < thr, mean < 25, mean > 230 |
| |
|
| | def _is_corrupt(path: Path) -> bool: |
| | try: |
| | with Image.open(path) as im: |
| | im.verify() |
| | return False |
| | except Exception: |
| | return True |
| |
|
| | def qc_integrity(imgs: List[Path], lbls: List[Path], cfg: QCConfig) -> Dict: |
| | missing = [i for i, l in zip(imgs, lbls) if l is None] |
| | corrupt = [] |
| | sample = imgs[:cfg.sample_limit] |
| | with ThreadPoolExecutor(max_workers=cfg.cpu_count) as ex: |
| | fut = {ex.submit(_is_corrupt, p): p for p in sample} |
| | for f in as_completed(fut): |
| | if f.result(): |
| | corrupt.append(fut[f]) |
| | score = 100 - (len(missing) + len(corrupt)) / max(len(imgs), 1) * 100 |
| | return { |
| | "name": "Integrity", |
| | "score": max(score, 0), |
| | "details": { |
| | "missing_label_files": [str(p) for p in missing], |
| | "corrupt_images": [str(p) for p in corrupt], |
| | } |
| | } |
| |
|
| | def qc_class_balance(lbls: List[Path], cfg: QCConfig) -> Dict: |
| | counts, boxes = Counter(), [] |
| | for l in lbls[:cfg.sample_limit]: |
| | bs = parse_label_file(l) if l else [] |
| | boxes.append(len(bs)) |
| | counts.update(int(b[0]) for b in bs) |
| | if not counts: |
| | return {"name": "Class balance", "score": 0, "details": "No labels"} |
| | bal = min(counts.values()) / max(counts.values()) * 100 |
| | return { |
| | "name": "Class balance", |
| | "score": bal, |
| | "details": { |
| | "class_counts": dict(counts), |
| | "boxes_per_image": { |
| | "min": min(boxes), |
| | "max": max(boxes), |
| | "mean": float(np.mean(boxes)) |
| | } |
| | } |
| | } |
| |
|
| | def qc_image_quality(imgs: List[Path], cfg: QCConfig) -> Dict: |
| | if cv2 is None: |
| | return {"name": "Image quality", "score": 100, "details": "cv2 missing"} |
| | blurry, dark, bright = [], [], [] |
| | sample = imgs[:cfg.sample_limit] |
| | with ThreadPoolExecutor(max_workers=cfg.cpu_count) as ex: |
| | args = [(p, cfg.blur_thr) for p in sample] |
| | for p, isb, isd, isB in ex.map(_quality_stat_args, args): |
| | if isb: blurry.append(p) |
| | if isd: dark.append(p) |
| | if isB: bright.append(p) |
| | bad = len({*blurry, *dark, *bright}) |
| | score = 100 - bad / max(len(sample), 1) * 100 |
| | return { |
| | "name": "Image quality", |
| | "score": score, |
| | "details": { |
| | "blurry": [str(p) for p in blurry], |
| | "dark": [str(p) for p in dark], |
| | "bright": [str(p) for p in bright] |
| | } |
| | } |
| |
|
| | def qc_duplicates(imgs: List[Path], cfg: QCConfig) -> Dict: |
| | if fastdup is not None and len(imgs) > 50: |
| | try: |
| | fd = fastdup.create( |
| | input_dir=str(Path(imgs[0]).parent.parent), |
| | work_dir=str(TMP_ROOT / "fastdup") |
| | ) |
| | fd.run() |
| | try: |
| | cc = fd.connected_components_grouped(sort_by="comp_size", ascending=False) |
| | clusters = cc["files"].tolist() if "files" in cc.columns else cc.groupby("component")["filename"].apply(list).tolist() |
| | except Exception: |
| | clusters = fd.connected_components() |
| | dup = sum(len(c) - 1 for c in clusters) |
| | score = max(0.0, 100 - dup / len(imgs) * 100) |
| | return {"name": "Duplicates", "score": score, "details": {"groups": clusters[:50]}} |
| | except Exception as e: |
| | return {"name": "Duplicates", "score": 100.0, "details": {"fastdup_error": str(e)}} |
| | return {"name": "Duplicates", "score": 100.0, "details": {"note": "skipped"}} |
| |
|
| | def _rel_iou(b1, b2): |
| | x1, y1, w1, h1 = b1 |
| | x2, y2, w2, h2 = b2 |
| | xa1, ya1 = x1 - w1/2, y1 - h1/2 |
| | xa2, ya2 = x1 + w1/2, y1 + h1/2 |
| | xb1, yb1 = x2 - w2/2, y2 - h2/2 |
| | xb2, yb2 = x2 + w2/2, y2 + h2/2 |
| | ix1 = max(xa1, xb1); iy1 = max(ya1, yb1) |
| | ix2 = min(xa2, xb2); iy2 = min(ya2, yb2) |
| | inter = max(ix2 - ix1, 0) * max(iy2 - iy1, 0) |
| | union = w1*h1 + w2*h2 - inter |
| | return inter/union if union else 0.0 |
| |
|
| | def qc_model_qa(imgs: List[Path], lbls: List[Path], cfg: QCConfig) -> Dict: |
| | model = get_model(cfg.weights) |
| | if model is None: |
| | return {"name": "Model QA", "score": 100, "details": "skipped"} |
| | ious, mism = [], [] |
| | sample = imgs[:cfg.sample_limit] |
| | for i in range(0, len(sample), cfg.batch_size): |
| | batch = sample[i:i+cfg.batch_size] |
| | results = model.predict(batch, verbose=False, half=True, dynamic=True) |
| | for p, res in zip(batch, results): |
| | gt = parse_label_file(Path(p).parent.parent/'labels'/f"{Path(p).stem}.txt") |
| | for cls, x, y, w, h in gt: |
| | best = 0.0 |
| | for b, c, conf in zip( |
| | res.boxes.xywh.cpu().numpy(), |
| | res.boxes.cls.cpu().numpy(), |
| | res.boxes.conf.cpu().numpy() |
| | ): |
| | if conf < cfg.conf_thr or int(c) != cls: |
| | continue |
| | best = max(best, _rel_iou((x, y, w, h), tuple(b))) |
| | ious.append(best) |
| | if best < cfg.iou_thr: |
| | mism.append(str(p)) |
| | miou = float(np.mean(ious)) if ious else 1.0 |
| | return {"name": "Model QA", "score": miou*100, "details": {"mean_iou": miou, "mismatches": mism[:50]}} |
| |
|
| | def qc_label_issues(imgs: List[Path], lbls: List[Path], cfg: QCConfig) -> Dict: |
| | if get_noise_indices is None: |
| | return {"name": "Label issues", "score": 100, "details": "skipped"} |
| | labels, idxs = [], [] |
| | sample = imgs[:cfg.sample_limit] |
| | for i, p in enumerate(sample): |
| | bs = parse_label_file(lbls[i]) if lbls[i] else [] |
| | for cls, *_ in bs: |
| | labels.append(int(cls)); idxs.append(i) |
| | if not labels: |
| | return {"name": "Label issues", "score": 100, "details": "no GT"} |
| | labels_arr = np.array(labels) |
| | uniq = sorted(set(labels_arr)) |
| | probs = np.eye(len(uniq))[np.searchsorted(uniq, labels_arr)] |
| | noise = get_noise_indices(labels=labels_arr, probabilities=probs) |
| | flags = sorted({idxs[n] for n in noise}) |
| | files = [str(sample[i]) for i in flags] |
| | score = 100 - len(flags)/len(labels)*100 |
| | return {"name": "Label issues", "score": score, "details": {"files": files[:50]}} |
| |
|
| | def aggregate(results: List[Dict]) -> float: |
| | return sum(DEFAULT_W[r["name"]]*r["score"] for r in results) |
| |
|
| | |
| | def gather_class_counts( |
| | dataset_info_list: List[Tuple[str, List[str], List[str], str]] |
| | ) -> Counter[str]: |
| | counts: Counter[str] = Counter() |
| | for dloc, class_names, splits, _ in dataset_info_list: |
| | for split in splits: |
| | labels_dir = Path(dloc) / split / "labels" |
| | if not labels_dir.exists(): |
| | continue |
| | for lp in labels_dir.rglob("*.txt"): |
| | for cls_id_float, *_ in parse_label_file(lp): |
| | idx = int(cls_id_float) |
| | if 0 <= idx < len(class_names): |
| | counts[class_names[idx]] += 1 |
| | return counts |
| |
|
| | |
| | RF_RE = re.compile(r"https?://universe\.roboflow\.com/([^/]+)/([^/]+)/dataset/(\d+)") |
| |
|
| | def download_rf_dataset(url: str, rf_api: Roboflow, dest: Path) -> Path: |
| | m = RF_RE.match(url.strip()) |
| | if not m: |
| | raise ValueError(f"Bad RF URL: {url}") |
| | ws, proj, ver = m.groups() |
| | ds_dir = dest / f"{ws}_{proj}_v{ver}" |
| | if ds_dir.exists(): |
| | return ds_dir |
| | pr = rf_api.workspace(ws).project(proj) |
| | pr.version(int(ver)).download("yolov8", location=str(ds_dir)) |
| | return ds_dir |
| |
|
| | |
| | def run_quality( |
| | root: Path, |
| | yaml_file: Path | None, |
| | weights: Path | None, |
| | cfg: QCConfig, |
| | run_dup: bool, |
| | run_modelqa: bool |
| | ) -> Tuple[str, pd.DataFrame]: |
| | imgs, lbls, meta = gather_dataset(root, yaml_file) |
| | results = [ |
| | qc_integrity(imgs, lbls, cfg), |
| | qc_class_balance(lbls, cfg), |
| | qc_image_quality(imgs, cfg), |
| | qc_duplicates(imgs, cfg) if run_dup else {"name":"Duplicates","score":100,"details":"skipped"}, |
| | qc_model_qa(imgs, lbls, cfg) if run_modelqa else {"name":"Model QA","score":100,"details":"skipped"}, |
| | qc_label_issues(imgs, lbls, cfg) if run_modelqa else {"name":"Label issues","score":100,"details":"skipped"}, |
| | ] |
| | final = aggregate(results) |
| | md = [f"## **{meta.get('name', root.name)}** β Score {final:.1f}/100"] |
| | for r in results: |
| | md.append(f"### {r['name']} {r['score']:.1f}") |
| | md.append("<details><summary>details</summary>\n```json") |
| | md.append(json.dumps(r["details"], indent=2)) |
| | md.append("```\n</details>\n") |
| | df = pd.DataFrame.from_dict( |
| | next(r for r in results if r["name"] == "Class balance")["details"]["class_counts"], |
| | orient="index", columns=["count"] |
| | ) |
| | df.index.name = "class" |
| | return "\n".join(md), df |
| |
|
| | def merge_datasets( |
| | dataset_info_list: List[Tuple[str, List[str], List[str], str]], |
| | class_map_df: pd.DataFrame, |
| | out_dir: Path = Path("merged_dataset"), |
| | seed: int = 1234, |
| | ) -> Path: |
| | random.seed(seed) |
| | if out_dir.exists(): |
| | shutil.rmtree(out_dir, onerror=lambda f, p, _: (os.chmod(p, stat.S_IWRITE), f(p))) |
| | for sub in ("train/images","train/labels","valid/images","valid/labels"): |
| | (out_dir / sub).mkdir(parents=True, exist_ok=True) |
| |
|
| | class_name_mapping = { |
| | row["original_class"]: row["new_name"] if not row["remove"] else "__REMOVED__" |
| | for _, row in class_map_df.iterrows() |
| | } |
| | limits_per_merged = { |
| | row["new_name"]: int(row["max_images"]) |
| | for _, row in class_map_df.iterrows() |
| | if not row["remove"] |
| | } |
| | active_classes = [c for c in sorted(set(class_name_mapping.values())) if c != "__REMOVED__"] |
| | id_map = {cls: idx for idx, cls in enumerate(active_classes)} |
| |
|
| | image_to_classes: dict[str, set[str]] = {} |
| | image_to_label: dict[str, Path] = {} |
| | class_to_images: dict[str, set[str]] = {c: set() for c in active_classes} |
| |
|
| | for dloc, class_names_dataset, splits, _ in dataset_info_list: |
| | for split in splits: |
| | labels_root = Path(dloc) / split / "labels" |
| | if not labels_root.exists(): |
| | continue |
| | for lp in labels_root.rglob("*.txt"): |
| | cls_set: set[str] = set() |
| | for cls_id_float, *rest in parse_label_file(lp): |
| | idx = int(cls_id_float) |
| | if 0 <= idx < len(class_names_dataset): |
| | orig = class_names_dataset[idx] |
| | new = class_name_mapping.get(orig, orig) |
| | if new in active_classes: |
| | cls_set.add(new) |
| | if not cls_set: |
| | continue |
| | img_path = str(lp.parent.parent / "images" / f"{lp.stem}.jpg") |
| | image_to_classes[img_path] = cls_set |
| | image_to_label[img_path] = lp |
| | for c in cls_set: |
| | class_to_images[c].add(img_path) |
| |
|
| | selected_images: set[str] = set() |
| | counters = {c: 0 for c in active_classes} |
| | pool = [img for imgs in class_to_images.values() for img in imgs] |
| | random.shuffle(pool) |
| |
|
| | for img in pool: |
| | cs = image_to_classes[img] |
| | if any(counters[c] >= limits_per_merged.get(c, 0) for c in cs): |
| | continue |
| | selected_images.add(img) |
| | for c in cs: |
| | counters[c] += 1 |
| |
|
| | for img in selected_images: |
| | split = "train" if random.random() < 0.9 else "valid" |
| | dst_img = out_dir / split / "images" / Path(img).name |
| | dst_img.parent.mkdir(parents=True, exist_ok=True) |
| | shutil.copy(img, dst_img) |
| |
|
| | lp_src = image_to_label[img] |
| | dst_lbl = out_dir / split / "labels" / lp_src.name |
| | dst_lbl.parent.mkdir(parents=True, exist_ok=True) |
| | lines = lp_src.read_text().splitlines() |
| | new_lines: List[str] = [] |
| | for line in lines: |
| | parts = line.split() |
| | cid = int(parts[0]) |
| | orig = None |
| | |
| | for dloc, class_names_dataset, splits, _ in dataset_info_list: |
| | if str(lp_src).startswith(dloc): |
| | orig = class_names_dataset[cid] if cid < len(class_names_dataset) else None |
| | break |
| | merged = class_name_mapping.get(orig, orig) if orig else None |
| | if merged and merged in active_classes: |
| | new_id = id_map[merged] |
| | new_lines.append(" ".join([str(new_id)] + parts[1:])) |
| | if new_lines: |
| | dst_lbl.write_text("\n".join(new_lines)) |
| | else: |
| | dst_img.unlink(missing_ok=True) |
| |
|
| | data_yaml = { |
| | "path": str(out_dir.resolve()), |
| | "train": "train/images", |
| | "val": "valid/images", |
| | "nc": len(active_classes), |
| | "names": active_classes, |
| | } |
| | (out_dir / "data.yaml").write_text(yaml.safe_dump(data_yaml)) |
| | return out_dir |
| |
|
| | |
| | |
| | |
| | with gr.Blocks(css="#classdf td{min-width:120px}") as demo: |
| | gr.Markdown(""" |
| | # πΉ **YOLOΒ Dataset Toolkit** |
| | _Evaluate β’ Merge β’ Edit β’ Download_ |
| | """) |
| |
|
| | |
| | with gr.Tab("Evaluate"): |
| | api_in = gr.Textbox(label="Roboflow API key", type="password") |
| | url_txt = gr.File(label=".txt of RF dataset URLs", file_types=['.txt']) |
| | zip_in = gr.File(label="Dataset ZIP") |
| | path_in = gr.Textbox(label="Server path") |
| | yaml_in = gr.File(label="Custom YAML", file_types=['.yaml']) |
| | weights_in = gr.File(label="YOLO weights (.pt)") |
| |
|
| | blur_sl = gr.Slider(0.0, 500.0, value=100.0, label="Blur threshold") |
| | iou_sl = gr.Slider(0.0, 1.0, value=0.5, label="IOU threshold") |
| | conf_sl = gr.Slider(0.0, 1.0, value=0.25, label="Min detection confidence") |
| |
|
| | run_dup = gr.Checkbox(label="Check duplicates (fastdup)", value=False) |
| | run_modelqa= gr.Checkbox(label="Run Model QA & cleanlab", value=False) |
| |
|
| | run_eval = gr.Button("Run Evaluation") |
| | out_md = gr.Markdown() |
| | out_df = gr.Dataframe() |
| |
|
| | def _evaluate_cb( |
| | api_key, url_txt, zip_file, server_path, yaml_file, weights, |
| | blur_thr, iou_thr, conf_thr, run_dup, run_modelqa |
| | ): |
| | reports, dfs = [], [] |
| | cfg = QCConfig(blur_thr, iou_thr, conf_thr, weights.name if weights else None) |
| | rf = Roboflow(api_key) if api_key and Roboflow else None |
| |
|
| | if url_txt and rf: |
| | for line in Path(url_txt.name).read_text().splitlines(): |
| | if not line.strip(): continue |
| | try: |
| | ds = download_rf_dataset(line, rf, TMP_ROOT) |
| | md, df = run_quality( |
| | ds, None, |
| | Path(weights.name) if weights else None, |
| | cfg, run_dup, run_modelqa |
| | ) |
| | reports.append(md); dfs.append(df) |
| | except Exception as e: |
| | reports.append(f"### {line}\nβ οΈ {e}") |
| |
|
| | if zip_file: |
| | tmp = Path(tempfile.mkdtemp()) |
| | shutil.unpack_archive(zip_file.name, tmp) |
| | md, df = run_quality( |
| | tmp, |
| | Path(yaml_file.name) if yaml_file else None, |
| | Path(weights.name) if weights else None, |
| | cfg, run_dup, run_modelqa |
| | ) |
| | reports.append(md); dfs.append(df) |
| | shutil.rmtree(tmp, ignore_errors=True) |
| |
|
| | if server_path: |
| | ds = Path(server_path) |
| | md, df = run_quality( |
| | ds, |
| | Path(yaml_file.name) if yaml_file else None, |
| | Path(weights.name) if weights else None, |
| | cfg, run_dup, run_modelqa |
| | ) |
| | reports.append(md); dfs.append(df) |
| |
|
| | summary = "\n---\n".join(reports) if reports else "" |
| | combined = pd.concat(dfs).groupby(level=0).sum() if dfs else pd.DataFrame() |
| | return summary, combined |
| |
|
| | run_eval.click( |
| | _evaluate_cb, |
| | inputs=[api_in, url_txt, zip_in, path_in, yaml_in, weights_in, |
| | blur_sl, iou_sl, conf_sl, run_dup, run_modelqa], |
| | outputs=[out_md, out_df] |
| | ) |
| |
|
| | |
| | with gr.Tab("Merge / Edit"): |
| | gr.Markdown("### 1οΈβ£Β Load one or more datasets") |
| | rf_key = gr.Textbox(label="Roboflow API key", type="password") |
| | rf_urls = gr.File(label=".txt of RF URLs", file_types=['.txt']) |
| | zips_in = gr.Files(label="One or more dataset ZIPs") |
| | load_btn = gr.Button("Load datasets") |
| | load_log = gr.Markdown() |
| | ds_state = gr.State([]) |
| |
|
| | def _load_cb(rf_key, rf_urls_file, zip_files): |
| | global autoinc |
| | info_list, log_lines = [], [] |
| | rf = Roboflow(rf_key) if rf_key and Roboflow else None |
| |
|
| | if rf_urls_file and rf: |
| | for url in Path(rf_urls_file.name).read_text().splitlines(): |
| | url = url.strip() |
| | if not url: continue |
| | try: |
| | ds = download_rf_dataset(url, rf, TMP_ROOT) |
| | names = load_class_names(ds/"data.yaml") |
| | splits = [s for s in ("train","valid","test") if (ds/s).exists()] |
| | info_list.append((str(ds), names, splits, Path(ds).name)) |
| | log_lines.append(f"βοΈ RF dataset **{Path(ds).name}** loaded ({len(names)} classes)") |
| | except Exception as e: |
| | log_lines.append(f"β οΈ RF load failed for {url!r}: {e}") |
| |
|
| | for f in zip_files or []: |
| | autoinc += 1 |
| | tmp = TMP_ROOT / f"zip_{autoinc}" |
| | tmp.mkdir(parents=True, exist_ok=True) |
| | shutil.unpack_archive(f.name, tmp) |
| | yaml_p = next(tmp.rglob("*.yaml"), None) |
| | if yaml_p: |
| | names = load_class_names(yaml_p) |
| | splits = [s for s in ("train","valid","test") if (tmp/s).exists()] |
| | info_list.append((str(tmp), names, splits, tmp.name)) |
| | log_lines.append(f"βοΈ ZIP **{tmp.name}** loaded") |
| |
|
| | return info_list, "\n".join(log_lines) or "No datasets loaded." |
| |
|
| | load_btn.click(_load_cb, [rf_key, rf_urls, zips_in], [ds_state, load_log]) |
| |
|
| | gr.Markdown("### 2οΈβ£Β Edit class mapping / limits / removal") |
| | class_df = gr.Dataframe( |
| | headers=["original_class","new_name","max_images","remove"], |
| | datatype=["str","str","number","bool"], |
| | interactive=True, elem_id="classdf" |
| | ) |
| | refresh_btn = gr.Button("Build class table from loaded datasets") |
| |
|
| | def _build_class_df(ds_info): |
| | counts = gather_class_counts(ds_info) |
| | all_names = sorted(counts.keys()) |
| | return pd.DataFrame({ |
| | "original_class": all_names, |
| | "new_name": all_names, |
| | "max_images": [counts[n] for n in all_names], |
| | "remove": [False]*len(all_names), |
| | }) |
| |
|
| | refresh_btn.click(_build_class_df, [ds_state], [class_df]) |
| |
|
| | merge_btn = gr.Button("Merge datasets β¨") |
| | zip_out = gr.File(label="Download merged ZIP") |
| | merge_log = gr.Markdown() |
| |
|
| | def _merge_cb(ds_info, class_df): |
| | if not ds_info: |
| | return None, "β οΈΒ Load datasets first." |
| | out_dir = merge_datasets(ds_info, class_df) |
| | zip_path = shutil.make_archive(str(out_dir), "zip", out_dir) |
| | count = len(list(Path(out_dir).rglob("*.jpg"))) |
| | return zip_path, f"β
Β Merged dataset at **{out_dir}** with {count} images." |
| |
|
| | merge_btn.click(_merge_cb, [ds_state, class_df], [zip_out, merge_log]) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860))) |
| |
|