from dataclasses import dataclass, field from typing import TypedDict, Union, Literal, Generic, TypeVar import numpy as np @dataclass class QueryParam: only_need_context: bool = False response_type: str = "Multiple Paragraphs" level: int = 2 # top_k: int = 3 # naive search naive_max_token_for_text_unit = 1536 retrieved_num_sampled_frames = 10 # videorag search only_need_context: bool = False TextChunkSchema = TypedDict( "TextChunkSchema", {"tokens": int, "content": str, "video_segment_id": str, "chunk_order_index": int}, ) SingleCommunitySchema = TypedDict( "SingleCommunitySchema", { "level": int, "title": str, "edges": list[list[str, str]], "nodes": list[str], "chunk_ids": list[str], "occurrence": float, "sub_communities": list[str], }, ) class CommunitySchema(SingleCommunitySchema): report_string: str report_json: dict T = TypeVar("T") @dataclass class StorageNameSpace: namespace: str global_config: dict async def index_start_callback(self): """commit the storage operations after indexing""" pass async def index_done_callback(self): """commit the storage operations after indexing""" pass async def query_done_callback(self): """commit the storage operations after querying""" pass @dataclass class BaseVectorStorage(StorageNameSpace): # embedding_func: EmbeddingFunc meta_fields: set = field(default_factory=set) async def query(self, query: str, top_k: int) -> list[dict]: raise NotImplementedError async def upsert(self, data: dict[str, dict]): """Use 'content' field from value for embedding, use key as id. If embedding_func is None, use 'embedding' field from value """ raise NotImplementedError @dataclass class BaseKVStorage(Generic[T], StorageNameSpace): async def all_keys(self) -> list[str]: raise NotImplementedError async def get_by_id(self, id: str) -> Union[T, None]: raise NotImplementedError async def get_by_ids( self, ids: list[str], fields: Union[set[str], None] = None ) -> list[Union[T, None]]: raise NotImplementedError async def filter_keys(self, data: list[str]) -> set[str]: """return un-exist keys""" raise NotImplementedError async def upsert(self, data: dict[str, T]): raise NotImplementedError async def drop(self): raise NotImplementedError