| import json |
| import os |
| import pickle |
| import random |
| from typing import List, Optional, Tuple, Union |
|
|
| import numpy as np |
| from nextqa import NExTQALoader |
|
|
| |
| from tqdm.asyncio import tqdm |
| from transformers import PreTrainedTokenizerBase |
|
|
| from sglang.benchmark.datasets.common import ( |
| SHAREGPT_FILENAME, |
| SHAREGPT_REPO_ID, |
| gen_prompt, |
| ) |
| from sglang.benchmark.datasets.generated_shared_prefix import get_gen_prefix_cache_path |
| from sglang.benchmark.utils import download_and_cache_hf_file |
| from sglang.lang.chat_template import get_chat_template, get_chat_template_by_model_path |
| from sglang.srt.entrypoints.openai.protocol import ChatCompletionMessageContentPart |
| from sglang.utils import encode_video_base64 |
|
|
| |
| MsgContent = Union[str, List[ChatCompletionMessageContentPart]] |
|
|
| |
| |
| |
| |
| |
| |
| SampleOutput = List[List[Tuple[MsgContent, int, int]]] |
|
|
|
|
| def common_filter_chat( |
| num_requests: int, |
| new_dataset: List, |
| tokenizer: PreTrainedTokenizerBase, |
| min_prompt_len: Optional[int], |
| min_output_len: Optional[int], |
| max_prompt_len: Optional[int], |
| max_output_len: Optional[int], |
| fixed_output_len: Optional[int], |
| ) -> SampleOutput: |
| |
| filtered_dataset: SampleOutput = [] |
| l = 0 |
| input_tokens = 0 |
| output_tokens = 0 |
| while l < num_requests: |
| for i in range(len(new_dataset)): |
| if l == num_requests: |
| break |
| processed = [] |
| for j in new_dataset[i]: |
| |
| prompt = j[0] |
| prompt_token_ids = tokenizer.encode(prompt) |
| prompt_len = len(prompt_token_ids) |
|
|
| completion = j[1] |
| completion_token_ids = tokenizer.encode(completion) |
| output_len = ( |
| len(completion_token_ids) |
| if fixed_output_len is None |
| else fixed_output_len |
| ) |
| if ( |
| min_prompt_len is not None |
| and prompt_len < min_prompt_len |
| or min_output_len is not None |
| and output_len < min_output_len |
| or max_prompt_len is not None |
| and prompt_len > max_prompt_len |
| or max_output_len is not None |
| and output_len > max_output_len |
| ): |
| |
| continue |
| input_tokens += prompt_len |
| output_tokens += output_len |
| processed.append((prompt, prompt_len, output_len)) |
| if len(processed) != 0: |
| filtered_dataset.append(processed) |
| l += 1 |
|
|
| print(f"#Input tokens: {input_tokens}") |
| print(f"#Output tokens: {output_tokens}") |
| return filtered_dataset |
|
|
|
|
| def sample_sharegpt_requests( |
| dataset_path: str, |
| num_requests: int, |
| tokenizer: PreTrainedTokenizerBase, |
| disable_shuffle: bool = False, |
| enable_multiturn: bool = True, |
| fixed_output_len: Optional[int] = None, |
| ) -> SampleOutput: |
| if fixed_output_len is not None and fixed_output_len < 4: |
| raise ValueError("output_len too small") |
|
|
| |
| if not os.path.isfile(dataset_path): |
| dataset_path = download_and_cache_hf_file( |
| repo_id=SHAREGPT_REPO_ID, |
| filename=SHAREGPT_FILENAME, |
| ) |
|
|
| |
| with open(dataset_path) as f: |
| dataset = json.load(f) |
| |
| dataset = [data for data in dataset if len(data["conversations"]) >= 2] |
|
|
| |
| new_dataset = [] |
| for data in dataset: |
| if len(data["conversations"]) % 2 != 0: |
| continue |
| if data["conversations"][0]["from"] != "human": |
| continue |
| chat = [] |
| total_len = 2 |
| if enable_multiturn: |
| total_len = len(data["conversations"]) |
| for i in range(0, total_len, 2): |
| |
| chat.append( |
| ( |
| data["conversations"][i]["value"], |
| data["conversations"][i + 1]["value"], |
| ) |
| ) |
| new_dataset.append(chat) |
|
|
| if not disable_shuffle: |
| |
| random.shuffle(new_dataset) |
|
|
| |
| filtered_dataset: SampleOutput = common_filter_chat( |
| num_requests, new_dataset, tokenizer, 4, 4, None, None, fixed_output_len |
| ) |
| return filtered_dataset |
|
|
|
|
| def sample_ultrachat_requests( |
| dataset_path: str, |
| num_requests: int, |
| tokenizer: PreTrainedTokenizerBase, |
| disable_shuffle: bool = False, |
| enable_multiturn: bool = True, |
| fixed_output_len: Optional[int] = None, |
| ) -> SampleOutput: |
| if fixed_output_len is not None and fixed_output_len < 4: |
| raise ValueError("output_len too small") |
|
|
| |
| dataset = [] |
| with open(dataset_path) as f: |
| while True: |
| line = f.readline() |
| if not line: |
| break |
| dataset.append(json.loads(line)) |
|
|
| |
| dataset = [data for data in dataset if len(data["data"]) >= 2] |
|
|
| |
| new_dataset = [] |
| for data in dataset: |
| if len(data["data"]) % 2 != 0: |
| continue |
| chat = [] |
| total_len = 2 |
| if enable_multiturn: |
| total_len = len(data["data"]) |
| for i in range(0, total_len, 2): |
| |
| chat.append((data["data"][i], data["data"][i + 1])) |
| new_dataset.append(chat) |
|
|
| |
| if not disable_shuffle: |
| random.shuffle(new_dataset) |
|
|
| |
| filtered_dataset: SampleOutput = common_filter_chat( |
| num_requests, new_dataset, tokenizer, 4, 4, None, None, fixed_output_len |
| ) |
| return filtered_dataset |
|
|
|
|
| def sample_loogle_requests( |
| dataset_path: str, |
| num_requests: int, |
| tokenizer: PreTrainedTokenizerBase, |
| disable_shuffle: bool = False, |
| enable_multiturn: bool = True, |
| enable_shared_prefix: bool = False, |
| fixed_output_len: Optional[int] = None, |
| ) -> SampleOutput: |
| if fixed_output_len is not None and fixed_output_len < 4: |
| raise ValueError("output_len too small") |
|
|
| |
| dataset = [] |
| with open(dataset_path) as f: |
| while True: |
| line = f.readline() |
| if not line: |
| break |
| dataset.append(json.loads(line)) |
|
|
| |
| new_dataset = [] |
| |
| |
| for data in dataset: |
| chat = [] |
| if ( |
| "qa_pairs" not in data |
| or data["qa_pairs"] == "none" |
| or len(data["qa_pairs"]) == 0 |
| ): |
| |
| |
| |
| chat.append( |
| ( |
| "Input: " |
| + data["input"] |
| + " Question: " |
| + "Please summarize the input", |
| data["input"][:1024], |
| ) |
| ) |
| new_dataset.append(chat) |
| else: |
| qa_pairs = eval(data["qa_pairs"]) |
| for i, qa in enumerate(qa_pairs): |
| if i == 0 or enable_shared_prefix: |
| |
| chat.append( |
| ("Input: " + data["input"] + " Question: " + qa["Q"], qa["A"]) |
| ) |
| elif enable_multiturn: |
| chat.append((qa["Q"], qa["A"])) |
|
|
| new_dataset.append(chat) |
|
|
| |
| if not disable_shuffle: |
| random.shuffle(new_dataset) |
|
|
| |
| filtered_dataset: SampleOutput = common_filter_chat( |
| num_requests, new_dataset, tokenizer, 4, None, None, None, fixed_output_len |
| ) |
| return filtered_dataset |
|
|
|
|
| def sample_nextqa_requests( |
| dataset_path: str, |
| num_requests: int, |
| tokenizer: PreTrainedTokenizerBase, |
| max_frames: int, |
| model_path: str, |
| disable_shuffle: bool = False, |
| enable_multiturn: bool = True, |
| backend: str = "sglang-oai", |
| chat_template_name: Optional[str] = None, |
| fixed_output_len: Optional[int] = None, |
| ) -> SampleOutput: |
| """ |
| Example of messages: |
| message = { |
| "role": "user", |
| "content": [ |
| {"type": "image_url", "image_url": {"url": base64_data}}, |
| {"type": "text", "text": video.prompt}, |
| ], |
| } |
| """ |
|
|
| if fixed_output_len is None: |
| fixed_output_len = 4096 |
|
|
| |
| dataset = NExTQALoader(video_dir=dataset_path, max_frames=max_frames) |
| new_dataset = [] |
| for v in dataset: |
| new_dataset.append(v) |
|
|
| if not disable_shuffle: |
| random.shuffle(new_dataset) |
|
|
| |
| filtered_dataset = [] |
| l = 0 |
| while l < num_requests: |
| for i in range(len(new_dataset)): |
| if l == num_requests: |
| break |
|
|
| video = new_dataset[i] |
|
|
| |
| prompt = video.prompt |
|
|
| |
| |
| if backend == "sglang" or backend == "sglang-native": |
| if "chat_template" in tokenizer.init_kwargs: |
| chat_template = get_chat_template(tokenizer.get_chat_template()) |
| elif chat_template_name is not None: |
| chat_template = get_chat_template(chat_template_name) |
| else: |
| chat_template = get_chat_template_by_model_path(model_path) |
| prompt = chat_template.image_token + prompt |
|
|
| prompt_token_ids = tokenizer(prompt).input_ids |
| prompt_len = len(prompt_token_ids) |
| output_len = fixed_output_len |
|
|
| |
| base64_data = encode_video_base64(video.path, video.num_frames) |
|
|
| |
| prompt_len += video.num_frames |
|
|
| |
| content = [ |
| {"type": "image_url", "image_url": {"url": base64_data}}, |
| {"type": "text", "text": prompt}, |
| ] |
|
|
| filtered_dataset.append([(content, prompt_len, output_len)]) |
| l += 1 |
| return filtered_dataset |
|
|
|
|
| def sample_random_requests( |
| input_len: int, |
| output_len: int, |
| num_prompts: int, |
| range_ratio: float, |
| tokenizer: PreTrainedTokenizerBase, |
| dataset_path: str, |
| disable_shuffle: bool = False, |
| ) -> SampleOutput: |
|
|
| input_lens = np.random.randint( |
| max(int(input_len * range_ratio), 1), |
| input_len + 1, |
| size=num_prompts, |
| ) |
| output_lens = np.random.randint( |
| int(output_len * range_ratio), |
| output_len + 1, |
| size=num_prompts, |
| ) |
|
|
| if True: |
| |
|
|
| |
| if not os.path.isfile(dataset_path): |
| dataset_path = download_and_cache_hf_file( |
| repo_id=SHAREGPT_REPO_ID, |
| filename=SHAREGPT_FILENAME, |
| ) |
|
|
| |
| with open(dataset_path) as f: |
| dataset = json.load(f) |
| |
| dataset = [data for data in dataset if len(data["conversations"]) >= 2] |
| |
| dataset = [ |
| (data["conversations"][0]["value"], data["conversations"][1]["value"]) |
| for data in dataset |
| ] |
|
|
| if not disable_shuffle: |
| |
| random.shuffle(dataset) |
|
|
| |
| input_requests: SampleOutput = [] |
| for data in dataset: |
| i = len(input_requests) |
| if i == num_prompts: |
| break |
|
|
| |
| prompt = data[0] |
| prompt_token_ids = tokenizer.encode(prompt) |
| prompt_len = len(prompt_token_ids) |
|
|
| |
| if prompt_len == 0: |
| continue |
|
|
| if prompt_len > input_lens[i]: |
| input_ids = prompt_token_ids[: input_lens[i]] |
| else: |
| ratio = (input_lens[i] + prompt_len - 1) // prompt_len |
| input_ids = (prompt_token_ids * ratio)[: input_lens[i]] |
| prompt = tokenizer.decode(input_ids) |
| input_requests.append([(prompt, int(input_lens[i]), int(output_lens[i]))]) |
| else: |
| |
| offsets = np.random.randint(0, tokenizer.vocab_size, size=num_prompts) |
| input_requests = [] |
| for i in range(num_prompts): |
| prompt = tokenizer.decode( |
| [ |
| (offsets[i] + i + j) % tokenizer.vocab_size |
| for j in range(input_lens[i]) |
| ] |
| ) |
| input_requests.append([(prompt, int(input_lens[i]), int(output_lens[i]))]) |
|
|
| print(f"#Input tokens: {np.sum(input_lens)}") |
| print(f"#Output tokens: {np.sum(output_lens)}") |
| return input_requests |
|
|
|
|
| def sample_generated_shared_prefix_requests( |
| num_groups: int, |
| prompts_per_group: int, |
| system_prompt_len: int, |
| question_len: int, |
| output_len: int, |
| tokenizer: PreTrainedTokenizerBase, |
| args, |
| disable_shuffle: bool = False, |
| ) -> SampleOutput: |
| """Generate benchmark requests with shared system prompts using random tokens and caching.""" |
| cache_path = get_gen_prefix_cache_path( |
| args.seed, |
| num_groups, |
| prompts_per_group, |
| system_prompt_len, |
| question_len, |
| output_len, |
| tokenizer, |
| ) |
|
|
| |
| if cache_path.exists(): |
| print(f"\nLoading cached generated input data from {cache_path}") |
| with open(cache_path, "rb") as f: |
| return pickle.load(f) |
|
|
| print("\nGenerating new input data...") |
|
|
| |
| system_prompts = [] |
| for _ in range(num_groups): |
| system_prompt = gen_prompt(tokenizer, system_prompt_len) |
| system_prompts.append(system_prompt) |
|
|
| |
| questions = [] |
| for _ in range(num_groups * prompts_per_group): |
| question = gen_prompt(tokenizer, question_len) |
| questions.append(question) |
|
|
| |
| input_requests = [] |
| total_input_tokens = 0 |
| total_output_tokens = 0 |
|
|
| for group_idx in tqdm(range(num_groups), desc="Generating system prompt"): |
| system_prompt = system_prompts[group_idx] |
| input_requests.append([]) |
| for prompt_idx in tqdm( |
| range(prompts_per_group), desc="Generating questions", leave=False |
| ): |
| question = questions[group_idx * prompts_per_group + prompt_idx] |
| full_prompt = f"{system_prompt}\n\n{question}" |
| prompt_len = len(tokenizer.encode(full_prompt)) |
| input_requests[-1].append((full_prompt, prompt_len, output_len)) |
| total_input_tokens += prompt_len |
| total_output_tokens += output_len |
|
|
| if not disable_shuffle: |
| |
| random.shuffle(input_requests) |
|
|
| |
| print(f"\nGenerated shared prefix dataset statistics:") |
| print(f"Number of groups: {num_groups}") |
| print(f"Prompts per group: {prompts_per_group}") |
| print(f"Total prompts: {len(input_requests) * prompts_per_group}") |
| print(f"Total input tokens: {total_input_tokens}") |
| print(f"Total output tokens: {total_output_tokens}") |
| print( |
| f"Average system prompt length: {sum(len(tokenizer.encode(sp)) for sp in system_prompts) / len(system_prompts):.1f} tokens" |
| ) |
| print( |
| f"Average question length: {sum(len(tokenizer.encode(q)) for q in questions) / len(questions):.1f} tokens\n" |
| ) |
|
|
| |
| cache_path.parent.mkdir(parents=True, exist_ok=True) |
| print(f"Caching generated input data to {cache_path}") |
| with open(cache_path, "wb") as f: |
| pickle.dump(input_requests, f) |
|
|
| return input_requests |
|
|
|
|
| def get_dataset(args, tokenizer): |
| if args.dataset_name == "sharegpt": |
| input_requests = sample_sharegpt_requests( |
| dataset_path=args.dataset_path, |
| num_requests=args.num_prompts, |
| tokenizer=tokenizer, |
| disable_shuffle=args.disable_shuffle, |
| enable_multiturn=args.enable_multiturn, |
| fixed_output_len=args.fixed_output_len, |
| ) |
| elif args.dataset_name == "ultrachat": |
| input_requests = sample_ultrachat_requests( |
| dataset_path=args.dataset_path, |
| num_requests=args.num_prompts, |
| tokenizer=tokenizer, |
| disable_shuffle=args.disable_shuffle, |
| enable_multiturn=args.enable_multiturn, |
| fixed_output_len=args.fixed_output_len, |
| ) |
| elif args.dataset_name == "loogle": |
| input_requests = sample_loogle_requests( |
| dataset_path=args.dataset_path, |
| num_requests=args.num_prompts, |
| tokenizer=tokenizer, |
| disable_shuffle=args.disable_shuffle, |
| enable_multiturn=args.enable_multiturn, |
| enable_shared_prefix=args.enable_shared_prefix, |
| fixed_output_len=args.fixed_output_len, |
| ) |
| elif args.dataset_name == "nextqa": |
| input_requests = sample_nextqa_requests( |
| dataset_path=args.dataset_path, |
| num_requests=args.num_prompts, |
| tokenizer=tokenizer, |
| max_frames=args.max_frames, |
| model_path=args.model, |
| disable_shuffle=args.disable_shuffle, |
| enable_multiturn=args.enable_multiturn, |
| backend=args.backend, |
| chat_template_name=args.chat_template, |
| fixed_output_len=args.fixed_output_len, |
| ) |
| elif args.dataset_name == "random": |
| input_requests = sample_random_requests( |
| input_len=args.random_input_len, |
| output_len=args.random_output_len, |
| num_prompts=args.num_prompts, |
| range_ratio=args.random_range_ratio, |
| tokenizer=tokenizer, |
| dataset_path=args.dataset_path, |
| ) |
| elif args.dataset_name == "generated-shared-prefix": |
| input_requests = sample_generated_shared_prefix_requests( |
| num_groups=args.gsp_num_groups, |
| prompts_per_group=args.gsp_prompts_per_group, |
| system_prompt_len=args.gsp_system_prompt_len, |
| question_len=args.gsp_question_len, |
| output_len=args.gsp_output_len, |
| args=args, |
| tokenizer=tokenizer, |
| ) |
| else: |
| raise ValueError(f"Unknown dataset: {args.dataset_name}") |
| return input_requests |
|
|