|
|
import datetime |
|
|
import json |
|
|
import pathlib, re |
|
|
import tiktoken |
|
|
|
|
|
BASE = pathlib.Path(__file__).resolve().parent.parent |
|
|
CONVERTED = BASE / "converted" |
|
|
CHUNKS_DIR = BASE / "chunks" |
|
|
CHUNKS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
MAX_TOKENS = 512 |
|
|
OVERLAP_TOKENS = 50 |
|
|
|
|
|
try: |
|
|
tokenizer = tiktoken.get_encoding("cl100k_base") |
|
|
except: |
|
|
tokenizer = tiktoken.get_encoding("gpt2") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def split_table_with_token_overlap(text, max_tokens, overlap_tokens): |
|
|
|
|
|
if count_tokens(text) <= max_tokens: |
|
|
return None, [text] |
|
|
|
|
|
header = text.splitlines()[0] if text.splitlines() else None |
|
|
parts = [] |
|
|
start = 0 |
|
|
text_tokens = tokenizer.encode(text) |
|
|
while start < len(text_tokens): |
|
|
end = start + max_tokens |
|
|
part_tokens = text_tokens[start:end] |
|
|
part_text = tokenizer.decode(part_tokens) |
|
|
parts.append(part_text) |
|
|
if end >= len(text_tokens): |
|
|
break |
|
|
start = end - overlap_tokens |
|
|
return header, parts |
|
|
|
|
|
def count_tokens(text: str) -> int: |
|
|
return len(tokenizer.encode(text)) |
|
|
|
|
|
def split_text_with_token_overlap(text, max_tokens, overlap_tokens): |
|
|
if count_tokens(text) <= max_tokens: |
|
|
return [text] |
|
|
|
|
|
parts = [] |
|
|
start = 0 |
|
|
text_tokens = tokenizer.encode(text) |
|
|
|
|
|
while start < len(text_tokens): |
|
|
end = start + max_tokens |
|
|
part_tokens = text_tokens[start:end] |
|
|
part_text = tokenizer.decode(part_tokens) |
|
|
parts.append(part_text) |
|
|
if end >= len(text_tokens): |
|
|
break |
|
|
start = end - overlap_tokens |
|
|
|
|
|
return parts |
|
|
|
|
|
def sha256(s: bytes): |
|
|
import hashlib |
|
|
return hashlib.sha256(s).hexdigest() |
|
|
|
|
|
def roman_to_int(r): |
|
|
r = r.upper() |
|
|
vals = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000} |
|
|
i = 0 |
|
|
total = 0 |
|
|
while i < len(r): |
|
|
if i+1 < len(r) and vals[r[i]] < vals[r[i+1]]: |
|
|
total += vals[r[i+1]] - vals[r[i]] |
|
|
i += 2 |
|
|
else: |
|
|
total += vals[r[i]] |
|
|
i += 1 |
|
|
return total |
|
|
|
|
|
def parse_md_to_tree(md_text): |
|
|
lines = md_text.splitlines() |
|
|
root = {'level':0, 'title':'ROOT', 'children': [], 'content_lines': [], 'parent': None} |
|
|
stack = [root] |
|
|
i = 0 |
|
|
seen_header = False |
|
|
|
|
|
while i < len(lines): |
|
|
line = lines[i] |
|
|
m = re.match(r'^\s*(#{1,4})\s+(.*)$', line) |
|
|
if m: |
|
|
seen_header = True |
|
|
level = len(m.group(1)) |
|
|
title = m.group(2).strip() |
|
|
node = {'level': level, 'title': title, 'children': [], 'content_lines': [], 'parent': stack[-1]} |
|
|
|
|
|
while stack and stack[-1]['level'] >= level: |
|
|
stack.pop() |
|
|
stack[-1]['children'].append(node) |
|
|
stack.append(node) |
|
|
i += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
if line.strip().startswith('|'): |
|
|
table_lines = [] |
|
|
while i < len(lines) and lines[i].strip().startswith('|'): |
|
|
table_lines.append(lines[i]) |
|
|
i += 1 |
|
|
|
|
|
table_node = { |
|
|
'level': stack[-1]['level'] + 1, |
|
|
'title': f'{stack[-1]["title"]}::TABLE', |
|
|
'children': [], |
|
|
'content_lines': table_lines, |
|
|
'is_table': True, |
|
|
'parent': stack[-1] |
|
|
} |
|
|
stack[-1]['children'].append(table_node) |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
if not seen_header: |
|
|
|
|
|
i += 1 |
|
|
continue |
|
|
stack[-1]['content_lines'].append(line) |
|
|
i += 1 |
|
|
|
|
|
def finalize(node): |
|
|
node['text'] = '\n'.join([l for l in node.get('content_lines',[]) if l.strip()!='']) |
|
|
for c in node.get('children',[]): |
|
|
finalize(c) |
|
|
finalize(root) |
|
|
return root |
|
|
|
|
|
def make_embedding_text(path, header, text): |
|
|
if header: |
|
|
head = header + "\n" + " | ".join([p for p in path if p and p!='ROOT' and p!='TABLE']) |
|
|
else: |
|
|
head = " | ".join([p for p in path if p and p!='ROOT' and p!='TABLE']) |
|
|
return (head + "\n\n" + text).strip() |
|
|
|
|
|
REF_PATTERNS = [ |
|
|
re.compile(r'Khoản\s+([\d\.]+)\s+Điều\s+([\d\.]+)', re.I), |
|
|
re.compile(r'Điều\s+([\d\.]+)', re.I), |
|
|
re.compile(r'Chương\s+([IVXLC]+)', re.I), |
|
|
re.compile(r'Nghị định\s+số\s+([^\s,\.]+)', re.I) |
|
|
] |
|
|
def extract_references(text): |
|
|
refs = [] |
|
|
for p in REF_PATTERNS: |
|
|
for m in p.finditer(text): |
|
|
refs.append({"raw": m.group(0), "groups": m.groups()}) |
|
|
return refs |
|
|
|
|
|
def flatten_and_emit(root, doc_id, filename, chunks_dir): |
|
|
chunks = [] |
|
|
seq = 0 |
|
|
def walk(node, path_titles): |
|
|
nonlocal seq |
|
|
|
|
|
|
|
|
has_text = bool(node.get('text','').strip()) |
|
|
is_table = node.get('is_table', False) |
|
|
if has_text or is_table: |
|
|
|
|
|
|
|
|
if is_table: |
|
|
path = path_titles |
|
|
else: |
|
|
path = path_titles + [node['title']] |
|
|
|
|
|
chapter = None; article=None; clause=None; point=None |
|
|
for t in path: |
|
|
if t.upper().startswith('CHƯƠNG'): |
|
|
m = re.search(r'CHƯƠNG\s+([IVXLC]+)', t, re.I) |
|
|
if m: |
|
|
try: |
|
|
chapter = roman_to_int(m.group(1)) |
|
|
except: |
|
|
chapter = m.group(1) |
|
|
if t.upper().startswith('ĐIỀU'): |
|
|
m = re.search(r'ĐIỀU\s+(\d+)', t, re.I) |
|
|
if m: |
|
|
article = int(m.group(1)) |
|
|
if t.upper().startswith('KHOẢN'): |
|
|
m = re.search(r'KHOẢN\s+([\d\.]+)', t, re.I) |
|
|
if m: |
|
|
clause = m.group(1) |
|
|
|
|
|
if clause.endswith('.'): |
|
|
clause = clause.rstrip('.') |
|
|
if t.upper().startswith('ĐIỂM') or re.match(r'^[a-z]\)', t.strip()): |
|
|
point = None |
|
|
|
|
|
m = re.search(r'ĐIỂM\s+([\d\.]+)', t, re.I) |
|
|
if m: |
|
|
point = m.group(1) |
|
|
if point.endswith('.'): |
|
|
point = point.rstrip('.') |
|
|
|
|
|
else: |
|
|
m = re.search(r'^([a-z])\)', t.strip()) |
|
|
if m: |
|
|
point = m.group(1) |
|
|
|
|
|
header = None |
|
|
text = node.get('text','').strip() |
|
|
if is_table: |
|
|
content_type = 'table' |
|
|
table_id = f"{doc_id}::table::{seq}" |
|
|
header, parts = split_table_with_token_overlap(text, MAX_TOKENS, OVERLAP_TOKENS) |
|
|
else: |
|
|
content_type = 'text' |
|
|
table_id = None |
|
|
parts = split_text_with_token_overlap(text, MAX_TOKENS, OVERLAP_TOKENS) |
|
|
for p in parts: |
|
|
seq += 1 |
|
|
|
|
|
cid = f"{doc_id}::CH{chapter or 0}::A{article or 0}::K{clause or 0}::P{point or 0}::C{seq}" |
|
|
chunk = { |
|
|
"id": cid, |
|
|
"doc_id": doc_id, |
|
|
"source_filename": filename, |
|
|
"created_at": datetime.datetime.utcnow().isoformat()+"Z", |
|
|
"chapter": chapter, |
|
|
"article": article, |
|
|
"clause": clause, |
|
|
"point": point, |
|
|
"content_type": content_type, |
|
|
"table_id": table_id, |
|
|
"path": path, |
|
|
"chunk_text": p, |
|
|
"chunk_for_embedding": make_embedding_text(path, header, p), |
|
|
"text_length_chars": len(p), |
|
|
"token_count": count_tokens(p), |
|
|
"references": extract_references(p), |
|
|
"parse_confidence": 0.9, |
|
|
"checksum": sha256(p.encode('utf-8')) |
|
|
} |
|
|
chunks.append(chunk) |
|
|
|
|
|
for c in node.get('children', []): |
|
|
walk(c, path_titles + [node['title']]) |
|
|
|
|
|
for child in root.get('children', []): |
|
|
walk(child, []) |
|
|
|
|
|
manifest = [] |
|
|
for ch in chunks: |
|
|
fn = f"{ch['id'].replace('/','_').replace(' ','_')}.json" |
|
|
outp = chunks_dir / fn |
|
|
with open(outp, 'w', encoding='utf-8') as f: |
|
|
json.dump(ch, f, ensure_ascii=False, indent=2) |
|
|
manifest.append({"id": ch['id'], "path": str(outp), "length": ch['text_length_chars']}) |
|
|
return manifest |
|
|
|
|
|
def _node_to_jsonable(node): |
|
|
|
|
|
return { |
|
|
'level': node.get('level'), |
|
|
'title': node.get('title'), |
|
|
'text': node.get('text', ''), |
|
|
'children': [_node_to_jsonable(c) for c in node.get('children', [])] |
|
|
} |
|
|
|
|
|
def convert_md_to_chunks(md_path, chunks_dir): |
|
|
|
|
|
text = md_path.read_text(encoding="utf-8") |
|
|
import textwrap |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
root = parse_md_to_tree(text) |
|
|
|
|
|
import json |
|
|
|
|
|
|
|
|
|
|
|
docid = md_path.stem + "_" + sha256(text.encode('utf-8'))[:8] |
|
|
manifest = flatten_and_emit(root, docid, md_path.name, chunks_dir) |
|
|
return manifest |
|
|
|
|
|
def main(): |
|
|
manifests = [] |
|
|
for md in CONVERTED.glob("*.md"): |
|
|
m = convert_md_to_chunks(md, CHUNKS_DIR) |
|
|
manifests.extend(m) |
|
|
|
|
|
with open(CHUNKS_DIR / "chunks_manifest.json", "w", encoding="utf-8") as f: |
|
|
json.dump({"generated_at": datetime.datetime.utcnow().isoformat()+"Z", "chunks": manifests}, f, ensure_ascii=False, indent=2) |
|
|
print("Wrote", len(manifests), "chunks") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |