File size: 8,203 Bytes
e101805 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
#!/usr/bin/env python3
"""
Merge per-rank process list CSVs into a single consolidated CSV.
Default behavior:
- Searches an input directory for files matching pattern 'process_list_autogen_rank*.csv'
- Concatenates them (union of columns)
- If a column named 'slide_id' exists, drop duplicates by that column keeping the last
- Writes the merged CSV as 'process_list_autogen.csv' into the output directory (defaults to input)
Usage examples:
python create_patches/merge_process_lists.py \
--input-dir create_patches/results_temp \
--output-dir create_patches/results_temp
# Custom pattern or output filename
python create_patches/merge_process_lists.py \
--input-dir /path/to/folder \
--pattern "process_list_autogen_rank*.csv" \
--output-filename process_list_autogen.csv
"""
import argparse
import glob
import os
import sys
from typing import List, Dict
import pandas as pd
def parse_args() -> argparse.Namespace:
"""Parse CLI arguments."""
p = argparse.ArgumentParser(description="Merge rank CSVs into one process_list_autogen.csv")
p.add_argument(
"--input-dir",
required=True,
help="Directory that contains rank CSVs (e.g., results_temp)",
)
p.add_argument(
"--output-dir",
default=None,
help="Directory to write the merged CSV. Defaults to --input-dir",
)
p.add_argument(
"--pattern",
default="process_list_autogen_rank*.csv",
help="Glob pattern to match rank CSVs in input-dir",
)
p.add_argument(
"--output-filename",
default="process_list_autogen.csv",
help="Filename of the merged CSV",
)
p.add_argument(
"--no-dedupe",
action="store_true",
help="Disable de-duplication. By default, duplicates by 'slide_id' are dropped if that column exists.",
)
p.add_argument(
"--verbose",
action="store_true",
help="Print detailed progress information",
)
return p.parse_args()
def find_rank_csvs(input_dir: str, pattern: str, verbose: bool = False) -> List[str]:
"""Find matching CSV paths in input_dir using the provided pattern."""
search_glob = os.path.join(os.path.abspath(input_dir), pattern)
paths = sorted(glob.glob(search_glob))
if verbose:
print(f"Searching: {search_glob}")
print(f"Found {len(paths)} files")
for p in paths:
print(f" - {p}")
return paths
def merge_csvs(paths: List[str], dedupe_by_slide_id: bool = True, verbose: bool = False) -> pd.DataFrame:
"""Read and concatenate CSVs; optionally de-duplicate by 'slide_id'.
De-duplication policy (improved):
- Prefer rows with status indicating completion first (processed > already_exist > others)
- For ties, prefer the row from the most recently modified source CSV
- After selecting a representative row per slide, reconcile 'process' so that if ANY
source row for that slide indicates completion (status processed/already_exist or process==0),
the merged row's process will be set to 0 (do not reprocess).
"""
if not paths:
raise FileNotFoundError("No CSV files matched the given pattern.")
frames = []
for p in paths:
try:
df = pd.read_csv(p)
# Keep source file provenance for debugging
df["_source_file"] = os.path.basename(p)
try:
# Add source mtime to help resolve conflicts deterministically
df["_source_mtime"] = os.path.getmtime(p)
except Exception:
df["_source_mtime"] = float("nan")
frames.append(df)
if verbose:
print(f"Loaded {p}: {len(df)} rows, {len(df.columns)} columns")
except Exception as e:
raise RuntimeError(f"Failed to read '{p}': {e}") from e
merged = pd.concat(frames, axis=0, ignore_index=True, sort=False)
# TEMPORARY FILTER: exclude rows whose slide_id ends with .jpg (case-insensitive)
# This is a temporary safeguard to avoid merging mistakenly generated JPG entries.
# Remove this block once the source CSV generation is corrected.
# (Requested behavior: only .jpg; not filtering .jpeg on purpose.)
if "slide_id" in merged.columns:
jpg_mask = merged["slide_id"].astype(str).str.lower().str.endswith(".jpg")
if verbose:
try:
removed = int(jpg_mask.sum())
except Exception:
removed = 0
print(f"Filtering out JPG slide_id rows: removed {removed}")
merged = merged.loc[~jpg_mask].reset_index(drop=True)
# END TEMPORARY FILTER
if verbose:
print(f"Concatenated rows: {len(merged)}")
if dedupe_by_slide_id and "slide_id" in merged.columns:
before = len(merged)
# Determine status precedence
status_priority: Dict[str, int] = {
# Higher is better (i.e., more complete)
"processed": 3,
"already_exist": 2,
# Failures or unknowns get lowest by default
"failed": 1,
"failed_seg": 1,
"failed_patch": 1,
# Typical in this repo for in-progress/to-be-processed
"tbp": 0,
}
def map_status(s):
try:
return status_priority.get(str(s), 1)
except Exception:
return 1
# Compute helper columns
merged["_status_priority"] = merged.get("status").map(map_status) if "status" in merged.columns else 1
if "_source_mtime" not in merged.columns:
merged["_source_mtime"] = float("nan")
# For robust reconciliation of 'process', compute per-slide if any row indicates completion
def slide_any_done(g: pd.DataFrame) -> bool:
done_status = g["status"].astype(str).isin(["processed", "already_exist"]) if "status" in g.columns else pd.Series([False] * len(g), index=g.index)
proc_is_zero = (g["process"] == 0) if "process" in g.columns else pd.Series([False] * len(g), index=g.index)
return bool((done_status | proc_is_zero).any())
any_done_map = merged.groupby("slide_id", as_index=True).apply(slide_any_done)
# Pick the best representative row per slide deterministically
merged_sorted = merged.sort_values(
by=["slide_id", "_status_priority", "_source_mtime"],
ascending=[True, False, False],
kind="mergesort", # stable to keep deterministic ties
)
deduped = merged_sorted.drop_duplicates(subset=["slide_id"], keep="first").reset_index(drop=True)
# Reconcile 'process': if any source row says it's done, mark as 0
if "process" in deduped.columns:
deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1)
else:
deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1)
merged = deduped
if verbose:
print(f"De-duplicated by 'slide_id': {before} -> {len(merged)}")
elif dedupe_by_slide_id:
if verbose:
print("Column 'slide_id' not found; skipping de-duplication by slide_id.")
return merged
def main() -> int:
args = parse_args()
input_dir = args.input_dir
output_dir = args.output_dir or input_dir
os.makedirs(output_dir, exist_ok=True)
paths = find_rank_csvs(input_dir, args.pattern, verbose=args.verbose)
try:
merged = merge_csvs(paths, dedupe_by_slide_id=(not args.no_dedupe), verbose=args.verbose)
except Exception as e:
print(f"[ERROR] {e}", file=sys.stderr)
return 2
out_path = os.path.join(output_dir, args.output_filename)
try:
merged.to_csv(out_path, index=False)
except Exception as e:
print(f"[ERROR] Failed to write output CSV '{out_path}': {e}", file=sys.stderr)
return 3
if args.verbose:
print(f"Saved: {out_path} ({len(merged)} rows, {len(merged.columns)} columns)")
return 0
if __name__ == "__main__":
raise SystemExit(main())
|