|
|
|
|
|
""" |
|
|
Merge per-rank process list CSVs into a single consolidated CSV. |
|
|
|
|
|
Default behavior: |
|
|
- Searches an input directory for files matching pattern 'process_list_autogen_rank*.csv' |
|
|
- Concatenates them (union of columns) |
|
|
- If a column named 'slide_id' exists, drop duplicates by that column keeping the last |
|
|
- Writes the merged CSV as 'process_list_autogen.csv' into the output directory (defaults to input) |
|
|
|
|
|
Usage examples: |
|
|
python create_patches/merge_process_lists.py \ |
|
|
--input-dir create_patches/results_temp \ |
|
|
--output-dir create_patches/results_temp |
|
|
|
|
|
# Custom pattern or output filename |
|
|
python create_patches/merge_process_lists.py \ |
|
|
--input-dir /path/to/folder \ |
|
|
--pattern "process_list_autogen_rank*.csv" \ |
|
|
--output-filename process_list_autogen.csv |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import glob |
|
|
import os |
|
|
import sys |
|
|
from typing import List, Dict |
|
|
|
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
def parse_args() -> argparse.Namespace: |
|
|
"""Parse CLI arguments.""" |
|
|
p = argparse.ArgumentParser(description="Merge rank CSVs into one process_list_autogen.csv") |
|
|
p.add_argument( |
|
|
"--input-dir", |
|
|
required=True, |
|
|
help="Directory that contains rank CSVs (e.g., results_temp)", |
|
|
) |
|
|
p.add_argument( |
|
|
"--output-dir", |
|
|
default=None, |
|
|
help="Directory to write the merged CSV. Defaults to --input-dir", |
|
|
) |
|
|
p.add_argument( |
|
|
"--pattern", |
|
|
default="process_list_autogen_rank*.csv", |
|
|
help="Glob pattern to match rank CSVs in input-dir", |
|
|
) |
|
|
p.add_argument( |
|
|
"--output-filename", |
|
|
default="process_list_autogen.csv", |
|
|
help="Filename of the merged CSV", |
|
|
) |
|
|
p.add_argument( |
|
|
"--no-dedupe", |
|
|
action="store_true", |
|
|
help="Disable de-duplication. By default, duplicates by 'slide_id' are dropped if that column exists.", |
|
|
) |
|
|
p.add_argument( |
|
|
"--verbose", |
|
|
action="store_true", |
|
|
help="Print detailed progress information", |
|
|
) |
|
|
return p.parse_args() |
|
|
|
|
|
|
|
|
def find_rank_csvs(input_dir: str, pattern: str, verbose: bool = False) -> List[str]: |
|
|
"""Find matching CSV paths in input_dir using the provided pattern.""" |
|
|
search_glob = os.path.join(os.path.abspath(input_dir), pattern) |
|
|
paths = sorted(glob.glob(search_glob)) |
|
|
if verbose: |
|
|
print(f"Searching: {search_glob}") |
|
|
print(f"Found {len(paths)} files") |
|
|
for p in paths: |
|
|
print(f" - {p}") |
|
|
return paths |
|
|
|
|
|
|
|
|
def merge_csvs(paths: List[str], dedupe_by_slide_id: bool = True, verbose: bool = False) -> pd.DataFrame: |
|
|
"""Read and concatenate CSVs; optionally de-duplicate by 'slide_id'. |
|
|
|
|
|
De-duplication policy (improved): |
|
|
- Prefer rows with status indicating completion first (processed > already_exist > others) |
|
|
- For ties, prefer the row from the most recently modified source CSV |
|
|
- After selecting a representative row per slide, reconcile 'process' so that if ANY |
|
|
source row for that slide indicates completion (status processed/already_exist or process==0), |
|
|
the merged row's process will be set to 0 (do not reprocess). |
|
|
""" |
|
|
if not paths: |
|
|
raise FileNotFoundError("No CSV files matched the given pattern.") |
|
|
|
|
|
frames = [] |
|
|
for p in paths: |
|
|
try: |
|
|
df = pd.read_csv(p) |
|
|
|
|
|
df["_source_file"] = os.path.basename(p) |
|
|
try: |
|
|
|
|
|
df["_source_mtime"] = os.path.getmtime(p) |
|
|
except Exception: |
|
|
df["_source_mtime"] = float("nan") |
|
|
frames.append(df) |
|
|
if verbose: |
|
|
print(f"Loaded {p}: {len(df)} rows, {len(df.columns)} columns") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to read '{p}': {e}") from e |
|
|
|
|
|
merged = pd.concat(frames, axis=0, ignore_index=True, sort=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "slide_id" in merged.columns: |
|
|
jpg_mask = merged["slide_id"].astype(str).str.lower().str.endswith(".jpg") |
|
|
if verbose: |
|
|
try: |
|
|
removed = int(jpg_mask.sum()) |
|
|
except Exception: |
|
|
removed = 0 |
|
|
print(f"Filtering out JPG slide_id rows: removed {removed}") |
|
|
merged = merged.loc[~jpg_mask].reset_index(drop=True) |
|
|
|
|
|
|
|
|
if verbose: |
|
|
print(f"Concatenated rows: {len(merged)}") |
|
|
|
|
|
if dedupe_by_slide_id and "slide_id" in merged.columns: |
|
|
before = len(merged) |
|
|
|
|
|
|
|
|
status_priority: Dict[str, int] = { |
|
|
|
|
|
"processed": 3, |
|
|
"already_exist": 2, |
|
|
|
|
|
"failed": 1, |
|
|
"failed_seg": 1, |
|
|
"failed_patch": 1, |
|
|
|
|
|
"tbp": 0, |
|
|
} |
|
|
|
|
|
def map_status(s): |
|
|
try: |
|
|
return status_priority.get(str(s), 1) |
|
|
except Exception: |
|
|
return 1 |
|
|
|
|
|
|
|
|
merged["_status_priority"] = merged.get("status").map(map_status) if "status" in merged.columns else 1 |
|
|
if "_source_mtime" not in merged.columns: |
|
|
merged["_source_mtime"] = float("nan") |
|
|
|
|
|
|
|
|
def slide_any_done(g: pd.DataFrame) -> bool: |
|
|
done_status = g["status"].astype(str).isin(["processed", "already_exist"]) if "status" in g.columns else pd.Series([False] * len(g), index=g.index) |
|
|
proc_is_zero = (g["process"] == 0) if "process" in g.columns else pd.Series([False] * len(g), index=g.index) |
|
|
return bool((done_status | proc_is_zero).any()) |
|
|
|
|
|
any_done_map = merged.groupby("slide_id", as_index=True).apply(slide_any_done) |
|
|
|
|
|
|
|
|
merged_sorted = merged.sort_values( |
|
|
by=["slide_id", "_status_priority", "_source_mtime"], |
|
|
ascending=[True, False, False], |
|
|
kind="mergesort", |
|
|
) |
|
|
deduped = merged_sorted.drop_duplicates(subset=["slide_id"], keep="first").reset_index(drop=True) |
|
|
|
|
|
|
|
|
if "process" in deduped.columns: |
|
|
deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1) |
|
|
else: |
|
|
deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1) |
|
|
|
|
|
merged = deduped |
|
|
|
|
|
if verbose: |
|
|
print(f"De-duplicated by 'slide_id': {before} -> {len(merged)}") |
|
|
elif dedupe_by_slide_id: |
|
|
if verbose: |
|
|
print("Column 'slide_id' not found; skipping de-duplication by slide_id.") |
|
|
|
|
|
return merged |
|
|
|
|
|
|
|
|
def main() -> int: |
|
|
args = parse_args() |
|
|
|
|
|
input_dir = args.input_dir |
|
|
output_dir = args.output_dir or input_dir |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
paths = find_rank_csvs(input_dir, args.pattern, verbose=args.verbose) |
|
|
try: |
|
|
merged = merge_csvs(paths, dedupe_by_slide_id=(not args.no_dedupe), verbose=args.verbose) |
|
|
except Exception as e: |
|
|
print(f"[ERROR] {e}", file=sys.stderr) |
|
|
return 2 |
|
|
|
|
|
out_path = os.path.join(output_dir, args.output_filename) |
|
|
try: |
|
|
merged.to_csv(out_path, index=False) |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to write output CSV '{out_path}': {e}", file=sys.stderr) |
|
|
return 3 |
|
|
|
|
|
if args.verbose: |
|
|
print(f"Saved: {out_path} ({len(merged)} rows, {len(merged.columns)} columns)") |
|
|
|
|
|
return 0 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
raise SystemExit(main()) |
|
|
|