math-validator / run_parallel.py
igriv's picture
Update validator app
1ea9c72 verified
#!/usr/bin/env python
"""
Run validator in parallel across multiple processes
"""
import subprocess
import sys
import os
import argparse
import math
from concurrent.futures import ProcessPoolExecutor, as_completed
import pandas as pd
def run_validator_range(args):
"""Run validator for a specific range"""
excel_file, solver, reconciler, start, end, images, batch_size, output_base, compile_latex = args
# Create unique output filename for this range
range_output = output_base.replace('.xlsx', f'_p{start}_{end}.xlsx')
cmd = [
sys.executable, "universal_validator.py",
excel_file,
"--model", solver,
"--reconciliation-model", reconciler,
"--images", images,
"--start", str(start),
"--end", str(end),
"--batch-size", str(batch_size),
"--output", range_output
]
if compile_latex:
cmd.append("--compile-latex")
print(f"[PARALLEL] Starting process for questions {start+1}-{end}...")
try:
# Run without capturing output so it streams to console
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding='utf-8',
errors='replace',
bufsize=1
)
# Stream output
output_lines = []
while True:
line = process.stdout.readline()
if not line:
break
print(f"[P{start//100+1}] {line.rstrip()}")
output_lines.append(line)
process.wait()
if process.returncode == 0:
print(f"[PARALLEL] Completed range {start+1}-{end}")
return (start, end, "success", "")
else:
error_msg = "".join(output_lines[-20:]) # Last 20 lines
print(f"[FAIL] Failed range {start+1}-{end}")
return (start, end, "failed", error_msg)
except Exception as e:
print(f"[ERROR] Error in range {start+1}-{end}: {e}")
return (start, end, "error", str(e))
def main():
parser = argparse.ArgumentParser(description='Run validator in parallel')
parser.add_argument('file', help='Excel file to process')
parser.add_argument('--num-processes', type=int, default=4,
help='Number of parallel processes (default: 4)')
parser.add_argument('--solver', default='o3-mini',
help='Solver model (default: o3-mini)')
parser.add_argument('--reconciler', default='gpt-4o',
help='Reconciliation model (default: gpt-4o)')
parser.add_argument('--images', default='when_needed',
help='Image handling (default: when_needed)')
parser.add_argument('--batch-size', type=int, default=5,
help='Questions per batch (default: 5)')
parser.add_argument('--questions-per-process', type=int, default=100,
help='Questions per process (default: 100)')
parser.add_argument('--output', type=str, default=None,
help='Output filename for merged results')
parser.add_argument('--start-range', type=int, default=0,
help='Start of question range')
parser.add_argument('--end-range', type=int, default=None,
help='End of question range')
parser.add_argument('--compile-latex', action='store_true',
help='Compile LaTeX files to PDF')
args = parser.parse_args()
# Count total questions
print(f"Loading {args.file} to count questions...")
df = pd.read_excel(args.file, sheet_name='Data')
# Filter for math questions
if 'raw_subject' in df.columns:
math_filter = df['raw_subject'].str.lower().str.contains(
'math|statistic|calculus|algebra|geometry|trigonometry',
na=False, regex=True
)
df = df[math_filter]
# Apply range if specified
if args.start_range > 0 or args.end_range:
start_idx = args.start_range
end_idx = args.end_range if args.end_range else len(df)
df = df.iloc[start_idx:end_idx]
print(f"Processing range: questions {start_idx+1} to {end_idx}")
total_questions = len(df)
print(f"Found {total_questions} math questions to process")
# Calculate ranges
questions_per_process = max(args.questions_per_process, math.ceil(total_questions / args.num_processes))
num_processes = min(args.num_processes, math.ceil(total_questions / questions_per_process))
# Generate output base filename
if args.output:
output_base = args.output
else:
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_name = os.path.basename(args.file).replace('.xlsx', '')
output_base = f"{base_name}_validated_{timestamp}_parallel.xlsx"
ranges = []
base_start = args.start_range if args.start_range else 0
for i in range(num_processes):
start = base_start + i * questions_per_process
end = min(base_start + (i + 1) * questions_per_process, base_start + total_questions)
if start < base_start + total_questions:
ranges.append((
args.file,
args.solver,
args.reconciler,
start,
end,
args.images,
args.batch_size,
output_base,
args.compile_latex
))
print(f"\nWill run {len(ranges)} parallel processes:")
for i, (_, _, _, start, end, _, _, _, _) in enumerate(ranges, 1):
print(f" Process {i}: questions {start+1}-{end}")
# Skip confirmation in GUI mode (when output is specified)
if not args.output:
confirm = input("\nProceed? (Y/n): ").strip().lower()
if confirm == 'n':
print("Cancelled")
return
# Run in parallel
print(f"\nStarting {len(ranges)} parallel processes...")
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = {executor.submit(run_validator_range, r): r for r in ranges}
completed = 0
failed = []
for future in as_completed(futures):
completed += 1
start, end, status, error = future.result()
if status != "success":
failed.append((start, end, error))
print(f"Progress: {completed}/{len(ranges)} processes completed")
# Summary
print("\n" + "="*60)
print("PARALLEL VALIDATION COMPLETE")
print("="*60)
if failed:
print(f"\nFailed ranges ({len(failed)}):")
for start, end, error in failed:
print(f" {start}-{end}: {error[:100]}")
print("\nRerun these ranges individually to retry")
else:
print("\nAll ranges completed successfully!")
# Merge results from all processes
print("\nMerging results from all processes...")
merge_results(args.file, output_base, ranges)
# Clean up intermediate files
for _, _, _, start, end, _, _, _, _ in ranges:
temp_file = output_base.replace('.xlsx', f'_p{start}_{end}.xlsx')
if os.path.exists(temp_file):
os.remove(temp_file)
print(f" Cleaned up: {temp_file}")
print(f"\nFinal results saved to: {output_base}")
print(f"Results from {len(ranges)} processes have been merged")
def merge_results(original_file, output_file, ranges):
"""Merge results from parallel processes into a single file"""
import pandas as pd
# Load original data
original_df = pd.read_excel(original_file, sheet_name='Data')
# Process each range file and update the dataframe
for _, _, _, start, end, _, _, _, _ in ranges:
temp_file = output_file.replace('.xlsx', f'_p{start}_{end}.xlsx')
if os.path.exists(temp_file):
try:
temp_df = pd.read_excel(temp_file, sheet_name='Data')
# Update the original dataframe with results from this range
for idx in range(start, min(end, len(temp_df))):
if idx < len(original_df):
for col in ['model_answer_file', 'answer_match', 'latex_file',
'quality_rating', 'difficulty_level', 'quality_comment']:
if col in temp_df.columns:
original_df.at[idx, col] = temp_df.at[idx, col]
print(f" Merged results from questions {start+1}-{end}")
except Exception as e:
print(f" Warning: Could not merge {temp_file}: {e}")
# Save merged results
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
original_df.to_excel(writer, sheet_name='Data', index=False)
# Copy other sheets if they exist
try:
xl = pd.ExcelFile(original_file)
for sheet_name in xl.sheet_names:
if sheet_name != 'Data':
df = pd.read_excel(original_file, sheet_name=sheet_name)
df.to_excel(writer, sheet_name=sheet_name, index=False)
except:
pass
if __name__ == "__main__":
main()