import re import uuid import base64 import json from bs4 import BeautifulSoup from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter def uuid64(): u = uuid.uuid4() b64 = base64.urlsafe_b64encode(u.bytes).rstrip(b'=') return b64.decode('ascii') async def clean_text(text: str) -> str: if not text: return "" text = re.sub(r'\[caption[^\]]*\].*?\[/caption\]', '', text, flags=re.IGNORECASE | re.DOTALL) text = re.sub(r'\[/?caption[^\]]*\]', '', text, flags=re.IGNORECASE) text = re.sub(r'\.(?=[A-ZĂÂÁÀẢÃẠ...])', '. ', text) text = re.sub(r'\.([A-ZÀ-Ỹ])', r'. \1', text) text = re.sub(r'\s+', ' ', text).strip() return text async def load_json_data(file_path): """Load JSON data from file.""" print(f"Loading data from {file_path}...") with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) print(f"[OK] Loaded {len(data)} entries") return data async def create_qdrant_collection(client, collection_name: str, vector_size: int): from qdrant_client.http.models import VectorParams, Distance if not client.collection_exists(collection_name): try: print(f"Collection '{collection_name}' does not exist. Creating...") client.create_collection( collection_name=collection_name, vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE) ) except Exception as e: print(f"Error creating collection '{collection_name}': {e}") raise e else: client.create_collection( collection_name=collection_name, vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE,) ) async def init_qdrant_client(endpoint: str, api_key: str): from qdrant_client import QdrantClient try: client = QdrantClient( url=endpoint, api_key=api_key, ) print("Qdrant client initialized successfully.") return client except Exception as e: print(f"Error initializing Qdrant client: {e}") raise e async def parse_html_to_sections(html: str, data_json): soup = BeautifulSoup(html, "html.parser") documents = [] # --- 1. Lấy

đầu tiên --- first_p = soup.find("p") if first_p: cleaned_text = await clean_text(first_p.get_text(separator=" ", strip=True)) documents.append( Document( page_content= cleaned_text, metadata={ "site": data_json["site"], "url": data_json["url"], "date_created": data_json["event_time"]["$date"], "document_id": uuid64(), "type": "intro" } ) ) first_p.decompose() # remove để không bị lặp # --- 2. Tách theo h2 --- h2_tags = soup.find_all("h2") for i, h2 in enumerate(h2_tags): header = await clean_text(h2.get_text(separator=" ", strip=True)) cleaned_text = await clean_text(first_p.get_text(separator=" ", strip=True)) contents = [] for sib in h2.next_siblings: if getattr(sib, "name", None) == "h2": break if hasattr(sib, "get_text"): text = await clean_text(sib.get_text(separator=" ", strip=True)) if text: contents.append(text) parent_text = header + "\n" + "\n".join(contents) documents.append( Document( page_content=parent_text, metadata={ "site": data_json["site"], "url": data_json["url"], "date_created": data_json["event_time"]["$date"], "header": header, "parent_id": uuid64(), "parent_chunking": parent_text, } ) ) return documents async def chunk_documents(docs, chunk_size=500, chunk_overlap =50): splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "\n", " ", ""] ) chunked_docs = [] for doc in docs: # chỉ chunk các section có header (bỏ intro nếu muốn) if doc.metadata.get("type") == "intro": chunked_docs.append(doc) continue chunks = splitter.split_text(doc.page_content) print("chunk=", len(chunks)) header = doc.metadata.get("header") # print(header) for idx, chunk in enumerate(chunks): page_content = header + "\n " + chunk # print(page_content) chunked_docs.append( Document( page_content= page_content, metadata={ **doc.metadata, "document_id": uuid64() } ) ) return chunked_docs