Spaces:
Running
Running
| import re | |
| import uuid | |
| import base64 | |
| import json | |
| from bs4 import BeautifulSoup | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| def uuid64(): | |
| u = uuid.uuid4() | |
| b64 = base64.urlsafe_b64encode(u.bytes).rstrip(b'=') | |
| return b64.decode('ascii') | |
| async def clean_text(text: str) -> str: | |
| if not text: | |
| return "" | |
| text = re.sub(r'\[caption[^\]]*\].*?\[/caption\]', '', text, flags=re.IGNORECASE | re.DOTALL) | |
| text = re.sub(r'\[/?caption[^\]]*\]', '', text, flags=re.IGNORECASE) | |
| text = re.sub(r'\.(?=[A-ZĂÂÁÀẢÃẠ...])', '. ', text) | |
| text = re.sub(r'\.([A-ZÀ-Ỹ])', r'. \1', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| async def load_json_data(file_path): | |
| """Load JSON data from file.""" | |
| print(f"Loading data from {file_path}...") | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| print(f"[OK] Loaded {len(data)} entries") | |
| return data | |
| async def create_qdrant_collection(client, collection_name: str, vector_size: int): | |
| from qdrant_client.http.models import VectorParams, Distance | |
| if not client.collection_exists(collection_name): | |
| try: | |
| print(f"Collection '{collection_name}' does not exist. Creating...") | |
| client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE) | |
| ) | |
| except Exception as e: | |
| print(f"Error creating collection '{collection_name}': {e}") | |
| raise e | |
| else: | |
| client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE,) | |
| ) | |
| async def init_qdrant_client(endpoint: str, api_key: str): | |
| from qdrant_client import QdrantClient | |
| try: | |
| client = QdrantClient( | |
| url=endpoint, | |
| api_key=api_key, | |
| ) | |
| print("Qdrant client initialized successfully.") | |
| return client | |
| except Exception as e: | |
| print(f"Error initializing Qdrant client: {e}") | |
| raise e | |
| async def parse_html_to_sections(html: str, data_json): | |
| soup = BeautifulSoup(html, "html.parser") | |
| documents = [] | |
| # --- 1. Lấy <p> đầu tiên --- | |
| first_p = soup.find("p") | |
| if first_p: | |
| cleaned_text = await clean_text(first_p.get_text(separator=" ", strip=True)) | |
| documents.append( | |
| Document( | |
| page_content= cleaned_text, | |
| metadata={ | |
| "site": data_json["site"], | |
| "url": data_json["url"], | |
| "date_created": data_json["event_time"]["$date"], | |
| "document_id": uuid64(), | |
| "type": "intro" | |
| } | |
| ) | |
| ) | |
| first_p.decompose() # remove để không bị lặp | |
| # --- 2. Tách theo h2 --- | |
| h2_tags = soup.find_all("h2") | |
| for i, h2 in enumerate(h2_tags): | |
| header = await clean_text(h2.get_text(separator=" ", strip=True)) | |
| cleaned_text = await clean_text(first_p.get_text(separator=" ", strip=True)) | |
| contents = [] | |
| for sib in h2.next_siblings: | |
| if getattr(sib, "name", None) == "h2": | |
| break | |
| if hasattr(sib, "get_text"): | |
| text = await clean_text(sib.get_text(separator=" ", strip=True)) | |
| if text: | |
| contents.append(text) | |
| parent_text = header + "\n" + "\n".join(contents) | |
| documents.append( | |
| Document( | |
| page_content=parent_text, | |
| metadata={ | |
| "site": data_json["site"], | |
| "url": data_json["url"], | |
| "date_created": data_json["event_time"]["$date"], | |
| "header": header, | |
| "parent_id": uuid64(), | |
| "parent_chunking": parent_text, | |
| } | |
| ) | |
| ) | |
| return documents | |
| async def chunk_documents(docs, chunk_size=500, chunk_overlap =50): | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| separators=["\n\n", "\n", " ", ""] | |
| ) | |
| chunked_docs = [] | |
| for doc in docs: | |
| # chỉ chunk các section có header (bỏ intro nếu muốn) | |
| if doc.metadata.get("type") == "intro": | |
| chunked_docs.append(doc) | |
| continue | |
| chunks = splitter.split_text(doc.page_content) | |
| print("chunk=", len(chunks)) | |
| header = doc.metadata.get("header") | |
| # print(header) | |
| for idx, chunk in enumerate(chunks): | |
| page_content = header + "\n " + chunk | |
| # print(page_content) | |
| chunked_docs.append( | |
| Document( | |
| page_content= page_content, | |
| metadata={ | |
| **doc.metadata, | |
| "document_id": uuid64() | |
| } | |
| ) | |
| ) | |
| return chunked_docs | |