H022329's picture
Upload folder using huggingface_hub
10a30d9 verified
import logging
logger = logging.getLogger("nano-graphrag")
# from dashscope import get_tokenizer
from transformers import AutoTokenizer
from ._utils import compute_mdhash_id
from typing import Callable, Dict, List, Optional, Type, Union, cast
import asyncio
import os
from ._llm import Qwen3
# qwen3_path = os.getenv("Qwen3_Path")
qwen3_model = Qwen3()
tiktoken_model_path = qwen3_model.download_tokenizer_files()
tiktoken_model_path = os.path.abspath(tiktoken_model_path)
try:
tiktoken_model_p = AutoTokenizer.from_pretrained(
tiktoken_model_path,
trust_remote_code=True,
local_files_only=True # <--- 关键参数
)
except Exception as e:
print(f"加载本地 tokenizer 失败: {e}")
print(f"请检查路径是否存在: {tiktoken_model_path}")
raise e
# tiktoken_model_p = AutoTokenizer.from_pretrained(tiktoken_model_path)
def chunking_by_video_segments(
tokens_list: list[list[int]],
doc_keys,
tiktoken_model,
max_token_size=1024,
):
# make sure each segment is not larger than max_token_size
for index in range(len(tokens_list)):
if len(tokens_list[index]) > max_token_size:
tokens_list[index] = tokens_list[index][:max_token_size]
results = []
chunk_token = []
chunk_segment_ids = []
chunk_order_index = 0
for index, tokens in enumerate(tokens_list):
if len(chunk_token) + len(tokens) <= max_token_size:
# add new segment
chunk_token += tokens.copy()
chunk_segment_ids.append(doc_keys[index])
else:
# save the current chunk
chunk = tiktoken_model.decode(chunk_token)
results.append(
{
"tokens": len(chunk_token),
"content": chunk.strip(),
"chunk_order_index": chunk_order_index,
"video_segment_id": chunk_segment_ids,
}
)
# new chunk with current segment as begin
chunk_token = []
chunk_segment_ids = []
chunk_token += tokens.copy()
chunk_segment_ids.append(doc_keys[index])
chunk_order_index += 1
# save the last chunk
if len(chunk_token) > 0:
chunk = tiktoken_model.decode(chunk_token)
results.append(
{
"tokens": len(chunk_token),
"content": chunk.strip(),
"chunk_order_index": chunk_order_index,
"video_segment_id": chunk_segment_ids,
}
)
return results
def get_chunks(new_videos, chunk_func=chunking_by_video_segments, **chunk_func_params):
inserting_chunks = {}
new_videos_list = list(new_videos.keys())
for video_name in new_videos_list:
segment_id_list = list(new_videos[video_name].keys())
docs = [new_videos[video_name][index]["content"] for index in segment_id_list]
doc_keys = [f'{video_name}_{index}' for index in segment_id_list]
tokens = tiktoken_model_p.batch_encode_plus(docs)
tokens_list = tokens['input_ids']
chunks = chunk_func(
tokens_list, doc_keys=doc_keys, tiktoken_model=tiktoken_model_p, **chunk_func_params
)
for chunk in chunks:
inserting_chunks.update(
{compute_mdhash_id(chunk["content"], prefix="chunk-"): chunk}
)
return inserting_chunks