| import argparse | |
| import base64 | |
| import concurrent.futures | |
| import io | |
| import json | |
| import os | |
| import random | |
| import re | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| from functools import partial | |
| from io import BytesIO | |
| from typing import Dict, List | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| from datasets import Dataset, concatenate_datasets, load_dataset, load_from_disk | |
| from tqdm import tqdm | |
| import bytedtos | |
| import seaborn as sns | |
| import yaml | |
| from openai import AzureOpenAI | |
| from PIL import Image | |
| from pillow_avif import AvifImagePlugin | |
| PROMPT_FORMAT = """I will provide you with an image, an original question, and its answer related to the image. Your task is to rewrite the question in such a way that answering it requires step-by-step Chain-of-Thought (CoT) reasoning with numerical or mathematical expressions where applicable. The reasoning process can include expressions like "let me think," "oh, I see," or other natural language thought expressions. | |
| Please make sure your question is to ask for a certain answer with a certain value, do not ask for open-ended answer, and the answer is correct and easy to verify via simple protocol, like "2" or "A". | |
| Please strictly do not include "Answer:" in the question part to avoid confusion and leakage. | |
| Input Format: | |
| Original Question: {original_question} | |
| Original Answer: {original_answer} | |
| Output Format: | |
| Question: [rewrite the question if necessary] | |
| Answer: [answer with reasoning steps, including calculations where applicable] | |
| <think>step-by-step reasoning process</think> | |
| <answer>easy to verify answer</answer> | |
| """ | |
| def get_image_data_url(image_input): | |
| if isinstance(image_input, str) and image_input.startswith("data:"): | |
| return image_input | |
| if isinstance(image_input, str) and image_input.startswith("http"): | |
| image_input = load_image(image_input) | |
| if isinstance(image_input, str): | |
| image_input = Image.open(image_input) | |
| if not isinstance(image_input, Image.Image): | |
| raise ValueError("Unsupported image input type") | |
| if image_input.mode != "RGB": | |
| image_input = image_input.convert("RGB") | |
| buffer = BytesIO() | |
| image_input.save(buffer, format="JPEG") | |
| img_bytes = buffer.getvalue() | |
| base64_data = base64.b64encode(img_bytes).decode("utf-8") | |
| return f"data:image/jpeg;base64,{base64_data}" | |
| def gpt4o_query(image, prompt, max_retries=5, initial_delay=3): | |
| if image is None: | |
| return None | |
| data_url_list = [get_image_data_url(image)] | |
| client = AzureOpenAI( | |
| azure_endpoint="YOUR_AZURE_ENDPOINT", | |
| api_version="2023-07-01-preview", | |
| api_key="YOUR_API_KEY", | |
| ) | |
| for attempt in range(max_retries): | |
| try: | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are an expert to analyze the image and provide useful information for users.", | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| ], | |
| }, | |
| ] | |
| for data_url in data_url_list: | |
| messages[1]["content"].insert( | |
| 0, {"type": "image_url", "image_url": {"url": data_url}} | |
| ) | |
| response = client.chat.completions.create( | |
| model="gpt-4o-2024-08-06", | |
| messages=messages, | |
| temperature=0.2, | |
| max_tokens=8192, | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| raise Exception( | |
| f"Failed after {max_retries} attempts. Last error: {str(e)}" | |
| ) | |
| delay = initial_delay * (2**attempt) + random.uniform( | |
| 0, 0.1 * initial_delay * (2**attempt) | |
| ) | |
| time.sleep(delay) | |
| def process_single_item(example): | |
| try: | |
| image_path = example["image_path"] | |
| formatted_prompt = PROMPT_FORMAT.format( | |
| original_question=example["question"], original_answer=example["answer"] | |
| ) | |
| response = gpt4o_query(image_path, formatted_prompt) | |
| example["gpt4o_response"] = response | |
| return example | |
| except Exception as e: | |
| print(f"Error processing item: {str(e)}") | |
| example["gpt4o_response"] = None | |
| return example | |
| def main(): | |
| dataset_path = "path/to/your/dataset" | |
| full_dataset = load_from_disk(dataset_path) | |
| processed_dataset = full_dataset.map( | |
| function=partial(process_single_item), | |
| num_proc=256, | |
| desc="Processing dataset with GPT-4o", | |
| keep_in_memory=True, | |
| ) | |
| output_path = f"{dataset_path}_processed" | |
| processed_dataset.save_to_disk(output_path) | |
| print(f"Processed dataset saved to: {output_path}") | |
| if __name__ == "__main__": | |
| main() | |