import os import math import asyncio import re import uuid import base64 import json from bs4 import BeautifulSoup from typing import List, Dict, Tuple, Optional, Any, Protocol, Literal from langchain_core.documents import Document from fastembed_manager import add_custom_embedding_model from langchain_text_splitters import RecursiveCharacterTextSplitter from tqdm.asyncio import tqdm_asyncio from asyncio import Semaphore from fastembed_manager import add_custom_embedding_model sem = Semaphore(10) def resolve_user_path(path: str) -> str: return os.path.expanduser(path) def load_json_data(file_path: str) -> List[Dict[str, Any]]: import json with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return data def uuid64(): u = uuid.uuid4() b64 = base64.urlsafe_b64encode(u.bytes).rstrip(b'=') return b64.decode('ascii') def clean_text(text: str) -> str: if not text: return "" # 1. Xóa TOÀN BỘ khối caption (cả thẻ lẫn nội dung bên trong) # Dùng flag re.DOTALL để dấu chấm (.) khớp được cả xuống dòng (\n) # Pattern: Tìm [caption ... ] ... [/caption] và xóa sạch text = re.sub(r'\[caption[^\]]*\].*?\[/caption\]', '', text, flags=re.IGNORECASE | re.DOTALL) # 2. (Dự phòng) Xóa các thẻ shortcode lẻ tẻ còn sót lại (ví dụ chỉ có mở mà không có đóng) text = re.sub(r'\[/?caption[^\]]*\]', '', text, flags=re.IGNORECASE) # 3. Xử lý lỗi dính chữ sau dấu chấm (Ví dụ: "tiêu biến.Ống" -> "tiêu biến. Ống") # Tìm dấu chấm, theo sau là chữ cái viết hoa, mà không có khoảng trắng text = re.sub(r'\.(?=[A-ZĂÂÁÀẢÃẠ...])', '. ', text) # (Lưu ý: Regex trên đơn giản, nếu muốn bắt chính xác tiếng Việt thì cần list dài hơn hoặc dùng \w) # Cách đơn giản hơn cho tiếng Việt: text = re.sub(r'\.([A-ZÀ-Ỹ])', r'. \1', text) # 4. Xóa khoảng trắng thừa text = re.sub(r'\s+', ' ', text).strip() return text def parse_html_to_sections(html: str, data_json): soup = BeautifulSoup(html, "html.parser") documents = [] first_p = soup.find("p") if first_p: cleaned_text = clean_text(first_p.get_text(separator=" ", strip=True)) documents.append( Document( page_content=cleaned_text, metadata={ "site": data_json["site"], "url": data_json["url"], "date_created": data_json["event_time"]["$date"], "document_id": uuid64(), "type": "intro" } ) ) first_p.decompose() h2_tags = soup.find_all("h2") for i, h2 in enumerate(h2_tags): header = clean_text(h2.get_text(separator=" ", strip=True)) contents = [] for sib in h2.next_siblings: if getattr(sib, "name", None) == "h2": break if hasattr(sib, "get_text"): text = clean_text(sib.get_text(separator=" ", strip=True)) if text: contents.append(text) parent_text = header + "\n" + "\n".join(contents) documents.append( Document( page_content=parent_text, metadata={ "site": data_json["site"], "url": data_json["url"], "date_created": data_json["event_time"]["$date"], "header": header, "parent_id": uuid64(), "parent_chunking": parent_text, } ) ) return documents def chunk_documents(docs, chunk_size=500, chunk_overlap =50): splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "\n", " ", ""] ) chunked_docs = [] for doc in docs: # chỉ chunk các section có header (bỏ intro nếu muốn) if doc.metadata.get("type") == "intro": chunked_docs.append(doc) continue chunks = splitter.split_text(doc.page_content) # print("chunk=", len(chunks)) header = doc.metadata.get("header") # print(header) for idx, chunk in enumerate(chunks): page_content = header + "\n " + chunk # print(page_content) chunked_docs.append( Document( page_content= page_content, metadata={ **doc.metadata, "document_id": uuid64() } ) ) return chunked_docs async def process_single_data(data_json) -> Document: async with sem: html_text = data_json.get("body", "") if not html_text: raise ValueError("No 'body' field in JSON data") section = await asyncio.to_thread(parse_html_to_sections, html_text, data_json) chunked_section = await asyncio.to_thread(chunk_documents, section) return chunked_section async def processing_json_file(file_path: str) -> List[Document]: print("Loading JSON data from:", file_path) data_list = load_json_data(file_path) all_documents = [] tasks = [process_single_data(data) for data in data_list] results = await tqdm_asyncio.gather(*tasks) all_documents = [doc for sublist in results for doc in sublist] return all_documents def embedding_documents(documents: List[Document]): from fastembed_sparse import FastEmbedSparse from qdrant_vector_store import QdrantVectorStore, RetrievalMode from dotenv import load_dotenv load_dotenv() sparse_embeddings = FastEmbedSparse(model_name="Qdrant/BM25") embed = add_custom_embedding_model( model_name="models/Vietnamese_Embedding_OnnX_Quantized", source_model="Mint1456/Vietnamese_Embedding_OnnX_Quantized", dim=1024, source_file="model.onnx" ) qdrant_api_key = os.getenv("QDRANT_API_KEY") qdrant_endpoint = os.getenv("QDRANT_ENDPOINT") store = QdrantVectorStore.from_documents( documents=documents, embedding=embed, sparse_embedding=sparse_embeddings, api_key=qdrant_api_key, url=qdrant_endpoint, collection_name="test_collection", retrieval_mode=RetrievalMode.HYBRID, force_recreate=False, ) if __name__ == "__main__": data_path = r"D:\Project\Data\flask_chatai.web_data 1.json" data = asyncio.run(processing_json_file(data_path)) # with open("processed_documents.txt", "w", encoding="utf-8") as f: # json.dump([doc.page_content for doc in data], f, ensure_ascii=False, indent=2) embedding_documents(data)