| import logging |
| logger = logging.getLogger("nano-graphrag") |
| |
| from transformers import AutoTokenizer |
| from ._utils import compute_mdhash_id |
| from typing import Callable, Dict, List, Optional, Type, Union, cast |
| import asyncio |
| import os |
| from ._llm import Qwen3 |
|
|
|
|
| |
| qwen3_model = Qwen3() |
| tiktoken_model_path = qwen3_model.download_tokenizer_files() |
| tiktoken_model_path = os.path.abspath(tiktoken_model_path) |
|
|
|
|
| try: |
| tiktoken_model_p = AutoTokenizer.from_pretrained( |
| tiktoken_model_path, |
| trust_remote_code=True, |
| local_files_only=True |
| ) |
| except Exception as e: |
| print(f"加载本地 tokenizer 失败: {e}") |
| print(f"请检查路径是否存在: {tiktoken_model_path}") |
| raise e |
|
|
|
|
| |
| def chunking_by_video_segments( |
| tokens_list: list[list[int]], |
| doc_keys, |
| tiktoken_model, |
| max_token_size=1024, |
| ): |
| |
| for index in range(len(tokens_list)): |
| if len(tokens_list[index]) > max_token_size: |
| tokens_list[index] = tokens_list[index][:max_token_size] |
| |
| results = [] |
| chunk_token = [] |
| chunk_segment_ids = [] |
| chunk_order_index = 0 |
| for index, tokens in enumerate(tokens_list): |
| |
| if len(chunk_token) + len(tokens) <= max_token_size: |
| |
| chunk_token += tokens.copy() |
| chunk_segment_ids.append(doc_keys[index]) |
| else: |
| |
| chunk = tiktoken_model.decode(chunk_token) |
| results.append( |
| { |
| "tokens": len(chunk_token), |
| "content": chunk.strip(), |
| "chunk_order_index": chunk_order_index, |
| "video_segment_id": chunk_segment_ids, |
| } |
| ) |
| |
| chunk_token = [] |
| chunk_segment_ids = [] |
| chunk_token += tokens.copy() |
| chunk_segment_ids.append(doc_keys[index]) |
| chunk_order_index += 1 |
| |
| |
| if len(chunk_token) > 0: |
| chunk = tiktoken_model.decode(chunk_token) |
| results.append( |
| { |
| "tokens": len(chunk_token), |
| "content": chunk.strip(), |
| "chunk_order_index": chunk_order_index, |
| "video_segment_id": chunk_segment_ids, |
| } |
| ) |
| |
| return results |
|
|
|
|
| def get_chunks(new_videos, chunk_func=chunking_by_video_segments, **chunk_func_params): |
| inserting_chunks = {} |
|
|
| new_videos_list = list(new_videos.keys()) |
| for video_name in new_videos_list: |
| segment_id_list = list(new_videos[video_name].keys()) |
| docs = [new_videos[video_name][index]["content"] for index in segment_id_list] |
| doc_keys = [f'{video_name}_{index}' for index in segment_id_list] |
|
|
| tokens = tiktoken_model_p.batch_encode_plus(docs) |
| tokens_list = tokens['input_ids'] |
| chunks = chunk_func( |
| tokens_list, doc_keys=doc_keys, tiktoken_model=tiktoken_model_p, **chunk_func_params |
| ) |
|
|
| for chunk in chunks: |
| inserting_chunks.update( |
| {compute_mdhash_id(chunk["content"], prefix="chunk-"): chunk} |
| ) |
|
|
| return inserting_chunks |
|
|
|
|
|
|
|
|
|
|
|
|