File size: 9,415 Bytes
1ea9c72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#!/usr/bin/env python
"""
Run validator in parallel across multiple processes
"""

import subprocess
import sys
import os
import argparse
import math
from concurrent.futures import ProcessPoolExecutor, as_completed
import pandas as pd

def run_validator_range(args):
    """Run validator for a specific range"""
    excel_file, solver, reconciler, start, end, images, batch_size, output_base, compile_latex = args
    
    # Create unique output filename for this range
    range_output = output_base.replace('.xlsx', f'_p{start}_{end}.xlsx')
    
    cmd = [
        sys.executable, "universal_validator.py",
        excel_file,
        "--model", solver,
        "--reconciliation-model", reconciler,
        "--images", images,
        "--start", str(start),
        "--end", str(end),
        "--batch-size", str(batch_size),
        "--output", range_output
    ]
    
    if compile_latex:
        cmd.append("--compile-latex")
    
    print(f"[PARALLEL] Starting process for questions {start+1}-{end}...")
    
    try:
        # Run without capturing output so it streams to console
        process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            encoding='utf-8',
            errors='replace',
            bufsize=1
        )
        
        # Stream output
        output_lines = []
        while True:
            line = process.stdout.readline()
            if not line:
                break
            print(f"[P{start//100+1}] {line.rstrip()}")
            output_lines.append(line)
        
        process.wait()
        
        if process.returncode == 0:
            print(f"[PARALLEL] Completed range {start+1}-{end}")
            return (start, end, "success", "")
        else:
            error_msg = "".join(output_lines[-20:])  # Last 20 lines
            print(f"[FAIL] Failed range {start+1}-{end}")
            return (start, end, "failed", error_msg)
            
    except Exception as e:
        print(f"[ERROR] Error in range {start+1}-{end}: {e}")
        return (start, end, "error", str(e))

def main():
    parser = argparse.ArgumentParser(description='Run validator in parallel')
    parser.add_argument('file', help='Excel file to process')
    parser.add_argument('--num-processes', type=int, default=4,
                       help='Number of parallel processes (default: 4)')
    parser.add_argument('--solver', default='o3-mini',
                       help='Solver model (default: o3-mini)')
    parser.add_argument('--reconciler', default='gpt-4o',
                       help='Reconciliation model (default: gpt-4o)')
    parser.add_argument('--images', default='when_needed',
                       help='Image handling (default: when_needed)')
    parser.add_argument('--batch-size', type=int, default=5,
                       help='Questions per batch (default: 5)')
    parser.add_argument('--questions-per-process', type=int, default=100,
                       help='Questions per process (default: 100)')
    parser.add_argument('--output', type=str, default=None,
                       help='Output filename for merged results')
    parser.add_argument('--start-range', type=int, default=0,
                       help='Start of question range')
    parser.add_argument('--end-range', type=int, default=None,
                       help='End of question range')
    parser.add_argument('--compile-latex', action='store_true',
                       help='Compile LaTeX files to PDF')
    
    args = parser.parse_args()
    
    # Count total questions
    print(f"Loading {args.file} to count questions...")
    df = pd.read_excel(args.file, sheet_name='Data')
    
    # Filter for math questions
    if 'raw_subject' in df.columns:
        math_filter = df['raw_subject'].str.lower().str.contains(
            'math|statistic|calculus|algebra|geometry|trigonometry', 
            na=False, regex=True
        )
        df = df[math_filter]
    
    # Apply range if specified
    if args.start_range > 0 or args.end_range:
        start_idx = args.start_range
        end_idx = args.end_range if args.end_range else len(df)
        df = df.iloc[start_idx:end_idx]
        print(f"Processing range: questions {start_idx+1} to {end_idx}")
    
    total_questions = len(df)
    print(f"Found {total_questions} math questions to process")
    
    # Calculate ranges
    questions_per_process = max(args.questions_per_process, math.ceil(total_questions / args.num_processes))
    num_processes = min(args.num_processes, math.ceil(total_questions / questions_per_process))
    
    # Generate output base filename
    if args.output:
        output_base = args.output
    else:
        from datetime import datetime
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        base_name = os.path.basename(args.file).replace('.xlsx', '')
        output_base = f"{base_name}_validated_{timestamp}_parallel.xlsx"
    
    ranges = []
    base_start = args.start_range if args.start_range else 0
    
    for i in range(num_processes):
        start = base_start + i * questions_per_process
        end = min(base_start + (i + 1) * questions_per_process, base_start + total_questions)
        if start < base_start + total_questions:
            ranges.append((
                args.file,
                args.solver,
                args.reconciler,
                start,
                end,
                args.images,
                args.batch_size,
                output_base,
                args.compile_latex
            ))
    
    print(f"\nWill run {len(ranges)} parallel processes:")
    for i, (_, _, _, start, end, _, _, _, _) in enumerate(ranges, 1):
        print(f"  Process {i}: questions {start+1}-{end}")
    
    # Skip confirmation in GUI mode (when output is specified)
    if not args.output:
        confirm = input("\nProceed? (Y/n): ").strip().lower()
        if confirm == 'n':
            print("Cancelled")
            return
    
    # Run in parallel
    print(f"\nStarting {len(ranges)} parallel processes...")
    
    with ProcessPoolExecutor(max_workers=num_processes) as executor:
        futures = {executor.submit(run_validator_range, r): r for r in ranges}
        
        completed = 0
        failed = []
        
        for future in as_completed(futures):
            completed += 1
            start, end, status, error = future.result()
            
            if status != "success":
                failed.append((start, end, error))
            
            print(f"Progress: {completed}/{len(ranges)} processes completed")
    
    # Summary
    print("\n" + "="*60)
    print("PARALLEL VALIDATION COMPLETE")
    print("="*60)
    
    if failed:
        print(f"\nFailed ranges ({len(failed)}):")
        for start, end, error in failed:
            print(f"  {start}-{end}: {error[:100]}")
        print("\nRerun these ranges individually to retry")
    else:
        print("\nAll ranges completed successfully!")
    
    # Merge results from all processes
    print("\nMerging results from all processes...")
    merge_results(args.file, output_base, ranges)
    
    # Clean up intermediate files
    for _, _, _, start, end, _, _, _, _ in ranges:
        temp_file = output_base.replace('.xlsx', f'_p{start}_{end}.xlsx')
        if os.path.exists(temp_file):
            os.remove(temp_file)
            print(f"  Cleaned up: {temp_file}")
    
    print(f"\nFinal results saved to: {output_base}")
    print(f"Results from {len(ranges)} processes have been merged")

def merge_results(original_file, output_file, ranges):
    """Merge results from parallel processes into a single file"""
    import pandas as pd
    
    # Load original data
    original_df = pd.read_excel(original_file, sheet_name='Data')
    
    # Process each range file and update the dataframe
    for _, _, _, start, end, _, _, _, _ in ranges:
        temp_file = output_file.replace('.xlsx', f'_p{start}_{end}.xlsx')
        if os.path.exists(temp_file):
            try:
                temp_df = pd.read_excel(temp_file, sheet_name='Data')
                # Update the original dataframe with results from this range
                for idx in range(start, min(end, len(temp_df))):
                    if idx < len(original_df):
                        for col in ['model_answer_file', 'answer_match', 'latex_file', 
                                   'quality_rating', 'difficulty_level', 'quality_comment']:
                            if col in temp_df.columns:
                                original_df.at[idx, col] = temp_df.at[idx, col]
                print(f"  Merged results from questions {start+1}-{end}")
            except Exception as e:
                print(f"  Warning: Could not merge {temp_file}: {e}")
    
    # Save merged results
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        original_df.to_excel(writer, sheet_name='Data', index=False)
        
        # Copy other sheets if they exist
        try:
            xl = pd.ExcelFile(original_file)
            for sheet_name in xl.sheet_names:
                if sheet_name != 'Data':
                    df = pd.read_excel(original_file, sheet_name=sheet_name)
                    df.to_excel(writer, sheet_name=sheet_name, index=False)
        except:
            pass

if __name__ == "__main__":
    main()