import datetime import json import pathlib, re import tiktoken BASE = pathlib.Path(__file__).resolve().parent.parent CONVERTED = BASE / "converted" CHUNKS_DIR = BASE / "chunks" #temp fix CHUNKS_DIR.mkdir(parents=True, exist_ok=True) MAX_TOKENS = 512 OVERLAP_TOKENS = 50 try: tokenizer = tiktoken.get_encoding("cl100k_base") except: tokenizer = tiktoken.get_encoding("gpt2") # ---- # helper functions # ---- def split_table_with_token_overlap(text, max_tokens, overlap_tokens): if count_tokens(text) <= max_tokens: return None, [text] header = text.splitlines()[0] if text.splitlines() else None parts = [] start = 0 text_tokens = tokenizer.encode(text) while start < len(text_tokens): end = start + max_tokens part_tokens = text_tokens[start:end] part_text = tokenizer.decode(part_tokens) parts.append(part_text) if end >= len(text_tokens): break start = end - overlap_tokens return header, parts def count_tokens(text: str) -> int: return len(tokenizer.encode(text)) def split_text_with_token_overlap(text, max_tokens, overlap_tokens): if count_tokens(text) <= max_tokens: return [text] parts = [] start = 0 text_tokens = tokenizer.encode(text) while start < len(text_tokens): end = start + max_tokens part_tokens = text_tokens[start:end] part_text = tokenizer.decode(part_tokens) parts.append(part_text) if end >= len(text_tokens): break start = end - overlap_tokens return parts def sha256(s: bytes): import hashlib return hashlib.sha256(s).hexdigest() def roman_to_int(r): r = r.upper() vals = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000} i = 0 total = 0 while i < len(r): if i+1 < len(r) and vals[r[i]] < vals[r[i+1]]: total += vals[r[i+1]] - vals[r[i]] i += 2 else: total += vals[r[i]] i += 1 return total def parse_md_to_tree(md_text): lines = md_text.splitlines() root = {'level':0, 'title':'ROOT', 'children': [], 'content_lines': [], 'parent': None} stack = [root] i = 0 seen_header = False while i < len(lines): line = lines[i] m = re.match(r'^\s*(#{1,4})\s+(.*)$', line) if m: seen_header = True level = len(m.group(1)) title = m.group(2).strip() node = {'level': level, 'title': title, 'children': [], 'content_lines': [], 'parent': stack[-1]} # find parent where parent.level < level while stack and stack[-1]['level'] >= level: stack.pop() stack[-1]['children'].append(node) stack.append(node) i += 1 continue # detect markdown table if line.strip().startswith('|'): table_lines = [] while i < len(lines) and lines[i].strip().startswith('|'): table_lines.append(lines[i]) i += 1 # Kiểm tra bảng hợp lệ (ít nhất có 2 dòng: header + ---) table_node = { 'level': stack[-1]['level'] + 1, 'title': f'{stack[-1]["title"]}::TABLE', 'children': [], 'content_lines': table_lines, 'is_table': True, 'parent': stack[-1] } stack[-1]['children'].append(table_node) continue # normal text -> append to current node content if not seen_header: # ignore leading text before any header i += 1 continue stack[-1]['content_lines'].append(line) i += 1 # join content def finalize(node): node['text'] = '\n'.join([l for l in node.get('content_lines',[]) if l.strip()!='']) for c in node.get('children',[]): finalize(c) finalize(root) return root def make_embedding_text(path, header, text): if header: head = header + "\n" + " | ".join([p for p in path if p and p!='ROOT' and p!='TABLE']) else: head = " | ".join([p for p in path if p and p!='ROOT' and p!='TABLE']) return (head + "\n\n" + text).strip() REF_PATTERNS = [ re.compile(r'Khoản\s+([\d\.]+)\s+Điều\s+([\d\.]+)', re.I), re.compile(r'Điều\s+([\d\.]+)', re.I), re.compile(r'Chương\s+([IVXLC]+)', re.I), re.compile(r'Nghị định\s+số\s+([^\s,\.]+)', re.I) ] def extract_references(text): refs = [] for p in REF_PATTERNS: for m in p.finditer(text): refs.append({"raw": m.group(0), "groups": m.groups()}) return refs def flatten_and_emit(root, doc_id, filename, chunks_dir): chunks = [] seq = 0 def walk(node, path_titles): nonlocal seq # decide whether to emit this node as a chunk: # emit when node has text or is table or is leaf with children but also content has_text = bool(node.get('text','').strip()) is_table = node.get('is_table', False) if has_text or is_table: # build path # For table nodes, use parent title instead of table title to avoid duplication if is_table: path = path_titles else: path = path_titles + [node['title']] # extract structural numbers chapter = None; article=None; clause=None; point=None for t in path: if t.upper().startswith('CHƯƠNG'): m = re.search(r'CHƯƠNG\s+([IVXLC]+)', t, re.I) if m: try: chapter = roman_to_int(m.group(1)) except: chapter = m.group(1) if t.upper().startswith('ĐIỀU'): m = re.search(r'ĐIỀU\s+(\d+)', t, re.I) if m: article = int(m.group(1)) if t.upper().startswith('KHOẢN'): m = re.search(r'KHOẢN\s+([\d\.]+)', t, re.I) if m: clause = m.group(1) # Loại bỏ dấu chấm cuối cùng nếu có if clause.endswith('.'): clause = clause.rstrip('.') if t.upper().startswith('ĐIỂM') or re.match(r'^[a-z]\)', t.strip()): point = None # Ưu tiên tìm "ĐIỂM" trước m = re.search(r'ĐIỂM\s+([\d\.]+)', t, re.I) if m: point = m.group(1) if point.endswith('.'): point = point.rstrip('.') # Nếu không tìm thấy "ĐIỂM", tìm "a)" else: m = re.search(r'^([a-z])\)', t.strip()) if m: point = m.group(1) header = None text = node.get('text','').strip() if is_table: content_type = 'table' table_id = f"{doc_id}::table::{seq}" header, parts = split_table_with_token_overlap(text, MAX_TOKENS, OVERLAP_TOKENS) else: content_type = 'text' table_id = None parts = split_text_with_token_overlap(text, MAX_TOKENS, OVERLAP_TOKENS) for p in parts: seq += 1 # nếu thế thì chapter, article, clause, point, seq quá quan trọng, để số như thế dễ fail cid = f"{doc_id}::CH{chapter or 0}::A{article or 0}::K{clause or 0}::P{point or 0}::C{seq}" chunk = { "id": cid, "doc_id": doc_id, "source_filename": filename, "created_at": datetime.datetime.utcnow().isoformat()+"Z", "chapter": chapter, "article": article, "clause": clause, "point": point, "content_type": content_type, "table_id": table_id, "path": path, "chunk_text": p, "chunk_for_embedding": make_embedding_text(path, header, p), "text_length_chars": len(p), "token_count": count_tokens(p), "references": extract_references(p), "parse_confidence": 0.9, # TODO: add confidence "checksum": sha256(p.encode('utf-8')) } chunks.append(chunk) # recurse for c in node.get('children', []): walk(c, path_titles + [node['title']]) # start walking top-level children (ignore ROOT title) for child in root.get('children', []): walk(child, []) # write chunks out manifest = [] for ch in chunks: fn = f"{ch['id'].replace('/','_').replace(' ','_')}.json" outp = chunks_dir / fn with open(outp, 'w', encoding='utf-8') as f: json.dump(ch, f, ensure_ascii=False, indent=2) manifest.append({"id": ch['id'], "path": str(outp), "length": ch['text_length_chars']}) return manifest def _node_to_jsonable(node): # omit parent to avoid cycles, include only relevant fields return { 'level': node.get('level'), 'title': node.get('title'), 'text': node.get('text', ''), 'children': [_node_to_jsonable(c) for c in node.get('children', [])] } def convert_md_to_chunks(md_path, chunks_dir): # test text = md_path.read_text(encoding="utf-8") import textwrap # text = textwrap.dedent(""" # | TT | Đối tượng | Hồ sơ cần phải nộp | # | --- | --- | --- | # | I | Đối tượng miễn 100% học phí | Đối tượng miễn 100% học phí | # | 1.1 | Sinh viên là con của người có công với cách mạng được hưởng ưu đãi | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Giấy xác nhận của cơ quan quản lý đối tượng người có công có con thuộc diện miễn giảm học phí; 4/ Bản sao thẻ Thương bệnh binh của bố/mẹ (nếu có). | # | 1.2 | Sinh viên bị khuyết tật | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Giấy xác nhận khuyết tật của UBND cấp xã cấp hoặc Quyết định trợ cấp xã hội của UBND cấp huyện. | # | 1.3 | Sinh viên (tuổi không quá 22) không có nguồn nuôi dưỡng thuộc đối tượng hưởng trợ cấp xã hội hàng tháng theo quy định tại khoản 1 và 2 Điều 5 Nghị định số 20/2021/NĐ-CP ngày 15/3/2021 của Chính phủ | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Quyết định về việc trợ cấp xã hội của Chủ tịch Ủy ban nhân dân cấp huyện. | # | 1.4 | Sinh viên là người dân tộc thiểu số có cha hoặc mẹ hoặc cả cha và mẹ hoặc ông bà (trong trường hợp ở với ông bà) thuộc hộ nghèo và hộ cận nghèo | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Bản sao Giấy xác nhận hộ nghèo/hộ cận nghèo do UBND cấp xã cấp. | # | 1.5 | Sinh viên người dân tộc thiểu số rất ít người ở vùng có điều kiện kinh tế - xã hội khó khăn hoặc đặc biệt khó khăn | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Bản sao Sổ hộ khẩu thường trú hoặc Giấy xác nhận của cơ quan công an về việc đăng ký thường trú (nếu Sổ hộ khẩu bị thất lạc); 4/ Giấy chứng nhận vùng theo hộ khẩu có điều kiện kinh tế - xã hội khó khăn hoặc đặc biệt khó khăn (nếu có). | # | II | Đối tượng giảm học phí | Đối tượng giảm học phí | # | 2.1 | Đối tượng giảm 70% học phí: Sinh viên người dân tộc thiểu số (không phải là người dân tộc thiểu số rất ít người) ở thôn/bản đặc biệt khó khăn, xã khu vực III vùng dân tộc và miền núi, xã đặc biệt khó khăn vùng bãi ngang ven biển hải đảo theo quy định của cơ quan có thẩm quyền | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Bản sao Sổ hộ khẩu thường trú hoặc Giấy xác nhận của cơ quan công an về việc đăng ký thường trú (nếu Sổ hộ khẩu bị thất lạc). | # | 2.2 | Đối tượng giảm 50% học phí: Sinh viên là con cán bộ, công chức, viên chức, công nhân mà cha hoặc mẹ bị tai nạn lao động hoặc mắc bệnh nghề nghiệp được hưởng trợ cấp thường xuyên | 1/ Đơn đề nghị miễn giảm học phí (mẫu tại phụ lục II); 2/ Bản sao Giấy khai sinh; 3/ Bản sao Sổ hưởng trợ cấp hàng tháng của cha hoặc mẹ bị tai nạn lao động hoặc mắc bệnh nghề nghiệp do tổ chức bảo hiểm xã hội cấp. | # """ # ) root = parse_md_to_tree(text) # Print JSON-like structure import json # print(json.dumps(_node_to_jsonable(root), ensure_ascii=False, indent=2)) # canoical doc id uses filename + md5 of text docid = md_path.stem + "_" + sha256(text.encode('utf-8'))[:8] manifest = flatten_and_emit(root, docid, md_path.name, chunks_dir) return manifest def main(): manifests = [] for md in CONVERTED.glob("*.md"): m = convert_md_to_chunks(md, CHUNKS_DIR) manifests.extend(m) # thế không phân biệt documents khác nhau hả ? # write global manifest with open(CHUNKS_DIR / "chunks_manifest.json", "w", encoding="utf-8") as f: json.dump({"generated_at": datetime.datetime.utcnow().isoformat()+"Z", "chunks": manifests}, f, ensure_ascii=False, indent=2) print("Wrote", len(manifests), "chunks") if __name__ == "__main__": main()