ChatGCLM-Open / mat-gen.py
umm-dev's picture
Create mat-gen.py (#4)
d3dd256 verified
import random
import json
import math
import time
import re
import sys
import multiprocessing
import os
from tqdm import tqdm
NUM_LINES = 2000000
OUTPUT_FILE = "correct_math_data.jsonl"
MIN_LENGTH = 2
MAX_LENGTH = 8
MIN_NUMBER = 1
MAX_NUMBER = 999
MAX_EXPONENT_BASE = 9
MAX_EXPONENT_POWER = 5
REASONING_CHANCE = 0.8
WORD_FORM_CHANCE = 0.25
BRACKET_CHANCE = 0.5
SENTENCE_FORM_CHANCE = 0.6
MAX_SOLVER_ITERATIONS = 30 # Reduced from 50 for faster timeout
NUM_WORKERS = os.cpu_count() or 1
PROMPT_TEMPLATES = [
"What is {expression}?", "Calculate the value of {expression}.", "Find the result of {expression}.",
"Can you solve {expression}?", "Solve for {expression}.", "What does {expression} equal?", "Compute {expression}.",
"What is the solution to {expression}?", "Give me the answer for {expression}.", "Determine the value of {expression}.",
"Evaluate the expression: {expression}.", "I need the result of {expression}, please."
]
COT_INTRO_TEMPLATES = [
"<think> Let's break down the equation {expression} step by step, following the order of operations (BEDMAS).",
"<think> Okay, to solve {expression}, I'll follow BEDMAS (Brackets, Exponents, Division/Multiplication, Addition/Subtraction).",
"<think> Analyzing {expression}. I need to solve this by applying the correct order of operations.",
"<think> Here's my step-by-step evaluation for {expression}:",
"<think> To get the answer for {expression}, I will use the order of operations.",
"<think> Processing {expression} requires following BEDMAS, let's begin.",
"<think> I will solve {expression} by carefully following the rules of BEDMAS.",
"<think> The expression is {expression}. My plan is to solve it using the order of operations.",
"<think> To solve this, I'll go through Brackets, then Exponents, then Multiplication/Division, and finally Addition/Subtraction for {expression}.",
"<think> Let's start solving {expression}. I'll tackle it one operation at a time based on BEDMAS.",
"<think> Thinking step-by-step for {expression}..."
]
COT_STEP_TEMPLATES = {
"brackets": [
"First, I'll solve the expression inside the brackets: {part}. That equals {result}.",
"Starting with the parentheses, {part} evaluates to {result}.",
"The brackets are the priority. Calculating {part} gives me {result}.",
"The calculation inside the parentheses comes first: {part} becomes {result}.",
"Looking inside the brackets, I see {part}. The result of that is {result}.",
"I'll begin by simplifying the part in the parentheses: {part} is {result}.",
"The first step according to BEDMAS is brackets. So, {part} is solved to {result}.",
"Tackling the parentheses first: {part} simplifies to {result}.",
"Evaluating the bracketed expression {part} yields {result}.",
"My focus is on the brackets first. {part} equals {result}."
],
"exponents": [
"Next, I'll handle the exponents. {part} is {result}.",
"Exponents are next in order. {part} calculates to {result}.",
"Now for the powers: {part} equals {result}.",
"Moving on to exponents, {part} results in {result}.",
"The next priority is exponents. The term {part} becomes {result}.",
"After brackets, I solve for exponents. {part} gives {result}.",
"Now, calculating the power: {part} is equal to {result}.",
"I see an exponent at {part}. This evaluates to {result}.",
"The 'E' in BEDMAS is for exponents, so I'll solve {part} to get {result}.",
"Time to resolve the exponents. {part} is {result}."
],
"multi_div_mod": [
"Now, I'll perform multiplication, division, and modulo from left to right. The first is {part}, which is {result}.",
"Next up is multiplication and division. I see {part}, which gives {result}.",
"Working through multiplication/division from left to right, {part} results in {result}.",
"The next step is to resolve multiplication and division. {part} is {result}.",
"Scanning from left to right for M/D/M, I find {part}. This calculates to {result}.",
"Now for multiplication and division. The operation {part} equals {result}.",
"Moving on, I'll handle the multiplication/division. {part} becomes {result}.",
"The next operations are multiply and divide. I'll solve {part} to get {result}.",
"I will now compute {part}, which results in {result}.",
"Left-to-right, the next multiplication or division is {part}, giving {result}."
],
"add_sub": [
"Finally, I'll do the addition and subtraction from left to right. I have {part}, which equals {result}.",
"Last step is addition and subtraction. {part} becomes {result}.",
"Finishing up with addition/subtraction, {part} evaluates to {result}.",
"The final operations are addition and subtraction. {part} results in {result}.",
"Now for the final calculations, addition and subtraction. {part} is {result}.",
"Working from left to right, the final step is {part}, which is {result}.",
"The last part of BEDMAS is addition and subtraction. {part} gives {result}.",
"To finish, I'll solve {part}, resulting in {result}.",
"Finally, the addition/subtraction part: {part} equals {result}.",
"The last calculation is {part}, and the answer is {result}."
]
}
COT_FINALIZER_TEMPLATES = [
"After all steps, the final answer is {result}.",
"So, the complete result for the expression is {result}.",
"Therefore, the final value is {result}.",
"Bringing it all together, the answer is {result}.",
"The final computation yields {result}.",
"Thus, the expression evaluates to {result}.",
"So the final answer is {result}.",
"After all those steps, we arrive at the answer: {result}.",
"The result of the entire calculation is {result}.",
"In conclusion, the answer is {result}."
]
SIMPLE_COMPLETION_TEMPLATES = [
"The equation {expression} equals {result}.", "The answer is {result}.",
"The result is {result}.", "It equals {result}.", "The final value is {result}.",
"{expression} results in {result}.", "The solution is {result}.",
"The value is {result}.", "After calculation, the answer is {result}.",
"The final result is {result}."
]
ONES = ['', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine']
TENS = ['', '', 'twenty', 'thirty', 'forty', 'fifty', 'sixty', 'seventy', 'eighty', 'ninety']
TEENS = ['ten', 'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen', 'sixteen', 'seventeen', 'eighteen', 'nineteen']
def number_to_words(n):
if not isinstance(n, int): return str(n)
if n == 0: return 'zero'
if n < 0: return f"negative {number_to_words(abs(n))}"
if n < 10: return ONES[n]
if n < 20: return TEENS[n-10]
if n < 100: return TENS[n//10] + (f"-{ONES[n%10]}" if n%10 else "")
if n < 1000: return f"{ONES[n//100]} hundred" + (f" and {number_to_words(n%100)}" if n%100 else "")
if n < 1000000: return f"{number_to_words(n//1000)} thousand" + (f", {number_to_words(n%1000)}" if n%1000 else "")
return str(n)
def operator_to_word(op):
return {'+': 'plus', '-': 'minus', '*': 'times', '/': 'divided by', '^': 'to the power of', '%': 'modulo'}.get(op, op)
def format_number(n):
if isinstance(n, float) and not n.is_integer():
return f"{n:.4f}".rstrip('0').rstrip('.')
return str(int(round(n)))
def generate_expression_parts():
length = random.randint(MIN_LENGTH, MAX_LENGTH)
parts = []
for i in range(length):
if parts and parts[-1] == '^':
parts.append(random.randint(2, MAX_EXPONENT_POWER))
else:
parts.append(random.randint(MIN_NUMBER, MAX_NUMBER))
if i < length - 1:
if parts and parts[-1] != '^':
op = random.choice(['+', '-', '*', '/', '%', '^'])
else:
op = random.choice(['+', '-', '*', '/', '%'])
if op == '^':
parts[-1] = random.randint(MIN_NUMBER, MAX_EXPONENT_BASE)
parts.append(op)
if random.random() < BRACKET_CHANCE and len(parts) >= 5:
start = random.randrange(0, len(parts) - 2, 2)
end = random.randrange(start + 2, len(parts), 2)
parts.insert(end + 1, ')')
parts.insert(start, '(')
return parts
def solve_with_cot(expression_str):
"""Optimized solver with better pattern matching and guaranteed termination."""
steps = []
current_expr = expression_str.strip()
for iteration in range(MAX_SOLVER_ITERATIONS):
# Remove extra spaces
current_expr = re.sub(r'\s+', ' ', current_expr).strip()
# Check if we're done (single number)
try:
final_result = float(current_expr)
return {'steps': steps, 'result': final_result}
except ValueError:
pass
reduction_made = False
# 1. Handle brackets first
bracket_match = re.search(r'\(([^()]+)\)', current_expr)
if bracket_match:
bracket_content = bracket_match.group(1).strip()
sub_solver_result = solve_with_cot(bracket_content)
if not sub_solver_result:
return None
result = sub_solver_result['result']
try:
formatted_result = format_number(result)
except (ValueError, OverflowError):
return None
steps.append(random.choice(COT_STEP_TEMPLATES["brackets"]).format(part=bracket_content, result=formatted_result))
current_expr = current_expr[:bracket_match.start()] + ' ' + formatted_result + ' ' + current_expr[bracket_match.end():]
reduction_made = True
continue
# 2. Handle exponents
exp_match = re.search(r'(-?\d+(?:\.\d+)?)\s*\^\s*(-?\d+(?:\.\d+)?)', current_expr)
if exp_match:
base_str, exp_str = exp_match.groups()
try:
base = float(base_str)
exponent = float(exp_str)
result = base ** exponent
if abs(result) > 1e12 or math.isnan(result) or math.isinf(result):
return None
formatted_result = format_number(result)
except (OverflowError, ValueError, ZeroDivisionError):
return None
part = f"{base_str} ^ {exp_str}"
steps.append(random.choice(COT_STEP_TEMPLATES["exponents"]).format(part=part, result=formatted_result))
current_expr = current_expr[:exp_match.start()] + ' ' + formatted_result + ' ' + current_expr[exp_match.end():]
reduction_made = True
continue
# 3. Handle multiplication, division, modulo (left to right)
mdm_match = re.search(r'(-?\d+(?:\.\d+)?)\s*([*/%])\s*(-?\d+(?:\.\d+)?)', current_expr)
if mdm_match:
left_str, op, right_str = mdm_match.groups()
try:
left = float(left_str)
right = float(right_str)
if op == '*':
result = left * right
elif op == '/':
if right == 0:
return None
result = left / right
elif op == '%':
if right == 0:
return None
result = left % right
if abs(result) > 1e12 or math.isnan(result) or math.isinf(result):
return None
formatted_result = format_number(result)
except (OverflowError, ValueError, ZeroDivisionError):
return None
part = f"{left_str} {op} {right_str}"
steps.append(random.choice(COT_STEP_TEMPLATES["multi_div_mod"]).format(part=part, result=formatted_result))
current_expr = current_expr[:mdm_match.start()] + ' ' + formatted_result + ' ' + current_expr[mdm_match.end():]
reduction_made = True
continue
# 4. Handle addition and subtraction (left to right)
# Match pattern where we have number [+|-] number but not at start of negative number
as_match = re.search(r'(-?\d+(?:\.\d+)?)\s*([+\-])\s*(-?\d+(?:\.\d+)?)', current_expr)
if as_match:
left_str, op, right_str = as_match.groups()
try:
left = float(left_str)
right = float(right_str)
if op == '+':
result = left + right
elif op == '-':
result = left - right
if abs(result) > 1e12 or math.isnan(result) or math.isinf(result):
return None
formatted_result = format_number(result)
except (OverflowError, ValueError):
return None
part = f"{left_str} {op} {right_str}"
steps.append(random.choice(COT_STEP_TEMPLATES["add_sub"]).format(part=part, result=formatted_result))
current_expr = current_expr[:as_match.start()] + ' ' + formatted_result + ' ' + current_expr[as_match.end():]
reduction_made = True
continue
# If no reduction was made, we're stuck - return None
if not reduction_made:
return None
# Timeout reached
return None
def generate_training_example(_=None):
"""Generate a single training example with retry logic."""
max_retries = 50 # Reduced from 100 for faster generation
for attempt in range(max_retries):
try:
expression_parts = generate_expression_parts()
expression_str = " ".join(map(str, expression_parts))
cot_result = solve_with_cot(expression_str)
if cot_result and isinstance(cot_result['result'], (int, float)):
final_result = cot_result['result']
# Filter out extreme values
if abs(final_result) > 1e12 or (final_result != 0 and abs(final_result) < 1e-4):
continue
if math.isnan(final_result) or math.isinf(final_result):
continue
result_str = format_number(final_result)
if len(result_str) > 20:
continue
use_words = random.random() < WORD_FORM_CHANCE
if use_words:
expression_text = ' '.join([number_to_words(p) if isinstance(p, int) else operator_to_word(p) if isinstance(p, str) else str(p) for p in expression_parts])
result_text = number_to_words(int(round(final_result)))
completion = random.choice(SIMPLE_COMPLETION_TEMPLATES).format(expression=expression_text, result=result_text)
else:
expression_text = expression_str
result_text = result_str
use_reasoning = random.random() < REASONING_CHANCE
if use_reasoning:
intro = random.choice(COT_INTRO_TEMPLATES).format(expression=expression_text)
steps_text = " ".join(cot_result['steps'])
finalizer = random.choice(COT_FINALIZER_TEMPLATES).format(result=result_text)
completion = f"{intro} {steps_text} {finalizer} </think>"
else:
completion = random.choice(SIMPLE_COMPLETION_TEMPLATES).format(expression=expression_text, result=result_text)
if random.random() < SENTENCE_FORM_CHANCE:
prompt = random.choice(PROMPT_TEMPLATES).format(expression=expression_text)
else:
prompt = f"{expression_text} ="
# Clean up spacing
prompt = re.sub(r'\s*\(', ' (', prompt)
prompt = re.sub(r'\)\s*', ') ', prompt).strip()
prompt = re.sub(r'\s+', ' ', prompt)
completion = re.sub(r'\s*\(', ' (', completion)
completion = re.sub(r'\)\s*', ') ', completion).strip()
completion = re.sub(r'\s+', ' ', completion)
return {"prompt": prompt, "completion": " " + completion}
except Exception as e:
continue
return None
def main():
print(f"🔥 Generating {NUM_LINES:,} examples using {NUM_WORKERS} parallel workers...")
print(f" Appending to '{OUTPUT_FILE}'...")
start_time = time.time()
generated_count = 0
failed_count = 0
with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
results_iterator = pool.imap_unordered(generate_training_example, range(NUM_LINES), chunksize=100)
for item in tqdm(results_iterator, total=NUM_LINES, desc="Generating examples"):
if item:
f.write(json.dumps(item) + "\n")
generated_count += 1
else:
failed_count += 1
elapsed_time = time.time() - start_time
print(f"\n\n✅ Done! Appended {generated_count:,} new items to '{OUTPUT_FILE}' in {elapsed_time:.2f}s.")
print(f" 📊 Success rate: {generated_count}/{NUM_LINES} ({100*generated_count/NUM_LINES:.1f}%)")
if failed_count > 0:
print(f" ⚠️ {failed_count:,} generation attempts failed (expressions too complex or invalid)")
if __name__ == "__main__":
multiprocessing.freeze_support()
main()