test_space / src /utils /embed_manager.py
Minh
init
6912ad8
import re
import uuid
import base64
import json
from bs4 import BeautifulSoup
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
def uuid64():
u = uuid.uuid4()
b64 = base64.urlsafe_b64encode(u.bytes).rstrip(b'=')
return b64.decode('ascii')
async def clean_text(text: str) -> str:
if not text:
return ""
text = re.sub(r'\[caption[^\]]*\].*?\[/caption\]', '', text, flags=re.IGNORECASE | re.DOTALL)
text = re.sub(r'\[/?caption[^\]]*\]', '', text, flags=re.IGNORECASE)
text = re.sub(r'\.(?=[A-ZĂÂÁÀẢÃẠ...])', '. ', text)
text = re.sub(r'\.([A-ZÀ-Ỹ])', r'. \1', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
async def load_json_data(file_path):
"""Load JSON data from file."""
print(f"Loading data from {file_path}...")
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"[OK] Loaded {len(data)} entries")
return data
async def create_qdrant_collection(client, collection_name: str, vector_size: int):
from qdrant_client.http.models import VectorParams, Distance
if not client.collection_exists(collection_name):
try:
print(f"Collection '{collection_name}' does not exist. Creating...")
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE)
)
except Exception as e:
print(f"Error creating collection '{collection_name}': {e}")
raise e
else:
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE,)
)
async def init_qdrant_client(endpoint: str, api_key: str):
from qdrant_client import QdrantClient
try:
client = QdrantClient(
url=endpoint,
api_key=api_key,
)
print("Qdrant client initialized successfully.")
return client
except Exception as e:
print(f"Error initializing Qdrant client: {e}")
raise e
async def parse_html_to_sections(html: str, data_json):
soup = BeautifulSoup(html, "html.parser")
documents = []
# --- 1. Lấy <p> đầu tiên ---
first_p = soup.find("p")
if first_p:
cleaned_text = await clean_text(first_p.get_text(separator=" ", strip=True))
documents.append(
Document(
page_content= cleaned_text,
metadata={
"site": data_json["site"],
"url": data_json["url"],
"date_created": data_json["event_time"]["$date"],
"document_id": uuid64(),
"type": "intro"
}
)
)
first_p.decompose() # remove để không bị lặp
# --- 2. Tách theo h2 ---
h2_tags = soup.find_all("h2")
for i, h2 in enumerate(h2_tags):
header = await clean_text(h2.get_text(separator=" ", strip=True))
cleaned_text = await clean_text(first_p.get_text(separator=" ", strip=True))
contents = []
for sib in h2.next_siblings:
if getattr(sib, "name", None) == "h2":
break
if hasattr(sib, "get_text"):
text = await clean_text(sib.get_text(separator=" ", strip=True))
if text:
contents.append(text)
parent_text = header + "\n" + "\n".join(contents)
documents.append(
Document(
page_content=parent_text,
metadata={
"site": data_json["site"],
"url": data_json["url"],
"date_created": data_json["event_time"]["$date"],
"header": header,
"parent_id": uuid64(),
"parent_chunking": parent_text,
}
)
)
return documents
async def chunk_documents(docs, chunk_size=500, chunk_overlap =50):
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", " ", ""]
)
chunked_docs = []
for doc in docs:
# chỉ chunk các section có header (bỏ intro nếu muốn)
if doc.metadata.get("type") == "intro":
chunked_docs.append(doc)
continue
chunks = splitter.split_text(doc.page_content)
print("chunk=", len(chunks))
header = doc.metadata.get("header")
# print(header)
for idx, chunk in enumerate(chunks):
page_content = header + "\n " + chunk
# print(page_content)
chunked_docs.append(
Document(
page_content= page_content,
metadata={
**doc.metadata,
"document_id": uuid64()
}
)
)
return chunked_docs