""" ================================================================================ RAG Document Preprocessing Pipeline — v4 (Structural + Semantic Awareness) University-Level NLP System — KASIT Faculty Assistant ================================================================================ KEY IMPROVEMENTS vs v3: ✅ Section/heading-aware DOCX chunking — Heading styles mark section boundaries; the section title is injected into every chunk so the LLM always knows which part of the document a passage belongs to. ✅ Table-aware extraction — detects the header row and prepends column names to every data row, making each row self-contained and searchable. E.g. "Date: March 11 | Time: 9:00 AM | Course Code: 1902214 | ..." This is critical for exam schedules, office-hours tables and fee tables. ✅ Document-type detection — filename-based routing assigns a doc_type label (exam_schedule, office_hours, study_plan, scholarship, regulation, …) to every chunk so the LLM can interpret context correctly. ✅ Arabic-aware chunk sizing — 700 chars for Arabic (denser script), 500 chars for English, matching proportional reading units. ✅ Semantic split for regulation docs — splits at article markers (المادة X / Article X) before falling back to char-based chunking, so each article stays together and is not truncated mid-clause. ✅ Minimum chunk length filter — drops noise fragments shorter than 60 chars. ✅ Rich per-chunk metadata: doc_type + section_title in every record. ================================================================================ """ import json import re import unicodedata from collections import Counter from pathlib import Path from typing import Dict, List import fitz # PyMuPDF from docx import Document from docx.oxml.ns import qn from docx.table import Table as DocxTable from docx.text.paragraph import Paragraph as DocxParagraph from langdetect import LangDetectException, detect # ── Paths & tunables ────────────────────────────────────────────────────────── INPUT_DIR = Path("input_documents") OUTPUT_FILE = Path("rag_dataset.json") CHUNK_SIZE_EN = 500 # chars — English (lower density) CHUNK_SIZE_AR = 700 # chars — Arabic (higher glyph density per char) OVERLAP_EN = 100 OVERLAP_AR = 150 MIN_CHUNK_LEN = 60 # drop fragments shorter than this # ══════════════════════════════════════════════════════════════════════════════ # Language helpers # ══════════════════════════════════════════════════════════════════════════════ def detect_language(text: str) -> str: if not text or not text.strip(): return "Unknown" arabic_chars = len(re.findall(r"[؀-ۿ]", text)) latin_chars = len(re.findall(r"[A-Za-z]", text)) total = arabic_chars + latin_chars if total == 0: return "Unknown" ratio = arabic_chars / total if ratio > 0.6: return "Arabic" if ratio < 0.1: try: code = detect(text) return "English" if code == "en" else code.upper() except LangDetectException: return "English" return "Mixed" def _arabic_dominant(text: str) -> bool: alpha = [c for c in text if c.isalpha()] if not alpha: return False return sum(1 for c in alpha if "؀" <= c <= "ۿ") / len(alpha) > 0.4 # ══════════════════════════════════════════════════════════════════════════════ # Document-type detection (filename-based) # ══════════════════════════════════════════════════════════════════════════════ _DOC_TYPE_MAP: List[tuple] = [ ("exam_schedule", ["mid_exam", "exam_schedul", "final_exam"]), ("office_hours", ["office_hours", "office hours", "proffs"]), ("academic_calendar", ["calendar", "uni_cal", "academic_cal"]), ("study_plan", ["study plan", "study_plan"]), ("course_records", ["course record", "course_record"]), ("departments", ["department", "majors", "departments nad"]), ("admissions_fees", ["admission", "fees_rag", "admissions_fees"]), ("scholarship", ["makruma", "مكرمة", "teachers_grant", "ashaer", "الجيش", "ثلاث", "moalim"]), ("regulation", ["تعليمات", "قانون", "دليل_اعضاء", "دليل اعضاء"]), ("knowledge_base", ["knowledge_base", "kasit_knowledge"]), ("faculty_info", ["faculty_it", "faculty_infor"]), ("curriculum", ["curriculum", "ai-english", "ds-english", "ai_curriculum"]), ("careers", ["career"]), ("contacts", ["email", "docs_email"]), ("english_system", ["english_sys"]), ] def detect_doc_type(filename: str) -> str: name = filename.lower() for dtype, patterns in _DOC_TYPE_MAP: if any(p in name for p in patterns): return dtype return "general" # ══════════════════════════════════════════════════════════════════════════════ # Text cleaning # ══════════════════════════════════════════════════════════════════════════════ _KEEP = re.compile( r"[^؀-ۿݐ-ݿﭐ-﷿ﹰ-" r"A-Za-z0-9\s\.,;:!?\-\(\)\[\]\"\'،؟؛/\\@#%&*+=<>\|_]" ) def clean_text(text: str) -> str: if not text: return "" text = unicodedata.normalize("NFC", text) text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]", " ", text) text = _KEEP.sub(" ", text) text = re.sub(r" {3,}", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() # ══════════════════════════════════════════════════════════════════════════════ # DOCX: structured block extraction (body-order traversal) # ══════════════════════════════════════════════════════════════════════════════ _DATA_VALUE_RE = re.compile( r"\d{2,4}[:/]\d{2}" # times 09:00 or 1:30 r"|\d{1,2}[-/]\d{1,2}" # short dates 3/11 r"|صباح|مساء|\bAM\b|\bPM\b" # AM / PM in either script ) def _first_row_is_header(cells: List[str]) -> bool: """Heuristic: the first table row is a header when its cells are short labels (< 35 chars average) and none of them contain a data-value pattern (times, dates, AM/PM).""" if not cells: return False if any(_DATA_VALUE_RE.search(c) for c in cells): return False return (sum(len(c) for c in cells) / len(cells)) < 35 def _table_to_blocks(table: DocxTable, section: str) -> List[Dict]: """ Convert a DOCX table to self-contained text blocks. If a header row is detected, each data row becomes: "ColName: value | ColName: value | ..." This makes every row independently searchable — critical for exam schedules (Date / Time / Course / Professor / Room) and fee tables. """ rows: List[List[str]] = [] for row in table.rows: seen: set = set() cells: List[str] = [] for cell in row.cells: t = cell.text.strip() if t and t not in seen: cells.append(t) seen.add(t) if cells: rows.append(cells) if not rows: return [] headers = rows[0] if _first_row_is_header(rows[0]) else [] data_rows = rows[1:] if headers else rows blocks = [] for row_cells in data_rows: if not row_cells: continue if headers: parts = [] for i, val in enumerate(row_cells): col = headers[i] if i < len(headers) else f"col{i + 1}" parts.append(f"{col}: {val}") text = " | ".join(parts) else: text = " | ".join(row_cells) text = clean_text(text) if len(text) >= MIN_CHUNK_LEN: blocks.append({ "text": text, "section_title": section, "is_table_row": True, "is_heading": False, }) return blocks def extract_docx_blocks(filepath: Path) -> List[Dict]: """ Walk the DOCX body in document order (paragraphs AND tables interleaved), track the current section heading, and return a list of raw blocks. Each block: {text, section_title, is_table_row, is_heading} """ try: doc = Document(str(filepath)) except Exception as exc: print(f" [ERROR] Cannot open DOCX '{filepath.name}': {exc}") return [] blocks: List[Dict] = [] section = "" for child in doc.element.body: tag = child.tag if tag == qn("w:p"): para = DocxParagraph(child, doc) text = para.text.strip() if not text: continue is_heading = False try: style = para.style.name or "" is_heading = style.lower().startswith("heading") except Exception: pass if is_heading: section = text blocks.append({ "text": text, "section_title": text, "is_table_row": False, "is_heading": True, }) else: blocks.append({ "text": text, "section_title": section, "is_table_row": False, "is_heading": False, }) elif tag == qn("w:tbl"): table = DocxTable(child, doc) for b in _table_to_blocks(table, section): blocks.append(b) return blocks # ══════════════════════════════════════════════════════════════════════════════ # PDF extraction # ══════════════════════════════════════════════════════════════════════════════ def extract_text_from_pdf(filepath: Path) -> str: parts: List[str] = [] try: doc = fitz.open(str(filepath)) except Exception as exc: print(f" [ERROR] Cannot open PDF '{filepath.name}': {exc}") return "" for page_num, page in enumerate(doc, start=1): try: blocks = sorted(page.get_text("blocks"), key=lambda b: (b[1], b[0])) for block in blocks: if block[4].strip(): parts.append(block[4]) except Exception as exc: print(f" [WARN] Page {page_num} of '{filepath.name}' skipped: {exc}") doc.close() return "\n".join(parts) # ══════════════════════════════════════════════════════════════════════════════ # Semantic chunking # ══════════════════════════════════════════════════════════════════════════════ _ARTICLE_MARKER = re.compile(r"(?:^|\n)((?:المادة|Article)\s+\d+)", re.IGNORECASE) _SENT_END = re.compile(r"[.!?؟\n]") def _char_chunk(text: str, size: int, overlap: int) -> List[str]: if not text: return [] chunks: List[str] = [] start, n = 0, len(text) while start < n: end = min(start + size, n) if end < n: m = list(_SENT_END.finditer(text, start, end)) if m: end = m[-1].end() else: sp = text.rfind(" ", start, end) if sp > start: end = sp chunk = text[start:end].strip() if chunk: chunks.append(chunk) start = end - overlap if end - overlap > start else end return chunks def chunk_semantic(text: str, is_arabic: bool = False) -> List[str]: """ Split text respecting structural boundaries: 1. Arabic article markers (المادة X) or English 'Article X' — for regulations. 2. Fall back to overlapping char-based chunking with sentence-end preference. """ size = CHUNK_SIZE_AR if is_arabic else CHUNK_SIZE_EN overlap = OVERLAP_AR if is_arabic else OVERLAP_EN markers = list(_ARTICLE_MARKER.finditer(text)) if len(markers) >= 2: segments = [] for i, m in enumerate(markers): seg_end = markers[i + 1].start() if i + 1 < len(markers) else len(text) segments.append(text[m.start():seg_end].strip()) chunks = [] for seg in segments: chunks.extend(_char_chunk(seg, size, overlap)) return [c for c in chunks if len(c) >= MIN_CHUNK_LEN] return [c for c in _char_chunk(text, size, overlap) if len(c) >= MIN_CHUNK_LEN] # ══════════════════════════════════════════════════════════════════════════════ # Record builder # ══════════════════════════════════════════════════════════════════════════════ def _record(text: str, source: str, chunk_id: int, doc_type: str, section_title: str) -> Dict: return { "text": text, "source": source, "chunk_id": chunk_id, "language": detect_language(text), "was_translated": False, "doc_type": doc_type, "section_title": section_title, } # ══════════════════════════════════════════════════════════════════════════════ # File processors # ══════════════════════════════════════════════════════════════════════════════ def process_docx(filepath: Path, doc_type: str) -> List[Dict]: """ Process DOCX with full structural awareness. Strategy: - Heading blocks mark section boundaries; heading text is prepended to the following paragraph buffer so every chunk carries section context. - Table rows are emitted as individual atomic records (they are already self-contained after header injection). - Consecutive paragraphs within the same section are buffered and then chunked semantically together. """ blocks = extract_docx_blocks(filepath) if not blocks: return [] records: List[Dict] = [] idx = 1 para_buf: List[str] = [] buf_section = "" def flush() -> None: nonlocal idx, para_buf if not para_buf: return combined = clean_text("\n".join(para_buf)) para_buf = [] if not combined: return is_ar = _arabic_dominant(combined) for chunk in chunk_semantic(combined, is_arabic=is_ar): if len(chunk) >= MIN_CHUNK_LEN: records.append(_record(chunk, filepath.name, idx, doc_type, buf_section)) idx += 1 for block in blocks: if block["is_heading"]: flush() buf_section = block["section_title"] para_buf.append(block["text"]) # heading text opens the next chunk for context elif block["is_table_row"]: # Table rows get their own atomic records (section boundary has no effect) flush() text = block["text"] if len(text) >= MIN_CHUNK_LEN: records.append(_record(text, filepath.name, idx, doc_type, block.get("section_title", ""))) idx += 1 else: # Regular paragraph — flush on section change if block["section_title"] != buf_section: if para_buf: flush() buf_section = block["section_title"] para_buf.append(block["text"]) flush() return records def process_pdf(filepath: Path, doc_type: str) -> List[Dict]: raw = extract_text_from_pdf(filepath) if not raw.strip(): print(f" [WARN] No text extracted from '{filepath.name}'.") return [] cleaned = clean_text(raw) if not cleaned: return [] is_ar = _arabic_dominant(cleaned) records = [] for idx, chunk in enumerate(chunk_semantic(cleaned, is_arabic=is_ar), start=1): if len(chunk) >= MIN_CHUNK_LEN: records.append(_record(chunk, filepath.name, idx, doc_type, "")) return records def process_file(filepath: Path) -> List[Dict]: suffix = filepath.suffix.lower() doc_type = detect_doc_type(filepath.name) print(f" → [{doc_type:<22}] '{filepath.name}' ...") if suffix == ".pdf": records = process_pdf(filepath, doc_type) elif suffix in (".docx", ".doc"): records = process_docx(filepath, doc_type) else: print(f" [SKIP] Unsupported format: {suffix}") return [] print(f" ✓ {len(records)} chunks") return records # ══════════════════════════════════════════════════════════════════════════════ # Main # ══════════════════════════════════════════════════════════════════════════════ def main() -> None: print("=" * 70) print(" RAG Preprocessor v4 — Section + Table-aware + Semantic Chunking") print(f" English chunks: {CHUNK_SIZE_EN} chars | Arabic: {CHUNK_SIZE_AR} chars") print("=" * 70) if not INPUT_DIR.exists(): INPUT_DIR.mkdir(parents=True) print(f"\n[INFO] Created '{INPUT_DIR}/' — add your documents and re-run.\n") return files = [ f for f in INPUT_DIR.iterdir() if f.is_file() and f.suffix.lower() in {".pdf", ".docx", ".doc"} ] if not files: print(f"\n[INFO] No supported files found in '{INPUT_DIR}/'.\n") return print(f"\nFound {len(files)} file(s):\n") all_records: List[Dict] = [] for f in sorted(files): print(f"[FILE] {f.name}") all_records.extend(process_file(f)) print() if not all_records: print("[WARN] No records produced. Exiting.") return with open(OUTPUT_FILE, "w", encoding="utf-8") as fh: json.dump(all_records, fh, ensure_ascii=False, indent=2) ar = sum(1 for r in all_records if r["language"] == "Arabic") en = sum(1 for r in all_records if r["language"] == "English") mx = sum(1 for r in all_records if r["language"] == "Mixed") dtypes = Counter(r.get("doc_type", "general") for r in all_records) print("=" * 70) print(f" ✅ {len(all_records)} total chunks → '{OUTPUT_FILE}'") print(f" Arabic: {ar} | English: {en} | Mixed: {mx}") print(f"\n Breakdown by document type:") for dt, cnt in dtypes.most_common(): print(f" {dt:<22}: {cnt:>4} chunks") print("=" * 70) if __name__ == "__main__": main()