| from dataclasses import dataclass, field |
| from typing import TypedDict, Union, Literal, Generic, TypeVar |
|
|
| import numpy as np |
|
|
|
|
|
|
|
|
| @dataclass |
| class QueryParam: |
| |
| only_need_context: bool = False |
| response_type: str = "Multiple Paragraphs" |
| level: int = 2 |
| |
| |
| naive_max_token_for_text_unit = 1536 |
|
|
| retrieved_num_sampled_frames = 10 |
| |
| only_need_context: bool = False |
|
|
|
|
| TextChunkSchema = TypedDict( |
| "TextChunkSchema", |
| {"tokens": int, "content": str, "video_segment_id": str, "chunk_order_index": int}, |
| ) |
|
|
| SingleCommunitySchema = TypedDict( |
| "SingleCommunitySchema", |
| { |
| "level": int, |
| "title": str, |
| "edges": list[list[str, str]], |
| "nodes": list[str], |
| "chunk_ids": list[str], |
| "occurrence": float, |
| "sub_communities": list[str], |
| }, |
| ) |
|
|
|
|
| class CommunitySchema(SingleCommunitySchema): |
| report_string: str |
| report_json: dict |
|
|
|
|
| T = TypeVar("T") |
|
|
|
|
| @dataclass |
| class StorageNameSpace: |
| namespace: str |
| global_config: dict |
|
|
| async def index_start_callback(self): |
| """commit the storage operations after indexing""" |
| pass |
|
|
| async def index_done_callback(self): |
| """commit the storage operations after indexing""" |
| pass |
|
|
| async def query_done_callback(self): |
| """commit the storage operations after querying""" |
| pass |
|
|
|
|
| @dataclass |
| class BaseVectorStorage(StorageNameSpace): |
| |
| meta_fields: set = field(default_factory=set) |
|
|
| async def query(self, query: str, top_k: int) -> list[dict]: |
| raise NotImplementedError |
|
|
| async def upsert(self, data: dict[str, dict]): |
| """Use 'content' field from value for embedding, use key as id. |
| If embedding_func is None, use 'embedding' field from value |
| """ |
| raise NotImplementedError |
|
|
|
|
|
|
| @dataclass |
| class BaseKVStorage(Generic[T], StorageNameSpace): |
| async def all_keys(self) -> list[str]: |
| raise NotImplementedError |
|
|
| async def get_by_id(self, id: str) -> Union[T, None]: |
| raise NotImplementedError |
|
|
| async def get_by_ids( |
| self, ids: list[str], fields: Union[set[str], None] = None |
| ) -> list[Union[T, None]]: |
| raise NotImplementedError |
|
|
| async def filter_keys(self, data: list[str]) -> set[str]: |
| """return un-exist keys""" |
| raise NotImplementedError |
|
|
| async def upsert(self, data: dict[str, T]): |
| raise NotImplementedError |
|
|
| async def drop(self): |
| raise NotImplementedError |
|
|
|
|
|
|