Spaces:
Sleeping
Sleeping
File size: 8,447 Bytes
1161dd2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
import aiosqlite
import json
import time
from typing import List
from server.app.utils.diskcache_lock import diskcache_lock
from server.logger.logger_config import my_logger as logger
from server.constant.constants import (SQLITE_DB_DIR, SQLITE_DB_NAME,
MAX_CHUNK_LENGTH, CHUNK_OVERLAP,
FROM_LOCAL_FILE, LOCAL_FILE_PARSING,
LOCAL_FILE_PARSING_COMPLETED,
LOCAL_FILE_EMBEDDED,
LOCAL_FILE_PROCESS_FAILED)
from server.rag.index.chunk.markdown_splitter import MarkdownTextSplitter
from server.rag.index.embedder.document_embedder import document_embedder
class AsyncTextParser:
ADD_BATCH_SIZE = 30
DELETE_BATCH_SIZE = 100
def __init__(self) -> None:
self.max_chunk_length = MAX_CHUNK_LENGTH
self.chunk_overlap = CHUNK_OVERLAP
self.doc_source = FROM_LOCAL_FILE
self.sqlite_db_path = f"{SQLITE_DB_DIR}/{SQLITE_DB_NAME}"
self.distributed_lock = diskcache_lock
async def update_doc_status(self, doc_id: int, doc_status: int) -> None:
logger.info(
f"[FILE_CONTENT] update_doc_status, doc_id: {doc_id}, doc_status: {doc_status}"
)
timestamp = int(time.time())
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
try:
with self.distributed_lock.lock():
await db.execute(
"UPDATE t_local_file_tab SET doc_status = ?, mtime = ? WHERE id = ?",
(doc_status, timestamp, doc_id))
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
async def add_local_file_chunk(self, file_id: int,
chunk_text_vec: List[str],
start_index: int) -> None:
logger.info(
f"[FILE_CONTENT] add_local_file_chunk, file_id: {file_id}, start_index: {start_index}"
)
timestamp = int(time.time())
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
chunks_to_add = []
chunk_index = start_index + 1
for chunk in chunk_text_vec:
chunks_to_add.append((file_id, chunk_index, chunk, len(chunk),
timestamp, timestamp))
chunk_index += 1
try:
with self.distributed_lock.lock():
await db.executemany(
"INSERT INTO t_local_file_chunk_tab (file_id, chunk_index, content, content_length, ctime, mtime) VALUES (?, ?, ?, ?, ?, ?)",
chunks_to_add)
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
async def add_content(self, doc_id: int, content: str, url: str) -> None:
if not content:
await self.update_doc_status(doc_id, LOCAL_FILE_PROCESS_FAILED)
return
begin_time = int(time.time())
logger.info(
f"[FILE_CONTENT] add_content begin, doc_id: {doc_id}, begin_time: {begin_time}"
)
await self.update_doc_status(doc_id, LOCAL_FILE_PARSING)
text_splitter_obj = MarkdownTextSplitter(
chunk_size=self.max_chunk_length, chunk_overlap=self.chunk_overlap)
chunk_text_vec = text_splitter_obj.split_text(content)
await self.update_doc_status(doc_id, LOCAL_FILE_PARSING_COMPLETED)
embedding_id_vec = []
for start in range(0, len(chunk_text_vec), self.ADD_BATCH_SIZE):
batch = chunk_text_vec[start:start + self.ADD_BATCH_SIZE]
await self.add_local_file_chunk(doc_id, batch, start)
try:
with self.distributed_lock.lock():
ret = await document_embedder.aadd_local_file_embedding(
doc_id, url, batch, self.doc_source)
if ret:
embedding_id_vec.extend(ret)
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
if embedding_id_vec:
await self.update_doc_status(doc_id, LOCAL_FILE_EMBEDDED)
try:
timestamp = int(time.time())
with self.distributed_lock.lock():
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
await db.execute(
"INSERT INTO t_doc_embedding_map_tab (doc_id, doc_source, embedding_id_list, ctime, mtime) VALUES (?, ?, ?, ?, ?)",
(doc_id, self.doc_source,
json.dumps(embedding_id_vec), timestamp,
timestamp))
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
end_time = int(time.time())
timecost = end_time - begin_time
logger.warning(
f"[FILE_CONTENT] add_content end, end_time: {end_time}, timecost: {timecost}"
)
async def delete_content(self, doc_id: int) -> None:
begin_time = int(time.time())
logger.info(
f"[FILE_CONTENT] delete_content begin, doc_id: {doc_id}, begin_time: {begin_time}"
)
# Delete embeddings associated with doc ID
await self._delete_embedding_doc(doc_id)
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
try:
with self.distributed_lock.lock():
await db.execute(
f"DELETE FROM t_local_file_tab WHERE id = ?",
(doc_id, ))
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
end_time = int(time.time())
timecost = end_time - begin_time
logger.warning(
f"[FILE_CONTENT] delete_content end, end_time: {end_time}, timecost: {timecost}"
)
async def _delete_embedding_doc(self, doc_id: int) -> None:
logger.info(f"[CRAWL_CONTENT] _delete_embedding_doc, doc_id: {doc_id}")
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
cursor = await db.execute(
f"SELECT embedding_id_list FROM t_doc_embedding_map_tab WHERE doc_source = ? and doc_id = ?",
[self.doc_source, doc_id])
rows = await cursor.fetchall()
# Parse embedding_id_list and flatten the list
embedding_id_vec = [
id for row in rows for id in json.loads(row[0])
]
try:
with self.distributed_lock.lock():
# Delete records from t_doc_embedding_map_tab
await db.execute(
f"DELETE FROM t_doc_embedding_map_tab WHERE doc_source = ? and doc_id = ?",
(self.doc_source, doc_id))
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
try:
for start in range(0, len(embedding_id_vec),
self.DELETE_BATCH_SIZE):
batch = embedding_id_vec[start:start +
self.DELETE_BATCH_SIZE]
if batch:
with self.distributed_lock.lock():
logger.info(
f"[CRAWL_CONTENT] _delete_embedding_doc, document_embedder.delete_document_embedding: {batch}"
)
document_embedder.delete_document_embedding(batch)
# await document_embedder.adelete_document_embedding(batch)
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
|