snote / scripts /chunks_and_metadata.py
xuanbao01's picture
Upload folder using huggingface_hub
44c5827 verified
import datetime
import json
import pathlib, re
import tiktoken
BASE = pathlib.Path(__file__).resolve().parent.parent
CONVERTED = BASE / "converted"
CHUNKS_DIR = BASE / "chunks" #temp fix
CHUNKS_DIR.mkdir(parents=True, exist_ok=True)
MAX_TOKENS = 512
OVERLAP_TOKENS = 50
try:
tokenizer = tiktoken.get_encoding("cl100k_base")
except:
tokenizer = tiktoken.get_encoding("gpt2")
# ----
# helper functions
# ----
def split_table_with_token_overlap(text, max_tokens, overlap_tokens):
if count_tokens(text) <= max_tokens:
return None, [text]
header = text.splitlines()[0] if text.splitlines() else None
parts = []
start = 0
text_tokens = tokenizer.encode(text)
while start < len(text_tokens):
end = start + max_tokens
part_tokens = text_tokens[start:end]
part_text = tokenizer.decode(part_tokens)
parts.append(part_text)
if end >= len(text_tokens):
break
start = end - overlap_tokens
return header, parts
def count_tokens(text: str) -> int:
return len(tokenizer.encode(text))
def split_text_with_token_overlap(text, max_tokens, overlap_tokens):
if count_tokens(text) <= max_tokens:
return [text]
parts = []
start = 0
text_tokens = tokenizer.encode(text)
while start < len(text_tokens):
end = start + max_tokens
part_tokens = text_tokens[start:end]
part_text = tokenizer.decode(part_tokens)
parts.append(part_text)
if end >= len(text_tokens):
break
start = end - overlap_tokens
return parts
def sha256(s: bytes):
import hashlib
return hashlib.sha256(s).hexdigest()
def roman_to_int(r):
r = r.upper()
vals = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000}
i = 0
total = 0
while i < len(r):
if i+1 < len(r) and vals[r[i]] < vals[r[i+1]]:
total += vals[r[i+1]] - vals[r[i]]
i += 2
else:
total += vals[r[i]]
i += 1
return total
def parse_md_to_tree(md_text):
lines = md_text.splitlines()
root = {'level':0, 'title':'ROOT', 'children': [], 'content_lines': [], 'parent': None}
stack = [root]
i = 0
seen_header = False
while i < len(lines):
line = lines[i]
m = re.match(r'^\s*(#{1,4})\s+(.*)$', line)
if m:
seen_header = True
level = len(m.group(1))
title = m.group(2).strip()
node = {'level': level, 'title': title, 'children': [], 'content_lines': [], 'parent': stack[-1]}
# find parent where parent.level < level
while stack and stack[-1]['level'] >= level:
stack.pop()
stack[-1]['children'].append(node)
stack.append(node)
i += 1
continue
# detect markdown table
if line.strip().startswith('|'):
table_lines = []
while i < len(lines) and lines[i].strip().startswith('|'):
table_lines.append(lines[i])
i += 1
# Kiểm tra bảng hợp lệ (ít nhất có 2 dòng: header + ---)
table_node = {
'level': stack[-1]['level'] + 1,
'title': f'{stack[-1]["title"]}::TABLE',
'children': [],
'content_lines': table_lines,
'is_table': True,
'parent': stack[-1]
}
stack[-1]['children'].append(table_node)
continue
# normal text -> append to current node content
if not seen_header:
# ignore leading text before any header
i += 1
continue
stack[-1]['content_lines'].append(line)
i += 1
# join content
def finalize(node):
node['text'] = '\n'.join([l for l in node.get('content_lines',[]) if l.strip()!=''])
for c in node.get('children',[]):
finalize(c)
finalize(root)
return root
def make_embedding_text(path, header, text):
if header:
head = header + "\n" + " | ".join([p for p in path if p and p!='ROOT' and p!='TABLE'])
else:
head = " | ".join([p for p in path if p and p!='ROOT' and p!='TABLE'])
return (head + "\n\n" + text).strip()
REF_PATTERNS = [
re.compile(r'Khoản\s+([\d\.]+)\s+Điều\s+([\d\.]+)', re.I),
re.compile(r'Điều\s+([\d\.]+)', re.I),
re.compile(r'Chương\s+([IVXLC]+)', re.I),
re.compile(r'Nghị định\s+số\s+([^\s,\.]+)', re.I)
]
def extract_references(text):
refs = []
for p in REF_PATTERNS:
for m in p.finditer(text):
refs.append({"raw": m.group(0), "groups": m.groups()})
return refs
def flatten_and_emit(root, doc_id, filename, chunks_dir):
chunks = []
seq = 0
def walk(node, path_titles):
nonlocal seq
# decide whether to emit this node as a chunk:
# emit when node has text or is table or is leaf with children but also content
has_text = bool(node.get('text','').strip())
is_table = node.get('is_table', False)
if has_text or is_table:
# build path
# For table nodes, use parent title instead of table title to avoid duplication
if is_table:
path = path_titles
else:
path = path_titles + [node['title']]
# extract structural numbers
chapter = None; article=None; clause=None; point=None
for t in path:
if t.upper().startswith('CHƯƠNG'):
m = re.search(r'CHƯƠNG\s+([IVXLC]+)', t, re.I)
if m:
try:
chapter = roman_to_int(m.group(1))
except:
chapter = m.group(1)
if t.upper().startswith('ĐIỀU'):
m = re.search(r'ĐIỀU\s+(\d+)', t, re.I)
if m:
article = int(m.group(1))
if t.upper().startswith('KHOẢN'):
m = re.search(r'KHOẢN\s+([\d\.]+)', t, re.I)
if m:
clause = m.group(1)
# Loại bỏ dấu chấm cuối cùng nếu có
if clause.endswith('.'):
clause = clause.rstrip('.')
if t.upper().startswith('ĐIỂM') or re.match(r'^[a-z]\)', t.strip()):
point = None
# Ưu tiên tìm "ĐIỂM" trước
m = re.search(r'ĐIỂM\s+([\d\.]+)', t, re.I)
if m:
point = m.group(1)
if point.endswith('.'):
point = point.rstrip('.')
# Nếu không tìm thấy "ĐIỂM", tìm "a)"
else:
m = re.search(r'^([a-z])\)', t.strip())
if m:
point = m.group(1)
header = None
text = node.get('text','').strip()
if is_table:
content_type = 'table'
table_id = f"{doc_id}::table::{seq}"
header, parts = split_table_with_token_overlap(text, MAX_TOKENS, OVERLAP_TOKENS)
else:
content_type = 'text'
table_id = None
parts = split_text_with_token_overlap(text, MAX_TOKENS, OVERLAP_TOKENS)
for p in parts:
seq += 1
# nếu thế thì chapter, article, clause, point, seq quá quan trọng, để số như thế dễ fail
cid = f"{doc_id}::CH{chapter or 0}::A{article or 0}::K{clause or 0}::P{point or 0}::C{seq}"
chunk = {
"id": cid,
"doc_id": doc_id,
"source_filename": filename,
"created_at": datetime.datetime.utcnow().isoformat()+"Z",
"chapter": chapter,
"article": article,
"clause": clause,
"point": point,
"content_type": content_type,
"table_id": table_id,
"path": path,
"chunk_text": p,
"chunk_for_embedding": make_embedding_text(path, header, p),
"text_length_chars": len(p),
"token_count": count_tokens(p),
"references": extract_references(p),
"parse_confidence": 0.9, # TODO: add confidence
"checksum": sha256(p.encode('utf-8'))
}
chunks.append(chunk)
# recurse
for c in node.get('children', []):
walk(c, path_titles + [node['title']])
# start walking top-level children (ignore ROOT title)
for child in root.get('children', []):
walk(child, [])
# write chunks out
manifest = []
for ch in chunks:
fn = f"{ch['id'].replace('/','_').replace(' ','_')}.json"
outp = chunks_dir / fn
with open(outp, 'w', encoding='utf-8') as f:
json.dump(ch, f, ensure_ascii=False, indent=2)
manifest.append({"id": ch['id'], "path": str(outp), "length": ch['text_length_chars']})
return manifest
def _node_to_jsonable(node):
# omit parent to avoid cycles, include only relevant fields
return {
'level': node.get('level'),
'title': node.get('title'),
'text': node.get('text', ''),
'children': [_node_to_jsonable(c) for c in node.get('children', [])]
}
def convert_md_to_chunks(md_path, chunks_dir):
# test
text = md_path.read_text(encoding="utf-8")
import textwrap
# text = textwrap.dedent("""
# | TT | Đối tượng | Hồ sơ cần phải nộp |
# | --- | --- | --- |
# | I | Đối tượng miễn 100% học phí | Đối tượng miễn 100% học phí |
# | 1.1 | Sinh viên là con của người có công với cách mạng được hưởng ưu đãi | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Giấy xác nhận của cơ quan quản lý đối tượng người có công có con thuộc diện miễn giảm học phí; 4/ Bản sao thẻ Thương bệnh binh của bố/mẹ (nếu có). |
# | 1.2 | Sinh viên bị khuyết tật | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Giấy xác nhận khuyết tật của UBND cấp xã cấp hoặc Quyết định trợ cấp xã hội của UBND cấp huyện. |
# | 1.3 | Sinh viên (tuổi không quá 22) không có nguồn nuôi dưỡng thuộc đối tượng hưởng trợ cấp xã hội hàng tháng theo quy định tại khoản 1 và 2 Điều 5 Nghị định số 20/2021/NĐ-CP ngày 15/3/2021 của Chính phủ | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Quyết định về việc trợ cấp xã hội của Chủ tịch Ủy ban nhân dân cấp huyện. |
# | 1.4 | Sinh viên là người dân tộc thiểu số có cha hoặc mẹ hoặc cả cha và mẹ hoặc ông bà (trong trường hợp ở với ông bà) thuộc hộ nghèo và hộ cận nghèo | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Bản sao Giấy xác nhận hộ nghèo/hộ cận nghèo do UBND cấp xã cấp. |
# | 1.5 | Sinh viên người dân tộc thiểu số rất ít người ở vùng có điều kiện kinh tế - xã hội khó khăn hoặc đặc biệt khó khăn | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Bản sao Sổ hộ khẩu thường trú hoặc Giấy xác nhận của cơ quan công an về việc đăng ký thường trú (nếu Sổ hộ khẩu bị thất lạc); 4/ Giấy chứng nhận vùng theo hộ khẩu có điều kiện kinh tế - xã hội khó khăn hoặc đặc biệt khó khăn (nếu có). |
# | II | Đối tượng giảm học phí | Đối tượng giảm học phí |
# | 2.1 | Đối tượng giảm 70% học phí: Sinh viên người dân tộc thiểu số (không phải là người dân tộc thiểu số rất ít người) ở thôn/bản đặc biệt khó khăn, xã khu vực III vùng dân tộc và miền núi, xã đặc biệt khó khăn vùng bãi ngang ven biển hải đảo theo quy định của cơ quan có thẩm quyền | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Bản sao Sổ hộ khẩu thường trú hoặc Giấy xác nhận của cơ quan công an về việc đăng ký thường trú (nếu Sổ hộ khẩu bị thất lạc). |
# | 2.2 | Đối tượng giảm 50% học phí: Sinh viên là con cán bộ, công chức, viên chức, công nhân mà cha hoặc mẹ bị tai nạn lao động hoặc mắc bệnh nghề nghiệp được hưởng trợ cấp thường xuyên | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Bản sao Sổ hưởng trợ cấp hàng tháng của cha hoặc mẹ bị tai nạn lao động hoặc mắc bệnh nghề nghiệp do tổ chức bảo hiểm xã hội cấp. |
# """
# )
root = parse_md_to_tree(text)
# Print JSON-like structure
import json
# print(json.dumps(_node_to_jsonable(root), ensure_ascii=False, indent=2))
# canoical doc id uses filename + md5 of text
docid = md_path.stem + "_" + sha256(text.encode('utf-8'))[:8]
manifest = flatten_and_emit(root, docid, md_path.name, chunks_dir)
return manifest
def main():
manifests = []
for md in CONVERTED.glob("*.md"):
m = convert_md_to_chunks(md, CHUNKS_DIR)
manifests.extend(m) # thế không phân biệt documents khác nhau hả ?
# write global manifest
with open(CHUNKS_DIR / "chunks_manifest.json", "w", encoding="utf-8") as f:
json.dump({"generated_at": datetime.datetime.utcnow().isoformat()+"Z", "chunks": manifests}, f, ensure_ascii=False, indent=2)
print("Wrote", len(manifests), "chunks")
if __name__ == "__main__":
main()