File size: 3,799 Bytes
4e98fb0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# -*- coding: utf-8 -*-
"""
向量存储抽象层

设计原则:
1. 接口与实现分离 - 易于切换存储后端
2. 异步优先 - 所有 I/O 操作都是异步的
3. 类型安全 - 完整的类型注解
4. 可观测 - 内置指标收集
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from enum import Enum
import logging

logger = logging.getLogger(__name__)


# ============================================================
# 数据模型
# ============================================================

@dataclass
class Document:
    """文档数据模型"""
    id: str
    content: str
    metadata: Dict[str, Any] = field(default_factory=dict)
    embedding: Optional[List[float]] = None
    
    @property
    def file_path(self) -> str:
        return self.metadata.get("file", "")
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "content": self.content,
            "metadata": self.metadata,
        }


@dataclass
class SearchResult:
    """搜索结果"""
    document: Document
    score: float
    source: str = "vector"  # "vector" | "bm25" | "hybrid"
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.document.id,
            "content": self.document.content,
            "file": self.document.file_path,
            "metadata": self.document.metadata,
            "score": self.score,
            "source": self.source,
        }


@dataclass
class CollectionStats:
    """集合统计信息"""
    name: str
    document_count: int
    indexed_files: Set[str] = field(default_factory=set)
    vector_dimension: int = 0


class StorageBackend(Enum):
    """存储后端类型"""
    QDRANT = "qdrant"
    CHROMA = "chroma"  # 保留兼容性


# ============================================================
# 抽象基类
# ============================================================

class BaseVectorStore(ABC):
    """
    向量存储抽象基类
    
    所有存储后端必须实现这些方法
    """
    
    @abstractmethod
    async def initialize(self) -> None:
        """初始化存储连接"""
        pass
    
    @abstractmethod
    async def close(self) -> None:
        """关闭连接"""
        pass
    
    @abstractmethod
    async def add_documents(
        self,
        documents: List[Document],
        embeddings: List[List[float]]
    ) -> int:
        """
        添加文档
        
        Args:
            documents: 文档列表
            embeddings: 对应的嵌入向量
            
        Returns:
            成功添加的文档数量
        """
        pass
    
    @abstractmethod
    async def search(
        self,
        query_embedding: List[float],
        top_k: int = 10,
        filter_conditions: Optional[Dict[str, Any]] = None
    ) -> List[SearchResult]:
        """
        向量相似度搜索
        
        Args:
            query_embedding: 查询向量
            top_k: 返回数量
            filter_conditions: 过滤条件
            
        Returns:
            搜索结果列表
        """
        pass
    
    @abstractmethod
    async def delete_collection(self) -> bool:
        """删除当前集合"""
        pass
    
    @abstractmethod
    async def get_stats(self) -> CollectionStats:
        """获取集合统计信息"""
        pass
    
    @abstractmethod
    async def get_documents_by_file(self, file_path: str) -> List[Document]:
        """根据文件路径获取文档"""
        pass


class BaseVectorStoreFactory(ABC):
    """向量存储工厂基类"""
    
    @abstractmethod
    def create(self, collection_name: str) -> BaseVectorStore:
        """创建存储实例"""
        pass