File size: 8,203 Bytes
e101805
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#!/usr/bin/env python3
"""
Merge per-rank process list CSVs into a single consolidated CSV.

Default behavior:
- Searches an input directory for files matching pattern 'process_list_autogen_rank*.csv'
- Concatenates them (union of columns)
- If a column named 'slide_id' exists, drop duplicates by that column keeping the last
- Writes the merged CSV as 'process_list_autogen.csv' into the output directory (defaults to input)

Usage examples:
  python create_patches/merge_process_lists.py \
      --input-dir create_patches/results_temp \
      --output-dir create_patches/results_temp

  # Custom pattern or output filename
  python create_patches/merge_process_lists.py \
      --input-dir /path/to/folder \
      --pattern "process_list_autogen_rank*.csv" \
      --output-filename process_list_autogen.csv
"""

import argparse
import glob
import os
import sys
from typing import List, Dict

import pandas as pd


def parse_args() -> argparse.Namespace:
    """Parse CLI arguments."""
    p = argparse.ArgumentParser(description="Merge rank CSVs into one process_list_autogen.csv")
    p.add_argument(
        "--input-dir",
        required=True,
        help="Directory that contains rank CSVs (e.g., results_temp)",
    )
    p.add_argument(
        "--output-dir",
        default=None,
        help="Directory to write the merged CSV. Defaults to --input-dir",
    )
    p.add_argument(
        "--pattern",
        default="process_list_autogen_rank*.csv",
        help="Glob pattern to match rank CSVs in input-dir",
    )
    p.add_argument(
        "--output-filename",
        default="process_list_autogen.csv",
        help="Filename of the merged CSV",
    )
    p.add_argument(
        "--no-dedupe",
        action="store_true",
        help="Disable de-duplication. By default, duplicates by 'slide_id' are dropped if that column exists.",
    )
    p.add_argument(
        "--verbose",
        action="store_true",
        help="Print detailed progress information",
    )
    return p.parse_args()


def find_rank_csvs(input_dir: str, pattern: str, verbose: bool = False) -> List[str]:
    """Find matching CSV paths in input_dir using the provided pattern."""
    search_glob = os.path.join(os.path.abspath(input_dir), pattern)
    paths = sorted(glob.glob(search_glob))
    if verbose:
        print(f"Searching: {search_glob}")
        print(f"Found {len(paths)} files")
        for p in paths:
            print(f" - {p}")
    return paths


def merge_csvs(paths: List[str], dedupe_by_slide_id: bool = True, verbose: bool = False) -> pd.DataFrame:
    """Read and concatenate CSVs; optionally de-duplicate by 'slide_id'.

    De-duplication policy (improved):
    - Prefer rows with status indicating completion first (processed > already_exist > others)
    - For ties, prefer the row from the most recently modified source CSV
    - After selecting a representative row per slide, reconcile 'process' so that if ANY
      source row for that slide indicates completion (status processed/already_exist or process==0),
      the merged row's process will be set to 0 (do not reprocess).
    """
    if not paths:
        raise FileNotFoundError("No CSV files matched the given pattern.")

    frames = []
    for p in paths:
        try:
            df = pd.read_csv(p)
            # Keep source file provenance for debugging
            df["_source_file"] = os.path.basename(p)
            try:
                # Add source mtime to help resolve conflicts deterministically
                df["_source_mtime"] = os.path.getmtime(p)
            except Exception:
                df["_source_mtime"] = float("nan")
            frames.append(df)
            if verbose:
                print(f"Loaded {p}: {len(df)} rows, {len(df.columns)} columns")
        except Exception as e:
            raise RuntimeError(f"Failed to read '{p}': {e}") from e

    merged = pd.concat(frames, axis=0, ignore_index=True, sort=False)

    # TEMPORARY FILTER: exclude rows whose slide_id ends with .jpg (case-insensitive)
    # This is a temporary safeguard to avoid merging mistakenly generated JPG entries.
    # Remove this block once the source CSV generation is corrected.
    # (Requested behavior: only .jpg; not filtering .jpeg on purpose.)
    if "slide_id" in merged.columns:
        jpg_mask = merged["slide_id"].astype(str).str.lower().str.endswith(".jpg")
        if verbose:
            try:
                removed = int(jpg_mask.sum())
            except Exception:
                removed = 0
            print(f"Filtering out JPG slide_id rows: removed {removed}")
        merged = merged.loc[~jpg_mask].reset_index(drop=True)
    # END TEMPORARY FILTER

    if verbose:
        print(f"Concatenated rows: {len(merged)}")

    if dedupe_by_slide_id and "slide_id" in merged.columns:
        before = len(merged)

        # Determine status precedence
        status_priority: Dict[str, int] = {
            # Higher is better (i.e., more complete)
            "processed": 3,
            "already_exist": 2,
            # Failures or unknowns get lowest by default
            "failed": 1,
            "failed_seg": 1,
            "failed_patch": 1,
            # Typical in this repo for in-progress/to-be-processed
            "tbp": 0,
        }

        def map_status(s):
            try:
                return status_priority.get(str(s), 1)
            except Exception:
                return 1

        # Compute helper columns
        merged["_status_priority"] = merged.get("status").map(map_status) if "status" in merged.columns else 1
        if "_source_mtime" not in merged.columns:
            merged["_source_mtime"] = float("nan")

        # For robust reconciliation of 'process', compute per-slide if any row indicates completion
        def slide_any_done(g: pd.DataFrame) -> bool:
            done_status = g["status"].astype(str).isin(["processed", "already_exist"]) if "status" in g.columns else pd.Series([False] * len(g), index=g.index)
            proc_is_zero = (g["process"] == 0) if "process" in g.columns else pd.Series([False] * len(g), index=g.index)
            return bool((done_status | proc_is_zero).any())

        any_done_map = merged.groupby("slide_id", as_index=True).apply(slide_any_done)

        # Pick the best representative row per slide deterministically
        merged_sorted = merged.sort_values(
            by=["slide_id", "_status_priority", "_source_mtime"],
            ascending=[True, False, False],
            kind="mergesort",  # stable to keep deterministic ties
        )
        deduped = merged_sorted.drop_duplicates(subset=["slide_id"], keep="first").reset_index(drop=True)

        # Reconcile 'process': if any source row says it's done, mark as 0
        if "process" in deduped.columns:
            deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1)
        else:
            deduped["process"] = deduped["slide_id"].map(any_done_map).fillna(False).astype(bool).map(lambda x: 0 if x else 1)

        merged = deduped

        if verbose:
            print(f"De-duplicated by 'slide_id': {before} -> {len(merged)}")
    elif dedupe_by_slide_id:
        if verbose:
            print("Column 'slide_id' not found; skipping de-duplication by slide_id.")

    return merged


def main() -> int:
    args = parse_args()

    input_dir = args.input_dir
    output_dir = args.output_dir or input_dir
    os.makedirs(output_dir, exist_ok=True)

    paths = find_rank_csvs(input_dir, args.pattern, verbose=args.verbose)
    try:
        merged = merge_csvs(paths, dedupe_by_slide_id=(not args.no_dedupe), verbose=args.verbose)
    except Exception as e:
        print(f"[ERROR] {e}", file=sys.stderr)
        return 2

    out_path = os.path.join(output_dir, args.output_filename)
    try:
        merged.to_csv(out_path, index=False)
    except Exception as e:
        print(f"[ERROR] Failed to write output CSV '{out_path}': {e}", file=sys.stderr)
        return 3

    if args.verbose:
        print(f"Saved: {out_path} ({len(merged)} rows, {len(merged.columns)} columns)")

    return 0


if __name__ == "__main__":
    raise SystemExit(main())