Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import os | |
| from dotenv import load_dotenv | |
| import time | |
| from typing import Dict, Any, Optional, List | |
| import re | |
| from datetime import datetime | |
| import json | |
| from tqdm import tqdm | |
| import base64 | |
| import requests | |
| from io import BytesIO | |
| load_dotenv() | |
| class UniversalMathValidator: | |
| """Universal validator that can handle different Excel formats and API providers""" | |
| def __init__(self, excel_file: str, provider: str = "openai", include_images: str = "when_needed", | |
| solver_model: str = None, reconciliation_model: str = None): | |
| """ | |
| Initialize validator | |
| Args: | |
| excel_file: Path to Excel file | |
| provider: "openai" or "openrouter" | |
| include_images: "always", "never", or "when_needed" | |
| solver_model: Model for solving questions | |
| reconciliation_model: Model for reconciliation | |
| """ | |
| self.excel_file = excel_file | |
| self.include_images = include_images | |
| # Determine provider based on models | |
| # If any model requires OpenRouter, use OpenRouter for everything | |
| openrouter_prefixes = ["anthropic/", "x-ai/", "google/", "meta-llama/", "mistral/", "openai/"] | |
| openai_models = ["o3-mini", "gpt-4o", "gpt-5", "gpt-5-mini", "gpt-5-nano", "gpt-4-turbo"] | |
| # Check if any model needs OpenRouter (has a prefix or is not an OpenAI model) | |
| solver_needs_or = solver_model and ( | |
| any(solver_model.startswith(p) for p in openrouter_prefixes) or | |
| solver_model not in openai_models | |
| ) | |
| recon_needs_or = reconciliation_model and ( | |
| any(reconciliation_model.startswith(p) for p in openrouter_prefixes) or | |
| reconciliation_model not in openai_models | |
| ) | |
| needs_openrouter = solver_needs_or or recon_needs_or | |
| # Override provider if OpenRouter is needed | |
| if needs_openrouter: | |
| self.provider = "openrouter" | |
| if provider == "openai": | |
| print("Note: Using OpenRouter for all models since non-OpenAI model specified") | |
| else: | |
| self.provider = provider | |
| # Store original model names for later prefixing if needed | |
| self.solver_model_input = solver_model | |
| self.reconciliation_model_input = reconciliation_model | |
| self.df = None | |
| self.output_file = None # Will be set later | |
| self.compile_latex = False # Will be set from args | |
| # Detect file format | |
| self.file_format = self._detect_format() | |
| # Create directories for outputs | |
| self.base_dir = "validation_results" | |
| self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| self.run_dir = os.path.join(self.base_dir, f"run_{self.timestamp}") | |
| self.latex_dir = os.path.join(self.run_dir, "latex_documents") | |
| self.answers_dir = os.path.join(self.run_dir, "model_answers") | |
| os.makedirs(self.latex_dir, exist_ok=True) | |
| os.makedirs(self.answers_dir, exist_ok=True) | |
| # Initialize API client | |
| if self.provider == "openai": | |
| from openai import OpenAI | |
| import httpx | |
| # Set 5 minute timeout for GPT-5 models which can be very slow | |
| self.client = OpenAI( | |
| api_key=os.getenv('OPENAI_API_KEY'), | |
| timeout=httpx.Timeout(300.0, connect=10.0) # 300 second timeout, 10 second connect | |
| ) | |
| # Default models for OpenAI | |
| self.model = self.solver_model_input or "o3-mini" | |
| self.reconciliation_model = self.reconciliation_model_input or "gpt-4o" | |
| self.assessment_model = "gpt-4o" | |
| elif self.provider == "openrouter": | |
| import httpx | |
| self.client = self._setup_openrouter() | |
| # Helper to add openai/ prefix if needed | |
| def format_for_openrouter(model_name): | |
| if not model_name: | |
| return None | |
| # If already has a prefix, use as-is | |
| if "/" in model_name: | |
| return model_name | |
| # If it's an OpenAI model, add prefix | |
| openai_models = ["o3-mini", "gpt-4o", "gpt-5", "gpt-5-mini", "gpt-5-nano", "gpt-4-turbo"] | |
| if model_name in openai_models: | |
| return f"openai/{model_name}" | |
| # Otherwise assume it needs no prefix (for backwards compatibility) | |
| return model_name | |
| # Format models for OpenRouter | |
| self.model = format_for_openrouter(self.solver_model_input) or "openai/o3-mini" | |
| self.reconciliation_model = format_for_openrouter(self.reconciliation_model_input) or "openai/gpt-4o" | |
| self.assessment_model = "openai/gpt-4o" | |
| # System prompts | |
| self.system_prompt_answer = """You are a highly skilled mathematics graduate student. | |
| Solve the following problem step by step. | |
| IMPORTANT: First show your complete reasoning and work. | |
| Then clearly state the final answer. | |
| Your response should include both the reasoning process and the final answer.""" | |
| self.system_prompt_assess = """You are an experienced mathematics educator. Evaluate mathematical questions.""" | |
| self.system_prompt_reconcile = """You are a graduate student who produces detailed justifications in LaTeX format. | |
| You excel at analyzing mathematical solutions and identifying potential errors. | |
| Your output should be a complete LaTeX document that can be compiled directly.""" | |
| # Create manifest | |
| self.manifest_file = os.path.join(self.run_dir, "manifest.json") | |
| self.manifest = { | |
| "timestamp": self.timestamp, | |
| "source_file": excel_file, | |
| "file_format": self.file_format, | |
| "provider": provider, | |
| "model": self.model, | |
| "questions": {} | |
| } | |
| def _detect_format(self) -> str: | |
| """Detect which format the Excel file uses""" | |
| xl = pd.ExcelFile(self.excel_file) | |
| # Check for specific sheets | |
| if 'rationale_images' in xl.sheet_names: | |
| return "HLE_B3" # HLE_Verified_B3 format | |
| elif 'model_responses' in xl.sheet_names: | |
| return "HLE_335" # HLE_335 format | |
| else: | |
| return "unknown" | |
| def _setup_openrouter(self): | |
| """Setup OpenRouter client""" | |
| from openai import OpenAI | |
| import httpx | |
| # OpenRouter uses OpenAI-compatible API | |
| client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=os.getenv('OPENROUTER_API_KEY'), | |
| timeout=httpx.Timeout(300.0, connect=10.0), # Same timeout as OpenAI | |
| default_headers={ | |
| "HTTP-Referer": "https://github.com/yourusername/validator", | |
| "X-Title": "Math Validator" | |
| } | |
| ) | |
| return client | |
| def load_data(self): | |
| """Load and normalize data based on file format""" | |
| if self.file_format == "HLE_B3": | |
| # Load HLE_Verified_B3 format | |
| self.df = pd.read_excel(self.excel_file, sheet_name='Data') | |
| # Normalize column names | |
| self.df['task_name'] = self.df.get('id', '') | |
| self.df['answer type'] = self.df.get('answer_type', 'exactMatch') | |
| # Create image mapping from file_url column (question images) | |
| self.image_mapping = {} | |
| if 'file_url' in self.df.columns: | |
| for idx, row in self.df.iterrows(): | |
| if pd.notna(row.get('file_url')) and pd.notna(row.get('id')): | |
| self.image_mapping[row['id']] = row['file_url'] | |
| print(f"Loaded {len(self.image_mapping)} question images from file_url column") | |
| # Also load rationale images if needed (these are for rationales, not questions) | |
| try: | |
| rationale_images = pd.read_excel(self.excel_file, sheet_name='rationale_images') | |
| # Don't overwrite question images with rationale images | |
| rationale_mapping = dict(zip(rationale_images['ID'], rationale_images['gcp'])) | |
| print(f"Found {len(rationale_mapping)} rationale images (not used for questions)") | |
| except: | |
| pass | |
| elif self.file_format == "HLE_335": | |
| # Load HLE_335 format | |
| self.df = pd.read_excel(self.excel_file, sheet_name='Data') | |
| self.image_mapping = {} | |
| else: | |
| # Generic format - assume Data sheet exists | |
| self.df = pd.read_excel(self.excel_file, sheet_name='Data') | |
| self.image_mapping = {} | |
| # Filter for math questions but KEEP ORIGINAL INDICES | |
| if 'raw_subject' in self.df.columns: | |
| math_filter = self.df['raw_subject'].str.lower().str.contains( | |
| 'math|statistic|calculus|algebra|geometry|trigonometry', | |
| na=False, regex=True | |
| ) | |
| # Keep original indices by not resetting them | |
| self.df = self.df[math_filter] # Don't use .copy() with reset indices | |
| # Add result columns | |
| self.df['model_answer_file'] = '' | |
| self.df['answer_match'] = '' | |
| self.df['latex_file'] = '' | |
| self.df['quality_rating'] = '' | |
| self.df['difficulty_level'] = '' | |
| self.df['quality_comment'] = '' | |
| print(f"Loaded {len(self.df)} math/statistics questions from {self.file_format} format") | |
| return self.df | |
| def _get_image_for_question(self, row) -> Optional[str]: | |
| """Get image URL or path for a question if needed""" | |
| if self.include_images == "never": | |
| return None | |
| # Check if question has an image reference | |
| question_id = row.get('id') or row.get('task_name') | |
| question_text = str(row.get('question', '')).lower() | |
| # Check if question mentions an image | |
| has_image_reference = any(keyword in question_text for keyword in [ | |
| "image", "figure", "diagram", "picture", "attached", | |
| "graph", "plot", "shown", "below", "above" | |
| ]) | |
| if self.include_images == "always" or ( | |
| self.include_images == "when_needed" and has_image_reference | |
| ): | |
| # First check file_url column directly (primary source for question images) | |
| if 'file_url' in row and pd.notna(row['file_url']): | |
| return row['file_url'] | |
| # Then try to get image from mapping | |
| if question_id in self.image_mapping: | |
| return self.image_mapping[question_id] | |
| # Finally check for generic image column | |
| if 'image' in row and pd.notna(row['image']): | |
| return row['image'] | |
| # Log warning if image was expected but not found | |
| if has_image_reference: | |
| original_idx = row.name if hasattr(row, 'name') else 'unknown' | |
| print(f" [WARNING] Question {original_idx} mentions image but none found (ID: {question_id[:20]}...)") | |
| return None | |
| def _encode_image(self, image_url: str) -> Optional[str]: | |
| """Download and encode image as base64""" | |
| try: | |
| response = requests.get(image_url, timeout=10) | |
| if response.status_code == 200: | |
| return base64.b64encode(response.content).decode('utf-8') | |
| except: | |
| pass | |
| return None | |
| def get_model_answer(self, question: str, image_url: Optional[str] = None, attempt: int = 1) -> Optional[str]: | |
| """Get answer from model with optional image support""" | |
| try: | |
| messages = [ | |
| {"role": "system", "content": self.system_prompt_answer} | |
| ] | |
| # Build user message with optional image | |
| if image_url and self.provider == "openai": | |
| # OpenAI vision format | |
| user_content = [ | |
| {"type": "text", "text": question} | |
| ] | |
| if image_url.startswith('http'): | |
| user_content.append({ | |
| "type": "image_url", | |
| "image_url": {"url": image_url} | |
| }) | |
| messages.append({"role": "user", "content": user_content}) | |
| else: | |
| # Text-only or OpenRouter (handle differently if needed) | |
| messages.append({"role": "user", "content": question}) | |
| # Make API call | |
| # Check the original model name (before prefixing) for special handling | |
| # Handle case where solver_model_input might not be set | |
| if hasattr(self, 'solver_model_input'): | |
| original_model = self.solver_model_input or self.model | |
| else: | |
| original_model = self.model | |
| if original_model in ["o3-mini", "gpt-5", "gpt-5-mini", "gpt-5-nano"]: | |
| # Use higher token limit for GPT-5 and o3 models to allow for reasoning | |
| if original_model == "o3-mini": | |
| max_tokens = 10000 | |
| elif original_model in ["gpt-5", "gpt-5-mini", "gpt-5-nano"]: | |
| max_tokens = 8000 # Increased for reasoning + answer | |
| else: | |
| max_tokens = 3000 | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=messages, | |
| max_completion_tokens=max_tokens | |
| ) | |
| else: | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=messages, | |
| temperature=0.1, | |
| max_tokens=2000 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "timeout" in error_msg.lower(): | |
| print(f" [TIMEOUT] Timeout getting model answer (attempt {attempt}/3)") | |
| else: | |
| print(f" [ERROR] Error getting model answer (attempt {attempt}): {e}") | |
| if attempt < 3: | |
| time.sleep(2 ** attempt) | |
| return self.get_model_answer(question, image_url, attempt + 1) | |
| print(f" [ERROR] Failed after 3 attempts") | |
| return None | |
| def generate_reconciliation_latex(self, question: str, model_answer: str, | |
| reference_answer: str, rationale: str = None, attempt: int = 1) -> str: | |
| """Generate LaTeX reconciliation document for mismatched answers""" | |
| prompt = f"""Compare and reconcile these two answers to the following problem. | |
| PROBLEM: | |
| {question} | |
| MODEL'S ANSWER: | |
| {model_answer} | |
| REFERENCE ANSWER: | |
| {reference_answer} | |
| REFERENCE RATIONALE: | |
| {rationale if pd.notna(rationale) else "Not provided"} | |
| Please create a complete LaTeX document that: | |
| 1. States the problem | |
| 2. Shows the model's approach and solution | |
| 3. Shows the reference approach and solution | |
| 4. Analyzes where any differences or errors might occur | |
| 5. Provides your assessment of which answer is correct and why | |
| The document should be properly formatted with sections and mathematical notation. | |
| Begin with \\documentclass and end with \\end{{document}}.""" | |
| try: | |
| # Handle GPT-5 models parameter differences | |
| messages = [ | |
| {"role": "system", "content": self.system_prompt_reconcile}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| # Use the configured reconciliation model | |
| reconciliation_model = self.reconciliation_model | |
| # Check the original model name (before prefixing) for special handling | |
| # Handle case where reconciliation_model_input might not be set | |
| if hasattr(self, 'reconciliation_model_input'): | |
| original_recon = self.reconciliation_model_input or reconciliation_model | |
| else: | |
| original_recon = reconciliation_model | |
| # Check if reconciliation model needs special handling | |
| if original_recon in ["gpt-5", "gpt-5-mini", "gpt-5-nano"]: | |
| # GPT-5 models don't support temperature | |
| response = self.client.chat.completions.create( | |
| model=reconciliation_model, | |
| messages=messages, | |
| max_completion_tokens=8000 # Allow longer for reconciliation | |
| ) | |
| elif original_recon in ["o3-mini"]: | |
| response = self.client.chat.completions.create( | |
| model=reconciliation_model, | |
| messages=messages, | |
| max_completion_tokens=10000 | |
| ) | |
| else: | |
| # Standard models (gpt-4o, claude, etc.) | |
| response = self.client.chat.completions.create( | |
| model=reconciliation_model, | |
| messages=messages, | |
| temperature=0.3, | |
| max_tokens=3000 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "timeout" in error_msg.lower(): | |
| print(f" [TIMEOUT] Timeout generating reconciliation (attempt {attempt}/3)") | |
| else: | |
| print(f" [ERROR] Error generating reconciliation (attempt {attempt}): {e}") | |
| if attempt < 3: | |
| time.sleep(2 ** attempt) # Exponential backoff | |
| return self.generate_reconciliation_latex(question, model_answer, reference_answer, rationale, attempt + 1) | |
| print(f" [ERROR] Failed to generate reconciliation after 3 attempts") | |
| return None | |
| def process_questions(self, start_idx: int = 0, batch_size: int = 5): | |
| """Process questions with progress tracking""" | |
| total = len(self.df) | |
| with tqdm(total=total, desc="Overall Progress", position=0, leave=True) as pbar_main: | |
| pbar_main.update(start_idx) | |
| for i in range(start_idx, total, batch_size): | |
| batch_end = min(i + batch_size, total) | |
| print(f"\n{'='*60}") | |
| print(f"Processing batch: questions {i+1} to {batch_end} of {total}") | |
| print(f"Using {self.provider} with model {self.model}") | |
| print(f"{'='*60}") | |
| batch_size_actual = batch_end - i | |
| with tqdm(total=batch_size_actual * 3, desc="Current Batch", position=1, leave=False) as pbar_batch: | |
| for idx in range(i, batch_end): | |
| row = self.df.iloc[idx] | |
| original_idx = self.df.index[idx] | |
| # Get question and check for image | |
| question = row['question'] | |
| image_url = self._get_image_for_question(row) | |
| if image_url: | |
| print(f" Including image for question {original_idx}") | |
| # Get model answer | |
| print(f" Question {original_idx}: Getting answer from {self.model}...") | |
| model_answer = self.get_model_answer(question, image_url) | |
| if model_answer: | |
| print(f" [OK] Got answer ({len(model_answer)} chars)") | |
| else: | |
| print(f" [FAIL] Failed to get answer") | |
| pbar_batch.update(1) | |
| if model_answer: | |
| # Save model answer to file | |
| question_id = f"q_{original_idx:04d}" | |
| answer_filename = f"{question_id}_answer.txt" | |
| answer_path = os.path.join(self.answers_dir, answer_filename) | |
| with open(answer_path, 'w', encoding='utf-8') as f: | |
| f.write(f"Question: {question}\n\n") | |
| f.write(f"Model Answer: {model_answer}\n") | |
| self.df.at[original_idx, 'model_answer_file'] = answer_filename | |
| # Check if answer matches reference | |
| reference_answer = str(row.get('correct_answer', row.get('answer', ''))) | |
| # Simple string matching (could be enhanced) | |
| model_norm = str(model_answer).strip().lower() | |
| ref_norm = str(reference_answer).strip().lower() | |
| # Check for exact match or numerical equivalence | |
| match = (model_norm == ref_norm) | |
| if not match and reference_answer: | |
| # Try extracting numbers for comparison | |
| import re | |
| model_nums = re.findall(r'-?\d+\.?\d*', model_norm) | |
| ref_nums = re.findall(r'-?\d+\.?\d*', ref_norm) | |
| if model_nums and ref_nums: | |
| match = (model_nums[0] == ref_nums[0]) | |
| self.df.at[original_idx, 'answer_match'] = 'Yes' if match else 'No' | |
| # Print match result for GUI tracking | |
| if match: | |
| print(f" [MATCH] Answer matches reference") | |
| else: | |
| print(f" [MISMATCH] Answer differs from reference") | |
| # Generate LaTeX reconciliation if mismatch | |
| if not match and reference_answer: | |
| print(f" Generating reconciliation for question {original_idx}") | |
| rationale = row.get('rationale', '') | |
| latex_doc = self.generate_reconciliation_latex( | |
| question, model_answer, reference_answer, rationale | |
| ) | |
| # Only save LaTeX if generation was successful | |
| if latex_doc: | |
| latex_filename = f"{question_id}_reconciliation.tex" | |
| latex_path = os.path.join(self.latex_dir, latex_filename) | |
| with open(latex_path, 'w', encoding='utf-8') as f: | |
| f.write(latex_doc) | |
| self.df.at[original_idx, 'latex_file'] = latex_filename | |
| # Compile LaTeX if requested | |
| if self.compile_latex: | |
| # Try async compilation first (better on Linux/HF Spaces) | |
| try: | |
| from latex_compiler import compile_latex_async, is_linux | |
| if is_linux(): | |
| # Async compilation on Linux - doesn't block | |
| compile_latex_async( | |
| latex_path, | |
| self.latex_dir, | |
| callback=lambda s, p, e: None # Silent callback | |
| ) | |
| print(f" [PDF] Compiling in background: {latex_filename}") | |
| else: | |
| # Fallback to synchronous on Windows | |
| import subprocess | |
| pdf_path = latex_path.replace('.tex', '.pdf') | |
| result = subprocess.run( | |
| ['pdflatex', '-interaction=nonstopmode', '-output-directory', self.latex_dir, latex_path], | |
| capture_output=True, | |
| timeout=30 | |
| ) | |
| if os.path.exists(pdf_path): | |
| print(f" [OK] Compiled to PDF: {os.path.basename(pdf_path)}") | |
| except ImportError: | |
| # latex_compiler.py not available, use old method | |
| try: | |
| import subprocess | |
| pdf_path = latex_path.replace('.tex', '.pdf') | |
| result = subprocess.run( | |
| ['pdflatex', '-interaction=nonstopmode', '-output-directory', self.latex_dir, latex_path], | |
| capture_output=True, | |
| timeout=30 | |
| ) | |
| if os.path.exists(pdf_path): | |
| print(f" [OK] Compiled to PDF: {os.path.basename(pdf_path)}") | |
| except Exception as e: | |
| print(f" Warning: Could not compile LaTeX: {e}") | |
| except Exception as e: | |
| print(f" Warning: Could not compile LaTeX: {e}") | |
| else: | |
| print(f" Failed to generate reconciliation after retries") | |
| self.df.at[original_idx, 'latex_file'] = 'GENERATION_ERROR' | |
| pbar_batch.update(2) | |
| else: | |
| self.df.at[original_idx, 'model_answer_file'] = 'ERROR' | |
| self.df.at[original_idx, 'answer_match'] = 'ERROR' | |
| pbar_batch.update(2) | |
| pbar_main.update(1) | |
| time.sleep(0.5) # Rate limiting | |
| self.save_results() | |
| print(f"\nBatch complete. Progress saved to {self.output_file}") | |
| if batch_end < total: | |
| time.sleep(5) | |
| def save_results(self): | |
| """Save results back to Excel""" | |
| with pd.ExcelWriter(self.output_file, engine='openpyxl') as writer: | |
| original = pd.ExcelFile(self.excel_file) | |
| for sheet_name in original.sheet_names: | |
| if sheet_name == 'Data': | |
| original_df = pd.read_excel(self.excel_file, sheet_name='Data') | |
| # Update only processed rows | |
| for idx in self.df.index: | |
| for col in ['model_answer_file', 'answer_match', 'latex_file', | |
| 'quality_rating', 'difficulty_level', 'quality_comment']: | |
| if col in self.df.columns: | |
| original_df.at[idx, col] = self.df.at[idx, col] | |
| original_df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| else: | |
| df_other = pd.read_excel(self.excel_file, sheet_name=sheet_name) | |
| df_other.to_excel(writer, sheet_name=sheet_name, index=False) | |
| def run(self): | |
| """Main execution""" | |
| # Set default output file if not already set | |
| if not self.output_file: | |
| from datetime import datetime | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| base_name = os.path.basename(self.excel_file).replace('.xlsx', '') | |
| self.output_file = f"{base_name}_validated_{timestamp}.xlsx" | |
| print(f"Starting Universal Math Validator") | |
| print(f" File: {self.excel_file}") | |
| print(f" Format: {self.file_format}") | |
| print(f" Provider: {self.provider}") | |
| print(f" Model: {self.model}") | |
| print(f" Image handling: {self.include_images}") | |
| print(f" Output: {self.output_file}") | |
| print("=" * 60) | |
| self.load_data() | |
| self.process_questions() | |
| # Calculate and display summary statistics | |
| if 'answer_match' in self.df.columns: | |
| total = len(self.df) | |
| correct = (self.df['answer_match'] == 'Yes').sum() | |
| incorrect = (self.df['answer_match'] == 'No').sum() | |
| errors = (self.df['answer_match'] == 'ERROR').sum() | |
| print("\n" + "="*60) | |
| print("VALIDATION COMPLETE") | |
| print("="*60) | |
| print(f"\nTotal questions processed: {total}") | |
| print(f"Correct answers: {correct} ({correct/total*100:.1f}%)") | |
| print(f"Incorrect answers: {incorrect} ({incorrect/total*100:.1f}%)") | |
| if errors > 0: | |
| print(f"Errors: {errors}") | |
| # Count LaTeX files generated | |
| latex_count = (self.df['latex_file'] != '').sum() | |
| if latex_count > 0: | |
| print(f"\nLaTeX reconciliation documents generated: {latex_count}") | |
| print(f"Location: {self.latex_dir}") | |
| print(f"\nResults saved to: {self.output_file}") | |
| print(f"Model answers saved to: {self.answers_dir}") | |
| else: | |
| print("\nValidation Complete!") | |
| print(f"Results saved to: {self.output_file}") | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Universal Math Question Validator') | |
| parser.add_argument('file', help='Excel file to process') | |
| parser.add_argument('--provider', choices=['openai', 'openrouter'], default='openai', | |
| help='API provider to use') | |
| parser.add_argument('--model', help='Model for solving questions (default: o3-mini)') | |
| parser.add_argument('--reconciliation-model', help='Model for reconciliation (default: gpt-4o)') | |
| parser.add_argument('--images', choices=['always', 'never', 'when_needed'], | |
| default='when_needed', help='When to include images') | |
| parser.add_argument('--start', type=int, default=0, help='Start from question index') | |
| parser.add_argument('--end', type=int, default=None, help='End at question index (for parallel processing)') | |
| parser.add_argument('--batch-size', type=int, default=5, help='Number of questions per batch') | |
| parser.add_argument('--output', type=str, default=None, help='Output filename (default: auto-generated)') | |
| parser.add_argument('--compile-latex', action='store_true', help='Compile LaTeX files to PDF') | |
| args = parser.parse_args() | |
| validator = UniversalMathValidator( | |
| excel_file=args.file, | |
| provider=args.provider, | |
| include_images=args.images, | |
| solver_model=args.model, | |
| reconciliation_model=args.reconciliation_model | |
| ) | |
| # Set output filename if provided | |
| if args.output: | |
| validator.output_file = args.output | |
| else: | |
| # Generate default filename with timestamp | |
| from datetime import datetime | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| base_name = os.path.basename(args.file).replace('.xlsx', '') | |
| if args.start > 0 or args.end: | |
| range_str = f"_q{args.start+1}_q{args.end}" if args.end else f"_from_q{args.start+1}" | |
| else: | |
| range_str = "" | |
| validator.output_file = f"{base_name}_validated_{timestamp}{range_str}.xlsx" | |
| # Set LaTeX compilation flag | |
| validator.compile_latex = args.compile_latex | |
| # Handle parallel processing by limiting range | |
| if args.end: | |
| validator.load_data() | |
| # Filter to specific range for parallel processing | |
| validator.df = validator.df.iloc[args.start:args.end] | |
| validator.process_questions(start_idx=0, batch_size=args.batch_size) | |
| else: | |
| validator.run() |