Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| """ | |
| Run validator in parallel across multiple processes | |
| """ | |
| import subprocess | |
| import sys | |
| import os | |
| import argparse | |
| import math | |
| from concurrent.futures import ProcessPoolExecutor, as_completed | |
| import pandas as pd | |
| def run_validator_range(args): | |
| """Run validator for a specific range""" | |
| excel_file, solver, reconciler, start, end, images, batch_size, output_base, compile_latex = args | |
| # Create unique output filename for this range | |
| range_output = output_base.replace('.xlsx', f'_p{start}_{end}.xlsx') | |
| cmd = [ | |
| sys.executable, "universal_validator.py", | |
| excel_file, | |
| "--model", solver, | |
| "--reconciliation-model", reconciler, | |
| "--images", images, | |
| "--start", str(start), | |
| "--end", str(end), | |
| "--batch-size", str(batch_size), | |
| "--output", range_output | |
| ] | |
| if compile_latex: | |
| cmd.append("--compile-latex") | |
| print(f"[PARALLEL] Starting process for questions {start+1}-{end}...") | |
| try: | |
| # Run without capturing output so it streams to console | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| encoding='utf-8', | |
| errors='replace', | |
| bufsize=1 | |
| ) | |
| # Stream output | |
| output_lines = [] | |
| while True: | |
| line = process.stdout.readline() | |
| if not line: | |
| break | |
| print(f"[P{start//100+1}] {line.rstrip()}") | |
| output_lines.append(line) | |
| process.wait() | |
| if process.returncode == 0: | |
| print(f"[PARALLEL] Completed range {start+1}-{end}") | |
| return (start, end, "success", "") | |
| else: | |
| error_msg = "".join(output_lines[-20:]) # Last 20 lines | |
| print(f"[FAIL] Failed range {start+1}-{end}") | |
| return (start, end, "failed", error_msg) | |
| except Exception as e: | |
| print(f"[ERROR] Error in range {start+1}-{end}: {e}") | |
| return (start, end, "error", str(e)) | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Run validator in parallel') | |
| parser.add_argument('file', help='Excel file to process') | |
| parser.add_argument('--num-processes', type=int, default=4, | |
| help='Number of parallel processes (default: 4)') | |
| parser.add_argument('--solver', default='o3-mini', | |
| help='Solver model (default: o3-mini)') | |
| parser.add_argument('--reconciler', default='gpt-4o', | |
| help='Reconciliation model (default: gpt-4o)') | |
| parser.add_argument('--images', default='when_needed', | |
| help='Image handling (default: when_needed)') | |
| parser.add_argument('--batch-size', type=int, default=5, | |
| help='Questions per batch (default: 5)') | |
| parser.add_argument('--questions-per-process', type=int, default=100, | |
| help='Questions per process (default: 100)') | |
| parser.add_argument('--output', type=str, default=None, | |
| help='Output filename for merged results') | |
| parser.add_argument('--start-range', type=int, default=0, | |
| help='Start of question range') | |
| parser.add_argument('--end-range', type=int, default=None, | |
| help='End of question range') | |
| parser.add_argument('--compile-latex', action='store_true', | |
| help='Compile LaTeX files to PDF') | |
| args = parser.parse_args() | |
| # Count total questions | |
| print(f"Loading {args.file} to count questions...") | |
| df = pd.read_excel(args.file, sheet_name='Data') | |
| # Filter for math questions | |
| if 'raw_subject' in df.columns: | |
| math_filter = df['raw_subject'].str.lower().str.contains( | |
| 'math|statistic|calculus|algebra|geometry|trigonometry', | |
| na=False, regex=True | |
| ) | |
| df = df[math_filter] | |
| # Apply range if specified | |
| if args.start_range > 0 or args.end_range: | |
| start_idx = args.start_range | |
| end_idx = args.end_range if args.end_range else len(df) | |
| df = df.iloc[start_idx:end_idx] | |
| print(f"Processing range: questions {start_idx+1} to {end_idx}") | |
| total_questions = len(df) | |
| print(f"Found {total_questions} math questions to process") | |
| # Calculate ranges | |
| questions_per_process = max(args.questions_per_process, math.ceil(total_questions / args.num_processes)) | |
| num_processes = min(args.num_processes, math.ceil(total_questions / questions_per_process)) | |
| # Generate output base filename | |
| if args.output: | |
| output_base = args.output | |
| else: | |
| from datetime import datetime | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| base_name = os.path.basename(args.file).replace('.xlsx', '') | |
| output_base = f"{base_name}_validated_{timestamp}_parallel.xlsx" | |
| ranges = [] | |
| base_start = args.start_range if args.start_range else 0 | |
| for i in range(num_processes): | |
| start = base_start + i * questions_per_process | |
| end = min(base_start + (i + 1) * questions_per_process, base_start + total_questions) | |
| if start < base_start + total_questions: | |
| ranges.append(( | |
| args.file, | |
| args.solver, | |
| args.reconciler, | |
| start, | |
| end, | |
| args.images, | |
| args.batch_size, | |
| output_base, | |
| args.compile_latex | |
| )) | |
| print(f"\nWill run {len(ranges)} parallel processes:") | |
| for i, (_, _, _, start, end, _, _, _, _) in enumerate(ranges, 1): | |
| print(f" Process {i}: questions {start+1}-{end}") | |
| # Skip confirmation in GUI mode (when output is specified) | |
| if not args.output: | |
| confirm = input("\nProceed? (Y/n): ").strip().lower() | |
| if confirm == 'n': | |
| print("Cancelled") | |
| return | |
| # Run in parallel | |
| print(f"\nStarting {len(ranges)} parallel processes...") | |
| with ProcessPoolExecutor(max_workers=num_processes) as executor: | |
| futures = {executor.submit(run_validator_range, r): r for r in ranges} | |
| completed = 0 | |
| failed = [] | |
| for future in as_completed(futures): | |
| completed += 1 | |
| start, end, status, error = future.result() | |
| if status != "success": | |
| failed.append((start, end, error)) | |
| print(f"Progress: {completed}/{len(ranges)} processes completed") | |
| # Summary | |
| print("\n" + "="*60) | |
| print("PARALLEL VALIDATION COMPLETE") | |
| print("="*60) | |
| if failed: | |
| print(f"\nFailed ranges ({len(failed)}):") | |
| for start, end, error in failed: | |
| print(f" {start}-{end}: {error[:100]}") | |
| print("\nRerun these ranges individually to retry") | |
| else: | |
| print("\nAll ranges completed successfully!") | |
| # Merge results from all processes | |
| print("\nMerging results from all processes...") | |
| merge_results(args.file, output_base, ranges) | |
| # Clean up intermediate files | |
| for _, _, _, start, end, _, _, _, _ in ranges: | |
| temp_file = output_base.replace('.xlsx', f'_p{start}_{end}.xlsx') | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| print(f" Cleaned up: {temp_file}") | |
| print(f"\nFinal results saved to: {output_base}") | |
| print(f"Results from {len(ranges)} processes have been merged") | |
| def merge_results(original_file, output_file, ranges): | |
| """Merge results from parallel processes into a single file""" | |
| import pandas as pd | |
| # Load original data | |
| original_df = pd.read_excel(original_file, sheet_name='Data') | |
| # Process each range file and update the dataframe | |
| for _, _, _, start, end, _, _, _, _ in ranges: | |
| temp_file = output_file.replace('.xlsx', f'_p{start}_{end}.xlsx') | |
| if os.path.exists(temp_file): | |
| try: | |
| temp_df = pd.read_excel(temp_file, sheet_name='Data') | |
| # Update the original dataframe with results from this range | |
| for idx in range(start, min(end, len(temp_df))): | |
| if idx < len(original_df): | |
| for col in ['model_answer_file', 'answer_match', 'latex_file', | |
| 'quality_rating', 'difficulty_level', 'quality_comment']: | |
| if col in temp_df.columns: | |
| original_df.at[idx, col] = temp_df.at[idx, col] | |
| print(f" Merged results from questions {start+1}-{end}") | |
| except Exception as e: | |
| print(f" Warning: Could not merge {temp_file}: {e}") | |
| # Save merged results | |
| with pd.ExcelWriter(output_file, engine='openpyxl') as writer: | |
| original_df.to_excel(writer, sheet_name='Data', index=False) | |
| # Copy other sheets if they exist | |
| try: | |
| xl = pd.ExcelFile(original_file) | |
| for sheet_name in xl.sheet_names: | |
| if sheet_name != 'Data': | |
| df = pd.read_excel(original_file, sheet_name=sheet_name) | |
| df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| except: | |
| pass | |
| if __name__ == "__main__": | |
| main() |