| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import re |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def arabic_preprocessing(text: str) -> str: |
| """Remove Arabic diacritics to normalize input for the model.""" |
| arabic_diacritics = re.compile(r'[\u064B-\u0652]') |
| return re.sub(arabic_diacritics, '', text).strip() |
|
|
|
|
| def arabic_postprocessing(text: str) -> str: |
| """ |
| Typographic cleanup and punctuation normalization after model inference. |
| Handles: bracket spacing, duplicate marks, chunk-join artifacts, etc. |
| """ |
| if not text: |
| return text |
|
|
| |
| text = re.sub(r'(?<=\d),(?=\d)', '٪TEMP_COMMA٪', text) |
| text = re.sub(r'(?<=\d):(?=\d)', '٪TEMP_COLON٪', text) |
|
|
| |
| text = text.replace(',', '،').replace(';', '؛').replace('?', '؟') |
|
|
| |
| text = re.sub(r'\(\s+', '(', text) |
| text = re.sub(r'\s+\)', ')', text) |
| text = re.sub(r'\[\s+', '[', text) |
| text = re.sub(r'\s+\]', ']', text) |
| text = re.sub(r'«\s+', '«', text) |
| text = re.sub(r'\s+»', '»', text) |
|
|
| |
| text = re.sub(r'([،؛:!؟])\1+', r'\1', text) |
| text = re.sub(r'\.{4,}', '...', text) |
|
|
| |
| text = re.sub(r'[،؛:]+([.!؟])', r'\1', text) |
| text = re.sub(r'،؛|؛،', '؛', text) |
| text = re.sub(r'([!؟])\.', r'\1', text) |
|
|
| |
| |
| text = re.sub(r'[؛:]\s*(التي|الذي|الذين|اللتان|اللذان|اللاتي|اللواتي)', r' \1', text) |
| |
| |
| |
| def _fix_misplaced(m): |
| verb, col1, name, col2 = m.groups() |
| if col1 == ':': |
| return f"{verb}: {name}" |
| if col2 == ':': |
| return f"{verb} {name}:" |
| return m.group(0) |
| |
| text = re.sub( |
| r'\b([وفلس]?(?:قال|يقول|قالت|تقول|أجاب|أجابت|سأل|سألت|أخبر|أخبرت|صرح|صرحت|أضاف|أضافت|أردف|أردفت))(:?)\s+(ال[أ-ي]+|أحمد|محمد|محمود|علي|عمر|خالد|فاطمة|مريم|عائشة|خديجة)\b(:?)', |
| _fix_misplaced, text |
| ) |
|
|
| |
| _ALLOWED_COLON_CUES = r'^[وفلس]?(قال|يقول|قالت|تقول|أجاب|أجابت|سأل|سألت|أخبر|أخبرت|صرح|صرحت|أضاف|أضافت|أردف|أردفت|وضح|وضحت|أوضح|أوضحت|رد|ردت|التالي|الآتي|مثال|ملاحظة|تنبيه|تحذير|قائلا|قائلة|اسم|العمر|تاريخ|رقم|عاجل|الآتية|التالية)$' |
| |
| def _colon_guard(match): |
| context = match.group(1) |
| colon = match.group(2) |
| |
| words = re.findall(r'[\u0600-\u06FFa-zA-Z]+', context) |
| if not words: |
| return match.group(0) |
| |
| prev_word = words[-1] |
| last_6_words = words[-6:] |
| |
| if any(re.match(_ALLOWED_COLON_CUES, w) for w in last_6_words): |
| return match.group(0) |
| |
| if prev_word.startswith(('ال', 'لل', 'بال', 'فال', 'وال', 'كال')): |
| return match.group(0) |
| |
| return context + ' ' |
| |
| text = re.sub(r'([^:]+)(:)', _colon_guard, text) |
| |
| |
| text = re.sub(r'\b(يقدر|يستطيع|يمكن|يجب|ينبغي|يعتبر|يعد|يرى|يعتقد)\s*:', r'\1 ', text) |
| |
| text = re.sub(r'؛\s*(و|ف|ثم|أو|أم|بل)\b', r'، \1', text) |
|
|
| |
| text = re.sub(r'^[،؛:!؟. \t]+', '', text) |
|
|
| |
| text = re.sub(r'([،؛:!؟.])(?=\S)', r'\1 ', text) |
|
|
| |
| text = text.replace('٪TEMP_COMMA٪', ',').replace('٪TEMP_COLON٪', ':') |
|
|
| |
| text = re.sub(r'\s+([،؛:!؟.])', r'\1', text) |
|
|
| |
| text = re.sub(r'[ \t]+', ' ', text).strip() |
| return text |
|
|
|
|
| |
| |
| |
|
|
| ARABIC_PUNCT_CHARS = set('.,،؛؟!:;?!') |
| MAX_PUNCT_DELTA = 3 |
| MAX_PUNCT_DELTA_SHORT = 1 |
| MAX_PUNCT_RATIO = 0.5 |
|
|
| |
| _EXCL_CUES = {'يا', 'ما', 'كم', 'لا', 'هل', 'أين', 'متى', |
| 'كيف', 'لماذا', 'ماذا', 'أي', 'لعل', 'ليت'} |
|
|
|
|
| def _normalize_for_comparison(text: str) -> str: |
| """ |
| Normalize Arabic for safe comparison. |
| Only removes diacritics to prevent punctuation model from stripping harakat. |
| Does NOT fold hamza/ya/ta-marbuta to ensure we catch spelling regressions! |
| """ |
| |
| text = re.sub(r'[\u064B-\u0652]', '', text) |
| return text |
|
|
|
|
| def validate_punctuation_diff(diff: dict, full_text: str = '') -> bool: |
| """ |
| Return True ONLY if the diff is a safe punctuation-only change. |
| |
| ALLOWED: |
| - Inserting 1 punctuation mark (short text) or 1–3 (long text) |
| - Replacing one punctuation mark with another |
| - Adding terminal punctuation to any text (1+ words) that lacks it |
| - Adding ؟/! to short texts (< 3 words) ONLY with cue words |
| |
| REJECTED: |
| - Adding/deleting/duplicating Arabic words |
| - Rewriting phrases |
| - Excessive punctuation repetition (3+ consecutive identical) |
| - Punctuation spam: delta/word_count > 0.5 (multi-word diffs) |
| - Short text (≤2 words): delta > 1 |
| - Any diff: delta > MAX_PUNCT_DELTA |
| - Adding terminal punctuation when text already ends with punct |
| - Adding ؟/! to short texts without interrogative/exclamatory cues |
| """ |
| original = diff.get('original', '') |
| correction = diff.get('correction', '') |
|
|
| |
| |
| if re.search(r'[a-zA-Z]|\{|\[|<|#|@|://', original): |
| is_at_end = False |
| if full_text and 'end' in diff: |
| is_at_end = diff['end'] >= len(full_text) - 2 |
| elif not full_text: |
| is_at_end = True |
| |
| orig_punct = sum(1 for c in original if c in '.,،؛؟!:;?!') |
| corr_punct = sum(1 for c in correction if c in '.,،؛؟!:;?!') |
| |
| |
| if corr_punct > orig_punct and not is_at_end: |
| logger.info(f"[PUNC-SAFETY] Blocked mid-sentence punctuation on structured data: '{original}' -> '{correction}'") |
| return False |
| |
| |
| if re.search(r'[a-zA-Z]|\{|\[|<|#|@|://', original): |
| |
| if original != correction and not (is_at_end and correction.endswith(('.', '؟')) and correction[:-1].rstrip() == original.rstrip()): |
| logger.info(f"[PUNC-SAFETY] Blocked corruption of JSON/Code/URL: '{original}' -> '{correction}'") |
| return False |
| correction = diff.get('correction', '') |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| TERMINAL_PUNCT = set('.,،؛؟!:;?!') |
| orig_stripped = original.rstrip() |
| corr_stripped = correction.rstrip() |
| if orig_stripped and corr_stripped: |
| |
| orig_alpha_r0 = re.sub(r'[.,،؛؟!:;?\s]', '', original) |
| corr_alpha_r0 = re.sub(r'[.,،؛؟!:;?\s]', '', correction) |
| if (_normalize_for_comparison(orig_alpha_r0) == |
| _normalize_for_comparison(corr_alpha_r0)): |
| |
| orig_punct_end = sum(1 for c in original if c in TERMINAL_PUNCT) |
| corr_punct_end = sum(1 for c in correction if c in TERMINAL_PUNCT) |
| if corr_punct_end > orig_punct_end: |
| |
| orig_no_punct = re.sub(r'[.,،؛؟!:;?!]+$', '', original) |
| corr_no_punct = re.sub(r'[.,،؛؟!:;?!]+$', '', correction) |
| if _normalize_for_comparison(orig_no_punct.replace(' ', '')) == \ |
| _normalize_for_comparison(corr_no_punct.replace(' ', '')): |
| |
| is_at_end = False |
| if full_text and 'end' in diff: |
| is_at_end = diff['end'] >= len(full_text) - 2 |
| elif not full_text: |
| is_at_end = True |
| |
| if not is_at_end: |
| |
| pass |
| else: |
| |
| |
| _word_count_source = full_text if full_text else original |
| _full_word_count = len(re.findall( |
| r'[\u0600-\u06FFa-zA-Z]+', _word_count_source |
| )) |
| _full_already_has_terminal = bool( |
| re.search(r'[.،؛؟!?!][\s]*$', full_text) |
| ) if full_text else False |
| _full_has_ellipsis = full_text.rstrip().endswith('...') if full_text else False |
| |
| |
| if _full_word_count >= 1 and not _full_already_has_terminal and not _full_has_ellipsis: |
| |
| |
| |
| |
| |
| _added_punct = correction[len(orig_stripped):] |
| if _full_word_count < 3 and ('!' in _added_punct or '؟' in _added_punct): |
| _text_to_scan = full_text if full_text else original |
| _has_cue = any(w in _EXCL_CUES for w in _text_to_scan.split()) |
| if not _has_cue: |
| logger.info( |
| f"[PUNC-SAFETY] Blocked !/؟ on short text without cue: " |
| f"'{original}' → '{correction}'" |
| ) |
| return False |
| |
| logger.info( |
| f"[PUNC-SAFETY] Allowed terminal punct for sentence " |
| f"({_full_word_count} words): " |
| f"'{original}' → '{correction}'" |
| ) |
| |
| else: |
| |
| logger.info( |
| f"[PUNC-SAFETY] TerminalPunctuationGuard triggered: removing trailing punctuation " |
| f"'{original}' → '{correction}'" |
| ) |
| return False |
|
|
|
|
|
|
| |
| orig_alpha = re.sub(r'[.,،؛؟!:;?\s]', '', original) |
| corr_alpha = re.sub(r'[.,،؛؟!:;?\s]', '', correction) |
|
|
| if _normalize_for_comparison(orig_alpha) != _normalize_for_comparison(corr_alpha): |
| return False |
|
|
| |
| if re.search(r'([.,،؛؟!:;?])\1{2,}', correction): |
| return False |
|
|
| |
| orig_punct_count = sum(1 for c in original if c in ARABIC_PUNCT_CHARS) |
| corr_punct_count = sum(1 for c in correction if c in ARABIC_PUNCT_CHARS) |
| punct_delta = max(0, corr_punct_count - orig_punct_count) |
| word_count = len(re.findall(r'[\u0600-\u06FFa-zA-Z]+', correction)) or 1 |
|
|
| |
| if word_count <= 2 and punct_delta > MAX_PUNCT_DELTA_SHORT: |
| return False |
|
|
| |
| if word_count > 2 and punct_delta / word_count > MAX_PUNCT_RATIO: |
| return False |
|
|
| |
| if punct_delta > MAX_PUNCT_DELTA: |
| return False |
|
|
| return True |
|
|