|
|
import json |
|
|
import yaml |
|
|
import sympy |
|
|
from sympy.parsing.latex import parse_latex |
|
|
from huggingface_hub import hf_hub_download |
|
|
from pathlib import Path |
|
|
import jsonlines |
|
|
from typing import Dict, List, Any |
|
|
|
|
|
from config import DATASETS, DATA_PROCESSING |
|
|
|
|
|
class MathDataProcessor: |
|
|
def __init__(self): |
|
|
self.processed_data = [] |
|
|
self.dataset_paths = {} |
|
|
self.math_operations = { |
|
|
"differentiation": self._process_differentiation, |
|
|
"integration": self._process_integration, |
|
|
"limits": self._process_limits, |
|
|
"simplification": self._process_simplification, |
|
|
"matrix": self._process_matrix, |
|
|
"probability": self._process_probability, |
|
|
"statistics": self._process_statistics |
|
|
} |
|
|
|
|
|
def download_dataset(self, dataset_name: str) -> Path: |
|
|
"""Download dataset from Hugging Face""" |
|
|
if dataset_name not in DATASETS: |
|
|
raise ValueError(f"Dataset {dataset_name} not defined in configuration") |
|
|
|
|
|
dataset_config = DATASETS[dataset_name] |
|
|
dataset_path = Path(f"data/{dataset_name}") |
|
|
|
|
|
|
|
|
hf_hub_download( |
|
|
repo_id=dataset_config["dataset_name"], |
|
|
filename=f"{dataset_config['split']}.jsonl", |
|
|
local_dir=dataset_path |
|
|
) |
|
|
|
|
|
self.dataset_paths[dataset_name] = dataset_path |
|
|
return dataset_path |
|
|
|
|
|
def normalize_equation(self, equation: str) -> str: |
|
|
"""Normalize mathematical equations using sympy""" |
|
|
try: |
|
|
|
|
|
if "\\" in equation: |
|
|
eq = parse_latex(equation) |
|
|
else: |
|
|
eq = sympy.sympify(equation) |
|
|
return str(eq) |
|
|
except: |
|
|
return equation |
|
|
|
|
|
def process_proof_steps(self, steps: List[str]) -> List[Dict[str, str]]: |
|
|
"""Process proof steps into structured format""" |
|
|
processed_steps = [] |
|
|
|
|
|
for step in steps: |
|
|
try: |
|
|
|
|
|
structured_step = yaml.safe_load(step) |
|
|
if isinstance(structured_step, dict): |
|
|
processed_steps.append(structured_step) |
|
|
else: |
|
|
processed_steps.append({"step": step}) |
|
|
except: |
|
|
processed_steps.append({"step": step}) |
|
|
|
|
|
return processed_steps |
|
|
|
|
|
def _process_differentiation(self, expression: str) -> str: |
|
|
"""Process and validate differentiation operations""" |
|
|
x = sympy.Symbol('x') |
|
|
try: |
|
|
expr = sympy.sympify(expression) |
|
|
derivative = sympy.diff(expr, x) |
|
|
return str(derivative) |
|
|
except: |
|
|
return expression |
|
|
|
|
|
def _process_integration(self, expression: str) -> str: |
|
|
"""Process and validate integration operations""" |
|
|
x = sympy.Symbol('x') |
|
|
try: |
|
|
expr = sympy.sympify(expression) |
|
|
integral = sympy.integrate(expr, x) |
|
|
return str(integral) |
|
|
except: |
|
|
return expression |
|
|
|
|
|
def _process_limits(self, expression: str) -> str: |
|
|
"""Process and validate limit operations""" |
|
|
x = sympy.Symbol('x') |
|
|
try: |
|
|
expr = sympy.sympify(expression) |
|
|
limit = sympy.limit(expr, x, sympy.oo) |
|
|
return str(limit) |
|
|
except: |
|
|
return expression |
|
|
|
|
|
def _process_simplification(self, expression: str) -> str: |
|
|
"""Process and validate expression simplification""" |
|
|
try: |
|
|
expr = sympy.sympify(expression) |
|
|
simplified = sympy.simplify(expr) |
|
|
return str(simplified) |
|
|
except: |
|
|
return expression |
|
|
|
|
|
def _process_matrix(self, matrix_str: str) -> str: |
|
|
"""Process and validate matrix operations""" |
|
|
try: |
|
|
matrix = sympy.Matrix([[float(n) for n in row.split()] |
|
|
for row in matrix_str.split(';')]) |
|
|
return str(matrix) |
|
|
except: |
|
|
return matrix_str |
|
|
|
|
|
def _process_probability(self, problem: str) -> Dict: |
|
|
"""Process probability problems and extract key parameters""" |
|
|
try: |
|
|
|
|
|
if "probability" in problem.lower(): |
|
|
return { |
|
|
"type": "probability", |
|
|
"parameters": self._extract_parameters(problem), |
|
|
"distribution": self._identify_distribution(problem) |
|
|
} |
|
|
return {"type": "unknown"} |
|
|
except: |
|
|
return {"type": "unknown"} |
|
|
|
|
|
def _process_statistics(self, data: str) -> Dict: |
|
|
"""Process statistical data and extract key metrics""" |
|
|
try: |
|
|
|
|
|
if "," in data: |
|
|
numbers = [float(n) for n in data.split(',')] |
|
|
return { |
|
|
"mean": sum(numbers) / len(numbers), |
|
|
"median": sorted(numbers)[len(numbers)//2], |
|
|
"std_dev": self._calculate_std_dev(numbers) |
|
|
} |
|
|
return {"error": "Invalid data format"} |
|
|
except: |
|
|
return {"error": "Processing failed"} |
|
|
|
|
|
def _extract_parameters(self, text: str) -> Dict: |
|
|
"""Extract parameters from mathematical text""" |
|
|
parameters = {} |
|
|
|
|
|
if "=" in text: |
|
|
parts = text.split("=") |
|
|
parameters["equation"] = parts[0].strip() |
|
|
parameters["value"] = parts[1].strip() |
|
|
return parameters |
|
|
|
|
|
def _identify_distribution(self, text: str) -> str: |
|
|
"""Identify probability distribution from text""" |
|
|
distributions = { |
|
|
"binomial": ["binomial", "bernoulli"], |
|
|
"normal": ["normal", "gaussian"], |
|
|
"poisson": ["poisson"], |
|
|
"exponential": ["exponential"] |
|
|
} |
|
|
|
|
|
text_lower = text.lower() |
|
|
for dist, keywords in distributions.items(): |
|
|
if any(keyword in text_lower for keyword in keywords): |
|
|
return dist |
|
|
return "unknown" |
|
|
|
|
|
def _calculate_std_dev(self, numbers: List[float]) -> float: |
|
|
"""Calculate standard deviation""" |
|
|
mean = sum(numbers) / len(numbers) |
|
|
variance = sum((x - mean) ** 2 for x in numbers) / len(numbers) |
|
|
return variance ** 0.5 |
|
|
|
|
|
def process_math_operation(self, operation_type: str, content: str) -> Any: |
|
|
"""Process a specific mathematical operation""" |
|
|
if operation_type in self.math_operations: |
|
|
return self.math_operations[operation_type](content) |
|
|
return content |
|
|
|
|
|
def validate_entry(self, entry: Dict[str, Any]) -> bool: |
|
|
"""Enhanced validation with mathematical checks""" |
|
|
steps = entry.get("steps", []) |
|
|
text = entry.get("question", "") + entry.get("answer", "") |
|
|
|
|
|
|
|
|
if len(steps) < DATA_PROCESSING["validation"]["min_steps"]: |
|
|
return False |
|
|
|
|
|
if len(text) < DATA_PROCESSING["validation"]["min_length"]: |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
if "equation" in entry: |
|
|
sympy.sympify(entry["equation"]) |
|
|
|
|
|
|
|
|
if len(steps) > 1: |
|
|
for i in range(len(steps) - 1): |
|
|
if not self._check_step_continuity(steps[i], steps[i+1]): |
|
|
return False |
|
|
|
|
|
|
|
|
if "proof" in entry: |
|
|
if not self._check_proof_validity(entry["proof"]): |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
except: |
|
|
return False |
|
|
|
|
|
def _check_step_continuity(self, step1: str, step2: str) -> bool: |
|
|
"""Check if mathematical steps are logically connected""" |
|
|
try: |
|
|
|
|
|
if "=" in step1 and "=" in step2: |
|
|
s1 = step1.split("=")[1].strip() |
|
|
s2 = step2.split("=")[0].strip() |
|
|
return s1 == s2 |
|
|
return True |
|
|
except: |
|
|
return False |
|
|
|
|
|
def _check_proof_validity(self, proof: str) -> bool: |
|
|
"""Check if a proof is logically valid""" |
|
|
|
|
|
if "assume" in proof.lower() and "therefore" not in proof.lower(): |
|
|
return False |
|
|
|
|
|
if "contradiction" in proof.lower() and "false" not in proof.lower(): |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def process_dataset(self, dataset_name: str): |
|
|
"""Process a specific dataset according to its configuration""" |
|
|
dataset_path = self.download_dataset(dataset_name) |
|
|
dataset_config = DATASETS[dataset_name] |
|
|
|
|
|
with jsonlines.open(dataset_path / f"{dataset_config['split']}.jsonl") as reader: |
|
|
for entry in reader: |
|
|
processed_entry = {} |
|
|
|
|
|
|
|
|
for field in dataset_config["use_fields"]: |
|
|
value = entry.get(field) |
|
|
if value: |
|
|
if field == "equation": |
|
|
processed_entry[field] = self.normalize_equation(value) |
|
|
elif field == "proof_steps": |
|
|
processed_entry[field] = self.process_proof_steps(value) |
|
|
else: |
|
|
processed_entry[field] = value |
|
|
|
|
|
|
|
|
if self.validate_entry(processed_entry): |
|
|
self.processed_data.append(processed_entry) |
|
|
|
|
|
def save_processed_data(self, output_path: str): |
|
|
"""Save processed data to JSONL format""" |
|
|
with jsonlines.open(output_path, mode='w') as writer: |
|
|
writer.write_all(self.processed_data) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
processor = MathDataProcessor() |
|
|
|
|
|
|
|
|
for dataset in DATASETS.keys(): |
|
|
processor.process_dataset(dataset) |
|
|
|
|
|
|
|
|
output_path = "processed_data/math_expert_data.jsonl" |
|
|
processor.save_processed_data(output_path) |
|
|
|