Spaces:
Running
Running
| import os | |
| import math | |
| import asyncio | |
| import re | |
| import uuid | |
| import base64 | |
| import json | |
| from bs4 import BeautifulSoup | |
| from typing import List, Dict, Tuple, Optional, Any, Protocol, Literal | |
| from langchain_core.documents import Document | |
| from fastembed_manager import add_custom_embedding_model | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from tqdm.asyncio import tqdm_asyncio | |
| from asyncio import Semaphore | |
| from fastembed_manager import add_custom_embedding_model | |
| sem = Semaphore(10) | |
| def resolve_user_path(path: str) -> str: | |
| return os.path.expanduser(path) | |
| def load_json_data(file_path: str) -> List[Dict[str, Any]]: | |
| import json | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return data | |
| def uuid64(): | |
| u = uuid.uuid4() | |
| b64 = base64.urlsafe_b64encode(u.bytes).rstrip(b'=') | |
| return b64.decode('ascii') | |
| def clean_text(text: str) -> str: | |
| if not text: | |
| return "" | |
| # 1. Xóa TOÀN BỘ khối caption (cả thẻ lẫn nội dung bên trong) | |
| # Dùng flag re.DOTALL để dấu chấm (.) khớp được cả xuống dòng (\n) | |
| # Pattern: Tìm [caption ... ] ... [/caption] và xóa sạch | |
| text = re.sub(r'\[caption[^\]]*\].*?\[/caption\]', '', text, flags=re.IGNORECASE | re.DOTALL) | |
| # 2. (Dự phòng) Xóa các thẻ shortcode lẻ tẻ còn sót lại (ví dụ chỉ có mở mà không có đóng) | |
| text = re.sub(r'\[/?caption[^\]]*\]', '', text, flags=re.IGNORECASE) | |
| # 3. Xử lý lỗi dính chữ sau dấu chấm (Ví dụ: "tiêu biến.Ống" -> "tiêu biến. Ống") | |
| # Tìm dấu chấm, theo sau là chữ cái viết hoa, mà không có khoảng trắng | |
| text = re.sub(r'\.(?=[A-ZĂÂÁÀẢÃẠ...])', '. ', text) | |
| # (Lưu ý: Regex trên đơn giản, nếu muốn bắt chính xác tiếng Việt thì cần list dài hơn hoặc dùng \w) | |
| # Cách đơn giản hơn cho tiếng Việt: | |
| text = re.sub(r'\.([A-ZÀ-Ỹ])', r'. \1', text) | |
| # 4. Xóa khoảng trắng thừa | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def parse_html_to_sections(html: str, data_json): | |
| soup = BeautifulSoup(html, "html.parser") | |
| documents = [] | |
| first_p = soup.find("p") | |
| if first_p: | |
| cleaned_text = clean_text(first_p.get_text(separator=" ", strip=True)) | |
| documents.append( | |
| Document( | |
| page_content=cleaned_text, | |
| metadata={ | |
| "site": data_json["site"], | |
| "url": data_json["url"], | |
| "date_created": data_json["event_time"]["$date"], | |
| "document_id": uuid64(), | |
| "type": "intro" | |
| } | |
| ) | |
| ) | |
| first_p.decompose() | |
| h2_tags = soup.find_all("h2") | |
| for i, h2 in enumerate(h2_tags): | |
| header = clean_text(h2.get_text(separator=" ", strip=True)) | |
| contents = [] | |
| for sib in h2.next_siblings: | |
| if getattr(sib, "name", None) == "h2": | |
| break | |
| if hasattr(sib, "get_text"): | |
| text = clean_text(sib.get_text(separator=" ", strip=True)) | |
| if text: | |
| contents.append(text) | |
| parent_text = header + "\n" + "\n".join(contents) | |
| documents.append( | |
| Document( | |
| page_content=parent_text, | |
| metadata={ | |
| "site": data_json["site"], | |
| "url": data_json["url"], | |
| "date_created": data_json["event_time"]["$date"], | |
| "header": header, | |
| "parent_id": uuid64(), | |
| "parent_chunking": parent_text, | |
| } | |
| ) | |
| ) | |
| return documents | |
| def chunk_documents(docs, chunk_size=500, chunk_overlap =50): | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| separators=["\n\n", "\n", " ", ""] | |
| ) | |
| chunked_docs = [] | |
| for doc in docs: | |
| # chỉ chunk các section có header (bỏ intro nếu muốn) | |
| if doc.metadata.get("type") == "intro": | |
| chunked_docs.append(doc) | |
| continue | |
| chunks = splitter.split_text(doc.page_content) | |
| # print("chunk=", len(chunks)) | |
| header = doc.metadata.get("header") | |
| # print(header) | |
| for idx, chunk in enumerate(chunks): | |
| page_content = header + "\n " + chunk | |
| # print(page_content) | |
| chunked_docs.append( | |
| Document( | |
| page_content= page_content, | |
| metadata={ | |
| **doc.metadata, | |
| "document_id": uuid64() | |
| } | |
| ) | |
| ) | |
| return chunked_docs | |
| async def process_single_data(data_json) -> Document: | |
| async with sem: | |
| html_text = data_json.get("body", "") | |
| if not html_text: | |
| raise ValueError("No 'body' field in JSON data") | |
| section = await asyncio.to_thread(parse_html_to_sections, html_text, data_json) | |
| chunked_section = await asyncio.to_thread(chunk_documents, section) | |
| return chunked_section | |
| async def processing_json_file(file_path: str) -> List[Document]: | |
| print("Loading JSON data from:", file_path) | |
| data_list = load_json_data(file_path) | |
| all_documents = [] | |
| tasks = [process_single_data(data) for data in data_list] | |
| results = await tqdm_asyncio.gather(*tasks) | |
| all_documents = [doc for sublist in results for doc in sublist] | |
| return all_documents | |
| def embedding_documents(documents: List[Document]): | |
| from fastembed_sparse import FastEmbedSparse | |
| from qdrant_vector_store import QdrantVectorStore, RetrievalMode | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| sparse_embeddings = FastEmbedSparse(model_name="Qdrant/BM25") | |
| embed = add_custom_embedding_model( | |
| model_name="models/Vietnamese_Embedding_OnnX_Quantized", | |
| source_model="Mint1456/Vietnamese_Embedding_OnnX_Quantized", | |
| dim=1024, | |
| source_file="model.onnx" | |
| ) | |
| qdrant_api_key = os.getenv("QDRANT_API_KEY") | |
| qdrant_endpoint = os.getenv("QDRANT_ENDPOINT") | |
| store = QdrantVectorStore.from_documents( | |
| documents=documents, | |
| embedding=embed, | |
| sparse_embedding=sparse_embeddings, | |
| api_key=qdrant_api_key, | |
| url=qdrant_endpoint, | |
| collection_name="test_collection", | |
| retrieval_mode=RetrievalMode.HYBRID, | |
| force_recreate=False, | |
| ) | |
| if __name__ == "__main__": | |
| data_path = r"D:\Project\Data\flask_chatai.web_data 1.json" | |
| data = asyncio.run(processing_json_file(data_path)) | |
| # with open("processed_documents.txt", "w", encoding="utf-8") as f: | |
| # json.dump([doc.page_content for doc in data], f, ensure_ascii=False, indent=2) | |
| embedding_documents(data) |