st24hour's picture
Upload folder using huggingface_hub
e101805 verified
#!/usr/bin/env python3
"""
Merge per-rank process list CSVs into a single consolidated CSV.
Default behavior:
- Searches an input directory for files matching pattern 'process_list_autogen_rank*.csv'
- Concatenates them (union of columns)
- If a column named 'slide_id' exists, drop duplicates by that column keeping the last
- Writes the merged CSV as 'process_list_autogen.csv' into the output directory (defaults to input)
Usage examples:
python create_patches/merge_process_lists.py \
--input-dir create_patches/results_temp \
--output-dir create_patches/results_temp
# Custom pattern or output filename
python create_patches/merge_process_lists.py \
--input-dir /path/to/folder \
--pattern "process_list_autogen_rank*.csv" \
--output-filename process_list_autogen.csv
"""
import argparse
import glob
import os
import sys
from typing import List, Dict
import pandas as pd
def parse_args() -> argparse.Namespace:
"""Parse CLI arguments."""
p = argparse.ArgumentParser(description="Merge rank CSVs into one process_list_autogen.csv")
p.add_argument(
"--input-dir",
required=True,
help="Directory that contains rank CSVs (e.g., results_temp)",
)
p.add_argument(
"--output-dir",
default=None,
help="Directory to write the merged CSV. Defaults to --input-dir",
)
p.add_argument(
"--pattern",
default="process_list_autogen_rank*.csv",
help="Glob pattern to match rank CSVs in input-dir",
)
p.add_argument(
"--output-filename",
default="process_list_autogen.csv",
help="Filename of the merged CSV",
)
p.add_argument(
"--no-dedupe",
action="store_true",
help="Disable de-duplication. By default, duplicates by 'slide_id' are dropped if that column exists.",
)
p.add_argument(
"--verbose",
action="store_true",
help="Print detailed progress information",
)
return p.parse_args()
def find_rank_csvs(input_dir: str, pattern: str, verbose: bool = False) -> List[str]:
"""Find matching CSV paths in input_dir using the provided pattern."""
search_glob = os.path.join(os.path.abspath(input_dir), pattern)
paths = sorted(glob.glob(search_glob))
if verbose:
print(f"Searching: {search_glob}")
print(f"Found {len(paths)} files")
for p in paths:
print(f" - {p}")
return paths
def merge_csvs(paths: List[str], dedupe_by_slide_id: bool = True, verbose: bool = False) -> pd.DataFrame:
"""Read and concatenate CSVs; optionally de-duplicate by 'slide_id'.
De-duplication policy (improved):
- Prefer rows with status indicating completion first (processed > already_exist > others)
- For ties, prefer the row from the most recently modified source CSV
- After selecting a representative row per slide, reconcile 'process' so that if ANY
source row for that slide indicates completion (status processed/already_exist or process==0),
the merged row's process will be set to 0 (do not reprocess).
"""
if not paths:
raise FileNotFoundError("No CSV files matched the given pattern.")
frames = []
for p in paths:
try:
df = pd.read_csv(p)
# Keep source file provenance for debugging
df["_source_file"] = os.path.basename(p)
try:
# Add source mtime to help resolve conflicts deterministically
df["_source_mtime"] = os.path.getmtime(p)
except Exception:
df["_source_mtime"] = float("nan")
frames.append(df)
if verbose:
print(f"Loaded {p}: {len(df)} rows, {len(df.columns)} columns")
except Exception as e:
raise RuntimeError(f"Failed to read '{p}': {e}") from e
merged = pd.concat(frames, axis=0, ignore_index=True, sort=False)
# TEMPORARY FILTER: exclude rows whose slide_id ends with .jpg (case-insensitive)
# This is a temporary safeguard to avoid merging mistakenly generated JPG entries.
# Remove this block once the source CSV generation is corrected.
# (Requested behavior: only .jpg; not filtering .jpeg on purpose.)
if "slide_id" in merged.columns:
jpg_mask = merged["slide_id"].astype(str).str.lower().str.endswith(".jpg")
if verbose:
try:
removed = int(jpg_mask.sum())
except Exception:
removed = 0
print(f"Filtering out JPG slide_id rows: removed {removed}")
merged = merged.loc[~jpg_mask].reset_index(drop=True)
# END TEMPORARY FILTER
if verbose:
print(f"Concatenated rows: {len(merged)}")
if dedupe_by_slide_id and "slide_id" in merged.columns:
before = len(merged)
# Determine status precedence
status_priority: Dict[str, int] = {
# Higher is better (i.e., more complete)
"processed": 3,
"already_exist": 2,
# Failures or unknowns get lowest by default
"failed": 1,
"failed_seg": 1,
"failed_patch": 1,
# Typical in this repo for in-progress/to-be-processed
"tbp": 0,
}
def map_status(s):
try:
return status_priority.get(str(s), 1)
except Exception:
return 1
# Compute helper columns
merged["_status_priority"] = merged.get("status").map(map_status) if "status" in merged.columns else 1
if "_source_mtime" not in merged.columns:
merged["_source_mtime"] = float("nan")
# For robust reconciliation of 'process', compute per-slide if any row indicates completion
def slide_any_done(g: pd.DataFrame) -> bool:
done_status = g["status"].astype(str).isin(["processed", "already_exist"]) if "status" in g.columns else pd.Series([False] * len(g), index=g.index)
proc_is_zero = (g["process"] == 0) if "process" in g.columns else pd.Series([False] * len(g), index=g.index)
return bool((done_status | proc_is_zero).any())
any_done_map = merged.groupby("slide_id", as_index=True).apply(slide_any_done)
# Pick the best representative row per slide deterministically
merged_sorted = merged.sort_values(
by=["slide_id", "_status_priority", "_source_mtime"],
ascending=[True, False, False],
kind="mergesort", # stable to keep deterministic ties
)
deduped = merged_sorted.drop_duplicates(subset=["slide_id"], keep="first").reset_index(drop=True)
# Reconcile 'process': if any source row says it's done, mark as 0
if "process" in deduped.columns:
deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1)
else:
deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1)
merged = deduped
if verbose:
print(f"De-duplicated by 'slide_id': {before} -> {len(merged)}")
elif dedupe_by_slide_id:
if verbose:
print("Column 'slide_id' not found; skipping de-duplication by slide_id.")
return merged
def main() -> int:
args = parse_args()
input_dir = args.input_dir
output_dir = args.output_dir or input_dir
os.makedirs(output_dir, exist_ok=True)
paths = find_rank_csvs(input_dir, args.pattern, verbose=args.verbose)
try:
merged = merge_csvs(paths, dedupe_by_slide_id=(not args.no_dedupe), verbose=args.verbose)
except Exception as e:
print(f"[ERROR] {e}", file=sys.stderr)
return 2
out_path = os.path.join(output_dir, args.output_filename)
try:
merged.to_csv(out_path, index=False)
except Exception as e:
print(f"[ERROR] Failed to write output CSV '{out_path}': {e}", file=sys.stderr)
return 3
if args.verbose:
print(f"Saved: {out_path} ({len(merged)} rows, {len(merged.columns)} columns)")
return 0
if __name__ == "__main__":
raise SystemExit(main())