dahyedahye's picture
.
1161dd2
import aiosqlite
import json
import time
from typing import List
from server.app.utils.diskcache_lock import diskcache_lock
from server.logger.logger_config import my_logger as logger
from server.constant.constants import (SQLITE_DB_DIR, SQLITE_DB_NAME,
MAX_CHUNK_LENGTH, CHUNK_OVERLAP,
FROM_LOCAL_FILE, LOCAL_FILE_PARSING,
LOCAL_FILE_PARSING_COMPLETED,
LOCAL_FILE_EMBEDDED,
LOCAL_FILE_PROCESS_FAILED)
from server.rag.index.chunk.markdown_splitter import MarkdownTextSplitter
from server.rag.index.embedder.document_embedder import document_embedder
class AsyncTextParser:
ADD_BATCH_SIZE = 30
DELETE_BATCH_SIZE = 100
def __init__(self) -> None:
self.max_chunk_length = MAX_CHUNK_LENGTH
self.chunk_overlap = CHUNK_OVERLAP
self.doc_source = FROM_LOCAL_FILE
self.sqlite_db_path = f"{SQLITE_DB_DIR}/{SQLITE_DB_NAME}"
self.distributed_lock = diskcache_lock
async def update_doc_status(self, doc_id: int, doc_status: int) -> None:
logger.info(
f"[FILE_CONTENT] update_doc_status, doc_id: {doc_id}, doc_status: {doc_status}"
)
timestamp = int(time.time())
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
try:
with self.distributed_lock.lock():
await db.execute(
"UPDATE t_local_file_tab SET doc_status = ?, mtime = ? WHERE id = ?",
(doc_status, timestamp, doc_id))
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
async def add_local_file_chunk(self, file_id: int,
chunk_text_vec: List[str],
start_index: int) -> None:
logger.info(
f"[FILE_CONTENT] add_local_file_chunk, file_id: {file_id}, start_index: {start_index}"
)
timestamp = int(time.time())
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
chunks_to_add = []
chunk_index = start_index + 1
for chunk in chunk_text_vec:
chunks_to_add.append((file_id, chunk_index, chunk, len(chunk),
timestamp, timestamp))
chunk_index += 1
try:
with self.distributed_lock.lock():
await db.executemany(
"INSERT INTO t_local_file_chunk_tab (file_id, chunk_index, content, content_length, ctime, mtime) VALUES (?, ?, ?, ?, ?, ?)",
chunks_to_add)
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
async def add_content(self, doc_id: int, content: str, url: str) -> None:
if not content:
await self.update_doc_status(doc_id, LOCAL_FILE_PROCESS_FAILED)
return
begin_time = int(time.time())
logger.info(
f"[FILE_CONTENT] add_content begin, doc_id: {doc_id}, begin_time: {begin_time}"
)
await self.update_doc_status(doc_id, LOCAL_FILE_PARSING)
text_splitter_obj = MarkdownTextSplitter(
chunk_size=self.max_chunk_length, chunk_overlap=self.chunk_overlap)
chunk_text_vec = text_splitter_obj.split_text(content)
await self.update_doc_status(doc_id, LOCAL_FILE_PARSING_COMPLETED)
embedding_id_vec = []
for start in range(0, len(chunk_text_vec), self.ADD_BATCH_SIZE):
batch = chunk_text_vec[start:start + self.ADD_BATCH_SIZE]
await self.add_local_file_chunk(doc_id, batch, start)
try:
with self.distributed_lock.lock():
ret = await document_embedder.aadd_local_file_embedding(
doc_id, url, batch, self.doc_source)
if ret:
embedding_id_vec.extend(ret)
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
if embedding_id_vec:
await self.update_doc_status(doc_id, LOCAL_FILE_EMBEDDED)
try:
timestamp = int(time.time())
with self.distributed_lock.lock():
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
await db.execute(
"INSERT INTO t_doc_embedding_map_tab (doc_id, doc_source, embedding_id_list, ctime, mtime) VALUES (?, ?, ?, ?, ?)",
(doc_id, self.doc_source,
json.dumps(embedding_id_vec), timestamp,
timestamp))
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
end_time = int(time.time())
timecost = end_time - begin_time
logger.warning(
f"[FILE_CONTENT] add_content end, end_time: {end_time}, timecost: {timecost}"
)
async def delete_content(self, doc_id: int) -> None:
begin_time = int(time.time())
logger.info(
f"[FILE_CONTENT] delete_content begin, doc_id: {doc_id}, begin_time: {begin_time}"
)
# Delete embeddings associated with doc ID
await self._delete_embedding_doc(doc_id)
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
try:
with self.distributed_lock.lock():
await db.execute(
f"DELETE FROM t_local_file_tab WHERE id = ?",
(doc_id, ))
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
end_time = int(time.time())
timecost = end_time - begin_time
logger.warning(
f"[FILE_CONTENT] delete_content end, end_time: {end_time}, timecost: {timecost}"
)
async def _delete_embedding_doc(self, doc_id: int) -> None:
logger.info(f"[CRAWL_CONTENT] _delete_embedding_doc, doc_id: {doc_id}")
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
cursor = await db.execute(
f"SELECT embedding_id_list FROM t_doc_embedding_map_tab WHERE doc_source = ? and doc_id = ?",
[self.doc_source, doc_id])
rows = await cursor.fetchall()
# Parse embedding_id_list and flatten the list
embedding_id_vec = [
id for row in rows for id in json.loads(row[0])
]
try:
with self.distributed_lock.lock():
# Delete records from t_doc_embedding_map_tab
await db.execute(
f"DELETE FROM t_doc_embedding_map_tab WHERE doc_source = ? and doc_id = ?",
(self.doc_source, doc_id))
await db.commit()
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")
try:
for start in range(0, len(embedding_id_vec),
self.DELETE_BATCH_SIZE):
batch = embedding_id_vec[start:start +
self.DELETE_BATCH_SIZE]
if batch:
with self.distributed_lock.lock():
logger.info(
f"[CRAWL_CONTENT] _delete_embedding_doc, document_embedder.delete_document_embedding: {batch}"
)
document_embedder.delete_document_embedding(batch)
# await document_embedder.adelete_document_embedding(batch)
except Exception as e:
logger.error(f"Process distributed_lock exception: {e}")