Spaces:
Running
Running
File size: 5,149 Bytes
6912ad8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import re
import uuid
import base64
import json
from bs4 import BeautifulSoup
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
def uuid64():
u = uuid.uuid4()
b64 = base64.urlsafe_b64encode(u.bytes).rstrip(b'=')
return b64.decode('ascii')
async def clean_text(text: str) -> str:
if not text:
return ""
text = re.sub(r'\[caption[^\]]*\].*?\[/caption\]', '', text, flags=re.IGNORECASE | re.DOTALL)
text = re.sub(r'\[/?caption[^\]]*\]', '', text, flags=re.IGNORECASE)
text = re.sub(r'\.(?=[A-ZĂÂÁÀẢÃẠ...])', '. ', text)
text = re.sub(r'\.([A-ZÀ-Ỹ])', r'. \1', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
async def load_json_data(file_path):
"""Load JSON data from file."""
print(f"Loading data from {file_path}...")
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"[OK] Loaded {len(data)} entries")
return data
async def create_qdrant_collection(client, collection_name: str, vector_size: int):
from qdrant_client.http.models import VectorParams, Distance
if not client.collection_exists(collection_name):
try:
print(f"Collection '{collection_name}' does not exist. Creating...")
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE)
)
except Exception as e:
print(f"Error creating collection '{collection_name}': {e}")
raise e
else:
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE,)
)
async def init_qdrant_client(endpoint: str, api_key: str):
from qdrant_client import QdrantClient
try:
client = QdrantClient(
url=endpoint,
api_key=api_key,
)
print("Qdrant client initialized successfully.")
return client
except Exception as e:
print(f"Error initializing Qdrant client: {e}")
raise e
async def parse_html_to_sections(html: str, data_json):
soup = BeautifulSoup(html, "html.parser")
documents = []
# --- 1. Lấy <p> đầu tiên ---
first_p = soup.find("p")
if first_p:
cleaned_text = await clean_text(first_p.get_text(separator=" ", strip=True))
documents.append(
Document(
page_content= cleaned_text,
metadata={
"site": data_json["site"],
"url": data_json["url"],
"date_created": data_json["event_time"]["$date"],
"document_id": uuid64(),
"type": "intro"
}
)
)
first_p.decompose() # remove để không bị lặp
# --- 2. Tách theo h2 ---
h2_tags = soup.find_all("h2")
for i, h2 in enumerate(h2_tags):
header = await clean_text(h2.get_text(separator=" ", strip=True))
cleaned_text = await clean_text(first_p.get_text(separator=" ", strip=True))
contents = []
for sib in h2.next_siblings:
if getattr(sib, "name", None) == "h2":
break
if hasattr(sib, "get_text"):
text = await clean_text(sib.get_text(separator=" ", strip=True))
if text:
contents.append(text)
parent_text = header + "\n" + "\n".join(contents)
documents.append(
Document(
page_content=parent_text,
metadata={
"site": data_json["site"],
"url": data_json["url"],
"date_created": data_json["event_time"]["$date"],
"header": header,
"parent_id": uuid64(),
"parent_chunking": parent_text,
}
)
)
return documents
async def chunk_documents(docs, chunk_size=500, chunk_overlap =50):
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", " ", ""]
)
chunked_docs = []
for doc in docs:
# chỉ chunk các section có header (bỏ intro nếu muốn)
if doc.metadata.get("type") == "intro":
chunked_docs.append(doc)
continue
chunks = splitter.split_text(doc.page_content)
print("chunk=", len(chunks))
header = doc.metadata.get("header")
# print(header)
for idx, chunk in enumerate(chunks):
page_content = header + "\n " + chunk
# print(page_content)
chunked_docs.append(
Document(
page_content= page_content,
metadata={
**doc.metadata,
"document_id": uuid64()
}
)
)
return chunked_docs
|