#!/usr/bin/env python3 """ Merge per-rank process list CSVs into a single consolidated CSV. Default behavior: - Searches an input directory for files matching pattern 'process_list_autogen_rank*.csv' - Concatenates them (union of columns) - If a column named 'slide_id' exists, drop duplicates by that column keeping the last - Writes the merged CSV as 'process_list_autogen.csv' into the output directory (defaults to input) Usage examples: python create_patches/merge_process_lists.py \ --input-dir create_patches/results_temp \ --output-dir create_patches/results_temp # Custom pattern or output filename python create_patches/merge_process_lists.py \ --input-dir /path/to/folder \ --pattern "process_list_autogen_rank*.csv" \ --output-filename process_list_autogen.csv """ import argparse import glob import os import sys from typing import List, Dict import pandas as pd def parse_args() -> argparse.Namespace: """Parse CLI arguments.""" p = argparse.ArgumentParser(description="Merge rank CSVs into one process_list_autogen.csv") p.add_argument( "--input-dir", required=True, help="Directory that contains rank CSVs (e.g., results_temp)", ) p.add_argument( "--output-dir", default=None, help="Directory to write the merged CSV. Defaults to --input-dir", ) p.add_argument( "--pattern", default="process_list_autogen_rank*.csv", help="Glob pattern to match rank CSVs in input-dir", ) p.add_argument( "--output-filename", default="process_list_autogen.csv", help="Filename of the merged CSV", ) p.add_argument( "--no-dedupe", action="store_true", help="Disable de-duplication. By default, duplicates by 'slide_id' are dropped if that column exists.", ) p.add_argument( "--verbose", action="store_true", help="Print detailed progress information", ) return p.parse_args() def find_rank_csvs(input_dir: str, pattern: str, verbose: bool = False) -> List[str]: """Find matching CSV paths in input_dir using the provided pattern.""" search_glob = os.path.join(os.path.abspath(input_dir), pattern) paths = sorted(glob.glob(search_glob)) if verbose: print(f"Searching: {search_glob}") print(f"Found {len(paths)} files") for p in paths: print(f" - {p}") return paths def merge_csvs(paths: List[str], dedupe_by_slide_id: bool = True, verbose: bool = False) -> pd.DataFrame: """Read and concatenate CSVs; optionally de-duplicate by 'slide_id'. De-duplication policy (improved): - Prefer rows with status indicating completion first (processed > already_exist > others) - For ties, prefer the row from the most recently modified source CSV - After selecting a representative row per slide, reconcile 'process' so that if ANY source row for that slide indicates completion (status processed/already_exist or process==0), the merged row's process will be set to 0 (do not reprocess). """ if not paths: raise FileNotFoundError("No CSV files matched the given pattern.") frames = [] for p in paths: try: df = pd.read_csv(p) # Keep source file provenance for debugging df["_source_file"] = os.path.basename(p) try: # Add source mtime to help resolve conflicts deterministically df["_source_mtime"] = os.path.getmtime(p) except Exception: df["_source_mtime"] = float("nan") frames.append(df) if verbose: print(f"Loaded {p}: {len(df)} rows, {len(df.columns)} columns") except Exception as e: raise RuntimeError(f"Failed to read '{p}': {e}") from e merged = pd.concat(frames, axis=0, ignore_index=True, sort=False) # TEMPORARY FILTER: exclude rows whose slide_id ends with .jpg (case-insensitive) # This is a temporary safeguard to avoid merging mistakenly generated JPG entries. # Remove this block once the source CSV generation is corrected. # (Requested behavior: only .jpg; not filtering .jpeg on purpose.) if "slide_id" in merged.columns: jpg_mask = merged["slide_id"].astype(str).str.lower().str.endswith(".jpg") if verbose: try: removed = int(jpg_mask.sum()) except Exception: removed = 0 print(f"Filtering out JPG slide_id rows: removed {removed}") merged = merged.loc[~jpg_mask].reset_index(drop=True) # END TEMPORARY FILTER if verbose: print(f"Concatenated rows: {len(merged)}") if dedupe_by_slide_id and "slide_id" in merged.columns: before = len(merged) # Determine status precedence status_priority: Dict[str, int] = { # Higher is better (i.e., more complete) "processed": 3, "already_exist": 2, # Failures or unknowns get lowest by default "failed": 1, "failed_seg": 1, "failed_patch": 1, # Typical in this repo for in-progress/to-be-processed "tbp": 0, } def map_status(s): try: return status_priority.get(str(s), 1) except Exception: return 1 # Compute helper columns merged["_status_priority"] = merged.get("status").map(map_status) if "status" in merged.columns else 1 if "_source_mtime" not in merged.columns: merged["_source_mtime"] = float("nan") # For robust reconciliation of 'process', compute per-slide if any row indicates completion def slide_any_done(g: pd.DataFrame) -> bool: done_status = g["status"].astype(str).isin(["processed", "already_exist"]) if "status" in g.columns else pd.Series([False] * len(g), index=g.index) proc_is_zero = (g["process"] == 0) if "process" in g.columns else pd.Series([False] * len(g), index=g.index) return bool((done_status | proc_is_zero).any()) any_done_map = merged.groupby("slide_id", as_index=True).apply(slide_any_done) # Pick the best representative row per slide deterministically merged_sorted = merged.sort_values( by=["slide_id", "_status_priority", "_source_mtime"], ascending=[True, False, False], kind="mergesort", # stable to keep deterministic ties ) deduped = merged_sorted.drop_duplicates(subset=["slide_id"], keep="first").reset_index(drop=True) # Reconcile 'process': if any source row says it's done, mark as 0 if "process" in deduped.columns: deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1) else: deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1) merged = deduped if verbose: print(f"De-duplicated by 'slide_id': {before} -> {len(merged)}") elif dedupe_by_slide_id: if verbose: print("Column 'slide_id' not found; skipping de-duplication by slide_id.") return merged def main() -> int: args = parse_args() input_dir = args.input_dir output_dir = args.output_dir or input_dir os.makedirs(output_dir, exist_ok=True) paths = find_rank_csvs(input_dir, args.pattern, verbose=args.verbose) try: merged = merge_csvs(paths, dedupe_by_slide_id=(not args.no_dedupe), verbose=args.verbose) except Exception as e: print(f"[ERROR] {e}", file=sys.stderr) return 2 out_path = os.path.join(output_dir, args.output_filename) try: merged.to_csv(out_path, index=False) except Exception as e: print(f"[ERROR] Failed to write output CSV '{out_path}': {e}", file=sys.stderr) return 3 if args.verbose: print(f"Saved: {out_path} ({len(merged)} rows, {len(merged.columns)} columns)") return 0 if __name__ == "__main__": raise SystemExit(main())