import pandas as pd import os from dotenv import load_dotenv import time from typing import Dict, Any, Optional, List import re from datetime import datetime import json from tqdm import tqdm import base64 import requests from io import BytesIO load_dotenv() class UniversalMathValidator: """Universal validator that can handle different Excel formats and API providers""" def __init__(self, excel_file: str, provider: str = "openai", include_images: str = "when_needed", solver_model: str = None, reconciliation_model: str = None): """ Initialize validator Args: excel_file: Path to Excel file provider: "openai" or "openrouter" include_images: "always", "never", or "when_needed" solver_model: Model for solving questions reconciliation_model: Model for reconciliation """ self.excel_file = excel_file self.include_images = include_images # Determine provider based on models # If any model requires OpenRouter, use OpenRouter for everything openrouter_prefixes = ["anthropic/", "x-ai/", "google/", "meta-llama/", "mistral/", "openai/"] openai_models = ["o3-mini", "gpt-4o", "gpt-5", "gpt-5-mini", "gpt-5-nano", "gpt-4-turbo"] # Check if any model needs OpenRouter (has a prefix or is not an OpenAI model) solver_needs_or = solver_model and ( any(solver_model.startswith(p) for p in openrouter_prefixes) or solver_model not in openai_models ) recon_needs_or = reconciliation_model and ( any(reconciliation_model.startswith(p) for p in openrouter_prefixes) or reconciliation_model not in openai_models ) needs_openrouter = solver_needs_or or recon_needs_or # Override provider if OpenRouter is needed if needs_openrouter: self.provider = "openrouter" if provider == "openai": print("Note: Using OpenRouter for all models since non-OpenAI model specified") else: self.provider = provider # Store original model names for later prefixing if needed self.solver_model_input = solver_model self.reconciliation_model_input = reconciliation_model self.df = None self.output_file = None # Will be set later self.compile_latex = False # Will be set from args # Detect file format self.file_format = self._detect_format() # Create directories for outputs self.base_dir = "validation_results" self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.run_dir = os.path.join(self.base_dir, f"run_{self.timestamp}") self.latex_dir = os.path.join(self.run_dir, "latex_documents") self.answers_dir = os.path.join(self.run_dir, "model_answers") os.makedirs(self.latex_dir, exist_ok=True) os.makedirs(self.answers_dir, exist_ok=True) # Initialize API client if self.provider == "openai": from openai import OpenAI import httpx # Set 5 minute timeout for GPT-5 models which can be very slow self.client = OpenAI( api_key=os.getenv('OPENAI_API_KEY'), timeout=httpx.Timeout(300.0, connect=10.0) # 300 second timeout, 10 second connect ) # Default models for OpenAI self.model = self.solver_model_input or "o3-mini" self.reconciliation_model = self.reconciliation_model_input or "gpt-4o" self.assessment_model = "gpt-4o" elif self.provider == "openrouter": import httpx self.client = self._setup_openrouter() # Helper to add openai/ prefix if needed def format_for_openrouter(model_name): if not model_name: return None # If already has a prefix, use as-is if "/" in model_name: return model_name # If it's an OpenAI model, add prefix openai_models = ["o3-mini", "gpt-4o", "gpt-5", "gpt-5-mini", "gpt-5-nano", "gpt-4-turbo"] if model_name in openai_models: return f"openai/{model_name}" # Otherwise assume it needs no prefix (for backwards compatibility) return model_name # Format models for OpenRouter self.model = format_for_openrouter(self.solver_model_input) or "openai/o3-mini" self.reconciliation_model = format_for_openrouter(self.reconciliation_model_input) or "openai/gpt-4o" self.assessment_model = "openai/gpt-4o" # System prompts self.system_prompt_answer = """You are a highly skilled mathematics graduate student. Solve the following problem step by step. IMPORTANT: First show your complete reasoning and work. Then clearly state the final answer. Your response should include both the reasoning process and the final answer.""" self.system_prompt_assess = """You are an experienced mathematics educator. Evaluate mathematical questions.""" self.system_prompt_reconcile = """You are a graduate student who produces detailed justifications in LaTeX format. You excel at analyzing mathematical solutions and identifying potential errors. Your output should be a complete LaTeX document that can be compiled directly.""" # Create manifest self.manifest_file = os.path.join(self.run_dir, "manifest.json") self.manifest = { "timestamp": self.timestamp, "source_file": excel_file, "file_format": self.file_format, "provider": provider, "model": self.model, "questions": {} } def _detect_format(self) -> str: """Detect which format the Excel file uses""" xl = pd.ExcelFile(self.excel_file) # Check for specific sheets if 'rationale_images' in xl.sheet_names: return "HLE_B3" # HLE_Verified_B3 format elif 'model_responses' in xl.sheet_names: return "HLE_335" # HLE_335 format else: return "unknown" def _setup_openrouter(self): """Setup OpenRouter client""" from openai import OpenAI import httpx # OpenRouter uses OpenAI-compatible API client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=os.getenv('OPENROUTER_API_KEY'), timeout=httpx.Timeout(300.0, connect=10.0), # Same timeout as OpenAI default_headers={ "HTTP-Referer": "https://github.com/yourusername/validator", "X-Title": "Math Validator" } ) return client def load_data(self): """Load and normalize data based on file format""" if self.file_format == "HLE_B3": # Load HLE_Verified_B3 format self.df = pd.read_excel(self.excel_file, sheet_name='Data') # Normalize column names self.df['task_name'] = self.df.get('id', '') self.df['answer type'] = self.df.get('answer_type', 'exactMatch') # Create image mapping from file_url column (question images) self.image_mapping = {} if 'file_url' in self.df.columns: for idx, row in self.df.iterrows(): if pd.notna(row.get('file_url')) and pd.notna(row.get('id')): self.image_mapping[row['id']] = row['file_url'] print(f"Loaded {len(self.image_mapping)} question images from file_url column") # Also load rationale images if needed (these are for rationales, not questions) try: rationale_images = pd.read_excel(self.excel_file, sheet_name='rationale_images') # Don't overwrite question images with rationale images rationale_mapping = dict(zip(rationale_images['ID'], rationale_images['gcp'])) print(f"Found {len(rationale_mapping)} rationale images (not used for questions)") except: pass elif self.file_format == "HLE_335": # Load HLE_335 format self.df = pd.read_excel(self.excel_file, sheet_name='Data') self.image_mapping = {} else: # Generic format - assume Data sheet exists self.df = pd.read_excel(self.excel_file, sheet_name='Data') self.image_mapping = {} # Filter for math questions but KEEP ORIGINAL INDICES if 'raw_subject' in self.df.columns: math_filter = self.df['raw_subject'].str.lower().str.contains( 'math|statistic|calculus|algebra|geometry|trigonometry', na=False, regex=True ) # Keep original indices by not resetting them self.df = self.df[math_filter] # Don't use .copy() with reset indices # Add result columns self.df['model_answer_file'] = '' self.df['answer_match'] = '' self.df['latex_file'] = '' self.df['quality_rating'] = '' self.df['difficulty_level'] = '' self.df['quality_comment'] = '' print(f"Loaded {len(self.df)} math/statistics questions from {self.file_format} format") return self.df def _get_image_for_question(self, row) -> Optional[str]: """Get image URL or path for a question if needed""" if self.include_images == "never": return None # Check if question has an image reference question_id = row.get('id') or row.get('task_name') question_text = str(row.get('question', '')).lower() # Check if question mentions an image has_image_reference = any(keyword in question_text for keyword in [ "image", "figure", "diagram", "picture", "attached", "graph", "plot", "shown", "below", "above" ]) if self.include_images == "always" or ( self.include_images == "when_needed" and has_image_reference ): # First check file_url column directly (primary source for question images) if 'file_url' in row and pd.notna(row['file_url']): return row['file_url'] # Then try to get image from mapping if question_id in self.image_mapping: return self.image_mapping[question_id] # Finally check for generic image column if 'image' in row and pd.notna(row['image']): return row['image'] # Log warning if image was expected but not found if has_image_reference: original_idx = row.name if hasattr(row, 'name') else 'unknown' print(f" [WARNING] Question {original_idx} mentions image but none found (ID: {question_id[:20]}...)") return None def _encode_image(self, image_url: str) -> Optional[str]: """Download and encode image as base64""" try: response = requests.get(image_url, timeout=10) if response.status_code == 200: return base64.b64encode(response.content).decode('utf-8') except: pass return None def get_model_answer(self, question: str, image_url: Optional[str] = None, attempt: int = 1) -> Optional[str]: """Get answer from model with optional image support""" try: messages = [ {"role": "system", "content": self.system_prompt_answer} ] # Build user message with optional image if image_url and self.provider == "openai": # OpenAI vision format user_content = [ {"type": "text", "text": question} ] if image_url.startswith('http'): user_content.append({ "type": "image_url", "image_url": {"url": image_url} }) messages.append({"role": "user", "content": user_content}) else: # Text-only or OpenRouter (handle differently if needed) messages.append({"role": "user", "content": question}) # Make API call # Check the original model name (before prefixing) for special handling # Handle case where solver_model_input might not be set if hasattr(self, 'solver_model_input'): original_model = self.solver_model_input or self.model else: original_model = self.model if original_model in ["o3-mini", "gpt-5", "gpt-5-mini", "gpt-5-nano"]: # Use higher token limit for GPT-5 and o3 models to allow for reasoning if original_model == "o3-mini": max_tokens = 10000 elif original_model in ["gpt-5", "gpt-5-mini", "gpt-5-nano"]: max_tokens = 8000 # Increased for reasoning + answer else: max_tokens = 3000 response = self.client.chat.completions.create( model=self.model, messages=messages, max_completion_tokens=max_tokens ) else: response = self.client.chat.completions.create( model=self.model, messages=messages, temperature=0.1, max_tokens=2000 ) return response.choices[0].message.content.strip() except Exception as e: error_msg = str(e) if "timeout" in error_msg.lower(): print(f" [TIMEOUT] Timeout getting model answer (attempt {attempt}/3)") else: print(f" [ERROR] Error getting model answer (attempt {attempt}): {e}") if attempt < 3: time.sleep(2 ** attempt) return self.get_model_answer(question, image_url, attempt + 1) print(f" [ERROR] Failed after 3 attempts") return None def generate_reconciliation_latex(self, question: str, model_answer: str, reference_answer: str, rationale: str = None, attempt: int = 1) -> str: """Generate LaTeX reconciliation document for mismatched answers""" prompt = f"""Compare and reconcile these two answers to the following problem. PROBLEM: {question} MODEL'S ANSWER: {model_answer} REFERENCE ANSWER: {reference_answer} REFERENCE RATIONALE: {rationale if pd.notna(rationale) else "Not provided"} Please create a complete LaTeX document that: 1. States the problem 2. Shows the model's approach and solution 3. Shows the reference approach and solution 4. Analyzes where any differences or errors might occur 5. Provides your assessment of which answer is correct and why The document should be properly formatted with sections and mathematical notation. Begin with \\documentclass and end with \\end{{document}}.""" try: # Handle GPT-5 models parameter differences messages = [ {"role": "system", "content": self.system_prompt_reconcile}, {"role": "user", "content": prompt} ] # Use the configured reconciliation model reconciliation_model = self.reconciliation_model # Check the original model name (before prefixing) for special handling # Handle case where reconciliation_model_input might not be set if hasattr(self, 'reconciliation_model_input'): original_recon = self.reconciliation_model_input or reconciliation_model else: original_recon = reconciliation_model # Check if reconciliation model needs special handling if original_recon in ["gpt-5", "gpt-5-mini", "gpt-5-nano"]: # GPT-5 models don't support temperature response = self.client.chat.completions.create( model=reconciliation_model, messages=messages, max_completion_tokens=8000 # Allow longer for reconciliation ) elif original_recon in ["o3-mini"]: response = self.client.chat.completions.create( model=reconciliation_model, messages=messages, max_completion_tokens=10000 ) else: # Standard models (gpt-4o, claude, etc.) response = self.client.chat.completions.create( model=reconciliation_model, messages=messages, temperature=0.3, max_tokens=3000 ) return response.choices[0].message.content.strip() except Exception as e: error_msg = str(e) if "timeout" in error_msg.lower(): print(f" [TIMEOUT] Timeout generating reconciliation (attempt {attempt}/3)") else: print(f" [ERROR] Error generating reconciliation (attempt {attempt}): {e}") if attempt < 3: time.sleep(2 ** attempt) # Exponential backoff return self.generate_reconciliation_latex(question, model_answer, reference_answer, rationale, attempt + 1) print(f" [ERROR] Failed to generate reconciliation after 3 attempts") return None def process_questions(self, start_idx: int = 0, batch_size: int = 5): """Process questions with progress tracking""" total = len(self.df) with tqdm(total=total, desc="Overall Progress", position=0, leave=True) as pbar_main: pbar_main.update(start_idx) for i in range(start_idx, total, batch_size): batch_end = min(i + batch_size, total) print(f"\n{'='*60}") print(f"Processing batch: questions {i+1} to {batch_end} of {total}") print(f"Using {self.provider} with model {self.model}") print(f"{'='*60}") batch_size_actual = batch_end - i with tqdm(total=batch_size_actual * 3, desc="Current Batch", position=1, leave=False) as pbar_batch: for idx in range(i, batch_end): row = self.df.iloc[idx] original_idx = self.df.index[idx] # Get question and check for image question = row['question'] image_url = self._get_image_for_question(row) if image_url: print(f" Including image for question {original_idx}") # Get model answer print(f" Question {original_idx}: Getting answer from {self.model}...") model_answer = self.get_model_answer(question, image_url) if model_answer: print(f" [OK] Got answer ({len(model_answer)} chars)") else: print(f" [FAIL] Failed to get answer") pbar_batch.update(1) if model_answer: # Save model answer to file question_id = f"q_{original_idx:04d}" answer_filename = f"{question_id}_answer.txt" answer_path = os.path.join(self.answers_dir, answer_filename) with open(answer_path, 'w', encoding='utf-8') as f: f.write(f"Question: {question}\n\n") f.write(f"Model Answer: {model_answer}\n") self.df.at[original_idx, 'model_answer_file'] = answer_filename # Check if answer matches reference reference_answer = str(row.get('correct_answer', row.get('answer', ''))) # Simple string matching (could be enhanced) model_norm = str(model_answer).strip().lower() ref_norm = str(reference_answer).strip().lower() # Check for exact match or numerical equivalence match = (model_norm == ref_norm) if not match and reference_answer: # Try extracting numbers for comparison import re model_nums = re.findall(r'-?\d+\.?\d*', model_norm) ref_nums = re.findall(r'-?\d+\.?\d*', ref_norm) if model_nums and ref_nums: match = (model_nums[0] == ref_nums[0]) self.df.at[original_idx, 'answer_match'] = 'Yes' if match else 'No' # Print match result for GUI tracking if match: print(f" [MATCH] Answer matches reference") else: print(f" [MISMATCH] Answer differs from reference") # Generate LaTeX reconciliation if mismatch if not match and reference_answer: print(f" Generating reconciliation for question {original_idx}") rationale = row.get('rationale', '') latex_doc = self.generate_reconciliation_latex( question, model_answer, reference_answer, rationale ) # Only save LaTeX if generation was successful if latex_doc: latex_filename = f"{question_id}_reconciliation.tex" latex_path = os.path.join(self.latex_dir, latex_filename) with open(latex_path, 'w', encoding='utf-8') as f: f.write(latex_doc) self.df.at[original_idx, 'latex_file'] = latex_filename # Compile LaTeX if requested if self.compile_latex: # Try async compilation first (better on Linux/HF Spaces) try: from latex_compiler import compile_latex_async, is_linux if is_linux(): # Async compilation on Linux - doesn't block compile_latex_async( latex_path, self.latex_dir, callback=lambda s, p, e: None # Silent callback ) print(f" [PDF] Compiling in background: {latex_filename}") else: # Fallback to synchronous on Windows import subprocess pdf_path = latex_path.replace('.tex', '.pdf') result = subprocess.run( ['pdflatex', '-interaction=nonstopmode', '-output-directory', self.latex_dir, latex_path], capture_output=True, timeout=30 ) if os.path.exists(pdf_path): print(f" [OK] Compiled to PDF: {os.path.basename(pdf_path)}") except ImportError: # latex_compiler.py not available, use old method try: import subprocess pdf_path = latex_path.replace('.tex', '.pdf') result = subprocess.run( ['pdflatex', '-interaction=nonstopmode', '-output-directory', self.latex_dir, latex_path], capture_output=True, timeout=30 ) if os.path.exists(pdf_path): print(f" [OK] Compiled to PDF: {os.path.basename(pdf_path)}") except Exception as e: print(f" Warning: Could not compile LaTeX: {e}") except Exception as e: print(f" Warning: Could not compile LaTeX: {e}") else: print(f" Failed to generate reconciliation after retries") self.df.at[original_idx, 'latex_file'] = 'GENERATION_ERROR' pbar_batch.update(2) else: self.df.at[original_idx, 'model_answer_file'] = 'ERROR' self.df.at[original_idx, 'answer_match'] = 'ERROR' pbar_batch.update(2) pbar_main.update(1) time.sleep(0.5) # Rate limiting self.save_results() print(f"\nBatch complete. Progress saved to {self.output_file}") if batch_end < total: time.sleep(5) def save_results(self): """Save results back to Excel""" with pd.ExcelWriter(self.output_file, engine='openpyxl') as writer: original = pd.ExcelFile(self.excel_file) for sheet_name in original.sheet_names: if sheet_name == 'Data': original_df = pd.read_excel(self.excel_file, sheet_name='Data') # Update only processed rows for idx in self.df.index: for col in ['model_answer_file', 'answer_match', 'latex_file', 'quality_rating', 'difficulty_level', 'quality_comment']: if col in self.df.columns: original_df.at[idx, col] = self.df.at[idx, col] original_df.to_excel(writer, sheet_name=sheet_name, index=False) else: df_other = pd.read_excel(self.excel_file, sheet_name=sheet_name) df_other.to_excel(writer, sheet_name=sheet_name, index=False) def run(self): """Main execution""" # Set default output file if not already set if not self.output_file: from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_name = os.path.basename(self.excel_file).replace('.xlsx', '') self.output_file = f"{base_name}_validated_{timestamp}.xlsx" print(f"Starting Universal Math Validator") print(f" File: {self.excel_file}") print(f" Format: {self.file_format}") print(f" Provider: {self.provider}") print(f" Model: {self.model}") print(f" Image handling: {self.include_images}") print(f" Output: {self.output_file}") print("=" * 60) self.load_data() self.process_questions() # Calculate and display summary statistics if 'answer_match' in self.df.columns: total = len(self.df) correct = (self.df['answer_match'] == 'Yes').sum() incorrect = (self.df['answer_match'] == 'No').sum() errors = (self.df['answer_match'] == 'ERROR').sum() print("\n" + "="*60) print("VALIDATION COMPLETE") print("="*60) print(f"\nTotal questions processed: {total}") print(f"Correct answers: {correct} ({correct/total*100:.1f}%)") print(f"Incorrect answers: {incorrect} ({incorrect/total*100:.1f}%)") if errors > 0: print(f"Errors: {errors}") # Count LaTeX files generated latex_count = (self.df['latex_file'] != '').sum() if latex_count > 0: print(f"\nLaTeX reconciliation documents generated: {latex_count}") print(f"Location: {self.latex_dir}") print(f"\nResults saved to: {self.output_file}") print(f"Model answers saved to: {self.answers_dir}") else: print("\nValidation Complete!") print(f"Results saved to: {self.output_file}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='Universal Math Question Validator') parser.add_argument('file', help='Excel file to process') parser.add_argument('--provider', choices=['openai', 'openrouter'], default='openai', help='API provider to use') parser.add_argument('--model', help='Model for solving questions (default: o3-mini)') parser.add_argument('--reconciliation-model', help='Model for reconciliation (default: gpt-4o)') parser.add_argument('--images', choices=['always', 'never', 'when_needed'], default='when_needed', help='When to include images') parser.add_argument('--start', type=int, default=0, help='Start from question index') parser.add_argument('--end', type=int, default=None, help='End at question index (for parallel processing)') parser.add_argument('--batch-size', type=int, default=5, help='Number of questions per batch') parser.add_argument('--output', type=str, default=None, help='Output filename (default: auto-generated)') parser.add_argument('--compile-latex', action='store_true', help='Compile LaTeX files to PDF') args = parser.parse_args() validator = UniversalMathValidator( excel_file=args.file, provider=args.provider, include_images=args.images, solver_model=args.model, reconciliation_model=args.reconciliation_model ) # Set output filename if provided if args.output: validator.output_file = args.output else: # Generate default filename with timestamp from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_name = os.path.basename(args.file).replace('.xlsx', '') if args.start > 0 or args.end: range_str = f"_q{args.start+1}_q{args.end}" if args.end else f"_from_q{args.start+1}" else: range_str = "" validator.output_file = f"{base_name}_validated_{timestamp}{range_str}.xlsx" # Set LaTeX compilation flag validator.compile_latex = args.compile_latex # Handle parallel processing by limiting range if args.end: validator.load_data() # Filter to specific range for parallel processing validator.df = validator.df.iloc[args.start:args.end] validator.process_questions(start_idx=0, batch_size=args.batch_size) else: validator.run()