File size: 8,447 Bytes
1161dd2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import aiosqlite
import json
import time
from typing import List
from server.app.utils.diskcache_lock import diskcache_lock
from server.logger.logger_config import my_logger as logger
from server.constant.constants import (SQLITE_DB_DIR, SQLITE_DB_NAME,
                                       MAX_CHUNK_LENGTH, CHUNK_OVERLAP,
                                       FROM_LOCAL_FILE, LOCAL_FILE_PARSING,
                                       LOCAL_FILE_PARSING_COMPLETED,
                                       LOCAL_FILE_EMBEDDED,
                                       LOCAL_FILE_PROCESS_FAILED)
from server.rag.index.chunk.markdown_splitter import MarkdownTextSplitter
from server.rag.index.embedder.document_embedder import document_embedder


class AsyncTextParser:
    ADD_BATCH_SIZE = 30
    DELETE_BATCH_SIZE = 100

    def __init__(self) -> None:
        self.max_chunk_length = MAX_CHUNK_LENGTH
        self.chunk_overlap = CHUNK_OVERLAP
        self.doc_source = FROM_LOCAL_FILE
        self.sqlite_db_path = f"{SQLITE_DB_DIR}/{SQLITE_DB_NAME}"
        self.distributed_lock = diskcache_lock

    async def update_doc_status(self, doc_id: int, doc_status: int) -> None:
        logger.info(
            f"[FILE_CONTENT] update_doc_status, doc_id: {doc_id}, doc_status: {doc_status}"
        )
        timestamp = int(time.time())
        async with aiosqlite.connect(self.sqlite_db_path) as db:
            await db.execute("PRAGMA journal_mode=WAL;")

            try:
                with self.distributed_lock.lock():
                    await db.execute(
                        "UPDATE t_local_file_tab SET doc_status = ?, mtime = ? WHERE id = ?",
                        (doc_status, timestamp, doc_id))
                    await db.commit()
            except Exception as e:
                logger.error(f"Process distributed_lock exception: {e}")

    async def add_local_file_chunk(self, file_id: int,
                                   chunk_text_vec: List[str],
                                   start_index: int) -> None:
        logger.info(
            f"[FILE_CONTENT] add_local_file_chunk, file_id: {file_id}, start_index: {start_index}"
        )
        timestamp = int(time.time())
        async with aiosqlite.connect(self.sqlite_db_path) as db:
            await db.execute("PRAGMA journal_mode=WAL;")

            chunks_to_add = []
            chunk_index = start_index + 1
            for chunk in chunk_text_vec:
                chunks_to_add.append((file_id, chunk_index, chunk, len(chunk),
                                      timestamp, timestamp))
                chunk_index += 1

            try:
                with self.distributed_lock.lock():
                    await db.executemany(
                        "INSERT INTO t_local_file_chunk_tab (file_id, chunk_index, content, content_length, ctime, mtime) VALUES (?, ?, ?, ?, ?, ?)",
                        chunks_to_add)
                    await db.commit()
            except Exception as e:
                logger.error(f"Process distributed_lock exception: {e}")

    async def add_content(self, doc_id: int, content: str, url: str) -> None:
        if not content:
            await self.update_doc_status(doc_id, LOCAL_FILE_PROCESS_FAILED)
            return

        begin_time = int(time.time())
        logger.info(
            f"[FILE_CONTENT] add_content begin, doc_id: {doc_id}, begin_time: {begin_time}"
        )
        await self.update_doc_status(doc_id, LOCAL_FILE_PARSING)

        text_splitter_obj = MarkdownTextSplitter(
            chunk_size=self.max_chunk_length, chunk_overlap=self.chunk_overlap)
        chunk_text_vec = text_splitter_obj.split_text(content)
        await self.update_doc_status(doc_id, LOCAL_FILE_PARSING_COMPLETED)

        embedding_id_vec = []
        for start in range(0, len(chunk_text_vec), self.ADD_BATCH_SIZE):
            batch = chunk_text_vec[start:start + self.ADD_BATCH_SIZE]

            await self.add_local_file_chunk(doc_id, batch, start)

            try:
                with self.distributed_lock.lock():
                    ret = await document_embedder.aadd_local_file_embedding(
                        doc_id, url, batch, self.doc_source)
                    if ret:
                        embedding_id_vec.extend(ret)
            except Exception as e:
                logger.error(f"Process distributed_lock exception: {e}")

        if embedding_id_vec:
            await self.update_doc_status(doc_id, LOCAL_FILE_EMBEDDED)

            try:
                timestamp = int(time.time())
                with self.distributed_lock.lock():

                    async with aiosqlite.connect(self.sqlite_db_path) as db:
                        await db.execute("PRAGMA journal_mode=WAL;")

                        await db.execute(
                            "INSERT INTO t_doc_embedding_map_tab (doc_id, doc_source, embedding_id_list, ctime, mtime) VALUES (?, ?, ?, ?, ?)",
                            (doc_id, self.doc_source,
                             json.dumps(embedding_id_vec), timestamp,
                             timestamp))
                        await db.commit()
            except Exception as e:
                logger.error(f"Process distributed_lock exception: {e}")

        end_time = int(time.time())
        timecost = end_time - begin_time
        logger.warning(
            f"[FILE_CONTENT] add_content end, end_time: {end_time}, timecost: {timecost}"
        )

    async def delete_content(self, doc_id: int) -> None:
        begin_time = int(time.time())
        logger.info(
            f"[FILE_CONTENT] delete_content begin, doc_id: {doc_id}, begin_time: {begin_time}"
        )

        # Delete embeddings associated with doc ID
        await self._delete_embedding_doc(doc_id)

        async with aiosqlite.connect(self.sqlite_db_path) as db:
            await db.execute("PRAGMA journal_mode=WAL;")

            try:
                with self.distributed_lock.lock():
                    await db.execute(
                        f"DELETE FROM t_local_file_tab WHERE id = ?",
                        (doc_id, ))
                    await db.commit()
            except Exception as e:
                logger.error(f"Process distributed_lock exception: {e}")

        end_time = int(time.time())
        timecost = end_time - begin_time
        logger.warning(
            f"[FILE_CONTENT] delete_content end, end_time: {end_time}, timecost: {timecost}"
        )

    async def _delete_embedding_doc(self, doc_id: int) -> None:
        logger.info(f"[CRAWL_CONTENT] _delete_embedding_doc, doc_id: {doc_id}")
        async with aiosqlite.connect(self.sqlite_db_path) as db:
            await db.execute("PRAGMA journal_mode=WAL;")

            cursor = await db.execute(
                f"SELECT embedding_id_list FROM t_doc_embedding_map_tab WHERE doc_source = ? and doc_id = ?",
                [self.doc_source, doc_id])
            rows = await cursor.fetchall()
            # Parse embedding_id_list and flatten the list
            embedding_id_vec = [
                id for row in rows for id in json.loads(row[0])
            ]

            try:
                with self.distributed_lock.lock():
                    # Delete records from t_doc_embedding_map_tab
                    await db.execute(
                        f"DELETE FROM t_doc_embedding_map_tab WHERE doc_source = ? and doc_id = ?",
                        (self.doc_source, doc_id))
                    await db.commit()
            except Exception as e:
                logger.error(f"Process distributed_lock exception: {e}")

            try:
                for start in range(0, len(embedding_id_vec),
                                   self.DELETE_BATCH_SIZE):
                    batch = embedding_id_vec[start:start +
                                             self.DELETE_BATCH_SIZE]
                    if batch:
                        with self.distributed_lock.lock():
                            logger.info(
                                f"[CRAWL_CONTENT] _delete_embedding_doc, document_embedder.delete_document_embedding: {batch}"
                            )
                            document_embedder.delete_document_embedding(batch)
                            # await document_embedder.adelete_document_embedding(batch)
            except Exception as e:
                logger.error(f"Process distributed_lock exception: {e}")