| |
| """ |
| Generate programming problems from function_dataset_v2.csv using OpenAI Batch API. |
| Batch API offers 50% cost savings compared to standard API. |
| """ |
|
|
| import csv |
| import json |
| import os |
| import sys |
| from openai import OpenAI |
| from datetime import datetime |
| from typing import Dict, Optional, List |
| import time |
|
|
| |
| MODEL_NAME = "gpt-4o-mini" |
| MIN_RELEVANCE_SCORE = 60 |
| MAX_BUDGET_USD = 10.0 |
|
|
| |
| |
| BATCH_PRICING = { |
| |
| "gpt-5.2": { |
| "input": 0.875 / 1_000_000, |
| "output": 7.00 / 1_000_000, |
| }, |
| "gpt-5.1": { |
| "input": 0.625 / 1_000_000, |
| "output": 5.00 / 1_000_000, |
| }, |
| "gpt-5": { |
| "input": 0.625 / 1_000_000, |
| "output": 5.00 / 1_000_000, |
| }, |
| "gpt-5-mini": { |
| "input": 0.125 / 1_000_000, |
| "output": 1.00 / 1_000_000, |
| }, |
| "gpt-5-nano": { |
| "input": 0.025 / 1_000_000, |
| "output": 0.20 / 1_000_000, |
| }, |
| |
| "gpt-4o": { |
| "input": 1.25 / 1_000_000, |
| "output": 5.00 / 1_000_000, |
| }, |
| "gpt-4o-2024-05-13": { |
| "input": 2.50 / 1_000_000, |
| "output": 7.50 / 1_000_000, |
| }, |
| "gpt-4o-mini": { |
| "input": 0.075 / 1_000_000, |
| "output": 0.30 / 1_000_000, |
| }, |
| |
| "gpt-4-turbo": { |
| "input": 5.00 / 1_000_000, |
| "output": 15.00 / 1_000_000, |
| }, |
| |
| "gpt-3.5-turbo": { |
| "input": 0.25 / 1_000_000, |
| "output": 0.75 / 1_000_000, |
| }, |
| } |
|
|
| PROMPT_TEMPLATE = """You are an expert in scientific computing and computational chemistry/biology/physics. Please create a high-quality programming problem inspired by the following code snippet from a real scientific computing project. |
| |
| The problem should focus on scientific computing concepts such as: |
| - Numerical algorithms and simulations |
| - Data analysis and visualization |
| - Mathematical modeling |
| - Scientific data processing |
| - Computational methods in chemistry, biology, or physics |
| |
| Code snippet for inspiration: |
| ```python |
| {code} |
| ``` |
| |
| Present your output in two distinct sections: |
| |
| [Problem Description] |
| Create a **completely self-contained** problem description that: |
| - Does NOT directly reference the code snippet above |
| - Provides all necessary context and background |
| - Clearly states what needs to be implemented |
| - Specifies input/output format and constraints |
| - Is inspired by the scientific computing concepts in the code but creates a NEW, interesting problem |
| - Assumes common programming knowledge but explains any domain-specific concepts |
| |
| [Solution] |
| Provide a comprehensive, **correct** Python solution that: |
| - Accurately solves the problem described |
| - Includes clear comments explaining the approach |
| - Uses appropriate scientific computing libraries (numpy, scipy, etc.) when relevant |
| - Is complete and runnable |
| - Follows best practices for scientific computing |
| |
| Remember: The problem should be INSPIRED by the code, not a direct copy. Create something educational and interesting for scientific computing practitioners.""" |
|
|
|
|
| class BatchAPIClient: |
| """Client for OpenAI Batch API with cost tracking.""" |
| |
| def __init__(self, model_name: str = MODEL_NAME, api_key: Optional[str] = None): |
| """Initialize OpenAI Batch API client. |
| |
| Args: |
| model_name: Name of the OpenAI model to use |
| api_key: OpenAI API key (if None, will use OPENAI_API_KEY env variable) |
| """ |
| self.model_name = model_name |
| self.client = OpenAI(api_key=api_key) |
| |
| |
| if model_name in BATCH_PRICING: |
| self.input_price = BATCH_PRICING[model_name]["input"] |
| self.output_price = BATCH_PRICING[model_name]["output"] |
| else: |
| print(f"Warning: No Batch pricing info for {model_name}, using gpt-4o-mini prices") |
| self.input_price = BATCH_PRICING["gpt-4o-mini"]["input"] |
| self.output_price = BATCH_PRICING["gpt-4o-mini"]["output"] |
| |
| print(f"๐ Batch API Pricing (50% off standard rates):") |
| print(f" Input: ${self.input_price * 1_000_000:.4f} per 1M tokens") |
| print(f" Output: ${self.output_price * 1_000_000:.4f} per 1M tokens") |
| print() |
| |
| def create_batch_file(self, requests: List[Dict], output_path: str) -> str: |
| """Create a JSONL file for batch processing. |
| |
| Args: |
| requests: List of request dictionaries |
| output_path: Path to save the JSONL file |
| |
| Returns: |
| Path to the created file |
| """ |
| with open(output_path, 'w', encoding='utf-8') as f: |
| for req in requests: |
| f.write(json.dumps(req, ensure_ascii=False) + '\n') |
| |
| print(f"โ
Created batch file: {output_path}") |
| print(f" Total requests: {len(requests)}") |
| return output_path |
| |
| def upload_batch_file(self, file_path: str) -> str: |
| """Upload batch file to OpenAI. |
| |
| Args: |
| file_path: Path to the JSONL file |
| |
| Returns: |
| File ID |
| """ |
| print(f"โฌ๏ธ Uploading batch file to OpenAI...") |
| with open(file_path, 'rb') as f: |
| batch_file = self.client.files.create( |
| file=f, |
| purpose='batch' |
| ) |
| |
| print(f"โ
File uploaded: {batch_file.id}") |
| return batch_file.id |
| |
| def create_batch(self, file_id: str, description: Optional[str] = None) -> str: |
| """Create a batch job. |
| |
| Args: |
| file_id: ID of the uploaded file |
| description: Optional description for the batch |
| |
| Returns: |
| Batch ID |
| """ |
| print(f"๐ Creating batch job...") |
| batch = self.client.batches.create( |
| input_file_id=file_id, |
| endpoint="/v1/chat/completions", |
| completion_window="24h", |
| metadata={ |
| "description": description or "Programming problems generation", |
| "created_at": datetime.now().isoformat() |
| } |
| ) |
| |
| print(f"โ
Batch created: {batch.id}") |
| print(f" Status: {batch.status}") |
| print(f" Total requests: {batch.request_counts.total}") |
| return batch.id |
| |
| def check_batch_status(self, batch_id: str) -> Dict: |
| """Check the status of a batch job. |
| |
| Args: |
| batch_id: ID of the batch |
| |
| Returns: |
| Batch status information |
| """ |
| batch = self.client.batches.retrieve(batch_id) |
| |
| status_info = { |
| 'id': batch.id, |
| 'status': batch.status, |
| 'created_at': batch.created_at, |
| 'completed_at': batch.completed_at, |
| 'failed_at': batch.failed_at, |
| 'expired_at': batch.expired_at, |
| 'request_counts': { |
| 'total': batch.request_counts.total, |
| 'completed': batch.request_counts.completed, |
| 'failed': batch.request_counts.failed, |
| }, |
| 'output_file_id': batch.output_file_id, |
| 'error_file_id': batch.error_file_id, |
| } |
| |
| return status_info |
| |
| def download_results(self, file_id: str, output_path: str): |
| """Download batch results. |
| |
| Args: |
| file_id: ID of the output file |
| output_path: Path to save the results |
| """ |
| print(f"โฌ๏ธ Downloading results...") |
| content = self.client.files.content(file_id) |
| |
| with open(output_path, 'wb') as f: |
| f.write(content.content) |
| |
| print(f"โ
Results saved to: {output_path}") |
| |
| def estimate_cost(self, num_requests: int, avg_input_tokens: int, avg_output_tokens: int) -> Dict: |
| """Estimate the cost of a batch job. |
| |
| Args: |
| num_requests: Number of requests |
| avg_input_tokens: Average input tokens per request |
| avg_output_tokens: Average output tokens per request |
| |
| Returns: |
| Cost estimation dictionary |
| """ |
| total_input_tokens = num_requests * avg_input_tokens |
| total_output_tokens = num_requests * avg_output_tokens |
| |
| input_cost = total_input_tokens * self.input_price |
| output_cost = total_output_tokens * self.output_price |
| total_cost = input_cost + output_cost |
| |
| |
| standard_cost = total_cost * 2 |
| savings = standard_cost - total_cost |
| |
| return { |
| 'num_requests': num_requests, |
| 'total_input_tokens': total_input_tokens, |
| 'total_output_tokens': total_output_tokens, |
| 'total_tokens': total_input_tokens + total_output_tokens, |
| 'input_cost': input_cost, |
| 'output_cost': output_cost, |
| 'total_cost': total_cost, |
| 'standard_api_cost': standard_cost, |
| 'savings': savings, |
| 'savings_percentage': 50.0 |
| } |
|
|
|
|
| def prepare_batch_requests( |
| input_file: str, |
| min_score: int = MIN_RELEVANCE_SCORE, |
| max_samples: Optional[int] = None, |
| start_from: int = 0, |
| ) -> List[Dict]: |
| """Prepare batch requests from function dataset. |
| |
| Args: |
| input_file: Path to function_dataset_v2.csv |
| min_score: Minimum relevance score to process |
| max_samples: Maximum number of samples to process |
| start_from: Skip first N rows |
| |
| Returns: |
| List of batch request dictionaries |
| """ |
| print(f"๐ Preparing batch requests...") |
| print(f" Input: {input_file}") |
| print(f" Min Score: {min_score}") |
| if max_samples: |
| print(f" Max Samples: {max_samples}") |
| print() |
| |
| requests = [] |
| total_rows = 0 |
| skipped_low_score = 0 |
| skipped_no_code = 0 |
| |
| with open(input_file, 'r', encoding='utf-8') as infile: |
| reader = csv.DictReader(infile) |
| |
| for row in reader: |
| total_rows += 1 |
| |
| |
| if total_rows <= start_from: |
| continue |
| |
| |
| if max_samples and len(requests) >= max_samples: |
| break |
| |
| |
| try: |
| relevance_score = int(row.get('relevance_score', 0)) |
| except (ValueError, TypeError): |
| relevance_score = 0 |
| |
| if relevance_score < min_score: |
| skipped_low_score += 1 |
| continue |
| |
| |
| function_content = row.get('function_content', '').strip() |
| if not function_content or len(function_content) < 50: |
| skipped_no_code += 1 |
| continue |
| |
| |
| metadata = { |
| 'original_index': str(row.get('original_index', '')), |
| 'function_name': str(row.get('function_name', '')), |
| 'repo_name': str(row.get('repo_name', '')), |
| 'path': str(row.get('path', '')), |
| 'language': str(row.get('language', '')), |
| 'relevance_score': str(relevance_score), |
| 'function_start_line': str(row.get('function_start_line', '')), |
| 'function_end_line': str(row.get('function_end_line', '')), |
| } |
| |
| |
| prompt = PROMPT_TEMPLATE.format(code=function_content) |
| |
| |
| request = { |
| "custom_id": f"request-{len(requests)}", |
| "method": "POST", |
| "url": "/v1/chat/completions", |
| "body": { |
| "model": MODEL_NAME, |
| "messages": [ |
| { |
| "role": "system", |
| "content": "You are an expert in scientific computing and programming education." |
| }, |
| { |
| "role": "user", |
| "content": prompt |
| } |
| ], |
| "temperature": 0.7, |
| "metadata": metadata |
| } |
| } |
| |
| requests.append(request) |
| |
| print(f"โ
Prepared {len(requests)} requests") |
| print(f" Total rows: {total_rows}") |
| print(f" Skipped (low score): {skipped_low_score}") |
| print(f" Skipped (no/short code): {skipped_no_code}") |
| print() |
| |
| return requests |
|
|
|
|
| def process_batch_results( |
| results_file: str, |
| output_file: str, |
| model_name: str, |
| input_price: float, |
| output_price: float, |
| requests_file: Optional[str] = None |
| ): |
| """Process batch results and save to JSONL format. |
| |
| Args: |
| results_file: Path to batch results file |
| output_file: Path to output JSONL file |
| model_name: Model name used |
| input_price: Input token price |
| output_price: Output token price |
| requests_file: Optional path to original batch requests file (to restore prompts) |
| """ |
| print(f"๐ Processing batch results...") |
| |
| |
| prompts_map = {} |
| if requests_file and os.path.exists(requests_file): |
| print(f" Loading prompts from: {requests_file}") |
| with open(requests_file, 'r', encoding='utf-8') as f: |
| for line in f: |
| req = json.loads(line) |
| custom_id = req['custom_id'] |
| |
| for msg in req['body']['messages']: |
| if msg['role'] == 'user': |
| prompts_map[custom_id] = msg['content'] |
| break |
| print(f" Loaded {len(prompts_map)} prompts") |
| |
| processed = 0 |
| errors = 0 |
| total_input_tokens = 0 |
| total_output_tokens = 0 |
| total_cost = 0.0 |
| |
| with open(results_file, 'r', encoding='utf-8') as infile, \ |
| open(output_file, 'w', encoding='utf-8') as outfile: |
| |
| for line in infile: |
| batch_result = json.loads(line) |
| |
| |
| if batch_result.get('error'): |
| errors += 1 |
| print(f"โ Error in {batch_result['custom_id']}: {batch_result['error']}") |
| continue |
| |
| response = batch_result['response'] |
| custom_id = batch_result['custom_id'] |
| |
| |
| usage = response['body']['usage'] |
| input_tokens = usage['prompt_tokens'] |
| output_tokens = usage['completion_tokens'] |
| |
| |
| input_cost = input_tokens * input_price |
| output_cost = output_tokens * output_price |
| request_cost = input_cost + output_cost |
| |
| |
| total_input_tokens += input_tokens |
| total_output_tokens += output_tokens |
| total_cost += request_cost |
| |
| |
| metadata = response['body'].get('metadata', {}) |
| |
| |
| response_text = response['body']['choices'][0]['message']['content'] |
| |
| |
| result = { |
| 'metadata': metadata, |
| 'response': response_text, |
| 'usage': { |
| 'input_tokens': input_tokens, |
| 'output_tokens': output_tokens, |
| 'total_tokens': input_tokens + output_tokens, |
| 'input_cost': input_cost, |
| 'output_cost': output_cost, |
| 'request_cost': request_cost |
| }, |
| 'model': model_name, |
| 'timestamp': datetime.now().isoformat(), |
| 'custom_id': custom_id |
| } |
| |
| |
| if custom_id in prompts_map: |
| result['prompt'] = prompts_map[custom_id] |
| |
| outfile.write(json.dumps(result, ensure_ascii=False) + '\n') |
| processed += 1 |
| |
| print(f"\nโ
Processed {processed} results") |
| print(f" Errors: {errors}") |
| print() |
| |
| |
| print("=" * 70) |
| print("BATCH API USAGE SUMMARY") |
| print("=" * 70) |
| print(f"Model: {model_name}") |
| print(f"Total Requests: {processed}") |
| print(f"Total Input Tokens: {total_input_tokens:,}") |
| print(f"Total Output Tokens: {total_output_tokens:,}") |
| print(f"Total Tokens: {total_input_tokens + total_output_tokens:,}") |
| print(f"\nBatch API Cost: ${total_cost:.6f}") |
| print(f"Standard API Cost: ${total_cost * 2:.6f}") |
| print(f"Savings (50%): ${total_cost:.6f}") |
| print("=" * 70) |
|
|
|
|
| def main(): |
| import argparse |
| |
| parser = argparse.ArgumentParser( |
| description='Generate programming problems using OpenAI Batch API (50% cost savings)' |
| ) |
| |
| subparsers = parser.add_subparsers(dest='command', help='Command to run') |
| |
| |
| prepare_parser = subparsers.add_parser('prepare', help='Prepare batch requests') |
| prepare_parser.add_argument('--input', default='function_dataset_v2.csv') |
| prepare_parser.add_argument('--output', default='batch_requests.jsonl') |
| prepare_parser.add_argument('--min-score', type=int, default=MIN_RELEVANCE_SCORE) |
| prepare_parser.add_argument('--max-samples', type=int, default=None) |
| prepare_parser.add_argument('--start-from', type=int, default=0) |
| prepare_parser.add_argument('--model', default=MODEL_NAME) |
| |
| |
| submit_parser = subparsers.add_parser('submit', help='Submit batch job to OpenAI') |
| submit_parser.add_argument('--input', default='batch_requests.jsonl') |
| submit_parser.add_argument('--model', default=MODEL_NAME) |
| submit_parser.add_argument('--description', default='Programming problems generation') |
| |
| |
| status_parser = subparsers.add_parser('status', help='Check batch job status') |
| status_parser.add_argument('batch_id', help='Batch ID to check') |
| |
| |
| download_parser = subparsers.add_parser('download', help='Download batch results') |
| download_parser.add_argument('batch_id', help='Batch ID to download') |
| download_parser.add_argument('--output', default='batch_results.jsonl') |
| |
| |
| process_parser = subparsers.add_parser('process', help='Process downloaded results') |
| process_parser.add_argument('--input', default='batch_results.jsonl') |
| process_parser.add_argument('--output', default='programming_problems_batch.jsonl') |
| process_parser.add_argument('--model', default=MODEL_NAME) |
| process_parser.add_argument('--requests', default='batch_requests_full.jsonl', |
| help='Original batch requests file (to restore prompts)') |
| |
| |
| estimate_parser = subparsers.add_parser('estimate', help='Estimate batch cost') |
| estimate_parser.add_argument('--num-requests', type=int, required=True) |
| estimate_parser.add_argument('--avg-input-tokens', type=int, default=1917) |
| estimate_parser.add_argument('--avg-output-tokens', type=int, default=2552) |
| estimate_parser.add_argument('--model', default=MODEL_NAME) |
| |
| args = parser.parse_args() |
| |
| if not args.command: |
| parser.print_help() |
| sys.exit(1) |
| |
| |
| if not os.getenv('OPENAI_API_KEY'): |
| print("โ Error: OPENAI_API_KEY environment variable not set.") |
| print(" Please set it with: export OPENAI_API_KEY='your-api-key'") |
| sys.exit(1) |
| |
| client = BatchAPIClient(model_name=args.model if hasattr(args, 'model') else MODEL_NAME) |
| |
| if args.command == 'prepare': |
| requests = prepare_batch_requests( |
| input_file=args.input, |
| min_score=args.min_score, |
| max_samples=args.max_samples, |
| start_from=args.start_from |
| ) |
| |
| client.create_batch_file(requests, args.output) |
| |
| |
| print("\n๐ฐ Cost Estimation:") |
| estimate = client.estimate_cost( |
| num_requests=len(requests), |
| avg_input_tokens=1917, |
| avg_output_tokens=2552 |
| ) |
| print(f" Estimated Batch API Cost: ${estimate['total_cost']:.2f}") |
| print(f" Standard API Cost: ${estimate['standard_api_cost']:.2f}") |
| print(f" Savings (50%): ${estimate['savings']:.2f}") |
| print() |
| |
| elif args.command == 'submit': |
| file_id = client.upload_batch_file(args.input) |
| batch_id = client.create_batch(file_id, args.description) |
| |
| print(f"\n๐ Save this Batch ID: {batch_id}") |
| print(f" Check status with: python3 {sys.argv[0]} status {batch_id}") |
| |
| elif args.command == 'status': |
| status = client.check_batch_status(args.batch_id) |
| |
| print("\n๐ Batch Status:") |
| print(f" ID: {status['id']}") |
| print(f" Status: {status['status']}") |
| print(f" Total: {status['request_counts']['total']}") |
| print(f" Completed: {status['request_counts']['completed']}") |
| print(f" Failed: {status['request_counts']['failed']}") |
| |
| if status['status'] == 'completed': |
| print(f"\nโ
Batch completed!") |
| print(f" Download with: python3 {sys.argv[0]} download {args.batch_id}") |
| elif status['status'] == 'failed': |
| print(f"\nโ Batch failed!") |
| else: |
| print(f"\nโณ Batch is still processing...") |
| |
| elif args.command == 'download': |
| status = client.check_batch_status(args.batch_id) |
| |
| if status['status'] != 'completed': |
| print(f"โ Batch is not completed yet (status: {status['status']})") |
| sys.exit(1) |
| |
| client.download_results(status['output_file_id'], args.output) |
| print(f"\nโ
Downloaded to: {args.output}") |
| print(f" Process with: python3 {sys.argv[0]} process --input {args.output}") |
| |
| elif args.command == 'process': |
| process_batch_results( |
| results_file=args.input, |
| output_file=args.output, |
| model_name=args.model, |
| input_price=client.input_price, |
| output_price=client.output_price, |
| requests_file=args.requests |
| ) |
| print(f"\nโ
Final results saved to: {args.output}") |
| |
| elif args.command == 'estimate': |
| estimate = client.estimate_cost( |
| num_requests=args.num_requests, |
| avg_input_tokens=args.avg_input_tokens, |
| avg_output_tokens=args.avg_output_tokens |
| ) |
| |
| print("\n๐ฐ COST ESTIMATION") |
| print("=" * 70) |
| print(f"Number of Requests: {estimate['num_requests']:,}") |
| print(f"Total Input Tokens: {estimate['total_input_tokens']:,}") |
| print(f"Total Output Tokens: {estimate['total_output_tokens']:,}") |
| print(f"Total Tokens: {estimate['total_tokens']:,}") |
| print() |
| print(f"Batch API Cost: ${estimate['total_cost']:.2f}") |
| print(f"Standard API Cost: ${estimate['standard_api_cost']:.2f}") |
| print(f"๐ฐ Savings (50%): ${estimate['savings']:.2f}") |
| print("=" * 70) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|