| from __future__ import annotations |
|
|
| import json |
| import re |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
|
|
| from legalrag.config import AppConfig |
| from legalrag.utils.lang import detect_lang |
| from legalrag.utils.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
| CN_NUM = r"[一二三四五六七八九十百千万〇零0-9]+" |
|
|
| def normalize_article_no(s: str) -> str: |
| if isinstance(s, int): |
| return s |
|
|
| s = (s or "").strip() |
| m = re.search(r"(\d+)", s) |
| if m: |
| return str(int(m.group(1))) |
|
|
| |
| CN_DIGIT = {"零":0,"〇":0,"一":1,"二":2,"两":2,"三":3,"四":4,"五":5,"六":6,"七":7,"八":8,"九":9} |
| CN_UNIT = {"十":10,"百":100,"千":1000} |
| CN_BIG = {"万":10_000,"亿":100_000_000} |
|
|
| s2 = re.sub(r"[第条\s]", "", s) |
| total, section, number = 0, 0, 0 |
| for ch in s2: |
| if ch in CN_DIGIT: |
| number = CN_DIGIT[ch] |
| elif ch in CN_UNIT: |
| unit = CN_UNIT[ch] |
| if number == 0: |
| number = 1 |
| section += number * unit |
| number = 0 |
| elif ch in CN_BIG: |
| big = CN_BIG[ch] |
| section += number |
| number = 0 |
| total += section * big |
| section = 0 |
| section += number |
| v = total + section |
| return str(v) if v > 0 else "" |
|
|
|
|
|
|
| |
| PART_RE = re.compile(rf"^\s*(?:第\s*)?(?P<num>{CN_NUM})\s*编(?P<title>.*)$", re.M) |
| SUBPART_RE = re.compile(rf"^\s*(?:第\s*)?(?P<num>{CN_NUM})\s*分编(?P<title>.*)$", re.M) |
| CHAPTER_RE = re.compile(rf"^\s*(?:第\s*)?(?P<num>{CN_NUM})\s*章(?P<title>.*)$", re.M) |
| SECTION_RE = re.compile(rf"^\s*(?:第\s*)?(?P<num>{CN_NUM})\s*节(?P<title>.*)$", re.M) |
| INLINE_HEADING_RE = re.compile(rf"(?:第\s*)?(?P<num>{CN_NUM})\s*(?:分编|编|章|节)\s*.+$") |
|
|
| |
| ARTICLE_LINE_RE = re.compile(rf"^\s*第\s*(?P<num>{CN_NUM})\s*条(?P<rest>.*)$") |
| ARTICLE_LINE_NO_DAI_RE = re.compile(rf"^\s*(?P<num>{CN_NUM})\s*条(?P<rest>.*)$") |
|
|
| |
| EN_SECTION_LINE_RE = re.compile(r"^\s*§\s*(?P<id>[0-9A-Za-z-]+)\.?\s*(?P<rest>.*)$") |
| EN_SECTION_SCAN_RE = re.compile(r"(?m)^\s*§\s*(?P<id>[0-9A-Za-z-]+)\.") |
| EN_ARTICLE_RE = re.compile(r"^\s*ARTICLE\s+(?P<num>[0-9A-Za-z-]+)\s*[-–—]\s*(?P<title>.*)$", re.IGNORECASE) |
| EN_PART_RE = re.compile(r"^\s*PART\s+(?P<num>[0-9A-Za-z-]+)\.?\s*(?P<title>.*)$", re.IGNORECASE) |
|
|
| |
| |
| ARTICLE_SCAN_RE = re.compile( |
| r"(?m)(?<![一二三四五六七八九十百千万〇零0-9])第\s*(?P<num>[一二三四五六七八九十百千万〇零0-9]+)\s*条" |
| ) |
| ARTICLE_SCAN_NO_DAI_RE = re.compile( |
| rf"(?m)(^|\n)\s*(?P<num>{CN_NUM})\s*条" |
| ) |
|
|
| _CITATION_PREFIXES = ("本法", "本章", "本节", "本条例", "本编", "本分编", "依照", "根据") |
|
|
| def _is_citation_start(text: str, start: int) -> bool: |
| prefix = text[max(0, start - 6):start] |
| return any(prefix.endswith(p) for p in _CITATION_PREFIXES) |
|
|
| def _normalize_article_markers(text: str) -> str: |
| if not text: |
| return text |
| |
| text = re.sub( |
| rf"(第\s*{CN_NUM})\s*\n\s*({CN_NUM})\s*条", |
| r"\1\2条", |
| text, |
| ) |
| text = re.sub( |
| rf"(第\s*{CN_NUM})\s*\n\s*条", |
| r"\1条", |
| text, |
| ) |
| return text |
|
|
| def _clean_line(s: str) -> str: |
| s = s.replace("\u3000", " ") |
| s = re.sub(r"[ \t]+", " ", s).strip() |
| return s |
|
|
| def _should_break(prev: str, nxt: str) -> bool: |
| if not prev: |
| return True |
| if not nxt: |
| return True |
| if re.search(r"[。!?;::]$", prev): |
| return True |
| if re.match(r"^(第\s*[一二三四五六七八九十百千万〇零0-9]+\s*条)", nxt): |
| return True |
| if PART_RE.match(nxt) or SUBPART_RE.match(nxt) or CHAPTER_RE.match(nxt) or SECTION_RE.match(nxt): |
| return True |
| if re.match(r"^[((]?[一二三四五六七八九十0-9]+[))\.、]", nxt): |
| return True |
| return False |
|
|
| def _merge_lines(lines: List[str]) -> str: |
| out: List[str] = [] |
| cur = "" |
| for raw in lines: |
| line = _clean_line(raw) |
| if not line: |
| if cur: |
| out.append(cur) |
| cur = "" |
| continue |
| if not cur: |
| cur = line |
| continue |
| if _should_break(cur, line): |
| out.append(cur) |
| cur = line |
| else: |
| cur = cur + line |
| if cur: |
| out.append(cur) |
| return "\n".join(out).strip() |
|
|
| def _heading(kind: str, num: str, title: str) -> str: |
| title = _clean_line(title) |
| return f"{num}{kind} {title}".strip() if title else f"{num}{kind}" |
|
|
| def _collect_heading_positions(text: str) -> Dict[str, List[Tuple[int, str]]]: |
| positions: Dict[str, List[Tuple[int, str]]] = { |
| "part": [], |
| "subpart": [], |
| "chapter": [], |
| "section": [], |
| } |
| for m in PART_RE.finditer(text): |
| positions["part"].append((m.start(), _heading("编", m.group("num"), m.group("title")))) |
| for m in SUBPART_RE.finditer(text): |
| positions["subpart"].append((m.start(), _heading("分编", m.group("num"), m.group("title")))) |
| for m in CHAPTER_RE.finditer(text): |
| positions["chapter"].append((m.start(), _heading("章", m.group("num"), m.group("title")))) |
| for m in SECTION_RE.finditer(text): |
| positions["section"].append((m.start(), _heading("节", m.group("num"), m.group("title")))) |
| return positions |
|
|
| def _last_heading_before(items: List[Tuple[int, str]], pos: int) -> str: |
| last = "" |
| for p, val in items: |
| if p <= pos: |
| last = val |
| else: |
| break |
| return last |
|
|
| def _strip_heading_lines(lines: List[str]) -> List[str]: |
| out: List[str] = [] |
| for raw in lines: |
| line = _clean_line(raw) |
| if not line: |
| out.append(raw) |
| continue |
| if PART_RE.match(line) or SUBPART_RE.match(line) or CHAPTER_RE.match(line) or SECTION_RE.match(line): |
| continue |
| inline = INLINE_HEADING_RE.search(line) |
| if inline: |
| prefix = line[:inline.start()].strip() |
| if prefix: |
| out.append(prefix) |
| continue |
| out.append(line) |
| return out |
|
|
| def _read_text(path: Path) -> str: |
| try: |
| return path.read_text(encoding="utf-8") |
| except UnicodeDecodeError: |
| return path.read_text(encoding="gb18030", errors="ignore") |
|
|
| def _write_jsonl(path: Path, records: List[Dict]) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with path.open("w", encoding="utf-8") as f: |
| for r in records: |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") |
|
|
| @dataclass |
| class State: |
| law_name: str |
| source: str |
| lang: str |
| part: str = "" |
| subpart: str = "" |
| chapter: str = "" |
| section: str = "" |
| cur_key: Optional[str] = None |
| cur_no: Optional[str] = None |
| cur_lines: List[str] = None |
|
|
| def __post_init__(self) -> None: |
| self.cur_lines = self.cur_lines or [] |
|
|
| def _finalize(st: State, out: List[Dict]) -> None: |
| if not st.cur_no: |
| return |
| text = _merge_lines(st.cur_lines) |
| if not text: |
| return |
|
|
| article_key = st.cur_key or st.cur_no |
| rec_id = f"{st.source}::{article_key}" |
|
|
| out.append( |
| { |
| "id": rec_id, |
| "law_name": st.law_name, |
| "lang": st.lang, |
| "part": st.part, |
| "subpart": st.subpart, |
| "chapter": st.chapter, |
| "section": st.section, |
| "article_no": st.cur_no, |
| "article_key": article_key, |
| "article_id": normalize_article_no(st.cur_no) , |
| "text": text, |
| "source": st.source, |
| } |
| ) |
| st.cur_key = None |
| st.cur_no = None |
| st.cur_lines = [] |
|
|
| def _finalize_en(st: State, out: List[Dict]) -> None: |
| if not st.cur_no: |
| return |
| text = _merge_lines(st.cur_lines) |
| if not text: |
| return |
|
|
| article_key = st.cur_key or st.cur_no |
| rec_id = f"{st.source}::{article_key}" |
|
|
| out.append( |
| { |
| "id": rec_id, |
| "law_name": st.law_name, |
| "lang": st.lang, |
| "part": st.part, |
| "subpart": st.subpart, |
| "chapter": st.chapter, |
| "section": st.section, |
| "article_no": st.cur_no, |
| "article_key": article_key, |
| "article_id": article_key, |
| "text": text, |
| "source": st.source, |
| } |
| ) |
| st.cur_key = None |
| st.cur_no = None |
| st.cur_lines = [] |
|
|
| def parse_english_by_lines(text: str, source: str, law_name: str) -> List[Dict]: |
| st = State(law_name=law_name, source=source, lang="en") |
| records: List[Dict] = [] |
|
|
| for raw in text.splitlines(): |
| line = _clean_line(raw) |
| if not line: |
| if st.cur_no: |
| st.cur_lines.append("") |
| continue |
|
|
| m = EN_ARTICLE_RE.match(line) |
| if m: |
| st.chapter = _heading("Article", m.group("num"), m.group("title")) |
| st.section = "" |
| continue |
|
|
| m = EN_PART_RE.match(line) |
| if m: |
| st.section = _heading("Part", m.group("num"), m.group("title")) |
| continue |
|
|
| m = EN_SECTION_LINE_RE.match(line) |
| if m: |
| _finalize_en(st, records) |
| key = _clean_line(m.group("id")) |
| st.cur_key = key |
| st.cur_no = f"§ {key}" |
| st.cur_lines = [line] |
| continue |
|
|
| if st.cur_no: |
| st.cur_lines.append(line) |
|
|
| _finalize_en(st, records) |
| return records |
|
|
| def parse_by_lines(text: str, source: str, law_name: str) -> List[Dict]: |
| if detect_lang(text) == "en": |
| return parse_english_by_lines(text, source=source, law_name=law_name) |
| text = _normalize_article_markers(text) |
| st = State(law_name=law_name, source=source, lang=detect_lang(text)) |
| records: List[Dict] = [] |
|
|
| for raw in text.splitlines(): |
| line = _clean_line(raw) |
| if not line: |
| if st.cur_no: |
| st.cur_lines.append("") |
| continue |
|
|
| |
| m = PART_RE.match(line) |
| if m and not ARTICLE_LINE_RE.match(line): |
| if st.cur_no: |
| _finalize(st, records) |
| st.part = _heading("编", m.group("num"), m.group("title")) |
| st.subpart = "" |
| st.chapter = "" |
| st.section = "" |
| continue |
|
|
| m = SUBPART_RE.match(line) |
| if m and not ARTICLE_LINE_RE.match(line): |
| if st.cur_no: |
| _finalize(st, records) |
| st.subpart = _heading("分编", m.group("num"), m.group("title")) |
| st.chapter = "" |
| st.section = "" |
| continue |
|
|
| m = CHAPTER_RE.match(line) |
| if m and not ARTICLE_LINE_RE.match(line): |
| if st.cur_no: |
| _finalize(st, records) |
| st.chapter = _heading("章", m.group("num"), m.group("title")) |
| st.section = "" |
| continue |
|
|
| m = SECTION_RE.match(line) |
| if m and not ARTICLE_LINE_RE.match(line): |
| if st.cur_no: |
| _finalize(st, records) |
| st.section = _heading("节", m.group("num"), m.group("title")) |
| continue |
|
|
| |
| m = ARTICLE_LINE_RE.match(line) |
| if m: |
| _finalize(st, records) |
| key = _clean_line(m.group("num")) |
| st.cur_key = key |
| st.cur_no = f"第{key}条" |
| st.cur_lines = [line] |
| continue |
| m = ARTICLE_LINE_NO_DAI_RE.match(line) |
| if m: |
| _finalize(st, records) |
| key = _clean_line(m.group("num")) |
| st.cur_key = key |
| st.cur_no = f"第{key}条" |
| st.cur_lines = [line] |
| continue |
|
|
| |
| if st.cur_no: |
| st.cur_lines.append(line) |
|
|
| _finalize(st, records) |
| return records |
|
|
| def parse_english_by_scan_fallback(text: str, source: str, law_name: str) -> List[Dict]: |
| matches = list(EN_SECTION_SCAN_RE.finditer(text)) |
| matches = sorted(matches, key=lambda m: m.start()) |
| if not matches: |
| return [] |
|
|
| records: List[Dict] = [] |
| for i, m in enumerate(matches): |
| start = m.start() |
| end = matches[i + 1].start() if i + 1 < len(matches) else len(text) |
| segment = text[start:end].strip() |
|
|
| key = _clean_line(m.group("id")) |
| article_no = f"§ {key}" |
| rec_id = f"{source}::{key}" |
|
|
| seg_text = _merge_lines(segment.splitlines()) |
|
|
| records.append( |
| { |
| "id": rec_id, |
| "law_name": law_name, |
| "lang": "en", |
| "part": "", |
| "subpart": "", |
| "chapter": "", |
| "section": "", |
| "article_no": article_no, |
| "article_key": key, |
| "article_id": key, |
| "text": seg_text, |
| "source": source, |
| } |
| ) |
| return records |
|
|
| def parse_by_scan_fallback(text: str, source: str, law_name: str) -> List[Dict]: |
| """ |
| Fallback when the input is not cleanly line-broken (common with PDF copy-paste). |
| We scan the whole text for article markers and slice segments between them. |
| """ |
| if detect_lang(text) == "en": |
| return parse_english_by_scan_fallback(text, source=source, law_name=law_name) |
| text = _normalize_article_markers(text) |
| lang = detect_lang(text) |
| heading_pos = _collect_heading_positions(text) |
| matches = list(ARTICLE_SCAN_RE.finditer(text)) + list(ARTICLE_SCAN_NO_DAI_RE.finditer(text)) |
| matches = sorted(matches, key=lambda m: m.start()) |
| matches = [m for m in matches if not _is_citation_start(text, m.start())] |
| if not matches: |
| return [] |
|
|
| records: List[Dict] = [] |
| for i, m in enumerate(matches): |
| start = m.start() |
| end = matches[i + 1].start() if i + 1 < len(matches) else len(text) |
| segment = text[start:end].strip() |
|
|
| key = _clean_line(m.group("num")) |
| article_no = f"第{key}条" |
| rec_id = f"{source}::{key}" |
|
|
| seg_lines = _strip_heading_lines(segment.splitlines()) |
| seg_text = _merge_lines(seg_lines) |
| part = _last_heading_before(heading_pos["part"], start) |
| subpart = _last_heading_before(heading_pos["subpart"], start) |
| chapter = _last_heading_before(heading_pos["chapter"], start) |
| section = _last_heading_before(heading_pos["section"], start) |
|
|
| records.append( |
| { |
| "id": rec_id, |
| "law_name": law_name, |
| "lang": lang, |
| "part": part, |
| "subpart": subpart, |
| "chapter": chapter, |
| "section": section, |
| "article_no": article_no, |
| "article_key": key, |
| "article_id": normalize_article_no(article_no) , |
| "text": seg_text, |
| "source": source, |
| } |
| ) |
| return records |
|
|
| def debug_preview(text: str) -> None: |
| lines = text.splitlines() |
| logger.info("---- raw preview (first 20 lines) ----") |
| for i, ln in enumerate(lines[:20], start=1): |
| logger.info(f"{i:02d}: {ln[:200]}") |
| logger.info("-------------------------------------") |
|
|
| |
| line_hits = sum(1 for ln in lines if ARTICLE_LINE_RE.match(_clean_line(ln))) |
| scan_hits = len(list(ARTICLE_SCAN_RE.finditer(text))) |
| logger.info(f"[debug] ARTICLE_LINE_RE line hits = {line_hits}") |
| logger.info(f"[debug] ARTICLE_SCAN_RE scan hits = {scan_hits}") |
|
|
| def main() -> int: |
| cfg = AppConfig.load(None) |
|
|
| raw_dir = Path(cfg.paths.raw_dir) |
| out_root = Path(cfg.paths.processed_dir) |
|
|
| txt_files = sorted(raw_dir.rglob("*.txt")) |
| logger.info(f"Raw dir: {raw_dir.resolve()}") |
| logger.info(f"Found txt files: {[p.name for p in txt_files]}") |
|
|
| if not txt_files: |
| logger.error(f"No .txt found under {raw_dir}") |
| return 2 |
|
|
| all_records: List[Dict] = [] |
| for p in txt_files: |
| logger.info(f"Parsing: {p}") |
| text = _read_text(p) |
| logger.info(f"File size: {len(text)} chars") |
|
|
| lang = detect_lang(text) |
| law_name = "Uniform Commercial Code" if lang == "en" else "中华人民共和国民法典" |
| recs_line = parse_by_lines(text, source=p.name, law_name=law_name) |
| recs_scan = parse_by_scan_fallback(text, source=p.name, law_name=law_name) |
| if recs_scan and (len(recs_line) < 10 or len(recs_scan) > len(recs_line)): |
| logger.warning( |
| "Switching to scan fallback (line=%d scan=%d).", |
| len(recs_line), |
| len(recs_scan), |
| ) |
| recs = recs_scan |
| else: |
| recs = recs_line |
|
|
| logger.info(f"Parsed records from {p.name}: {len(recs)}") |
| all_records.extend(recs) |
|
|
| by_lang: Dict[str, List[Dict]] = {"zh": [], "en": []} |
| for r in all_records: |
| r_lang = str(r.get("lang") or "zh").strip().lower() |
| if r_lang not in by_lang: |
| by_lang[r_lang] = [] |
| by_lang[r_lang].append(r) |
|
|
| total = sum(len(v) for v in by_lang.values()) |
| logger.info(f"Total records: {total}") |
| for lang_key, recs in by_lang.items(): |
| if not recs: |
| continue |
| out_path = out_root / f"law_{lang_key}.jsonl" |
| _write_jsonl(out_path, recs) |
| logger.info("Saved %d records to %s", len(recs), out_path) |
| return 0 |
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|