Spaces:
Running
Running
| """ | |
| ================================================================================ | |
| RAG Document Preprocessing Pipeline β v4 (Structural + Semantic Awareness) | |
| University-Level NLP System β KASIT Faculty Assistant | |
| ================================================================================ | |
| KEY IMPROVEMENTS vs v3: | |
| β Section/heading-aware DOCX chunking β Heading styles mark section | |
| boundaries; the section title is injected into every chunk so the LLM | |
| always knows which part of the document a passage belongs to. | |
| β Table-aware extraction β detects the header row and prepends column names | |
| to every data row, making each row self-contained and searchable. | |
| E.g. "Date: March 11 | Time: 9:00 AM | Course Code: 1902214 | ..." | |
| This is critical for exam schedules, office-hours tables and fee tables. | |
| β Document-type detection β filename-based routing assigns a doc_type label | |
| (exam_schedule, office_hours, study_plan, scholarship, regulation, β¦) | |
| to every chunk so the LLM can interpret context correctly. | |
| β Arabic-aware chunk sizing β 700 chars for Arabic (denser script), | |
| 500 chars for English, matching proportional reading units. | |
| β Semantic split for regulation docs β splits at article markers | |
| (Ψ§ΩΩ Ψ§Ψ―Ψ© X / Article X) before falling back to char-based chunking, | |
| so each article stays together and is not truncated mid-clause. | |
| β Minimum chunk length filter β drops noise fragments shorter than 60 chars. | |
| β Rich per-chunk metadata: doc_type + section_title in every record. | |
| ================================================================================ | |
| """ | |
| import json | |
| import re | |
| import unicodedata | |
| from collections import Counter | |
| from pathlib import Path | |
| from typing import Dict, List | |
| import fitz # PyMuPDF | |
| from docx import Document | |
| from docx.oxml.ns import qn | |
| from docx.table import Table as DocxTable | |
| from docx.text.paragraph import Paragraph as DocxParagraph | |
| from langdetect import LangDetectException, detect | |
| # ββ Paths & tunables ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| INPUT_DIR = Path("input_documents") | |
| OUTPUT_FILE = Path("rag_dataset.json") | |
| CHUNK_SIZE_EN = 500 # chars β English (lower density) | |
| CHUNK_SIZE_AR = 700 # chars β Arabic (higher glyph density per char) | |
| OVERLAP_EN = 100 | |
| OVERLAP_AR = 150 | |
| MIN_CHUNK_LEN = 60 # drop fragments shorter than this | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Language helpers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def detect_language(text: str) -> str: | |
| if not text or not text.strip(): | |
| return "Unknown" | |
| arabic_chars = len(re.findall(r"[Ψ-ΫΏ]", text)) | |
| latin_chars = len(re.findall(r"[A-Za-z]", text)) | |
| total = arabic_chars + latin_chars | |
| if total == 0: | |
| return "Unknown" | |
| ratio = arabic_chars / total | |
| if ratio > 0.6: | |
| return "Arabic" | |
| if ratio < 0.1: | |
| try: | |
| code = detect(text) | |
| return "English" if code == "en" else code.upper() | |
| except LangDetectException: | |
| return "English" | |
| return "Mixed" | |
| def _arabic_dominant(text: str) -> bool: | |
| alpha = [c for c in text if c.isalpha()] | |
| if not alpha: | |
| return False | |
| return sum(1 for c in alpha if "Ψ" <= c <= "ΫΏ") / len(alpha) > 0.4 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Document-type detection (filename-based) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _DOC_TYPE_MAP: List[tuple] = [ | |
| ("exam_schedule", ["mid_exam", "exam_schedul", "final_exam"]), | |
| ("office_hours", ["office_hours", "office hours", "proffs"]), | |
| ("academic_calendar", ["calendar", "uni_cal", "academic_cal"]), | |
| ("study_plan", ["study plan", "study_plan"]), | |
| ("course_records", ["course record", "course_record"]), | |
| ("departments", ["department", "majors", "departments nad"]), | |
| ("admissions_fees", ["admission", "fees_rag", "admissions_fees"]), | |
| ("scholarship", ["makruma", "Ω ΩΨ±Ω Ψ©", "teachers_grant", | |
| "ashaer", "Ψ§ΩΨ¬ΩΨ΄", "Ψ«ΩΨ§Ψ«", "moalim"]), | |
| ("regulation", ["ΨͺΨΉΩΩΩ Ψ§Ψͺ", "ΩΨ§ΩΩΩ", "Ψ―ΩΩΩ_Ψ§ΨΉΨΆΨ§Ψ‘", "Ψ―ΩΩΩ Ψ§ΨΉΨΆΨ§Ψ‘"]), | |
| ("knowledge_base", ["knowledge_base", "kasit_knowledge"]), | |
| ("faculty_info", ["faculty_it", "faculty_infor"]), | |
| ("curriculum", ["curriculum", "ai-english", "ds-english", "ai_curriculum"]), | |
| ("careers", ["career"]), | |
| ("contacts", ["email", "docs_email"]), | |
| ("english_system", ["english_sys"]), | |
| ] | |
| def detect_doc_type(filename: str) -> str: | |
| name = filename.lower() | |
| for dtype, patterns in _DOC_TYPE_MAP: | |
| if any(p in name for p in patterns): | |
| return dtype | |
| return "general" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Text cleaning | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _KEEP = re.compile( | |
| r"[^Ψ-ΫΏέ-έΏο-ο·ΏοΉ°-ο»Ώ" | |
| r"A-Za-z0-9\s\.,;:!?\-\(\)\[\]\"\'ΨΨΨ/\\@#%&*+=<>\|_]" | |
| ) | |
| def clean_text(text: str) -> str: | |
| if not text: | |
| return "" | |
| text = unicodedata.normalize("NFC", text) | |
| text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]", " ", text) | |
| text = _KEEP.sub(" ", text) | |
| text = re.sub(r" {3,}", " ", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| return text.strip() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DOCX: structured block extraction (body-order traversal) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _DATA_VALUE_RE = re.compile( | |
| r"\d{2,4}[:/]\d{2}" # times 09:00 or 1:30 | |
| r"|\d{1,2}[-/]\d{1,2}" # short dates 3/11 | |
| r"|Ψ΅Ψ¨Ψ§Ψ|Ω Ψ³Ψ§Ψ‘|\bAM\b|\bPM\b" # AM / PM in either script | |
| ) | |
| def _first_row_is_header(cells: List[str]) -> bool: | |
| """Heuristic: the first table row is a header when its cells are short | |
| labels (< 35 chars average) and none of them contain a data-value pattern | |
| (times, dates, AM/PM).""" | |
| if not cells: | |
| return False | |
| if any(_DATA_VALUE_RE.search(c) for c in cells): | |
| return False | |
| return (sum(len(c) for c in cells) / len(cells)) < 35 | |
| def _table_to_blocks(table: DocxTable, section: str) -> List[Dict]: | |
| """ | |
| Convert a DOCX table to self-contained text blocks. | |
| If a header row is detected, each data row becomes: | |
| "ColName: value | ColName: value | ..." | |
| This makes every row independently searchable β critical for exam | |
| schedules (Date / Time / Course / Professor / Room) and fee tables. | |
| """ | |
| rows: List[List[str]] = [] | |
| for row in table.rows: | |
| seen: set = set() | |
| cells: List[str] = [] | |
| for cell in row.cells: | |
| t = cell.text.strip() | |
| if t and t not in seen: | |
| cells.append(t) | |
| seen.add(t) | |
| if cells: | |
| rows.append(cells) | |
| if not rows: | |
| return [] | |
| headers = rows[0] if _first_row_is_header(rows[0]) else [] | |
| data_rows = rows[1:] if headers else rows | |
| blocks = [] | |
| for row_cells in data_rows: | |
| if not row_cells: | |
| continue | |
| if headers: | |
| parts = [] | |
| for i, val in enumerate(row_cells): | |
| col = headers[i] if i < len(headers) else f"col{i + 1}" | |
| parts.append(f"{col}: {val}") | |
| text = " | ".join(parts) | |
| else: | |
| text = " | ".join(row_cells) | |
| text = clean_text(text) | |
| if len(text) >= MIN_CHUNK_LEN: | |
| blocks.append({ | |
| "text": text, | |
| "section_title": section, | |
| "is_table_row": True, | |
| "is_heading": False, | |
| }) | |
| return blocks | |
| def extract_docx_blocks(filepath: Path) -> List[Dict]: | |
| """ | |
| Walk the DOCX body in document order (paragraphs AND tables interleaved), | |
| track the current section heading, and return a list of raw blocks. | |
| Each block: {text, section_title, is_table_row, is_heading} | |
| """ | |
| try: | |
| doc = Document(str(filepath)) | |
| except Exception as exc: | |
| print(f" [ERROR] Cannot open DOCX '{filepath.name}': {exc}") | |
| return [] | |
| blocks: List[Dict] = [] | |
| section = "" | |
| for child in doc.element.body: | |
| tag = child.tag | |
| if tag == qn("w:p"): | |
| para = DocxParagraph(child, doc) | |
| text = para.text.strip() | |
| if not text: | |
| continue | |
| is_heading = False | |
| try: | |
| style = para.style.name or "" | |
| is_heading = style.lower().startswith("heading") | |
| except Exception: | |
| pass | |
| if is_heading: | |
| section = text | |
| blocks.append({ | |
| "text": text, | |
| "section_title": text, | |
| "is_table_row": False, | |
| "is_heading": True, | |
| }) | |
| else: | |
| blocks.append({ | |
| "text": text, | |
| "section_title": section, | |
| "is_table_row": False, | |
| "is_heading": False, | |
| }) | |
| elif tag == qn("w:tbl"): | |
| table = DocxTable(child, doc) | |
| for b in _table_to_blocks(table, section): | |
| blocks.append(b) | |
| return blocks | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PDF extraction | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_text_from_pdf(filepath: Path) -> str: | |
| parts: List[str] = [] | |
| try: | |
| doc = fitz.open(str(filepath)) | |
| except Exception as exc: | |
| print(f" [ERROR] Cannot open PDF '{filepath.name}': {exc}") | |
| return "" | |
| for page_num, page in enumerate(doc, start=1): | |
| try: | |
| blocks = sorted(page.get_text("blocks"), key=lambda b: (b[1], b[0])) | |
| for block in blocks: | |
| if block[4].strip(): | |
| parts.append(block[4]) | |
| except Exception as exc: | |
| print(f" [WARN] Page {page_num} of '{filepath.name}' skipped: {exc}") | |
| doc.close() | |
| return "\n".join(parts) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Semantic chunking | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _ARTICLE_MARKER = re.compile(r"(?:^|\n)((?:Ψ§ΩΩ Ψ§Ψ―Ψ©|Article)\s+\d+)", re.IGNORECASE) | |
| _SENT_END = re.compile(r"[.!?Ψ\n]") | |
| def _char_chunk(text: str, size: int, overlap: int) -> List[str]: | |
| if not text: | |
| return [] | |
| chunks: List[str] = [] | |
| start, n = 0, len(text) | |
| while start < n: | |
| end = min(start + size, n) | |
| if end < n: | |
| m = list(_SENT_END.finditer(text, start, end)) | |
| if m: | |
| end = m[-1].end() | |
| else: | |
| sp = text.rfind(" ", start, end) | |
| if sp > start: | |
| end = sp | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| start = end - overlap if end - overlap > start else end | |
| return chunks | |
| def chunk_semantic(text: str, is_arabic: bool = False) -> List[str]: | |
| """ | |
| Split text respecting structural boundaries: | |
| 1. Arabic article markers (Ψ§ΩΩ Ψ§Ψ―Ψ© X) or English 'Article X' β for regulations. | |
| 2. Fall back to overlapping char-based chunking with sentence-end preference. | |
| """ | |
| size = CHUNK_SIZE_AR if is_arabic else CHUNK_SIZE_EN | |
| overlap = OVERLAP_AR if is_arabic else OVERLAP_EN | |
| markers = list(_ARTICLE_MARKER.finditer(text)) | |
| if len(markers) >= 2: | |
| segments = [] | |
| for i, m in enumerate(markers): | |
| seg_end = markers[i + 1].start() if i + 1 < len(markers) else len(text) | |
| segments.append(text[m.start():seg_end].strip()) | |
| chunks = [] | |
| for seg in segments: | |
| chunks.extend(_char_chunk(seg, size, overlap)) | |
| return [c for c in chunks if len(c) >= MIN_CHUNK_LEN] | |
| return [c for c in _char_chunk(text, size, overlap) if len(c) >= MIN_CHUNK_LEN] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Record builder | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _record(text: str, source: str, chunk_id: int, | |
| doc_type: str, section_title: str) -> Dict: | |
| return { | |
| "text": text, | |
| "source": source, | |
| "chunk_id": chunk_id, | |
| "language": detect_language(text), | |
| "was_translated": False, | |
| "doc_type": doc_type, | |
| "section_title": section_title, | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # File processors | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_docx(filepath: Path, doc_type: str) -> List[Dict]: | |
| """ | |
| Process DOCX with full structural awareness. | |
| Strategy: | |
| - Heading blocks mark section boundaries; heading text is prepended to the | |
| following paragraph buffer so every chunk carries section context. | |
| - Table rows are emitted as individual atomic records (they are already | |
| self-contained after header injection). | |
| - Consecutive paragraphs within the same section are buffered and then | |
| chunked semantically together. | |
| """ | |
| blocks = extract_docx_blocks(filepath) | |
| if not blocks: | |
| return [] | |
| records: List[Dict] = [] | |
| idx = 1 | |
| para_buf: List[str] = [] | |
| buf_section = "" | |
| def flush() -> None: | |
| nonlocal idx, para_buf | |
| if not para_buf: | |
| return | |
| combined = clean_text("\n".join(para_buf)) | |
| para_buf = [] | |
| if not combined: | |
| return | |
| is_ar = _arabic_dominant(combined) | |
| for chunk in chunk_semantic(combined, is_arabic=is_ar): | |
| if len(chunk) >= MIN_CHUNK_LEN: | |
| records.append(_record(chunk, filepath.name, idx, doc_type, buf_section)) | |
| idx += 1 | |
| for block in blocks: | |
| if block["is_heading"]: | |
| flush() | |
| buf_section = block["section_title"] | |
| para_buf.append(block["text"]) # heading text opens the next chunk for context | |
| elif block["is_table_row"]: | |
| # Table rows get their own atomic records (section boundary has no effect) | |
| flush() | |
| text = block["text"] | |
| if len(text) >= MIN_CHUNK_LEN: | |
| records.append(_record(text, filepath.name, idx, doc_type, | |
| block.get("section_title", ""))) | |
| idx += 1 | |
| else: | |
| # Regular paragraph β flush on section change | |
| if block["section_title"] != buf_section: | |
| if para_buf: | |
| flush() | |
| buf_section = block["section_title"] | |
| para_buf.append(block["text"]) | |
| flush() | |
| return records | |
| def process_pdf(filepath: Path, doc_type: str) -> List[Dict]: | |
| raw = extract_text_from_pdf(filepath) | |
| if not raw.strip(): | |
| print(f" [WARN] No text extracted from '{filepath.name}'.") | |
| return [] | |
| cleaned = clean_text(raw) | |
| if not cleaned: | |
| return [] | |
| is_ar = _arabic_dominant(cleaned) | |
| records = [] | |
| for idx, chunk in enumerate(chunk_semantic(cleaned, is_arabic=is_ar), start=1): | |
| if len(chunk) >= MIN_CHUNK_LEN: | |
| records.append(_record(chunk, filepath.name, idx, doc_type, "")) | |
| return records | |
| def process_file(filepath: Path) -> List[Dict]: | |
| suffix = filepath.suffix.lower() | |
| doc_type = detect_doc_type(filepath.name) | |
| print(f" β [{doc_type:<22}] '{filepath.name}' ...") | |
| if suffix == ".pdf": | |
| records = process_pdf(filepath, doc_type) | |
| elif suffix in (".docx", ".doc"): | |
| records = process_docx(filepath, doc_type) | |
| else: | |
| print(f" [SKIP] Unsupported format: {suffix}") | |
| return [] | |
| print(f" β {len(records)} chunks") | |
| return records | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main() -> None: | |
| print("=" * 70) | |
| print(" RAG Preprocessor v4 β Section + Table-aware + Semantic Chunking") | |
| print(f" English chunks: {CHUNK_SIZE_EN} chars | Arabic: {CHUNK_SIZE_AR} chars") | |
| print("=" * 70) | |
| if not INPUT_DIR.exists(): | |
| INPUT_DIR.mkdir(parents=True) | |
| print(f"\n[INFO] Created '{INPUT_DIR}/' β add your documents and re-run.\n") | |
| return | |
| files = [ | |
| f for f in INPUT_DIR.iterdir() | |
| if f.is_file() and f.suffix.lower() in {".pdf", ".docx", ".doc"} | |
| ] | |
| if not files: | |
| print(f"\n[INFO] No supported files found in '{INPUT_DIR}/'.\n") | |
| return | |
| print(f"\nFound {len(files)} file(s):\n") | |
| all_records: List[Dict] = [] | |
| for f in sorted(files): | |
| print(f"[FILE] {f.name}") | |
| all_records.extend(process_file(f)) | |
| print() | |
| if not all_records: | |
| print("[WARN] No records produced. Exiting.") | |
| return | |
| with open(OUTPUT_FILE, "w", encoding="utf-8") as fh: | |
| json.dump(all_records, fh, ensure_ascii=False, indent=2) | |
| ar = sum(1 for r in all_records if r["language"] == "Arabic") | |
| en = sum(1 for r in all_records if r["language"] == "English") | |
| mx = sum(1 for r in all_records if r["language"] == "Mixed") | |
| dtypes = Counter(r.get("doc_type", "general") for r in all_records) | |
| print("=" * 70) | |
| print(f" β {len(all_records)} total chunks β '{OUTPUT_FILE}'") | |
| print(f" Arabic: {ar} | English: {en} | Mixed: {mx}") | |
| print(f"\n Breakdown by document type:") | |
| for dt, cnt in dtypes.most_common(): | |
| print(f" {dt:<22}: {cnt:>4} chunks") | |
| print("=" * 70) | |
| if __name__ == "__main__": | |
| main() | |