tools / utils /cost /cost_text.py
Adinosaur's picture
Upload folder using huggingface_hub
1c980b1 verified
import json
import tiktoken
from tqdm import tqdm
from multiprocessing import Pool
import pandas as pd
# 全局编码器初始化(每个子进程独立初始化)
def init_process():
global encoder
encoder = tiktoken.get_encoding("cl100k_base")
def calculate_tokens(obj):
"""计算单个对象的token数量(子进程内部调用)"""
global encoder
total_text = []
try:
messages = obj.get("body", {}).get("messages", [])
for msg in messages:
# 系统提示
if msg.get("role") == "system":
content = msg.get("content", "")
if content: # 跳过空内容
total_text.append(content)
# 用户消息
elif msg.get("role") == "user":
content = msg.get("content", [])
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
if text:
total_text.append(text)
elif isinstance(content, dict) and content.get("type") == "text":
text = content.get("text", "")
if text:
total_text.append(text)
# 合并文本并计算Token
return len(encoder.encode("\n".join(total_text)))
except Exception as e:
print(f"处理错误: {e} | 数据: {obj.get('custom_id')}")
return 0
def process_line(line):
"""处理单行数据"""
try:
data = json.loads(line)
return {
"custom_id": data.get("custom_id"),
"tokens": calculate_tokens(data)
}
except json.JSONDecodeError:
print(f"无效JSON: {line[:100]}...") # 打印前100字符辅助定位
return None
except Exception as e:
print(f"全局错误: {e}")
return None
if __name__ == "__main__":
# 读取数据
with open("/mnt/data/users/zys/proj/vlm_reasoning/request/vqa_batch_requests.jsonl", "r") as f:
lines = f.readlines()
# 并行处理
with Pool(processes=8, initializer=init_process) as pool:
results = []
with tqdm(total=len(lines), desc="处理进度") as pbar:
for result in pool.imap(process_line, lines):
if result is not None: # 过滤失败记录
results.append(result)
pbar.update()
# 保存结果
df = pd.DataFrame(results)
df.to_csv("token_results.csv", index=False)
# 统计输出
total_tokens = df["tokens"].sum()
avg_tokens = df["tokens"].mean()
print(f"统计报告:\n"
f"- 总Token数: {total_tokens:,}\n"
f"- 平均每条: {avg_tokens:.1f}\n"
f"- 最大单条: {df['tokens'].max()}\n"
f"- 有效数据: {len(df)}/{len(lines)}")