File size: 6,224 Bytes
b0c0df0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
from tqdm import tqdm
from lmms_eval.api.registry import register_model
try:
from decord import VideoReader, cpu
except ImportError:
pass
from dotenv import load_dotenv
from loguru import logger as eval_logger
from lmms_eval.models.model_utils.gen_metrics import log_metrics
from lmms_eval.models.simple.openai_compatible import (
OpenAICompatible as OpenAICompatibleSimple,
)
from lmms_eval.protocol import ChatMessages
load_dotenv(verbose=True)
@register_model("openai_compatible_chat")
class OpenAICompatible(OpenAICompatibleSimple):
is_simple = False
def generate_until(self, requests) -> List[str]:
res = []
batch_size = getattr(self, "batch_size_per_gpu", 1)
batched_requests = [requests[i : i + batch_size] for i in range(0, len(requests), batch_size)]
pbar = tqdm(total=len(batched_requests), disable=(self.rank != 0), desc="Model Responding")
e2e_latency = 0
total_tokens = 0
for batch_requests in batched_requests:
batch_payloads = []
batch_doc_uuids = []
batch_responses = []
for req in batch_requests:
ctx, doc_to_messages, gen_kwargs, doc_id, task, split = req.args
doc_uuid = f"{task}___{split}___{doc_id}"
batch_doc_uuids.append(doc_uuid)
if self.continual_mode is True and self.cache_mode == "resume":
if doc_uuid in self.response_cache:
response_text = self.response_cache[doc_uuid]
if response_text:
batch_responses.append(response_text)
continue
chat_messages_raw = doc_to_messages(self.task_dict[task][split][doc_id])
chat_messages: ChatMessages = ChatMessages(**{"messages": chat_messages_raw})
payload = {"messages": chat_messages.to_openai_messages()}
payload["model"] = self.model_version
if "max_new_tokens" not in gen_kwargs:
gen_kwargs["max_new_tokens"] = 1024
if gen_kwargs["max_new_tokens"] > 4096:
gen_kwargs["max_new_tokens"] = 4096
if "temperature" not in gen_kwargs:
gen_kwargs["temperature"] = 0
if "top_p" not in gen_kwargs:
gen_kwargs["top_p"] = None
if "num_beams" not in gen_kwargs:
gen_kwargs["num_beams"] = 1
payload["max_tokens"] = gen_kwargs["max_new_tokens"]
payload["temperature"] = gen_kwargs["temperature"]
if "o1" in self.model_version or "o3" in self.model_version or "o4" in self.model_version:
del payload["temperature"]
payload.pop("max_tokens")
payload["reasoning_effort"] = "medium"
payload["response_format"] = {"type": "text"}
payload["max_completion_tokens"] = gen_kwargs["max_new_tokens"]
batch_payloads.append(payload)
batch_responses.append(None)
def process_single_request(payload, i):
if batch_responses[i] is not None:
return batch_responses[i], i, 0, 0
for attempt in range(self.max_retries):
try:
start_time = time.time()
response = self.client.chat.completions.create(**payload)
end_time = time.time()
response_text = response.choices[0].message.content
latency = end_time - start_time
tokens = 0
if hasattr(response, "usage"):
tokens = response.usage.completion_tokens
else:
tokens = len(response_text.split())
return response_text, i, latency, tokens
except Exception as e:
error_msg = str(e)
eval_logger.info(f"Attempt {attempt + 1}/{self.max_retries} failed with error: {error_msg}")
if attempt == self.max_retries - 1:
eval_logger.error(f"All {self.max_retries} attempts failed. Last error: {error_msg}")
return "", i, 0, 0
else:
time.sleep(self.timeout)
return "", i, 0, 0
tasks_to_run = [(payload, i) for i, payload in enumerate(batch_payloads) if batch_responses[i] is None]
if tasks_to_run:
max_workers = min(len(tasks_to_run), 32)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_index = {executor.submit(process_single_request, payload, i): i for payload, i in tasks_to_run}
for future in as_completed(future_to_index):
response_text, i, latency, tokens = future.result()
batch_responses[i] = response_text
e2e_latency += latency
total_tokens += tokens
if self.continual_mode is True:
for doc_uuid, response_text in zip(batch_doc_uuids, batch_responses):
if response_text is not None:
self.response_cache[doc_uuid] = response_text
with open(self.response_persistent_file, "w") as f:
json.dump(self.response_cache, f)
res.extend([r for r in batch_responses if r is not None])
pbar.update(1)
# Calculate average speed
avg_speed = total_tokens / e2e_latency if e2e_latency > 0 else 0
# Log metrics
metric_dict = {
"total_tokens": total_tokens,
"e2e_latency": e2e_latency,
"avg_speed": avg_speed,
}
log_metrics(**metric_dict)
pbar.close()
return res
|