kasitbot / rag_preprocessor.py
snygginghani's picture
Deploy KASITBot RAG chatbot
71e1c4b
"""
================================================================================
RAG Document Preprocessing Pipeline β€” v4 (Structural + Semantic Awareness)
University-Level NLP System β€” KASIT Faculty Assistant
================================================================================
KEY IMPROVEMENTS vs v3:
βœ… Section/heading-aware DOCX chunking β€” Heading styles mark section
boundaries; the section title is injected into every chunk so the LLM
always knows which part of the document a passage belongs to.
βœ… Table-aware extraction β€” detects the header row and prepends column names
to every data row, making each row self-contained and searchable.
E.g. "Date: March 11 | Time: 9:00 AM | Course Code: 1902214 | ..."
This is critical for exam schedules, office-hours tables and fee tables.
βœ… Document-type detection β€” filename-based routing assigns a doc_type label
(exam_schedule, office_hours, study_plan, scholarship, regulation, …)
to every chunk so the LLM can interpret context correctly.
βœ… Arabic-aware chunk sizing β€” 700 chars for Arabic (denser script),
500 chars for English, matching proportional reading units.
βœ… Semantic split for regulation docs β€” splits at article markers
(Ψ§Ω„Ω…Ψ§Ψ―Ψ© X / Article X) before falling back to char-based chunking,
so each article stays together and is not truncated mid-clause.
βœ… Minimum chunk length filter β€” drops noise fragments shorter than 60 chars.
βœ… Rich per-chunk metadata: doc_type + section_title in every record.
================================================================================
"""
import json
import re
import unicodedata
from collections import Counter
from pathlib import Path
from typing import Dict, List
import fitz # PyMuPDF
from docx import Document
from docx.oxml.ns import qn
from docx.table import Table as DocxTable
from docx.text.paragraph import Paragraph as DocxParagraph
from langdetect import LangDetectException, detect
# ── Paths & tunables ──────────────────────────────────────────────────────────
INPUT_DIR = Path("input_documents")
OUTPUT_FILE = Path("rag_dataset.json")
CHUNK_SIZE_EN = 500 # chars β€” English (lower density)
CHUNK_SIZE_AR = 700 # chars β€” Arabic (higher glyph density per char)
OVERLAP_EN = 100
OVERLAP_AR = 150
MIN_CHUNK_LEN = 60 # drop fragments shorter than this
# ══════════════════════════════════════════════════════════════════════════════
# Language helpers
# ══════════════════════════════════════════════════════════════════════════════
def detect_language(text: str) -> str:
if not text or not text.strip():
return "Unknown"
arabic_chars = len(re.findall(r"[Ψ€-ΫΏ]", text))
latin_chars = len(re.findall(r"[A-Za-z]", text))
total = arabic_chars + latin_chars
if total == 0:
return "Unknown"
ratio = arabic_chars / total
if ratio > 0.6:
return "Arabic"
if ratio < 0.1:
try:
code = detect(text)
return "English" if code == "en" else code.upper()
except LangDetectException:
return "English"
return "Mixed"
def _arabic_dominant(text: str) -> bool:
alpha = [c for c in text if c.isalpha()]
if not alpha:
return False
return sum(1 for c in alpha if "Ψ€" <= c <= "ΫΏ") / len(alpha) > 0.4
# ══════════════════════════════════════════════════════════════════════════════
# Document-type detection (filename-based)
# ══════════════════════════════════════════════════════════════════════════════
_DOC_TYPE_MAP: List[tuple] = [
("exam_schedule", ["mid_exam", "exam_schedul", "final_exam"]),
("office_hours", ["office_hours", "office hours", "proffs"]),
("academic_calendar", ["calendar", "uni_cal", "academic_cal"]),
("study_plan", ["study plan", "study_plan"]),
("course_records", ["course record", "course_record"]),
("departments", ["department", "majors", "departments nad"]),
("admissions_fees", ["admission", "fees_rag", "admissions_fees"]),
("scholarship", ["makruma", "Ω…ΩƒΨ±Ω…Ψ©", "teachers_grant",
"ashaer", "Ψ§Ω„Ψ¬ΩŠΨ΄", "Ψ«Ω„Ψ§Ψ«", "moalim"]),
("regulation", ["ΨͺΨΉΩ„ΩŠΩ…Ψ§Ψͺ", "Ω‚Ψ§Ω†ΩˆΩ†", "Ψ―Ω„ΩŠΩ„_Ψ§ΨΉΨΆΨ§Ψ‘", "Ψ―Ω„ΩŠΩ„ Ψ§ΨΉΨΆΨ§Ψ‘"]),
("knowledge_base", ["knowledge_base", "kasit_knowledge"]),
("faculty_info", ["faculty_it", "faculty_infor"]),
("curriculum", ["curriculum", "ai-english", "ds-english", "ai_curriculum"]),
("careers", ["career"]),
("contacts", ["email", "docs_email"]),
("english_system", ["english_sys"]),
]
def detect_doc_type(filename: str) -> str:
name = filename.lower()
for dtype, patterns in _DOC_TYPE_MAP:
if any(p in name for p in patterns):
return dtype
return "general"
# ══════════════════════════════════════════════════════════════════════════════
# Text cleaning
# ══════════════════════════════════════════════════════════════════════════════
_KEEP = re.compile(
r"[^Ψ€-ۿݐ-ݿﭐ-ο·ΏοΉ°-ο»Ώ"
r"A-Za-z0-9\s\.,;:!?\-\(\)\[\]\"\'ΨŒΨŸΨ›/\\@#%&*+=<>\|_]"
)
def clean_text(text: str) -> str:
if not text:
return ""
text = unicodedata.normalize("NFC", text)
text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]", " ", text)
text = _KEEP.sub(" ", text)
text = re.sub(r" {3,}", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
# ══════════════════════════════════════════════════════════════════════════════
# DOCX: structured block extraction (body-order traversal)
# ══════════════════════════════════════════════════════════════════════════════
_DATA_VALUE_RE = re.compile(
r"\d{2,4}[:/]\d{2}" # times 09:00 or 1:30
r"|\d{1,2}[-/]\d{1,2}" # short dates 3/11
r"|Ψ΅Ψ¨Ψ§Ψ­|Ω…Ψ³Ψ§Ψ‘|\bAM\b|\bPM\b" # AM / PM in either script
)
def _first_row_is_header(cells: List[str]) -> bool:
"""Heuristic: the first table row is a header when its cells are short
labels (< 35 chars average) and none of them contain a data-value pattern
(times, dates, AM/PM)."""
if not cells:
return False
if any(_DATA_VALUE_RE.search(c) for c in cells):
return False
return (sum(len(c) for c in cells) / len(cells)) < 35
def _table_to_blocks(table: DocxTable, section: str) -> List[Dict]:
"""
Convert a DOCX table to self-contained text blocks.
If a header row is detected, each data row becomes:
"ColName: value | ColName: value | ..."
This makes every row independently searchable β€” critical for exam
schedules (Date / Time / Course / Professor / Room) and fee tables.
"""
rows: List[List[str]] = []
for row in table.rows:
seen: set = set()
cells: List[str] = []
for cell in row.cells:
t = cell.text.strip()
if t and t not in seen:
cells.append(t)
seen.add(t)
if cells:
rows.append(cells)
if not rows:
return []
headers = rows[0] if _first_row_is_header(rows[0]) else []
data_rows = rows[1:] if headers else rows
blocks = []
for row_cells in data_rows:
if not row_cells:
continue
if headers:
parts = []
for i, val in enumerate(row_cells):
col = headers[i] if i < len(headers) else f"col{i + 1}"
parts.append(f"{col}: {val}")
text = " | ".join(parts)
else:
text = " | ".join(row_cells)
text = clean_text(text)
if len(text) >= MIN_CHUNK_LEN:
blocks.append({
"text": text,
"section_title": section,
"is_table_row": True,
"is_heading": False,
})
return blocks
def extract_docx_blocks(filepath: Path) -> List[Dict]:
"""
Walk the DOCX body in document order (paragraphs AND tables interleaved),
track the current section heading, and return a list of raw blocks.
Each block: {text, section_title, is_table_row, is_heading}
"""
try:
doc = Document(str(filepath))
except Exception as exc:
print(f" [ERROR] Cannot open DOCX '{filepath.name}': {exc}")
return []
blocks: List[Dict] = []
section = ""
for child in doc.element.body:
tag = child.tag
if tag == qn("w:p"):
para = DocxParagraph(child, doc)
text = para.text.strip()
if not text:
continue
is_heading = False
try:
style = para.style.name or ""
is_heading = style.lower().startswith("heading")
except Exception:
pass
if is_heading:
section = text
blocks.append({
"text": text,
"section_title": text,
"is_table_row": False,
"is_heading": True,
})
else:
blocks.append({
"text": text,
"section_title": section,
"is_table_row": False,
"is_heading": False,
})
elif tag == qn("w:tbl"):
table = DocxTable(child, doc)
for b in _table_to_blocks(table, section):
blocks.append(b)
return blocks
# ══════════════════════════════════════════════════════════════════════════════
# PDF extraction
# ══════════════════════════════════════════════════════════════════════════════
def extract_text_from_pdf(filepath: Path) -> str:
parts: List[str] = []
try:
doc = fitz.open(str(filepath))
except Exception as exc:
print(f" [ERROR] Cannot open PDF '{filepath.name}': {exc}")
return ""
for page_num, page in enumerate(doc, start=1):
try:
blocks = sorted(page.get_text("blocks"), key=lambda b: (b[1], b[0]))
for block in blocks:
if block[4].strip():
parts.append(block[4])
except Exception as exc:
print(f" [WARN] Page {page_num} of '{filepath.name}' skipped: {exc}")
doc.close()
return "\n".join(parts)
# ══════════════════════════════════════════════════════════════════════════════
# Semantic chunking
# ══════════════════════════════════════════════════════════════════════════════
_ARTICLE_MARKER = re.compile(r"(?:^|\n)((?:Ψ§Ω„Ω…Ψ§Ψ―Ψ©|Article)\s+\d+)", re.IGNORECASE)
_SENT_END = re.compile(r"[.!?؟\n]")
def _char_chunk(text: str, size: int, overlap: int) -> List[str]:
if not text:
return []
chunks: List[str] = []
start, n = 0, len(text)
while start < n:
end = min(start + size, n)
if end < n:
m = list(_SENT_END.finditer(text, start, end))
if m:
end = m[-1].end()
else:
sp = text.rfind(" ", start, end)
if sp > start:
end = sp
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - overlap if end - overlap > start else end
return chunks
def chunk_semantic(text: str, is_arabic: bool = False) -> List[str]:
"""
Split text respecting structural boundaries:
1. Arabic article markers (Ψ§Ω„Ω…Ψ§Ψ―Ψ© X) or English 'Article X' β€” for regulations.
2. Fall back to overlapping char-based chunking with sentence-end preference.
"""
size = CHUNK_SIZE_AR if is_arabic else CHUNK_SIZE_EN
overlap = OVERLAP_AR if is_arabic else OVERLAP_EN
markers = list(_ARTICLE_MARKER.finditer(text))
if len(markers) >= 2:
segments = []
for i, m in enumerate(markers):
seg_end = markers[i + 1].start() if i + 1 < len(markers) else len(text)
segments.append(text[m.start():seg_end].strip())
chunks = []
for seg in segments:
chunks.extend(_char_chunk(seg, size, overlap))
return [c for c in chunks if len(c) >= MIN_CHUNK_LEN]
return [c for c in _char_chunk(text, size, overlap) if len(c) >= MIN_CHUNK_LEN]
# ══════════════════════════════════════════════════════════════════════════════
# Record builder
# ══════════════════════════════════════════════════════════════════════════════
def _record(text: str, source: str, chunk_id: int,
doc_type: str, section_title: str) -> Dict:
return {
"text": text,
"source": source,
"chunk_id": chunk_id,
"language": detect_language(text),
"was_translated": False,
"doc_type": doc_type,
"section_title": section_title,
}
# ══════════════════════════════════════════════════════════════════════════════
# File processors
# ══════════════════════════════════════════════════════════════════════════════
def process_docx(filepath: Path, doc_type: str) -> List[Dict]:
"""
Process DOCX with full structural awareness.
Strategy:
- Heading blocks mark section boundaries; heading text is prepended to the
following paragraph buffer so every chunk carries section context.
- Table rows are emitted as individual atomic records (they are already
self-contained after header injection).
- Consecutive paragraphs within the same section are buffered and then
chunked semantically together.
"""
blocks = extract_docx_blocks(filepath)
if not blocks:
return []
records: List[Dict] = []
idx = 1
para_buf: List[str] = []
buf_section = ""
def flush() -> None:
nonlocal idx, para_buf
if not para_buf:
return
combined = clean_text("\n".join(para_buf))
para_buf = []
if not combined:
return
is_ar = _arabic_dominant(combined)
for chunk in chunk_semantic(combined, is_arabic=is_ar):
if len(chunk) >= MIN_CHUNK_LEN:
records.append(_record(chunk, filepath.name, idx, doc_type, buf_section))
idx += 1
for block in blocks:
if block["is_heading"]:
flush()
buf_section = block["section_title"]
para_buf.append(block["text"]) # heading text opens the next chunk for context
elif block["is_table_row"]:
# Table rows get their own atomic records (section boundary has no effect)
flush()
text = block["text"]
if len(text) >= MIN_CHUNK_LEN:
records.append(_record(text, filepath.name, idx, doc_type,
block.get("section_title", "")))
idx += 1
else:
# Regular paragraph β€” flush on section change
if block["section_title"] != buf_section:
if para_buf:
flush()
buf_section = block["section_title"]
para_buf.append(block["text"])
flush()
return records
def process_pdf(filepath: Path, doc_type: str) -> List[Dict]:
raw = extract_text_from_pdf(filepath)
if not raw.strip():
print(f" [WARN] No text extracted from '{filepath.name}'.")
return []
cleaned = clean_text(raw)
if not cleaned:
return []
is_ar = _arabic_dominant(cleaned)
records = []
for idx, chunk in enumerate(chunk_semantic(cleaned, is_arabic=is_ar), start=1):
if len(chunk) >= MIN_CHUNK_LEN:
records.append(_record(chunk, filepath.name, idx, doc_type, ""))
return records
def process_file(filepath: Path) -> List[Dict]:
suffix = filepath.suffix.lower()
doc_type = detect_doc_type(filepath.name)
print(f" β†’ [{doc_type:<22}] '{filepath.name}' ...")
if suffix == ".pdf":
records = process_pdf(filepath, doc_type)
elif suffix in (".docx", ".doc"):
records = process_docx(filepath, doc_type)
else:
print(f" [SKIP] Unsupported format: {suffix}")
return []
print(f" βœ“ {len(records)} chunks")
return records
# ══════════════════════════════════════════════════════════════════════════════
# Main
# ══════════════════════════════════════════════════════════════════════════════
def main() -> None:
print("=" * 70)
print(" RAG Preprocessor v4 β€” Section + Table-aware + Semantic Chunking")
print(f" English chunks: {CHUNK_SIZE_EN} chars | Arabic: {CHUNK_SIZE_AR} chars")
print("=" * 70)
if not INPUT_DIR.exists():
INPUT_DIR.mkdir(parents=True)
print(f"\n[INFO] Created '{INPUT_DIR}/' β€” add your documents and re-run.\n")
return
files = [
f for f in INPUT_DIR.iterdir()
if f.is_file() and f.suffix.lower() in {".pdf", ".docx", ".doc"}
]
if not files:
print(f"\n[INFO] No supported files found in '{INPUT_DIR}/'.\n")
return
print(f"\nFound {len(files)} file(s):\n")
all_records: List[Dict] = []
for f in sorted(files):
print(f"[FILE] {f.name}")
all_records.extend(process_file(f))
print()
if not all_records:
print("[WARN] No records produced. Exiting.")
return
with open(OUTPUT_FILE, "w", encoding="utf-8") as fh:
json.dump(all_records, fh, ensure_ascii=False, indent=2)
ar = sum(1 for r in all_records if r["language"] == "Arabic")
en = sum(1 for r in all_records if r["language"] == "English")
mx = sum(1 for r in all_records if r["language"] == "Mixed")
dtypes = Counter(r.get("doc_type", "general") for r in all_records)
print("=" * 70)
print(f" βœ… {len(all_records)} total chunks β†’ '{OUTPUT_FILE}'")
print(f" Arabic: {ar} | English: {en} | Mixed: {mx}")
print(f"\n Breakdown by document type:")
for dt, cnt in dtypes.most_common():
print(f" {dt:<22}: {cnt:>4} chunks")
print("=" * 70)
if __name__ == "__main__":
main()