File size: 5,054 Bytes
6ce472c
 
 
 
1362320
586cd83
1362320
586cd83
 
1362320
6ce472c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
586cd83
6ce472c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
586cd83
6ce472c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
Contain Wrapper Class for ChormaDB client, that can process and store documents and retrive document chunks.
"""

# for chromaDB
__import__("pysqlite3")
import sys

sys.modules["sqlite3"] = sys.modules.pop("pysqlite3")

from io import BytesIO
from typing import List
from typing_extensions import Literal
import uuid
import warnings
import chromadb
import re
from .utils import (
    generate_file_id,
    chunk_document,
    generate_embedding,
    extract_content_from_docx,
    extract_content_from_pdf,
)


class AdvancedClient:

    def __init__(self, vector_database_path: str = "vectorDB") -> None:
        self.client = chromadb.PersistentClient(path=vector_database_path)
        self.exsisting_collections = [
            collection.name for collection in self.client.list_collections()
        ]
        self.selected_collections: List[str] = []

    def create_or_get_collection(
        self,
        file_names: List[str],
        file_types: List[Literal["pdf", "docx"]],
        file_datas,
    ):
        collections = []
        for data in zip(file_names, file_types, file_datas):

            file_name, file_type, file_data = data
            file_id = generate_file_id(file_bytes=file_data)
            file_exisis = file_id in self.exsisting_collections

            if file_exisis:
                collection = file_id

            else:
                collection = self.client.create_collection(name=file_id)
                file_buffer = BytesIO(file_data)

                if file_type == "pdf":
                    document, pil_images = extract_content_from_pdf(file_buffer)
                    chunks = chunk_document(document)
                    ids = [f"{uuid.uuid4()}_id_{x}" for x in range(1, len(chunks) + 1)]
                    embeddings = generate_embedding(
                        chunks, embedding_model="znbang/bge:small-en-v1.5-q8_0"
                    )
                    metadatas = []

                    for chunk in chunks:
                        imgs_found = re.findall(
                            pattern=r"<img\s+src='([^']*)'>", string=chunk
                        )
                        chunk_imgs = []
                        if len(imgs_found) > 0:
                            for img in imgs_found:
                                chunk_imgs.append(pil_images[int(img)])
                        metadatas.append(
                            {"images": str(chunk_imgs), "file_name": file_name}
                        )

                elif file_type == "docx":
                    document = extract_content_from_docx(file_buffer)
                    chunks = chunk_document(document)
                    ids = [f"{uuid.uuid4()}_id_{x}" for x in range(1, len(chunks) + 1)]

                    embeddings = generate_embedding(
                        chunks, embedding_model="znbang/bge:small-en-v1.5-q8_0"
                    )
                    metadatas = [{"file_name": file_name} for _ in chunks]

                else:
                    raise Exception(
                        f"Given format '.{file_type}' is currently not supported."
                    )

                collection.add(
                    ids=ids,
                    embeddings=embeddings,  # type: ignore
                    documents=chunks,
                    metadatas=metadatas,  # type: ignore
                )
                collection = file_id
            collections.append(collection)

        self.selected_collections = collections

    def retrieve_chunks(self, query: str, number_of_chunks: int = 3):
        if len(self.selected_collections) == 0:

            warnings.warn(
                message=f"No collection is selected using all the exsisting collections, total collections : {len(self.exsisting_collections)}"
            )
            collections = [self.client.get_collection("UNION")]
            self.selected_collections = [collection.name for collection in collections]
        else:
            collections = [
                self.client.get_collection(collection_name)
                for collection_name in self.selected_collections
            ]

        query_emb = generate_embedding(
            [query], embedding_model="znbang/bge:small-en-v1.5-q8_0"
        )

        retrieved_docs = []

        for collection in collections:
            results = collection.query(
                query_embeddings=query_emb,
                n_results=5,
                include=["documents", "metadatas", "distances"],
            )

            for i in range(len(results["ids"][0])):
                retrieved_docs.append(
                    {
                        "document": results["documents"][0][i],
                        "metadata": results["metadatas"][0][i],
                        "distance": results["distances"][0][i],
                        "collection": collection.name,
                    }
                )

        retrieved_docs = sorted(retrieved_docs, key=lambda x: x["distance"])

        return retrieved_docs[:number_of_chunks]