Spaces:
Running
Running
File size: 3,799 Bytes
4e98fb0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | # -*- coding: utf-8 -*-
"""
向量存储抽象层
设计原则:
1. 接口与实现分离 - 易于切换存储后端
2. 异步优先 - 所有 I/O 操作都是异步的
3. 类型安全 - 完整的类型注解
4. 可观测 - 内置指标收集
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from enum import Enum
import logging
logger = logging.getLogger(__name__)
# ============================================================
# 数据模型
# ============================================================
@dataclass
class Document:
"""文档数据模型"""
id: str
content: str
metadata: Dict[str, Any] = field(default_factory=dict)
embedding: Optional[List[float]] = None
@property
def file_path(self) -> str:
return self.metadata.get("file", "")
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"content": self.content,
"metadata": self.metadata,
}
@dataclass
class SearchResult:
"""搜索结果"""
document: Document
score: float
source: str = "vector" # "vector" | "bm25" | "hybrid"
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.document.id,
"content": self.document.content,
"file": self.document.file_path,
"metadata": self.document.metadata,
"score": self.score,
"source": self.source,
}
@dataclass
class CollectionStats:
"""集合统计信息"""
name: str
document_count: int
indexed_files: Set[str] = field(default_factory=set)
vector_dimension: int = 0
class StorageBackend(Enum):
"""存储后端类型"""
QDRANT = "qdrant"
CHROMA = "chroma" # 保留兼容性
# ============================================================
# 抽象基类
# ============================================================
class BaseVectorStore(ABC):
"""
向量存储抽象基类
所有存储后端必须实现这些方法
"""
@abstractmethod
async def initialize(self) -> None:
"""初始化存储连接"""
pass
@abstractmethod
async def close(self) -> None:
"""关闭连接"""
pass
@abstractmethod
async def add_documents(
self,
documents: List[Document],
embeddings: List[List[float]]
) -> int:
"""
添加文档
Args:
documents: 文档列表
embeddings: 对应的嵌入向量
Returns:
成功添加的文档数量
"""
pass
@abstractmethod
async def search(
self,
query_embedding: List[float],
top_k: int = 10,
filter_conditions: Optional[Dict[str, Any]] = None
) -> List[SearchResult]:
"""
向量相似度搜索
Args:
query_embedding: 查询向量
top_k: 返回数量
filter_conditions: 过滤条件
Returns:
搜索结果列表
"""
pass
@abstractmethod
async def delete_collection(self) -> bool:
"""删除当前集合"""
pass
@abstractmethod
async def get_stats(self) -> CollectionStats:
"""获取集合统计信息"""
pass
@abstractmethod
async def get_documents_by_file(self, file_path: str) -> List[Document]:
"""根据文件路径获取文档"""
pass
class BaseVectorStoreFactory(ABC):
"""向量存储工厂基类"""
@abstractmethod
def create(self, collection_name: str) -> BaseVectorStore:
"""创建存储实例"""
pass
|