| """ |
| Flask backend server for Arabic text summarization. |
| Provides API endpoints for the Bayan web application. |
| """ |
|
|
| import os |
| import logging |
| import time |
| from flask import Flask, request, jsonify, Response |
| from flask_cors import CORS |
| from pathlib import Path |
| import traceback |
| import difflib |
| import re |
|
|
| |
| import sys |
| _quran_root = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| sys.path.insert(0, _quran_root) |
| try: |
| from quran import search_bayan |
| logger_quran_ok = True |
| except Exception as _quran_err: |
| logger_quran_ok = False |
| import logging as _ql |
| _ql.getLogger('app').warning(f'[QURAN] Failed to import quran module: {_quran_err}') |
| _ql.getLogger('app').warning(f'[QURAN] Searched path: {_quran_root}') |
| _ql.getLogger('app').warning(f'[QURAN] Files in root: {os.listdir(_quran_root) if os.path.isdir(_quran_root) else "DIR NOT FOUND"}') |
|
|
| |
| from nlp.pipeline_context import PipelineContext |
| from nlp.punctuation.punctuation_rules import validate_punctuation_diff |
|
|
| |
| try: |
| from dotenv import load_dotenv |
| _env_path = Path(__file__).parent.parent / '.env' |
| load_dotenv(dotenv_path=_env_path) |
| except ImportError: |
| pass |
|
|
| SUPABASE_URL = os.environ.get('SUPABASE_URL', '') |
| SUPABASE_ANON_KEY = os.environ.get('SUPABASE_ANON_KEY', '') |
|
|
| from model_loader import ( |
| SummarizationModel, |
| SpellingModel, |
| AutocompleteModel, |
| GrammarModel, |
| PunctuationModel, |
| SUMMARIZATION_PATH, |
| SPELLING_PATH, |
| AUTOCOMPLETE_PATH, |
| GRAMMAR_PATH, |
| PUNCTUATION_PATH |
| ) |
|
|
| |
| from hf_inference import ( |
| hf_summarize, |
| hf_correct_spelling, |
| hf_add_punctuation, |
| hf_autocomplete, |
| check_hf_api_available, |
| ) |
|
|
| HUGGINGFACE_SUMMARIZATION_REPO = os.environ.get( |
| "SUMMARIZATION_REPO_ID", |
| "bayan10/summarization-model", |
| ) |
|
|
| |
| |
| HF_API_TOKEN = os.environ.get('HF_API_TOKEN', '') |
| USE_HF_API = bool(HF_API_TOKEN) |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| app = Flask(__name__, static_folder='.', static_url_path='') |
| CORS(app, resources={r"/api/*": {"origins": "*"}}) |
|
|
| |
| MAX_TEXT_LENGTH = 5000 |
| MAX_SUMMARY_LENGTH = 512 |
| MIN_TEXT_LENGTH = 10 |
|
|
| |
| summarization_model = None |
| spelling_model = None |
| autocomplete_model = None |
| grammar_model = None |
| punctuation_model = None |
|
|
| |
| |
| _DIRECTIONAL_BLOCKS = { |
| |
| 'هذه': {'هذة'}, |
| 'هذا': {'هذة', 'هذه'}, |
| |
| 'كان': {'كأن'}, |
| 'كأن': {'كان'}, |
| 'كانت': {'كأنت'}, |
| 'كانوا': {'كأنوا'}, |
| |
| 'إلى': {'على', 'علي'}, |
| 'على': {'إلى', 'علي'}, |
| 'علي': {'على'}, |
| |
| 'لكن': {'لاكن'}, |
| |
| 'ذلك': {'ذالك'}, |
| |
| 'عمله': {'عملة'}, |
| 'لسانه': {'لسانة'}, |
| 'بيته': {'بيتة'}, |
| 'كتابه': {'كتابة'}, |
| } |
|
|
|
|
| def load_models(): |
| """Load models. In HF API mode, load summarization locally; other models gracefully degrade.""" |
| global summarization_model, spelling_model, autocomplete_model, grammar_model, punctuation_model |
| |
| if USE_HF_API: |
| logger.info("HF_API_TOKEN is set — HF API mode enabled") |
| logger.info("NOTE: HF Spaces free tier has NO outbound DNS. Loading summarization model locally.") |
| logger.info("Spelling, punctuation, autocomplete will gracefully degrade (return input unchanged).") |
| |
| |
| loaded = [] |
| failed = [] |
| |
| |
| global _startup_errors |
| _startup_errors = [] |
|
|
| |
| try: |
| logger.info(f"Loading summarization model from Hugging Face: {HUGGINGFACE_SUMMARIZATION_REPO}") |
| try: |
| summarization_model = SummarizationModel(HUGGINGFACE_SUMMARIZATION_REPO) |
| except Exception as remote_error: |
| logger.warning(f"Remote load failed, falling back to local model: {remote_error}") |
| _startup_errors.append(f"remote_load: {str(remote_error)[:200]}") |
| logger.info(f"Loading summarization model from local path: {SUMMARIZATION_PATH}") |
| summarization_model = SummarizationModel(SUMMARIZATION_PATH) |
| loaded.append("summarization") |
| logger.info("Summarization model loaded successfully") |
| except Exception as e: |
| import traceback |
| err_detail = traceback.format_exc() |
| failed.append(("summarization", str(e))) |
| _startup_errors.append(f"summarization_load_failed: {err_detail[-500:]}") |
| logger.error(f"Failed to load summarization model: {str(e)}") |
|
|
| logger.info(f"Models loaded: {loaded}") |
| if failed: |
| logger.warning(f"Models failed to load: {[f[0] for f in failed]}") |
|
|
| return len(loaded) > 0 |
|
|
| _startup_errors = [] |
|
|
|
|
| @app.route('/') |
| def index(): |
| """Serve the main HTML file with Supabase credentials injected.""" |
| html_path = Path(__file__).parent / 'index.html' |
| html = html_path.read_text(encoding='utf-8') |
|
|
| |
| html = html.replace( |
| '<meta name="supabase-url" content="">', |
| f'<meta name="supabase-url" content="{SUPABASE_URL}">' |
| ) |
| html = html.replace( |
| '<meta name="supabase-anon-key" content="">', |
| f'<meta name="supabase-anon-key" content="{SUPABASE_ANON_KEY}">' |
| ) |
|
|
| return Response(html, mimetype='text/html') |
|
|
|
|
| @app.route('/api/health', methods=['GET']) |
| def health_check(): |
| """Health check endpoint for production monitoring.""" |
| if USE_HF_API: |
| health = { |
| 'status': 'healthy', |
| 'mode': 'hf_spaces_local', |
| 'models': { |
| 'summarization': summarization_model is not None, |
| 'spelling': _spelling_available(), |
| 'autocomplete': _autocomplete_available(), |
| 'grammar': _grammar_available(), |
| 'punctuation': _punctuation_available(), |
| 'dialect': _dialect_available() |
| }, |
| 'note': 'Free tier: summarization local, other models return input unchanged', |
| 'supabase': { |
| 'configured': bool(SUPABASE_URL and SUPABASE_ANON_KEY), |
| }, |
| 'environment': 'huggingface_spaces', |
| } |
| status_code = 200 if summarization_model is not None else 503 |
| return jsonify(health), status_code |
| |
| health = { |
| 'status': 'healthy', |
| 'mode': 'local_models', |
| 'models': { |
| 'summarization': summarization_model is not None, |
| 'spelling': spelling_model is not None, |
| 'autocomplete': autocomplete_model is not None, |
| 'grammar': grammar_model is not None, |
| 'punctuation': punctuation_model is not None, |
| 'dialect': _dialect_available() |
| }, |
| 'supabase': { |
| 'configured': bool(SUPABASE_URL and SUPABASE_ANON_KEY), |
| }, |
| 'environment': 'render' if os.environ.get('RENDER') else 'local', |
| } |
| status_code = 200 if health['models']['summarization'] else 503 |
| return jsonify(health), status_code |
|
|
|
|
| @app.route('/api/debug-models', methods=['GET']) |
| def debug_models(): |
| """Debug endpoint: report model status and startup errors.""" |
| from hf_inference import debug_test_all_models |
| results = debug_test_all_models() |
| |
| |
| import os |
| try: |
| import resource |
| mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss |
| mem_info = f"{mem} KB" |
| except Exception: |
| mem_info = "N/A" |
| |
| |
| proc_mem = {} |
| try: |
| with open('/proc/meminfo', 'r') as f: |
| for line in f: |
| if any(k in line for k in ['MemTotal', 'MemFree', 'MemAvailable', 'SwapTotal']): |
| parts = line.split() |
| proc_mem[parts[0].rstrip(':')] = parts[1] + ' ' + (parts[2] if len(parts) > 2 else '') |
| except Exception: |
| proc_mem = {"error": "cannot read /proc/meminfo"} |
| |
| return jsonify({ |
| 'status': 'debug', |
| 'hf_api_token_set': bool(HF_API_TOKEN), |
| 'summarization_model_loaded': summarization_model is not None, |
| 'startup_errors': _startup_errors, |
| 'memory': mem_info, |
| 'proc_meminfo': proc_mem, |
| 'models': results, |
| }), 200 |
|
|
|
|
| def _spelling_available(): |
| """Check if spelling model is loaded (without triggering lazy load).""" |
| try: |
| from nlp.spelling.araspell_service import is_loaded |
| return is_loaded() |
| except Exception: |
| return False |
|
|
|
|
| def _grammar_available(): |
| """Check if grammar model is loaded (without triggering lazy load).""" |
| try: |
| from nlp.grammar.grammar_service import is_loaded |
| return is_loaded() |
| except Exception: |
| return False |
|
|
|
|
| def _punctuation_available(): |
| """Check if punctuation model is loaded (without triggering lazy load).""" |
| try: |
| from nlp.punctuation.punctuation_service import is_loaded |
| return is_loaded() |
| except Exception: |
| return False |
|
|
|
|
| def _autocomplete_available(): |
| """Check if autocomplete model is loaded (without triggering lazy load).""" |
| try: |
| from nlp.autocomplete.autocomplete_service import _instance |
| return _instance is not None and _instance.is_ready() |
| except Exception: |
| return False |
|
|
|
|
| def _dialect_available(): |
| """Check if dialect model is loaded (without triggering lazy load).""" |
| try: |
| from nlp.dialect.dialect_service import is_loaded |
| return is_loaded() |
| except Exception: |
| return False |
|
|
|
|
| @app.route('/api/spelling', methods=['POST']) |
| def spelling_correction(): |
| """ |
| Correct spelling in Arabic text. |
| |
| Request JSON: |
| { |
| "text": "Arabic text with spelling errors" |
| } |
| |
| Response JSON: |
| { |
| "original_text": "...", |
| "corrected_text": "...", |
| "status": "success" |
| } |
| """ |
| try: |
| if not request.is_json: |
| return jsonify({'error': 'Request must be JSON', 'status': 'error'}), 400 |
| |
| data = request.get_json() |
| text = data.get('text', '').strip() |
| |
| if not text: |
| return jsonify({'error': 'Text is required', 'status': 'error'}), 400 |
| |
| if len(text) > MAX_TEXT_LENGTH: |
| return jsonify({ |
| 'error': f'Text too long. Maximum {MAX_TEXT_LENGTH} characters.', |
| 'status': 'error' |
| }), 400 |
| |
| logger.info(f"Spelling correction request: text_length={len(text)}") |
| |
| from nlp.spelling.araspell_service import get_spelling_model |
| checker = get_spelling_model() |
| corrected = checker.correct(text) |
| |
| return jsonify({ |
| 'original_text': text, |
| 'corrected_text': corrected, |
| 'status': 'success' |
| }), 200 |
| |
| except RuntimeError as e: |
| logger.error(f"Spelling model error: {e}") |
| return jsonify({ |
| 'error': f'Spelling model unavailable: {str(e)[:200]}', |
| 'status': 'error' |
| }), 503 |
| except Exception as e: |
| logger.error(f"Spelling correction error: {e}") |
| return jsonify({ |
| 'error': f'Spelling correction failed: {str(e)[:200]}', |
| 'status': 'error' |
| }), 500 |
|
|
|
|
| @app.route('/api/summarize', methods=['POST']) |
| def summarize(): |
| """ |
| Summarize Arabic text. |
| |
| Expected JSON payload: |
| { |
| "text": "Arabic text to summarize", |
| "length": 1-3 (1=short, 2=medium, 3=long), |
| "full_text": true/false (whether to summarize full text or just first paragraph) |
| } |
| """ |
| if summarization_model is None: |
| return jsonify({ |
| 'error': 'Summarization model not loaded. Please check server logs.', |
| 'status': 'error' |
| }), 503 |
| |
| try: |
| |
| if not request.is_json: |
| return jsonify({ |
| 'error': 'Request must be JSON', |
| 'status': 'error' |
| }), 400 |
| |
| data = request.get_json() |
| |
| |
| text = data.get('text', '').strip() |
| if not text: |
| return jsonify({ |
| 'error': 'Text is required', |
| 'status': 'error' |
| }), 400 |
| |
| if len(text) < MIN_TEXT_LENGTH: |
| return jsonify({ |
| 'error': f'Text must be at least {MIN_TEXT_LENGTH} characters', |
| 'status': 'error' |
| }), 400 |
| |
| if len(text) > MAX_TEXT_LENGTH: |
| return jsonify({ |
| 'error': f'Text must be at most {MAX_TEXT_LENGTH} characters', |
| 'status': 'error' |
| }), 400 |
| |
| |
| length = int(data.get('length', 2)) |
| length = max(1, min(3, length)) |
| |
| full_text = data.get('full_text', True) |
| |
| |
| |
| input_length = len(text.split()) |
| length_multipliers = {1: 0.3, 2: 0.5, 3: 0.7} |
| max_length = max(20, int(input_length * length_multipliers[length])) |
| max_length = min(max_length, MAX_SUMMARY_LENGTH) |
| |
| |
| logger.info(f"Generating summary: length={length}, max_length={max_length}, text_length={len(text)}") |
| |
| |
| summary = summarization_model.summarize(text, max_length=max_length, min_length=max(10, max_length // 3)) |
| |
| return jsonify({ |
| 'summary': summary, |
| 'status': 'success', |
| 'original_length': len(text), |
| 'summary_length': len(summary) |
| }) |
| |
| except ValueError as e: |
| logger.error(f"Validation error: {str(e)}") |
| return jsonify({ |
| 'error': f'Invalid input: {str(e)}', |
| 'status': 'error' |
| }), 400 |
| |
| except Exception as e: |
| logger.error(f"Error during summarization: {str(e)}") |
| logger.error(traceback.format_exc()) |
| return jsonify({ |
| 'error': 'An error occurred during summarization. Please try again.', |
| 'status': 'error', |
| 'details': str(e) if app.debug else None |
| }), 500 |
|
|
|
|
|
|
| @app.route('/api/autocomplete', methods=['POST']) |
| def autocomplete(): |
| """ |
| Get autocomplete suggestions for Arabic text. |
| COMPLETELY INDEPENDENT — has zero interaction with /api/analyze. |
| |
| Request JSON: |
| { |
| "context": "<text before cursor>", |
| "n": 5 (optional) |
| } |
| |
| Response JSON: |
| { |
| "status": "success", |
| "suggestions": ["word1", "word2", ...] |
| } |
| """ |
| try: |
| if not request.is_json: |
| return jsonify({'error': 'Request must be JSON', 'status': 'error'}), 400 |
|
|
| data = request.get_json() |
| context = data.get('context', '').strip() |
| n = int(data.get('n', 3)) |
|
|
| if not context or len(context) < 3: |
| return jsonify({'suggestions': [], 'status': 'success'}) |
|
|
| |
| from nlp.autocomplete.autocomplete_rules import extract_context |
| context = extract_context(context, max_chars=200) |
|
|
| |
| from nlp.autocomplete.autocomplete_service import get_autocomplete_model |
| ac_model = get_autocomplete_model() |
|
|
| if not ac_model.is_ready(): |
| return jsonify({'suggestions': [], 'status': 'success'}) |
|
|
| t0 = time.time() |
| suggestions = ac_model.predict(context, n=n) |
| elapsed = int((time.time() - t0) * 1000) |
| logger.info(f"[AUTOCOMPLETE] {elapsed}ms | mode={ac_model.get_mode()} | context='{context[:80]}' | suggestions={suggestions}") |
|
|
| return jsonify({ |
| 'suggestions': suggestions, |
| 'status': 'success' |
| }) |
|
|
| except Exception as e: |
| logger.error(f"Error during autocomplete: {str(e)}") |
| logger.error(traceback.format_exc()) |
| return jsonify({ |
| 'suggestions': [], |
| 'status': 'success' |
| }) |
|
|
|
|
| @app.route('/api/grammar', methods=['POST']) |
| def grammar_correction(): |
| """ |
| Correct grammar in Arabic text. |
| |
| Request JSON: |
| { |
| "text": "Arabic text with grammar errors" |
| } |
| |
| Response JSON: |
| { |
| "original_text": "...", |
| "corrected_text": "...", |
| "status": "success" |
| } |
| """ |
| try: |
| if not request.is_json: |
| return jsonify({'error': 'Request must be JSON', 'status': 'error'}), 400 |
| |
| data = request.get_json() |
| text = data.get('text', '').strip() |
| |
| if not text: |
| return jsonify({'error': 'Text is required', 'status': 'error'}), 400 |
| |
| if len(text) > MAX_TEXT_LENGTH: |
| return jsonify({ |
| 'error': f'Text too long. Maximum {MAX_TEXT_LENGTH} characters.', |
| 'status': 'error' |
| }), 400 |
| |
| logger.info(f"Grammar correction request: text_length={len(text)}") |
| |
| from nlp.grammar.grammar_service import get_grammar_model |
| checker = get_grammar_model() |
| corrected = checker.correct(text) |
| |
| return jsonify({ |
| 'original_text': text, |
| 'corrected_text': corrected, |
| 'status': 'success' |
| }), 200 |
| |
| except RuntimeError as e: |
| logger.error(f"Grammar model error: {e}") |
| return jsonify({ |
| 'error': f'Grammar model unavailable: {str(e)[:200]}', |
| 'status': 'error' |
| }), 503 |
| except Exception as e: |
| logger.error(f"Error during grammar correction: {str(e)}") |
| logger.error(traceback.format_exc()) |
| return jsonify({ |
| 'error': 'An error occurred during grammar correction.', |
| 'status': 'error', |
| 'details': str(e) if app.debug else None |
| }), 500 |
|
|
|
|
| @app.route('/api/punctuation', methods=['POST']) |
| def add_punctuation(): |
| """ |
| Add punctuation to Arabic text using PuncAra-v1. |
| |
| Request JSON: |
| { |
| "text": "Arabic text without punctuation" |
| } |
| |
| Response JSON: |
| { |
| "status": "success", |
| "original_text": "...", |
| "corrected_text": "..." |
| } |
| """ |
| try: |
| if not request.is_json: |
| return jsonify({'error': 'Request must be JSON', 'status': 'error'}), 400 |
|
|
| data = request.get_json() |
| text = data.get('text', '').strip() |
|
|
| if not text: |
| return jsonify({'error': 'Text is required', 'status': 'error'}), 400 |
|
|
| logger.info(f"Adding punctuation for text of length: {len(text)}") |
| from nlp.punctuation.punctuation_service import get_punctuation_model |
| punc_checker = get_punctuation_model() |
| punctuated = punc_checker.correct(text) |
|
|
| return jsonify({ |
| 'original_text': text, |
| 'corrected_text': punctuated, |
| 'status': 'success' |
| }) |
|
|
| except RuntimeError as e: |
| logger.error(f"Punctuation model error: {e}") |
| return jsonify({ |
| 'error': f'Punctuation model unavailable: {str(e)[:200]}', |
| 'status': 'error' |
| }), 503 |
| except Exception as e: |
| logger.error(f"Error during punctuation: {str(e)}") |
| logger.error(traceback.format_exc()) |
| return jsonify({ |
| 'error': 'An error occurred during punctuation.', |
| 'status': 'error', |
| 'details': str(e) if app.debug else None |
| }), 500 |
|
|
|
|
| def get_word_positions(text): |
| """ |
| Returns a list of tuples (word, start_char_index, end_char_index) |
| for all whitespace-separated words in the text. |
| """ |
| positions = [] |
| for m in re.finditer(r'\S+', text): |
| positions.append((m.group(), m.start(), m.end())) |
| return positions |
|
|
|
|
| class OffsetMapper: |
| """ |
| Single source of truth for coordinate transformations between |
| two consecutive versions of CURRENT_TEXT. |
| |
| CONTRACT: |
| Input: text_before (str), text_after (str) |
| — two consecutive states of CURRENT_TEXT |
| Stores: Internal diff operations (PRIVATE) |
| API: |
| reverse_map_offset(pos) → text_after pos → text_before pos |
| forward_map_range(start, end) → text_before range → text_after range |
| |
| TERMINOLOGY: |
| text_before = CURRENT_TEXT before this stage's mutation |
| text_after = CURRENT_TEXT after this stage's mutation |
| forward = text_before → text_after |
| reverse = text_after → text_before |
| |
| RULES: |
| All external code uses reverse_map_offset() or forward_map_range(). |
| ._opcodes is PRIVATE — no external access. |
| """ |
|
|
| def __init__(self, text_before, text_after): |
| self._text_before = text_before |
| self._text_after = text_after |
| self._opcodes = [] |
| self._build() |
|
|
| def _build(self): |
| s = difflib.SequenceMatcher(None, self._text_before, self._text_after) |
| for tag, i1, i2, j1, j2 in s.get_opcodes(): |
| self._opcodes.append((i1, i2, j1, j2)) |
|
|
| def reverse_map_offset(self, pos_in_after): |
| """ |
| Map a single position from text_after → text_before. |
| (CURRENT_TEXT after mutation → CURRENT_TEXT before mutation) |
| |
| Used by PipelineContext.map_to_original() to walk the mapper |
| chain in reverse, ultimately reaching ORIGINAL_TEXT coordinates. |
| """ |
| for i1, i2, j1, j2 in self._opcodes: |
| if j1 <= pos_in_after <= j2: |
| if j2 == j1: |
| return i1 |
| ratio = (pos_in_after - j1) / (j2 - j1) |
| return round(i1 + ratio * (i2 - i1)) |
| return len(self._text_before) |
|
|
| def forward_map_range(self, start_in_before, end_in_before): |
| """ |
| Map a range from text_before → text_after. |
| (CURRENT_TEXT before mutation → CURRENT_TEXT after mutation) |
| |
| Used ONLY by StageLocker.update_via_mapper() to shift locked |
| spans after a text mutation. |
| |
| MONOTONICITY GUARD: If independent point mapping produces an |
| inverted range (start > end) due to non-monotonic edits, |
| the end is clamped to max(new_start, new_end). |
| """ |
| new_start = self._forward_map_pos(start_in_before) |
| new_end = self._forward_map_pos(end_in_before) |
| |
| new_end = max(new_start, new_end) |
| return new_start, new_end |
|
|
| def _forward_map_pos(self, pos): |
| """Map a single position text_before → text_after. PRIVATE.""" |
| for i1, i2, j1, j2 in self._opcodes: |
| if i1 <= pos <= i2: |
| if i2 == i1: |
| return j1 |
| ratio = (pos - i1) / (i2 - i1) |
| return int(j1 + ratio * (j2 - j1)) |
| if self._opcodes: |
| last = self._opcodes[-1] |
| return last[3] + (pos - last[1]) |
| return pos |
|
|
|
|
|
|
| def get_word_diffs(original, corrected): |
| """ |
| Identify differences between original and corrected text at the word level. |
| Returns a list of suggestions with start and end character offsets. |
| """ |
| orig_words = get_word_positions(original) |
| corr_words = get_word_positions(corrected) |
| s = difflib.SequenceMatcher(None, [w[0] for w in orig_words], [w[0] for w in corr_words]) |
| suggestions = [] |
| |
| for tag, i1, i2, j1, j2 in s.get_opcodes(): |
| if tag == 'replace': |
| if i1 < len(orig_words) and i2 - 1 < len(orig_words): |
| start_char = orig_words[i1][1] |
| end_char = orig_words[i2-1][2] |
| suggestions.append({ |
| 'start': start_char, |
| 'end': end_char, |
| 'original': original[start_char:end_char], |
| 'correction': " ".join([w[0] for w in corr_words[j1:j2]]), |
| 'type': 'generic' |
| }) |
| elif tag == 'delete': |
| if i1 < len(orig_words) and i2 - 1 < len(orig_words): |
| start_char = orig_words[i1][1] |
| end_char = orig_words[i2-1][2] |
| suggestions.append({ |
| 'start': start_char, |
| 'end': end_char, |
| 'original': original[start_char:end_char], |
| 'correction': '', |
| 'type': 'generic' |
| }) |
| elif tag == 'insert': |
| pos = orig_words[i1][1] if i1 < len(orig_words) else len(original) |
| suggestions.append({ |
| 'start': pos, |
| 'end': pos, |
| 'original': '', |
| 'correction': " ".join([w[0] for w in corr_words[j1:j2]]), |
| 'type': 'generic' |
| }) |
| |
| return suggestions |
|
|
|
|
| def _levenshtein(a, b): |
| """Simple Levenshtein distance for short words.""" |
| m, n = len(a), len(b) |
| if m == 0: |
| return n |
| if n == 0: |
| return m |
| dp = [[0] * (n + 1) for _ in range(m + 1)] |
| for i in range(m + 1): |
| dp[i][0] = i |
| for j in range(n + 1): |
| dp[0][j] = j |
| for i in range(1, m + 1): |
| for j in range(1, n + 1): |
| cost = 0 if a[i - 1] == b[j - 1] else 1 |
| dp[i][j] = min( |
| dp[i - 1][j] + 1, |
| dp[i][j - 1] + 1, |
| dp[i - 1][j - 1] + cost, |
| ) |
| return dp[m][n] |
|
|
|
|
| def _is_small_spelling_change(orig_word, corr_word, vocab_manager=None): |
| """ |
| Heuristic: only accept small spelling edits and ignore |
| aggressive changes (to avoid over-editing). |
| |
| CRITICAL: If both words are in-vocabulary (both are valid Arabic words), |
| only accept known orthographic fixes (ه→ة, hamza whitelist). |
| This prevents the model from corrupting correct words (e.g. وكان→وكأن). |
| |
| Returns: |
| float: 0.0 = reject, 0.5 = dampened confidence (rare word risk), |
| 0.9 = normal confidence. Phase 2 (BUG-034/035/036/037/E8). |
| """ |
| if not orig_word or not corr_word: |
| return 0.0 |
| if orig_word == corr_word: |
| return 0.0 |
|
|
| |
| |
| |
| _ed_dist = _levenshtein(orig_word, corr_word) |
| _max_len = max(len(orig_word), len(corr_word)) |
| if _max_len >= 3 and _ed_dist > max(2, _max_len * 0.4): |
| logger.info( |
| f"[SPELLING] Blocked hallucination: '{orig_word}'→'{corr_word}' " |
| f"(edit_dist={_ed_dist}, max_allowed={max(2, int(_max_len * 0.4))})" |
| ) |
| return 0.0 |
|
|
| |
| |
| |
| |
| _orig_len = len(orig_word) |
| _corr_len = len(corr_word) |
| if _orig_len >= 5 and _corr_len < _orig_len * 0.7: |
| logger.info( |
| f"[SPELLING] Blocked length shrink: '{orig_word}'→'{corr_word}' " |
| f"(len {_orig_len}→{_corr_len}, ratio={_corr_len/_orig_len:.2f})" |
| ) |
| return 0.0 |
|
|
| |
| |
| |
| |
| if _orig_len >= 3 and _corr_len >= 3: |
| |
| _PREFIXES = ('وال', 'فال', 'بال', 'كال', 'لل', 'ال', 'و', 'ف', 'ب', 'ل', 'ك') |
| _o_root = orig_word |
| _c_root = corr_word |
| for _pfx in _PREFIXES: |
| if _o_root.startswith(_pfx) and len(_o_root) > len(_pfx) + 1: |
| _o_root = _o_root[len(_pfx):] |
| break |
| for _pfx in _PREFIXES: |
| if _c_root.startswith(_pfx) and len(_c_root) > len(_pfx) + 1: |
| _c_root = _c_root[len(_pfx):] |
| break |
| |
| _HAMZA_CHARS = set('أإآاء') |
| if (_o_root and _c_root and _o_root[0] != _c_root[0] |
| and not (_o_root[0] in _HAMZA_CHARS and _c_root[0] in _HAMZA_CHARS)): |
| logger.info( |
| f"[SPELLING] Blocked first-letter change: '{orig_word}'→'{corr_word}' " |
| f"(root '{_o_root[0]}'→'{_c_root[0]}')" |
| ) |
| return 0.0 |
|
|
| |
| |
| |
| _DIGITS = set('0123456789٠١٢٣٤٥٦٧٨٩') |
| if any(c in _DIGITS for c in orig_word): |
| return 0.0 |
| if any(c in _DIGITS for c in corr_word): |
| return 0.0 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if corr_word in _DIRECTIONAL_BLOCKS.get(orig_word, set()): |
| return 0.0 |
|
|
| |
| _CLITIC_PREFIXES = ('و', 'ف', 'ب', 'ل', 'ك') |
| for _pfx in _CLITIC_PREFIXES: |
| if (orig_word.startswith(_pfx) and corr_word.startswith(_pfx) |
| and len(orig_word) > len(_pfx) + 1): |
| _orig_stem = orig_word[len(_pfx):] |
| _corr_stem = corr_word[len(_pfx):] |
| if _corr_stem in _DIRECTIONAL_BLOCKS.get(_orig_stem, set()): |
| return 0.0 |
|
|
| |
| |
| |
| |
| |
| _COMPOUND_PREFIXES = ['وبال', 'فبال', 'وال', 'فال', 'بال', 'كال', 'ول', 'فل', |
| 'وب', 'فب', 'وك', 'فك'] |
| for _cpfx in _COMPOUND_PREFIXES: |
| if orig_word.startswith(_cpfx) and len(orig_word) > len(_cpfx) + 2: |
| if not corr_word.startswith(_cpfx): |
| |
| |
| _stem = orig_word[len(_cpfx):] |
| for _alt_pfx in _COMPOUND_PREFIXES + list(_CLITIC_PREFIXES) + ['ال', '']: |
| if corr_word.startswith(_alt_pfx): |
| _corr_stem2 = corr_word[len(_alt_pfx):] |
| if _stem == _corr_stem2 or _levenshtein(_stem, _corr_stem2) <= 1: |
| return 0.0 |
| break |
|
|
| |
| |
| if re.search(r'[^ء-يآأإىa-zA-Z]', orig_word): |
| return 0.0 |
| if re.search(r'[^ء-يآأإىa-zA-Z]', corr_word): |
| return 0.0 |
|
|
| |
| |
| feminine_endings = ('ه', 'ة') |
| if orig_word.endswith(feminine_endings) and not corr_word.endswith(feminine_endings): |
| |
| if corr_word == orig_word[:-1] or len(corr_word) < len(orig_word): |
| return 0.0 |
|
|
| |
| |
| |
| if vocab_manager and len(corr_word) == len(orig_word) + 1 and corr_word.startswith(orig_word): |
| _appended_char = corr_word[-1] |
| if _appended_char in ('ا', 'ي', 'و') and vocab_manager.is_iv(orig_word): |
| logger.info( |
| f"[SPELLING] Blocked trailing '{_appended_char}' addition: " |
| f"'{orig_word}'→'{corr_word}' (original is IV)" |
| ) |
| return 0.0 |
|
|
| |
| |
| |
| if vocab_manager: |
| orig_iv = vocab_manager.is_iv(orig_word) |
| corr_iv = vocab_manager.is_iv(corr_word) |
| if orig_iv and corr_iv: |
| |
| |
| |
| |
| |
| if (orig_word.endswith('ه') and corr_word.endswith('ة') |
| and orig_word[:-1] == corr_word[:-1]): |
| |
| |
| |
| |
| |
| |
| |
| stem = orig_word[:-1] |
| if len(stem) >= 2 and vocab_manager.is_iv(stem): |
| logger.info( |
| f"[SPELLING] Blocked ه→ة (pronoun suffix): " |
| f"'{orig_word}'→'{corr_word}' (stem '{stem}' is IV → ه is pronoun)" |
| ) |
| return 0.0 |
| return 0.9 |
| |
| if (orig_word.endswith('ة') and corr_word.endswith('ه') |
| and orig_word[:-1] == corr_word[:-1]): |
| return 0.9 |
| |
| |
| |
| |
| from nlp.spelling.araspell_rules import AraSpellPostProcessor |
| if orig_word in AraSpellPostProcessor.HAMZA_WHITELIST: |
| expected = AraSpellPostProcessor.HAMZA_WHITELIST[orig_word] |
| if corr_word == expected: |
| return 0.9 |
| else: |
| logger.info( |
| f"[SPELLING] Whitelist mismatch: '{orig_word}'→'{corr_word}' " |
| f"(expected '{expected}') — rejected" |
| ) |
| return 0.0 |
| |
| for prefix in AraSpellPostProcessor.HAMZA_PREFIXES: |
| if orig_word.startswith(prefix) and len(orig_word) > len(prefix) + 1: |
| remainder = orig_word[len(prefix):] |
| if remainder in AraSpellPostProcessor.HAMZA_WHITELIST: |
| expected = prefix + AraSpellPostProcessor.HAMZA_WHITELIST[remainder] |
| if corr_word == expected: |
| return 0.9 |
| else: |
| logger.info( |
| f"[SPELLING] Prefixed whitelist mismatch: '{orig_word}'→'{corr_word}' " |
| f"(expected '{expected}') — rejected" |
| ) |
| return 0.0 |
| |
| if (orig_word.endswith('ي') and corr_word.endswith('ى') |
| and orig_word[:-1] == corr_word[:-1]): |
| return 0.85 |
| if (orig_word.endswith('ى') and corr_word.endswith('ي') |
| and orig_word[:-1] == corr_word[:-1]): |
| return 0.85 |
| |
| |
| |
| if len(orig_word) == len(corr_word): |
| from nlp.spelling.araspell_rules import RulesBasedCorrector |
| edit_dist = _levenshtein(orig_word, corr_word) |
| if edit_dist == 1: |
| orig_rank = vocab_manager.get_frequency_rank(orig_word) |
| corr_rank = vocab_manager.get_frequency_rank(corr_word) |
| if corr_rank < orig_rank and corr_rank < 5000: |
| |
| for a, b in zip(orig_word, corr_word): |
| if a != b: |
| if RulesBasedCorrector.is_keyboard_neighbor(a, b): |
| logger.info( |
| f"[SPELLING] Vocab-override (IV-IV): " |
| f"'{orig_word}'(rank={orig_rank})→" |
| f"'{corr_word}'(rank={corr_rank}) " |
| f"keyboard-adjacent '{a}'→'{b}'" |
| ) |
| return 0.5 |
| break |
| |
| |
| return 0.0 |
|
|
| dist = _levenshtein(orig_word, corr_word) |
| max_len = max(len(orig_word), len(corr_word)) |
|
|
| |
| |
| if dist > 2 or (dist / max_len) > 0.5: |
| return 0.0 |
|
|
| |
| |
| |
| ORTHO_PAIRS = { |
| ('ه', 'ة'), ('ة', 'ه'), |
| ('ا', 'أ'), ('أ', 'ا'), ('ا', 'إ'), ('إ', 'ا'), ('ا', 'آ'), ('آ', 'ا'), |
| ('ي', 'ى'), ('ى', 'ي'), |
| ('ؤ', 'و'), ('و', 'ؤ'), |
| ('ئ', 'ي'), ('ي', 'ئ'), |
| ('ء', 'أ'), ('أ', 'ء'), |
| ('ء', 'ؤ'), ('ؤ', 'ء'), |
| ('ء', 'ئ'), ('ئ', 'ء'), |
| } |
| |
| |
| |
| PHONETIC_PAIRS = { |
| ('ض', 'ظ'), ('ظ', 'ض'), |
| ('ذ', 'ز'), ('ز', 'ذ'), |
| ('ص', 'س'), ('س', 'ص'), |
| ('ط', 'ت'), ('ت', 'ط'), |
| ('ق', 'ك'), ('ك', 'ق'), |
| ('د', 'ض'), ('ض', 'د'), |
| ('غ', 'ق'), ('ق', 'غ'), |
| } |
|
|
| from nlp.spelling.araspell_rules import RulesBasedCorrector |
|
|
| |
| |
| |
| if len(orig_word) == len(corr_word) and dist == 2: |
| _transposition_found = False |
| for _ti in range(len(orig_word) - 1): |
| if (orig_word[_ti] == corr_word[_ti + 1] and |
| orig_word[_ti + 1] == corr_word[_ti] and |
| orig_word[:_ti] == corr_word[:_ti] and |
| orig_word[_ti + 2:] == corr_word[_ti + 2:]): |
| _transposition_found = True |
| break |
| if _transposition_found: |
| if vocab_manager: |
| _orig_oov = not vocab_manager.is_iv(orig_word) |
| _corr_iv = vocab_manager.is_iv(corr_word) |
| if _orig_oov and _corr_iv: |
| logger.info( |
| f"[SPELLING] Transposition accepted (OOV→IV): " |
| f"'{orig_word}'→'{corr_word}'" |
| ) |
| return 0.6 |
| elif _orig_oov and not _corr_iv: |
| |
| logger.info( |
| f"[SPELLING] Transposition accepted (OOV→OOV): " |
| f"'{orig_word}'→'{corr_word}' (low confidence)" |
| ) |
| return 0.5 |
| else: |
| return 0.6 |
|
|
| |
| |
| |
| if len(orig_word) == len(corr_word) + 1 and dist == 1: |
| |
| _insertion_valid = False |
| for _di in range(len(orig_word)): |
| |
| _candidate = orig_word[:_di] + orig_word[_di + 1:] |
| if _candidate == corr_word: |
| _insertion_valid = True |
| break |
| if _insertion_valid: |
| if vocab_manager: |
| _orig_oov = not vocab_manager.is_iv(orig_word) |
| _corr_iv = vocab_manager.is_iv(corr_word) |
| if _orig_oov and _corr_iv: |
| |
| |
| |
| |
| |
| |
| _CONJUGATION_SUFFIXES = {'ن', 'ت'} |
| _removed_char = None |
| for _di2 in range(len(orig_word)): |
| if orig_word[:_di2] + orig_word[_di2 + 1:] == corr_word: |
| _removed_char = orig_word[_di2] |
| _removed_pos = _di2 |
| break |
| if (_removed_char in _CONJUGATION_SUFFIXES |
| and _removed_pos == len(orig_word) - 1 |
| and len(corr_word) >= 3): |
| logger.info( |
| f"[SPELLING] Rejected suffix strip: " |
| f"'{orig_word}'→'{corr_word}' " |
| f"(removing suffix '{_removed_char}' likely strips conjugation)" |
| ) |
| return 0.0 |
| logger.info( |
| f"[SPELLING] Insertion fix accepted (OOV→IV): " |
| f"'{orig_word}'→'{corr_word}' (extra char removed)" |
| ) |
| return 0.7 |
| else: |
| return 0.6 |
|
|
| |
| |
| |
| if len(corr_word) == len(orig_word) + 1 and dist == 1: |
| |
| _deletion_valid = False |
| for _di in range(len(corr_word)): |
| |
| _candidate = corr_word[:_di] + corr_word[_di + 1:] |
| if _candidate == orig_word: |
| _deletion_valid = True |
| break |
| if _deletion_valid: |
| if vocab_manager: |
| _orig_oov = not vocab_manager.is_iv(orig_word) |
| _corr_iv = vocab_manager.is_iv(corr_word) |
| if _orig_oov and _corr_iv: |
| logger.info( |
| f"[SPELLING] Deletion fix accepted (OOV→IV): " |
| f"'{orig_word}'→'{corr_word}' (missing char added)" |
| ) |
| return 0.7 |
| else: |
| return 0.6 |
|
|
| |
| if len(orig_word) != len(corr_word): |
| |
| |
| if abs(len(orig_word) - len(corr_word)) > 1: |
| return 0.0 |
|
|
| |
| if orig_word.endswith('ان') and corr_word.endswith('ات') and orig_word[:-2] == corr_word[:-2]: |
| logger.info( |
| f"[SPELLING] Blocked grammatical change (Dual→Plural): " |
| f"'{orig_word}'→'{corr_word}'" |
| ) |
| return 0.0 |
|
|
| |
| |
| _has_keyboard_or_phonetic = False |
| for a, b in zip(orig_word, corr_word): |
| if a != b: |
| if (a, b) in ORTHO_PAIRS: |
| continue |
| elif RulesBasedCorrector.is_keyboard_neighbor(a, b) or (a, b) in PHONETIC_PAIRS: |
| _has_keyboard_or_phonetic = True |
| else: |
| return 0.0 |
| |
| if _has_keyboard_or_phonetic: |
| logger.info( |
| f"[SPELLING] Keyboard/phonetic typo accepted: " |
| f"'{orig_word}'→'{corr_word}' (dampened to 0.6)" |
| ) |
| return 0.6 |
|
|
| |
| |
| if (orig_word.endswith('ه') and corr_word.endswith('ة') |
| and len(orig_word) >= 3 and orig_word[-2] == 'ت' |
| and orig_word[:-1] == corr_word[:-1]): |
| logger.info( |
| f"[SPELLING] Blocked ه→ة at pronoun suffix (OOV path): " |
| f"'{orig_word}'→'{corr_word}'" |
| ) |
| return 0.0 |
|
|
| |
| |
| |
| if vocab_manager: |
| orig_iv = vocab_manager.is_iv(orig_word) |
| corr_iv = vocab_manager.is_iv(corr_word) |
|
|
| |
| |
| |
| orig_rank = vocab_manager.get_frequency_rank(orig_word) |
| corr_rank = vocab_manager.get_frequency_rank(corr_word) |
| if orig_iv and corr_iv and orig_rank < 999999: |
| |
| |
| if corr_rank >= orig_rank: |
| logger.info( |
| f"[SPELLING] Dampened (freq): '{orig_word}'(rank={orig_rank})" |
| f"→'{corr_word}'(rank={corr_rank}) — corr not more common" |
| ) |
| return 0.5 |
|
|
| if not orig_iv and corr_iv: |
| |
| |
| logger.info( |
| f"[SPELLING] Dampened confidence: '{orig_word}'→'{corr_word}' " |
| f"(OOV→IV, possible rare word)" |
| ) |
| return 0.5 |
|
|
| |
| |
| |
| |
| |
| _HAMZA_CHARS = set('أإآؤئء') |
| if len(orig_word) == len(corr_word): |
| has_hamza_diff = False |
| for a, b in zip(orig_word, corr_word): |
| if a != b: |
| if a in _HAMZA_CHARS or b in _HAMZA_CHARS: |
| has_hamza_diff = True |
| else: |
| has_hamza_diff = False |
| break |
| if has_hamza_diff: |
| logger.info( |
| f"[SPELLING] Dampened (hamza-only): '{orig_word}'→'{corr_word}'" |
| ) |
| return 0.5 |
|
|
| return 0.9 |
|
|
|
|
| def _is_spelling_only_change(original: str, correction: str) -> bool: |
| """ |
| Detect if a grammar model's correction is actually a spelling/orthographic fix |
| (hamza, ه→ة, ا→أ, etc.) rather than a true grammar change. |
| |
| Used to re-label grammar patches as 'spelling' for correct UI icons. |
| """ |
| if not original or not correction: |
| return False |
|
|
| |
| import re as _re |
| strip_diacritics = lambda t: _re.sub(r'[\u064B-\u065F\u0670]', '', t) |
| o = strip_diacritics(original) |
| c = strip_diacritics(correction) |
|
|
| if o == c: |
| return True |
|
|
| |
| o_words = o.split() |
| c_words = c.split() |
|
|
| if len(o_words) != len(c_words): |
| return False |
|
|
| all_spelling = True |
| for ow, cw in zip(o_words, c_words): |
| if ow == cw: |
| continue |
| if _is_orthographic_variant(ow, cw): |
| continue |
| all_spelling = False |
| break |
|
|
| return all_spelling |
|
|
|
|
| def _is_orthographic_variant(word1: str, word2: str) -> bool: |
| """ |
| Check if two words differ only by common Arabic orthographic variations: |
| - Hamza placement: ا↔أ↔إ↔آ, ى↔ي, ه↔ة |
| - These are spelling differences, not grammar. |
| """ |
| if len(word1) != len(word2): |
| |
| |
| if abs(len(word1) - len(word2)) > 1: |
| return False |
| |
| if (word1[:-1] == word2[:-1] and |
| {word1[-1], word2[-1]} <= {'ه', 'ة'}): |
| return True |
| return False |
|
|
| |
| SPELLING_EQUIVALENCES = { |
| frozenset({'ا', 'أ'}), frozenset({'ا', 'إ'}), frozenset({'ا', 'آ'}), |
| frozenset({'أ', 'إ'}), frozenset({'أ', 'آ'}), frozenset({'إ', 'آ'}), |
| frozenset({'ى', 'ي'}), frozenset({'ه', 'ة'}), |
| frozenset({'ؤ', 'و'}), frozenset({'ئ', 'ي'}), frozenset({'ئ', 'ء'}), |
| } |
| diff_count = 0 |
| for c1, c2 in zip(word1, word2): |
| if c1 == c2: |
| continue |
| if frozenset({c1, c2}) in SPELLING_EQUIVALENCES: |
| diff_count += 1 |
| else: |
| return False |
| return diff_count > 0 |
|
|
|
|
| @app.route('/api/dialect', methods=['POST']) |
| def convert_dialect(): |
| """ |
| Convert dialect Arabic text to Modern Standard Arabic (MSA). |
| |
| Request JSON: |
| { |
| "text": "عايز اشتكي من موظف في فرعكم" |
| } |
| |
| Response JSON: |
| { |
| "status": "success", |
| "original_text": "...", |
| "converted_text": "..." |
| } |
| """ |
| try: |
| if not request.is_json: |
| return jsonify({'error': 'Request must be JSON', 'status': 'error'}), 400 |
|
|
| data = request.get_json() |
| text = data.get('text', '').strip() |
|
|
| if not text: |
| return jsonify({'error': 'Text is required', 'status': 'error'}), 400 |
|
|
| if len(text) > MAX_TEXT_LENGTH: |
| return jsonify({ |
| 'error': f'Text too long. Maximum {MAX_TEXT_LENGTH} characters.', |
| 'status': 'error' |
| }), 400 |
|
|
| logger.info(f"[DIALECT] Conversion request: text_length={len(text)}") |
|
|
| from nlp.dialect.dialect_service import get_dialect_model |
| converter = get_dialect_model() |
| t0 = time.time() |
| result = converter.convert(text) |
| elapsed = int((time.time() - t0) * 1000) |
|
|
| logger.info(f"[DIALECT] {elapsed}ms | input='{text[:80]}' | output='{result[:80]}'") |
|
|
| return jsonify({ |
| 'original_text': text, |
| 'converted_text': result, |
| 'status': 'success' |
| }), 200 |
|
|
| except RuntimeError as e: |
| logger.error(f"Dialect model error: {e}") |
| return jsonify({ |
| 'error': f'Dialect model unavailable: {str(e)[:200]}', |
| 'status': 'error' |
| }), 503 |
| except Exception as e: |
| logger.error(f"Error during dialect conversion: {e}") |
| logger.error(traceback.format_exc()) |
| return jsonify({ |
| 'error': 'An error occurred during dialect conversion.', |
| 'status': 'error', |
| 'details': str(e) if app.debug else None |
| }), 500 |
|
|
|
|
| @app.route('/api/quran', methods=['POST']) |
| def quran_verify(): |
| """ |
| Quran text verification and translation. |
| Accepts: {text: str, language: str (optional, default='تدقيق الايات')} |
| Returns: {matched_segment, full_verse} or {error} |
| """ |
| try: |
| if not logger_quran_ok: |
| return jsonify({'error': 'Quran search module not available'}), 503 |
|
|
| data = request.get_json(force=True) |
| text = data.get('text', '').strip() |
| language = data.get('language', 'تدقيق الايات').strip() |
|
|
| if not text: |
| return jsonify({'error': 'النص المُدخل فارغ'}), 400 |
|
|
| if len(text) > 2000: |
| return jsonify({'error': 'النص طويل جداً (الحد الأقصى 2000 حرف)'}), 400 |
|
|
| app.logger.info(f'[QURAN] Query: "{text[:60]}..." lang={language}') |
| start_time = time.time() |
|
|
| result = search_bayan(text, target_type=language) |
|
|
| elapsed = int((time.time() - start_time) * 1000) |
| app.logger.info(f'[QURAN] Done in {elapsed}ms') |
|
|
| if 'error' in result: |
| return jsonify(result), 404 |
|
|
| return jsonify(result) |
|
|
| except Exception as e: |
| app.logger.error(f'[QURAN] Error: {e}') |
| app.logger.error(traceback.format_exc()) |
| return jsonify({'error': 'حدث خطأ أثناء البحث في القرآن الكريم'}), 500 |
|
|
|
|
| @app.route('/api/analyze', methods=['POST']) |
| def analyze_text(): |
| """ |
| Perform sequential analysis (Spelling -> Grammar -> Punctuation) |
| and return word-level suggestions with offsets. |
| """ |
| try: |
| if not request.is_json: |
| return jsonify({'error': 'Request must be JSON', 'status': 'error'}), 400 |
| |
| data = request.get_json() |
| text = data.get('text', '').strip() |
| |
| if not text: |
| return jsonify({'error': 'Text is required', 'status': 'error'}), 400 |
|
|
| |
| |
| |
| text = re.sub(r'<[^>]*>', '', text).strip() |
| if not text: |
| return jsonify({'error': 'Text is required', 'status': 'error'}), 400 |
|
|
| |
| arabic_chars = len(re.findall(r'[\u0600-\u06FF]', text)) |
| alpha_chars = len(re.findall(r'[a-zA-Z\u0600-\u06FF]', text)) |
| if alpha_chars > 0 and arabic_chars / alpha_chars < 0.3: |
| return jsonify({ |
| 'original': text, 'corrected': text, |
| 'suggestions': [], 'timing_ms': {}, |
| 'status': 'success' |
| }) |
|
|
| |
| ctx = PipelineContext(text) |
| current_text = text |
| suggestions = [] |
| mappers = [] |
|
|
| |
| _tel_events = [] |
| total_start = time.time() |
| timing_ms = {'spelling_ms': 0, 'grammar_ms': 0, 'punctuation_ms': 0, 'total_ms': 0} |
|
|
| def map_range_to_original(start, end): |
| """Legacy wrapper — delegates to PipelineContext.""" |
| return ctx.map_to_original(start, end) |
|
|
| def _get_spelling_alternatives(original_word, best_correction, spell_checker, max_alts=3): |
| """Generate alternative spelling suggestions for a word.""" |
| alts = [] |
| seen = {best_correction, original_word} |
|
|
| |
| try: |
| clean_w = re.sub(r'[^\w]', '', original_word) |
| edit_cands = spell_checker.edit_corrector.known(spell_checker.edit_corrector.edits1(clean_w)) |
| if edit_cands: |
| ranked = sorted(list(edit_cands), key=lambda x: spell_checker.vocab_manager.get_frequency_rank(x)) |
| for c in ranked: |
| if c not in seen and len(alts) < max_alts - 1: |
| alts.append(c) |
| seen.add(c) |
| except Exception: |
| pass |
|
|
| |
| |
| result = [best_correction] + alts + [original_word] |
| return result[:max_alts + 1] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| text_len = len(current_text) |
| run_spelling = text_len <= 1000 |
| if not run_spelling: |
| logger.info(f"[ANALYZE] Text length {text_len} > 300 — skipping AraSpell for performance") |
|
|
| |
| |
| |
| _RELIGIOUS_PHRASES = [ |
| |
| 'بسم الله', 'الحمد لله', 'سبحان الله', 'لا إله إلا الله', |
| 'إياك نعبد', 'قل هو الله', 'قل أعوذ', 'إنا أنزلناه', |
| 'حسبنا الله', 'لا حول ولا قوة', 'أستغفر الله', |
| 'الله أكبر', 'إنا لله', 'اللهم صل', 'وإياك نستعين', |
| 'ذلك الكتاب لا ريب', 'مالك يوم الدين', 'لم يلد ولم يولد', |
| 'الله لا إله إلا هو', 'الرحمن الرحيم', 'رب العالمين', |
| 'إنما الأعمال بالنيات', 'السلام عليكم ورحمة الله', |
| 'صراط الذين أنعمت', 'من شر ما خلق', 'ملك الناس', |
| 'رب اشرح لي صدري', 'ربنا آتنا', |
| 'قل أعوذ برب الناس', 'الحي القيوم', |
| 'لا تأخذه سنة ولا نوم', 'أشهد أن لا إله', |
| 'أشهد أن محمد', 'إنما الأعمال', |
| 'من حسن إسلام المرء', 'سبحان الله وبحمده', |
| 'الله أكبر كبير', 'إله الناس', 'من شر الوسواس', |
| 'وأشهد أن', 'رسول الله', 'كرسيه السماوات', |
| 'وسع كرسيه', 'في السماوات وما في الأرض', |
| 'عليه وسلم', 'صلى الله عليه', |
| 'المسلم من سلم المسلمون', |
| 'لا يؤمن أحدكم', |
| 'اهدنا الصراط', |
| ] |
| _is_religious_text = any(phrase in ctx.current_text for phrase in _RELIGIOUS_PHRASES) |
| if _is_religious_text: |
| logger.info(f"[ANALYZE] Religious text detected — skipping ALL stages") |
| |
| run_spelling = False |
|
|
| |
| |
| import re as _re_spell_guard |
| _has_url = bool(_re_spell_guard.search(r'https?://\S+', ctx.current_text)) |
| _has_email = bool(_re_spell_guard.search(r'\S+@\S+\.\S+', ctx.current_text)) |
| _has_hashtag = bool(_re_spell_guard.search(r'#[\u0600-\u06FF\w]{2,}', ctx.current_text)) |
| _has_percent = bool(_re_spell_guard.search(r'\d+\.\d+%', ctx.current_text)) |
| _has_latin_word = bool(_re_spell_guard.search(r'\b[A-Za-z]{3,}\b', ctx.current_text)) |
| if _has_url or _has_email: |
| logger.info(f"[ANALYZE] Text contains URLs/emails — skipping spelling") |
| run_spelling = False |
| elif _has_latin_word: |
| logger.info(f"[ANALYZE] Text contains Latin words — skipping spelling") |
| run_spelling = False |
| elif _has_hashtag: |
| logger.info(f"[ANALYZE] Text contains hashtags — skipping spelling") |
| run_spelling = False |
| elif _has_percent: |
| logger.info(f"[ANALYZE] Text contains percentages — skipping spelling") |
| run_spelling = False |
|
|
| |
| if run_spelling: |
| try: |
| t0 = time.time() |
| logger.info(f"[ANALYZE] Step 1: Spelling correction starting...") |
| from nlp.spelling.araspell_service import get_spelling_model |
| spell_checker = get_spelling_model() |
| raw_corrected = spell_checker.correct(current_text) |
| timing_ms['spelling_ms'] = int((time.time() - t0) * 1000) |
| logger.info(f"[ANALYZE] Step 1: Spelling done in {timing_ms['spelling_ms']}ms") |
|
|
| |
| |
| |
| |
| |
| import re as _re_strip |
| _rc_stripped = raw_corrected.rstrip() |
| _ct_stripped = current_text.rstrip() |
| _input_trailing = _re_strip.search(r'[\.،؛؟!]+$', _ct_stripped) |
| _output_trailing = _re_strip.search(r'[\.،؛؟!]+$', _rc_stripped) |
| if _output_trailing and not _input_trailing: |
| raw_corrected = _rc_stripped[:_output_trailing.start()] |
| logger.info( |
| f"[SPELLING] Stripped hallucinated trailing punct: " |
| f"'{_output_trailing.group()}'" |
| ) |
| elif _output_trailing and _input_trailing: |
| |
| if len(_output_trailing.group()) > len(_input_trailing.group()): |
| raw_corrected = _rc_stripped[:_output_trailing.start()] + _input_trailing.group() |
| logger.info( |
| f"[SPELLING] Trimmed extra trailing punct: " |
| f"'{_output_trailing.group()}' → '{_input_trailing.group()}'" |
| ) |
|
|
| |
| |
| |
| if raw_corrected != current_text: |
| try: |
| re_preprocessed = spell_checker.preprocess(raw_corrected) |
| _stab_dist = _levenshtein( |
| raw_corrected.replace(' ', ''), |
| re_preprocessed.replace(' ', '') |
| ) |
| if _stab_dist > 0: |
| _stab_ratio = _stab_dist / max(len(raw_corrected), 1) |
| if _stab_ratio > 0.15: |
| logger.info( |
| f"[SPELLING] Unstable correction " |
| f"(ratio={_stab_ratio:.2f}), using preprocessed" |
| ) |
| raw_corrected = re_preprocessed |
| except Exception: |
| pass |
|
|
| if raw_corrected != ctx.current_text: |
| orig_word_positions = get_word_positions(ctx.current_text) |
| corr_word_positions = get_word_positions(raw_corrected) |
|
|
| orig_word_strings = [w[0] for w in orig_word_positions] |
| corr_word_strings = [w[0] for w in corr_word_positions] |
|
|
| s = difflib.SequenceMatcher(None, orig_word_strings, corr_word_strings) |
| new_words = [] |
|
|
| for tag, i1, i2, j1, j2 in s.get_opcodes(): |
| if tag == 'equal': |
| start_idx = orig_word_positions[i1][1] |
| end_idx = orig_word_positions[i2-1][2] |
| new_words.append(current_text[start_idx:end_idx]) |
| elif tag == 'replace': |
| o_segment = orig_word_strings[i1:i2] |
| c_segment = corr_word_strings[j1:j2] |
|
|
| start_idx = orig_word_positions[i1][1] |
| end_idx = orig_word_positions[i2-1][2] |
|
|
| if len(o_segment) == 1 and len(c_segment) == 1: |
| |
| o_word = o_segment[0] |
| c_word = c_segment[0] |
| _spell_conf = _is_small_spelling_change(o_word, c_word, spell_checker.vocab_manager) |
| if _spell_conf: |
| |
| |
| if len(o_word) == len(c_word): |
| from nlp.spelling.araspell_rules import RulesBasedCorrector |
| for _oc, _cc in zip(o_word, c_word): |
| if _oc != _cc and RulesBasedCorrector.is_keyboard_neighbor(_oc, _cc): |
| _spell_conf = min(_spell_conf * 1.05, 0.95) |
| logger.info(f"[SPELLING] Accepted: '{o_word}'→'{c_word}' (conf={_spell_conf})") |
| new_words.append(c_word) |
| ctx.add_patch( |
| 'spelling', start_idx, end_idx, |
| c_word, confidence=_spell_conf, |
| alternatives=_get_spelling_alternatives(o_word, c_word, spell_checker), |
| ) |
| else: |
| logger.info(f"[SPELLING] Rejected: '{o_word}'→'{c_word}' (filter blocked)") |
| new_words.append(current_text[start_idx:end_idx]) |
| elif len(o_segment) == 1 and len(c_segment) > 1: |
| |
| o_word = o_segment[0] |
| if len(o_word) >= 5 and ' ' not in o_word: |
| corr_str = " ".join(c_segment) |
| |
| |
| _VALID_SINGLE_CHAR = {'و', 'ب', 'ل', 'ك', 'ف', 'أ'} |
| _parts_ok = all( |
| len(p) >= 2 or p in _VALID_SINGLE_CHAR |
| for p in c_segment |
| ) |
| |
| |
| _ATTACHED_PRONOUNS = { |
| 'هم', 'هن', 'ها', 'هما', 'كم', 'كن', 'نا', |
| 'ه', 'ك', |
| } |
| if _parts_ok and len(c_segment) == 2: |
| last_part = c_segment[-1] |
| if last_part in _ATTACHED_PRONOUNS: |
| |
| joined_no_space = ''.join(c_segment) |
| if _levenshtein(o_word, joined_no_space) <= 2: |
| _parts_ok = False |
| logger.info( |
| f"[SPELLING] Rejected split: '{o_word}'→'{corr_str}' " |
| f"(detached pronoun suffix '{last_part}')" |
| ) |
| if _parts_ok: |
| new_words.append(corr_str) |
| ctx.add_patch( |
| 'spelling', start_idx, end_idx, |
| corr_str, confidence=0.85, |
| alternatives=[corr_str, o_word], |
| ) |
| else: |
| logger.info( |
| f"[SPELLING] Rejected split: '{o_word}'→'{corr_str}' " |
| f"(dangling fragment in parts: {c_segment})" |
| ) |
| new_words.append(current_text[start_idx:end_idx]) |
| else: |
| new_words.append(current_text[start_idx:end_idx]) |
| else: |
| |
| |
| corr_joined = " ".join(c_segment) |
| ci = 0 |
| for oi in range(i1, i2): |
| o_word = orig_word_strings[oi] |
| o_start = orig_word_positions[oi][1] |
| o_end = orig_word_positions[oi][2] |
|
|
| if ci < len(c_segment): |
| c_word = c_segment[ci] |
| |
| _spell_conf2 = _is_small_spelling_change(o_word, c_word, spell_checker.vocab_manager) |
| if _spell_conf2: |
| new_words.append(c_word) |
| ctx.add_patch( |
| 'spelling', o_start, o_end, |
| c_word, confidence=_spell_conf2, |
| alternatives=_get_spelling_alternatives(o_word, c_word, spell_checker), |
| ) |
| ci += 1 |
| |
| elif len(o_word) >= 5 and ci + 1 < len(c_segment): |
| |
| split_parts = [c_segment[ci]] |
| temp_ci = ci + 1 |
| joined = c_segment[ci] |
| while temp_ci < len(c_segment) and len(joined) < len(o_word) + 2: |
| joined += c_segment[temp_ci] |
| split_parts.append(c_segment[temp_ci]) |
| temp_ci += 1 |
| |
| corr_str = " ".join(split_parts) |
| joined_no_space = "".join(split_parts) |
| dist = _levenshtein(o_word, joined_no_space) |
| |
| _VALID_SC = {'و', 'ب', 'ل', 'ك', 'ف', 'أ'} |
| _parts_ok = all( |
| len(p) >= 2 or p in _VALID_SC |
| for p in split_parts |
| ) |
| |
| _ATTACHED_PRON = { |
| 'هم', 'هن', 'ها', 'هما', 'كم', 'كن', 'نا', |
| 'ه', 'ك', |
| } |
| if _parts_ok and len(split_parts) == 2: |
| if split_parts[-1] in _ATTACHED_PRON: |
| if _levenshtein(o_word, joined_no_space) <= 2: |
| _parts_ok = False |
| logger.info( |
| f"[SPELLING] Rejected N→M split: '{o_word}'→'{corr_str}' " |
| f"(detached pronoun suffix '{split_parts[-1]}')" |
| ) |
| if dist <= 3 and len(split_parts) > 1 and _parts_ok: |
| new_words.append(corr_str) |
| ctx.add_patch( |
| 'spelling', o_start, o_end, |
| corr_str, confidence=0.85, |
| alternatives=[corr_str, o_word], |
| ) |
| ci = temp_ci |
| else: |
| if not _parts_ok: |
| logger.info( |
| f"[SPELLING] Rejected N→M split: '{o_word}'→'{corr_str}' " |
| f"(dangling fragment)" |
| ) |
| new_words.append(current_text[o_start:o_end]) |
| ci += 1 |
| else: |
| new_words.append(current_text[o_start:o_end]) |
| ci += 1 |
| else: |
| new_words.append(current_text[o_start:o_end]) |
| elif tag == 'delete': |
| for idx in range(i1, i2): |
| new_words.append(current_text[orig_word_positions[idx][1]:orig_word_positions[idx][2]]) |
| elif tag == 'insert': |
| continue |
|
|
| safe_text = " ".join(new_words) |
|
|
| |
| |
| |
| try: |
| _safe_words = safe_text.split() |
| _raw_words = raw_corrected.split() |
| if len(_safe_words) == len(_raw_words): |
| _bidi_changed = False |
| for _bi in range(len(_safe_words)): |
| if _safe_words[_bi] != _raw_words[_bi]: |
| _sw_iv = spell_checker.vocab_manager.is_iv(_safe_words[_bi]) |
| _rw_iv = spell_checker.vocab_manager.is_iv(_raw_words[_bi]) |
| |
| if not _sw_iv and _rw_iv: |
| |
| |
| |
| _BIDI_DIGITS = set('0123456789٠١٢٣٤٥٦٧٨٩') |
| if any(c in _BIDI_DIGITS for c in _safe_words[_bi]): |
| logger.info( |
| f"[SPELLING] Bidirectional blocked (digit): " |
| f"'{_safe_words[_bi]}'→'{_raw_words[_bi]}'" |
| ) |
| continue |
| |
| |
| |
| |
| _CLITIC_PFX = ('و', 'ف', 'ب', 'ل', 'ك') |
| _sw = _safe_words[_bi] |
| _rw = _raw_words[_bi] |
| if (len(_sw) > 3 and len(_rw) > 3 |
| and _sw[0] in _CLITIC_PFX and _rw[0] in _CLITIC_PFX |
| and _sw[0] != _rw[0] and _sw[1:] == _rw[1:]): |
| logger.info( |
| f"[SPELLING] Bidirectional blocked (prefix swap): " |
| f"'{_sw}'→'{_rw}'" |
| ) |
| continue |
| logger.info( |
| f"[SPELLING] Bidirectional fix: " |
| f"'{_safe_words[_bi]}'(OOV)→'{_raw_words[_bi]}'(IV)" |
| ) |
| |
| |
| |
| try: |
| _orig_words_list = text.split() |
| if _bi < len(_orig_words_list): |
| _bidi_orig_word = _orig_words_list[_bi] |
| |
| |
| if _bidi_orig_word == _safe_words[_bi]: |
| _bidi_pos = 0 |
| for _bw_idx in range(_bi): |
| _next_pos = text.find(_orig_words_list[_bw_idx], _bidi_pos) |
| if _next_pos >= 0: |
| _bidi_pos = _next_pos + len(_orig_words_list[_bw_idx]) |
| _bidi_start = text.find(_bidi_orig_word, max(0, _bidi_pos)) |
| if _bidi_start >= 0: |
| _bidi_end = _bidi_start + len(_bidi_orig_word) |
| ctx.add_patch( |
| 'spelling', _bidi_start, _bidi_end, |
| _raw_words[_bi], confidence=0.6, |
| alternatives=[_raw_words[_bi], _bidi_orig_word], |
| ) |
| except Exception: |
| pass |
| _safe_words[_bi] = _raw_words[_bi] |
| _bidi_changed = True |
| if _bidi_changed: |
| _new_safe = ' '.join(_safe_words) |
| _new_oov = spell_checker.vocab_manager.count_oov_words(_new_safe) |
| _old_oov = spell_checker.vocab_manager.count_oov_words(safe_text) |
| if _new_oov <= _old_oov: |
| safe_text = _new_safe |
| except Exception: |
| pass |
|
|
| |
| |
| try: |
| _raw_oov = spell_checker.vocab_manager.count_oov_words(raw_corrected) |
| _our_oov = spell_checker.vocab_manager.count_oov_words(safe_text) |
| if _raw_oov == 0 and _our_oov > 0: |
| logger.info( |
| f"[SPELLING] Safety net: raw=0 OOV, ours={_our_oov} OOV " |
| f"— using raw model output" |
| ) |
| safe_text = raw_corrected |
| elif _raw_oov == 0 and _our_oov == 0: |
| |
| _raw_dist = _levenshtein(current_text, raw_corrected) |
| _our_dist = _levenshtein(current_text, safe_text) |
| _rvr_dist = _levenshtein(safe_text, raw_corrected) |
| if _raw_dist < _our_dist and _rvr_dist <= 3: |
| logger.info( |
| f"[SPELLING] Safety net: raw closer to input " |
| f"(raw_dist={_raw_dist}, our_dist={_our_dist})" |
| ) |
| safe_text = raw_corrected |
| except Exception: |
| pass |
|
|
| ctx.mutate_text(safe_text, OffsetMapper) |
| current_text = ctx.current_text |
| except Exception as e: |
| logger.error(f"[ANALYZE] Spelling failed: {type(e).__name__}: {e}") |
| logger.error(traceback.format_exc()) |
| timing_ms['spelling_error'] = f"{type(e).__name__}: {str(e)[:200]}" |
|
|
| |
| |
|
|
| |
| |
| _PROTECTED_PATTERNS = [ |
| r'https?://\S+', |
| r'\S+@\S+\.\S+', |
| r'\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}', |
| r'\d{1,2}:\d{2}', |
| r'#[\u0600-\u06FF\w]+', |
| r'@[\w]+', |
| r'\+?\d{10,13}', |
| r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', |
| r'v\d+\.\d+\.\d+', |
| ] |
| _structured_placeholders = [] |
| _grammar_input_text = ctx.current_text |
| if not _is_religious_text: |
| import re as _re_struct |
| for _pat in _PROTECTED_PATTERNS: |
| for _m in _re_struct.finditer(_pat, _grammar_input_text): |
| _structured_placeholders.append((_m.start(), _m.end(), _m.group())) |
| |
| if _structured_placeholders: |
| _structured_placeholders.sort(key=lambda x: x[0], reverse=True) |
| for _sp_start, _sp_end, _sp_text in _structured_placeholders: |
| _grammar_input_text = _grammar_input_text[:_sp_start] + 'بيان' + _grammar_input_text[_sp_end:] |
| logger.info(f"[ANALYZE] Protected {len(_structured_placeholders)} structured elements") |
|
|
| |
| if not _is_religious_text: |
| try: |
| t0 = time.time() |
| logger.info(f"[ANALYZE] Step 2: Grammar correction starting...") |
| from nlp.grammar.grammar_service import get_grammar_model |
| grammar_checker = get_grammar_model() |
| corrected_grammar = grammar_checker.correct(_grammar_input_text) |
| timing_ms['grammar_ms'] = int((time.time() - t0) * 1000) |
| logger.info(f"[ANALYZE] Step 2: Grammar done in {timing_ms['grammar_ms']}ms") |
|
|
| |
| import json as _tel_json |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"grammar_raw_output","input":_grammar_input_text[:200],"output":corrected_grammar[:200]})}') |
| _tel_events.append({"event":"grammar_raw_output","input":_grammar_input_text[:200],"output":corrected_grammar[:200]}) |
|
|
| |
| if _structured_placeholders: |
| |
| for _sp_start, _sp_end, _sp_text in reversed(_structured_placeholders): |
| corrected_grammar = corrected_grammar.replace('بيان', _sp_text, 1) |
|
|
| if corrected_grammar != ctx.current_text: |
| diffs = get_word_diffs(ctx.current_text, corrected_grammar) |
| _grammar_accepted_diffs = [] |
| _grammar_total_diffs = len(diffs) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"grammar_diffs_extracted","count":_grammar_total_diffs})}') |
| _tel_events.append({"event":"grammar_diffs_extracted","count":_grammar_total_diffs}) |
| for d in diffs: |
| orig_text = d.get('original', '') |
| corr_text = d.get('correction', '') |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"grammar_diff","original":orig_text[:80],"correction":corr_text[:80],"start":d["start"],"end":d["end"]})}') |
| _tel_events.append({"event":"grammar_diff","original":orig_text[:80],"correction":corr_text[:80],"start":d["start"],"end":d["end"]}) |
| |
| |
| if ctx.stage_locker.is_locked_for(d['start'], d['end'], 'grammar'): |
| logger.info( |
| f"[LOCK] Grammar blocked on [{d['start']}:{d['end']}] " |
| f"'{d.get('original','')}' — locked by equal/higher priority stage" |
| ) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"filter_reject","filter":"StageLocker","original":orig_text[:80],"correction":corr_text[:80]})}') |
| _tel_events.append({"event":"filter_reject","filter":"StageLocker","original":orig_text[:80],"correction":corr_text[:80]}) |
| continue |
|
|
| |
| if orig_text and corr_text: |
| orig_chars = set(orig_text.replace(' ', '')) |
| corr_chars = set(corr_text.replace(' ', '')) |
| if orig_chars and corr_chars: |
| jaccard = len(orig_chars & corr_chars) / len(orig_chars | corr_chars) |
| if jaccard < 0.3: |
| logger.info( |
| f"[GRAMMAR] Rejected hallucination: '{orig_text}'→'{corr_text}' " |
| f"(jaccard={jaccard:.2f})" |
| ) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"filter_reject","filter":"Jaccard_03","original":orig_text[:80],"correction":corr_text[:80],"jaccard":round(jaccard,3)})}') |
| _tel_events.append({"event":"filter_reject","filter":"Jaccard_03","original":orig_text[:80],"correction":corr_text[:80],"jaccard":round(jaccard,3)}) |
| continue |
|
|
| |
| |
| |
| |
| if orig_text and corr_text: |
| |
| _has_latin = any('A' <= c <= 'Z' or 'a' <= c <= 'z' for c in orig_text) |
| if _has_latin and orig_text != corr_text: |
| logger.info( |
| f"[GRAMMAR] Skipping entity (contains Latin): " |
| f"'{orig_text}'→'{corr_text}'" |
| ) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"filter_reject","filter":"LatinGuard","original":orig_text[:80],"correction":corr_text[:80]})}') |
| _tel_events.append({"event":"filter_reject","filter":"LatinGuard","original":orig_text[:80],"correction":corr_text[:80]}) |
| continue |
|
|
| |
| |
| import re as _re_emoji |
| if orig_text and _re_emoji.search(r'[\U0001F300-\U0001F9FF]', orig_text): |
| logger.info( |
| f"[GRAMMAR] Skipping emoji content: '{orig_text}'" |
| ) |
| continue |
|
|
| |
| |
| |
| if orig_text and corr_text: |
| import re as _re_tnwn |
| _TANWEEN = '\u064B\u064C\u064D' |
| _orig_no_tnwn = _re_tnwn.sub(f'[{_TANWEEN}]', '', orig_text) |
| _corr_no_tnwn = _re_tnwn.sub(f'[{_TANWEEN}]', '', corr_text) |
| if _orig_no_tnwn == _corr_no_tnwn and orig_text != corr_text: |
| logger.info( |
| f"[GRAMMAR] Blocked tanween removal: " |
| f"'{orig_text}'→'{corr_text}'" |
| ) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"filter_reject","filter":"TanweenGuard","original":orig_text[:80],"correction":corr_text[:80]})}') |
| _tel_events.append({"event":"filter_reject","filter":"TanweenGuard","original":orig_text[:80],"correction":corr_text[:80]}) |
| continue |
|
|
| |
| |
| |
| |
| if orig_text and corr_text: |
| import re as _re_pstrip |
| _PUNCT_CHARS = '.,،؛;:!؟?()[]{}«»\"\'…' |
| _orig_stripped = orig_text.strip(_PUNCT_CHARS) |
| _corr_stripped = corr_text.strip(_PUNCT_CHARS) |
| if _orig_stripped == _corr_stripped and orig_text != corr_text: |
| logger.info( |
| f"[GRAMMAR] Blocked punct stripping: " |
| f"'{orig_text}'→'{corr_text}'" |
| ) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"filter_reject","filter":"PunctuationGuard","original":orig_text[:80],"correction":corr_text[:80]})}') |
| _tel_events.append({"event":"filter_reject","filter":"PunctuationGuard","original":orig_text[:80],"correction":corr_text[:80]}) |
| continue |
| |
| _TANWEEN2 = '\u064B\u064C\u064D' |
| _orig_clean = _re_pstrip.sub(f'[{_TANWEEN2}]', '', _orig_stripped) |
| _corr_clean = _re_pstrip.sub(f'[{_TANWEEN2}]', '', _corr_stripped) |
| if _orig_clean == _corr_clean and orig_text != corr_text: |
| logger.info( |
| f"[GRAMMAR] Blocked tanween+punct strip: " |
| f"'{orig_text}'→'{corr_text}'" |
| ) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"filter_reject","filter":"PunctuationGuard","original":orig_text[:80],"correction":corr_text[:80]})}') |
| _tel_events.append({"event":"filter_reject","filter":"PunctuationGuard","original":orig_text[:80],"correction":corr_text[:80]}) |
| continue |
|
|
| |
| |
| |
| |
| if orig_text and corr_text: |
| import re as _re_psp |
| |
| def _norm_punct_spacing(t): |
| |
| t = _re_psp.sub(r'\s+([.,:;!?\u060C\u061B\u061F\u0021%$)}\]>])', r'\1', t) |
| t = _re_psp.sub(r'([({\[<])\s+', r'\1', t) |
| return t |
| _orig_normed = _norm_punct_spacing(orig_text) |
| _corr_normed = _norm_punct_spacing(corr_text) |
| if _orig_normed == _corr_normed and orig_text != corr_text: |
| logger.info( |
| f"[GRAMMAR] Blocked punct spacing: " |
| f"'{orig_text}'\u2192'{corr_text}'" |
| ) |
| continue |
|
|
|
|
|
|
|
|
|
|
|
|
| |
| _is_grammar_pattern = False |
| if orig_text and corr_text: |
| _o_cl = orig_text.rstrip('.,،؛;:!؟?()[]{}«»"\'…') |
| _c_cl = corr_text.rstrip('.,،؛;:!؟?()[]{}«»"\'…') |
| |
| |
| if (_o_cl.endswith('ون') and _c_cl.endswith('ين') and _o_cl[:-2] == _c_cl[:-2]): |
| _is_grammar_pattern = True |
| elif (_o_cl.endswith('ان') and _c_cl.endswith('ين') and _o_cl[:-2] == _c_cl[:-2] and len(_o_cl) >= 4): |
| _is_grammar_pattern = True |
| |
| elif (_o_cl.endswith('ون') and _c_cl.endswith('وا') and len(_o_cl) >= 3): |
| _o_stem = _o_cl[:-2] |
| _c_stem = _c_cl[:-2] |
| if _o_stem == _c_stem or (len(_o_stem) > 1 and _o_stem[1:] == _c_stem[1:] and _o_stem[0] in 'يت' and _c_stem[0] in 'يت'): |
| _is_grammar_pattern = True |
| |
| elif (len(_o_cl) >= 3 and len(_c_cl) >= 3 and _o_cl[-2:] in ('وك', 'وه') and _c_cl[-2:] in ('اك', 'يك', 'اه', 'يه')): |
| _is_grammar_pattern = True |
| |
| elif ({_o_cl, _c_cl} <= {'هذان', 'هاتان'}): |
| _is_grammar_pattern = True |
| |
| elif (_c_cl.endswith('وا') and _c_cl[:-2] == _o_cl and len(_o_cl) >= 3): |
| _is_grammar_pattern = True |
| |
| elif (_c_cl.endswith('ن') and _c_cl[:-1] == _o_cl and len(_o_cl) >= 3): |
| _is_grammar_pattern = True |
| |
| elif (_o_cl.endswith('ون') and _c_cl.endswith('ن') and len(_o_cl) >= 3): |
| _o_stem = _o_cl[:-2] |
| _c_stem = _c_cl[:-1] |
| if _o_stem == _c_stem or (len(_o_stem) > 1 and _o_stem[1:] == _c_stem[1:] and _o_stem[0] in 'يت' and _c_stem[0] in 'يت'): |
| _is_grammar_pattern = True |
| |
| elif (_c_cl.endswith('ون') and _c_cl[:-2] == _o_cl and len(_o_cl) >= 3): |
| _is_grammar_pattern = True |
| |
| elif ((_c_cl.endswith('ان') or _c_cl.endswith('ين')) and _c_cl[:-2] == _o_cl and len(_o_cl) >= 3): |
| _is_grammar_pattern = True |
| |
| elif (_c_cl.endswith('تان') or _c_cl.endswith('تين')): |
| if _o_cl.endswith('ة') and _c_cl[:-3] == _o_cl[:-1] and len(_o_cl) >= 3: |
| _is_grammar_pattern = True |
| elif _c_cl[:-3] == _o_cl and len(_o_cl) >= 3: |
| _is_grammar_pattern = True |
| |
| elif (_c_cl.endswith('ات') and len(_c_cl) >= 4): |
| if _o_cl.endswith('ة') and _c_cl[:-2] == _o_cl[:-1]: |
| _is_grammar_pattern = True |
| elif _c_cl[:-2] == _o_cl: |
| _is_grammar_pattern = True |
| |
| elif (_c_cl.endswith('ة') and _c_cl[:-1] == _o_cl and len(_o_cl) >= 3): |
| _is_grammar_pattern = True |
| |
| elif (_c_cl.endswith('ية') and _c_cl[:-1] == _o_cl[:-1] + 'ي' and _o_cl.endswith('ي') and len(_o_cl) >= 3): |
| _is_grammar_pattern = True |
|
|
|
|
| |
| |
| |
| |
| if not _is_grammar_pattern and orig_text and corr_text: |
| _o_g2 = orig_text.rstrip('.،؛؟!?!') |
| _c_g2 = corr_text.rstrip('.،؛؟!?!') |
| if (len(_c_g2) == len(_o_g2) + 1 and _c_g2.startswith(_o_g2) |
| and _c_g2[-1] in ('ا', 'ي')): |
| logger.info( |
| f"[GRAMMAR] Blocked trailing letter addition: " |
| f"'{orig_text}'→'{corr_text}'" |
| ) |
| continue |
|
|
| |
| |
| |
| |
| if orig_text and any(c.isdigit() for c in orig_text): |
| logger.info( |
| f"[GRAMMAR] Blocked digit-containing diff: " |
| f"'{orig_text}'\u2192'{corr_text}'" |
| ) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"filter_reject","filter":"DigitGuard","original":orig_text[:80],"correction":corr_text[:80]})}') |
| _tel_events.append({"event":"filter_reject","filter":"DigitGuard","original":orig_text[:80],"correction":corr_text[:80]}) |
| continue |
|
|
| |
| |
| |
| |
| if not _is_grammar_pattern and orig_text and corr_text and len(orig_text) > 2: |
| import re as _re_jac |
| |
| _o_chars = set(_re_jac.sub(r'[\s.,،؛؟!:;?]', '', orig_text)) |
| _c_chars = set(_re_jac.sub(r'[\s.,،؛؟!:;?]', '', corr_text)) |
| if _o_chars and _c_chars: |
| _jac = len(_o_chars & _c_chars) / len(_o_chars | _c_chars) |
| if _jac < 0.5: |
| logger.info( |
| f"[GRAMMAR] Blocked low-Jaccard diff (j={_jac:.2f}): " |
| f"'{orig_text}'\u2192'{corr_text}'" |
| ) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"filter_reject","filter":"Jaccard_05","original":orig_text[:80],"correction":corr_text[:80],"jaccard":round(_jac,3)})}') |
| _tel_events.append({"event":"filter_reject","filter":"Jaccard_05","original":orig_text[:80],"correction":corr_text[:80],"jaccard":round(_jac,3)}) |
| continue |
|
|
| |
| |
| |
| if not _is_grammar_pattern and corr_text in _DIRECTIONAL_BLOCKS.get(orig_text, set()): |
| logger.info( |
| f"[GRAMMAR] Directional block: '{orig_text}'→'{corr_text}'" |
| ) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"filter_reject","filter":"DirectionalBlock","original":orig_text[:80],"correction":corr_text[:80]})}') |
| _tel_events.append({"event":"filter_reject","filter":"DirectionalBlock","original":orig_text[:80],"correction":corr_text[:80]}) |
| continue |
| |
| _gram_dir_blocked = False |
| for _gpfx in ('و', 'ف', 'ب', 'ل', 'ك'): |
| if (orig_text.startswith(_gpfx) and corr_text.startswith(_gpfx) |
| and len(orig_text) > len(_gpfx) + 1): |
| _g_orig_stem = orig_text[len(_gpfx):] |
| _g_corr_stem = corr_text[len(_gpfx):] |
| if _g_corr_stem in _DIRECTIONAL_BLOCKS.get(_g_orig_stem, set()): |
| logger.info( |
| f"[GRAMMAR] Directional block (prefixed): " |
| f"'{orig_text}'→'{corr_text}'" |
| ) |
| _gram_dir_blocked = True |
| break |
| if _gram_dir_blocked: |
| continue |
|
|
|
|
| |
| _TANWEEN_CHARS = set('ًٌٍ') |
| if any(c in _TANWEEN_CHARS for c in orig_text) and not any(c in _TANWEEN_CHARS for c in corr_text): |
| |
| for _tc in _TANWEEN_CHARS: |
| if _tc in orig_text and _tc not in corr_text: |
| corr_text = corr_text + _tc |
| break |
|
|
| |
| |
| stage_label = 'grammar' |
| if _is_spelling_only_change(orig_text, corr_text): |
| stage_label = 'spelling' |
| _grammar_accepted_diffs.append(d) |
| logger.info(f'[FILTER-TEL] {_tel_json.dumps({"event":"patch_accepted","stage":stage_label,"original":orig_text[:80],"correction":corr_text[:80],"start":d["start"],"end":d["end"]})}') |
| _tel_events.append({"event":"patch_accepted","stage":stage_label,"original":orig_text[:80],"correction":corr_text[:80],"start":d["start"],"end":d["end"]}) |
| ctx.add_patch( |
| stage_label, d['start'], d['end'], |
| corr_text, confidence=1.0 |
| ) |
|
|
| |
| |
| _OPEN_BRACKETS = set('([{') |
| _CLOSE_BRACKETS = set(')]}') |
| orig_opens = sum(1 for c in ctx.current_text if c in _OPEN_BRACKETS) |
| orig_closes = sum(1 for c in ctx.current_text if c in _CLOSE_BRACKETS) |
| corr_opens = sum(1 for c in corrected_grammar if c in _OPEN_BRACKETS) |
| corr_closes = sum(1 for c in corrected_grammar if c in _CLOSE_BRACKETS) |
| orig_balanced = (orig_opens == orig_closes) |
| corr_balanced = (corr_opens == corr_closes) |
| if orig_balanced and not corr_balanced: |
| logger.info( |
| f"[GRAMMAR] Rejected bracket-unbalanced output: " |
| f"orig=({orig_opens},{orig_closes}), corr=({corr_opens},{corr_closes})" |
| ) |
| |
| elif _grammar_accepted_diffs: |
| |
| |
| _safe_grammar = ctx.current_text |
| |
| for _ad in sorted(_grammar_accepted_diffs, key=lambda x: x['start'], reverse=True): |
| _safe_grammar = (_safe_grammar[:_ad['start']] + |
| _ad['correction'] + |
| _safe_grammar[_ad['end']:]) |
| ctx.mutate_text(_safe_grammar, OffsetMapper) |
| current_text = ctx.current_text |
| except Exception as e: |
| logger.error(f"[ANALYZE] Grammar failed: {type(e).__name__}: {e}") |
| logger.error(traceback.format_exc()) |
| timing_ms['grammar_error'] = f"{type(e).__name__}: {str(e)[:200]}" |
|
|
| |
| |
| if not _is_religious_text: |
| try: |
| t0 = time.time() |
| logger.info(f"[ANALYZE] Step 3: Punctuation starting...") |
| from nlp.punctuation.punctuation_service import get_punctuation_model |
| punc_checker = get_punctuation_model() |
| corrected_punc = punc_checker.correct(ctx.current_text) |
| timing_ms['punctuation_ms'] = int((time.time() - t0) * 1000) |
| logger.info(f"[ANALYZE] Step 3: Punctuation done in {timing_ms['punctuation_ms']}ms") |
| if corrected_punc != ctx.current_text: |
| diffs = get_word_diffs(ctx.current_text, corrected_punc) |
| for d in diffs: |
| |
| |
| |
| lock_info = ctx.stage_locker.is_locked_by_for(d['start'], d['end'], 'punctuation') |
| if lock_info: |
| import re as _re |
| orig_alpha = _re.sub(r'[^\u0600-\u06FFa-zA-Z]', '', d.get('original', '')) |
| corr_alpha = _re.sub(r'[^\u0600-\u06FFa-zA-Z]', '', d.get('correction', '')) |
| ls, le, owner = lock_info |
| if orig_alpha != corr_alpha: |
| |
| logger.info( |
| f"[LOCK] Punctuation blocked on [{d['start']}:{d['end']}] " |
| f"'{d.get('original','')}' \u2014 locked by {owner}[{ls}:{le}]" |
| ) |
| continue |
| |
| logger.info( |
| f"[LOCK] Punctuation ALLOWED through lock [{d['start']}:{d['end']}] " |
| f"'{d.get('original','')}' \u2192 '{d.get('correction','')}' " |
| f"(locked by {owner}[{ls}:{le}])" |
| ) |
| |
| if not validate_punctuation_diff(d, full_text=ctx.current_text): |
| logger.info( |
| f"[PUNC-SAFETY] Rejected diff [{d['start']}:{d['end']}] " |
| f"'{d.get('original','')}' → '{d.get('correction','')}' — not a safe punctuation change" |
| ) |
| continue |
| ctx.add_patch( |
| 'punctuation', d['start'], d['end'], |
| d['correction'], confidence=0.8 |
| ) |
|
|
| |
| MAX_PUNC_PATCHES_PER_RESPONSE = 3 |
| punc_patches = [p for p in ctx.patches.patches if p.stage == 'punctuation'] |
| if len(punc_patches) > MAX_PUNC_PATCHES_PER_RESPONSE: |
| |
| punc_patches_sorted = sorted(punc_patches, key=lambda p: p.start_original) |
| to_remove = set(id(p) for p in punc_patches_sorted[MAX_PUNC_PATCHES_PER_RESPONSE:]) |
| |
| for _capped_p in punc_patches_sorted[MAX_PUNC_PATCHES_PER_RESPONSE:]: |
| ctx.stage_locker.unlock(_capped_p.start_original, _capped_p.end_original) |
| ctx.patches.patches = [p for p in ctx.patches.patches if id(p) not in to_remove] |
| logger.info( |
| f"[PUNC-CAP] Capped punctuation patches: " |
| f"{len(punc_patches)} → {MAX_PUNC_PATCHES_PER_RESPONSE}" |
| ) |
|
|
| |
| _safe_punc = ctx.current_text |
| _punc_accepted = [d for d in diffs if validate_punctuation_diff(d, full_text=ctx.current_text)] |
| for _pd in sorted(_punc_accepted, key=lambda x: x['start'], reverse=True): |
| _safe_punc = (_safe_punc[:_pd['start']] + |
| _pd['correction'] + |
| _safe_punc[_pd['end']:]) |
| ctx.mutate_text(_safe_punc, OffsetMapper) |
| current_text = ctx.current_text |
|
|
| |
| |
| |
| |
| |
| |
| import re as _re_punc |
| _TERMINAL_PUNCT = set('.،؛؟!?!') |
| _current_stripped = ctx.current_text.rstrip() |
| _has_terminal = _current_stripped and _current_stripped[-1] in _TERMINAL_PUNCT |
| _word_count_fb = len(_re_punc.findall(r'[\u0600-\u06FFa-zA-Z]+', ctx.current_text)) |
| if not _has_terminal and _word_count_fb >= 4: |
| |
| _last_word_match = _re_punc.search(r'([\u0600-\u06FF]+)\s*$', _current_stripped) |
| if _last_word_match: |
| _lw_start = _last_word_match.start(1) |
| _lw_end = _last_word_match.end(1) |
| _lw_text = _last_word_match.group(1) |
| |
| _already_patched = any( |
| p.stage == 'punctuation' |
| and p.start_current == _lw_start |
| for p in ctx.patches.patches |
| ) |
| if not _already_patched: |
| ctx.add_patch( |
| 'punctuation', _lw_start, _lw_end, |
| _lw_text + '.', confidence=0.7 |
| ) |
| logger.info( |
| f"[PUNC-FALLBACK] Injected terminal period: " |
| f"'{_lw_text}' → '{_lw_text}.' at [{_lw_start}:{_lw_end}]" |
| ) |
| except Exception as e: |
| logger.error(f"[ANALYZE] Punctuation failed: {type(e).__name__}: {e}") |
| logger.error(traceback.format_exc()) |
| timing_ms['punctuation_error'] = f"{type(e).__name__}: {str(e)[:200]}" |
|
|
| total_time = time.time() - total_start |
| timing_ms['total_ms'] = int(total_time * 1000) |
|
|
| |
| |
| |
| |
| |
| |
| suggestions = ctx.patches.to_list() |
|
|
| |
| |
| |
| |
| def _apply_patches_to_original(original_text, suggestion_dicts): |
| """Apply patches in reverse offset order to produce corrected text.""" |
| result = original_text |
| |
| for s in sorted(suggestion_dicts, key=lambda x: -x['start']): |
| result = result[:s['start']] + s['correction'] + result[s['end']:] |
| return result |
|
|
| corrected = _apply_patches_to_original(text, suggestions) |
|
|
| logger.info(f"[ANALYZE] Total: {timing_ms['total_ms']}ms | " |
| f"Spelling: {timing_ms['spelling_ms']}ms | " |
| f"Grammar: {timing_ms['grammar_ms']}ms | " |
| f"Punctuation: {timing_ms['punctuation_ms']}ms | " |
| f"Suggestions: {len(suggestions)}") |
|
|
| |
| stage_errors = {k: v for k, v in timing_ms.items() if k.endswith('_error')} |
| response_status = 'partial' if stage_errors else 'success' |
|
|
| response_data = { |
| 'original': text, |
| 'corrected': corrected, |
| 'suggestions': suggestions, |
| 'timing_ms': timing_ms, |
| 'status': response_status, |
| 'telemetry': _tel_events, |
| } |
| if stage_errors: |
| response_data['warnings'] = stage_errors |
|
|
| return jsonify(response_data) |
|
|
| except Exception as e: |
| logger.error(f"Error during analysis: {str(e)}") |
| logger.error(traceback.format_exc()) |
| return jsonify({ |
| 'error': 'An error occurred during text analysis.', |
| 'status': 'error', |
| 'details': str(e) if app.debug else None |
| }), 500 |
|
|
|
|
| @app.errorhandler(404) |
| def not_found(error): |
| """Handle 404 errors.""" |
| return jsonify({ |
| 'error': 'Endpoint not found', |
| 'status': 'error' |
| }), 404 |
|
|
|
|
| @app.errorhandler(500) |
| def internal_error(error): |
| """Handle 500 errors.""" |
| logger.error(f"Internal server error: {str(error)}") |
| return jsonify({ |
| 'error': 'Internal server error', |
| 'status': 'error' |
| }), 500 |
|
|
|
|
| |
| |
| |
| _models_loaded = False |
|
|
| def _ensure_models_loaded(): |
| """Load ALL models at startup — no lazy loading. |
| |
| Each model is wrapped in its own try/except so a single failure |
| doesn't prevent the server from starting. Models that fail to load |
| will gracefully degrade at request time. |
| """ |
| global _models_loaded |
| if _models_loaded: |
| return |
| _models_loaded = True |
|
|
| total_t0 = time.time() |
| logger.info("=" * 60) |
| logger.info("BAYAN — Loading ALL models at startup (eager mode)...") |
| logger.info("=" * 60) |
|
|
| |
| if not load_models(): |
| logger.error("Failed to load summarization model.") |
|
|
| |
| try: |
| t0 = time.time() |
| from nlp.spelling.araspell_service import get_spelling_model |
| get_spelling_model() |
| logger.info(f"✓ Spelling model loaded in {time.time()-t0:.1f}s") |
| except Exception as e: |
| logger.error(f"✗ Spelling model failed to load: {e}") |
|
|
| |
| try: |
| t0 = time.time() |
| from nlp.grammar.grammar_service import get_grammar_model |
| get_grammar_model() |
| logger.info(f"✓ Grammar model loaded in {time.time()-t0:.1f}s") |
| except Exception as e: |
| logger.error(f"✗ Grammar model failed to load: {e}") |
|
|
| |
| try: |
| t0 = time.time() |
| from nlp.punctuation.punctuation_service import get_punctuation_model |
| get_punctuation_model() |
| logger.info(f"✓ Punctuation model loaded in {time.time()-t0:.1f}s") |
| except Exception as e: |
| logger.error(f"✗ Punctuation model failed to load: {e}") |
|
|
| |
| try: |
| t0 = time.time() |
| from nlp.autocomplete.autocomplete_service import get_autocomplete_model |
| get_autocomplete_model() |
| logger.info(f"✓ Autocomplete model loaded in {time.time()-t0:.1f}s") |
| except Exception as e: |
| logger.error(f"✗ Autocomplete model failed to load: {e}") |
|
|
| |
| try: |
| t0 = time.time() |
| from nlp.dialect.dialect_service import get_dialect_model |
| get_dialect_model() |
| logger.info(f"✓ Dialect model loaded in {time.time()-t0:.1f}s") |
| except Exception as e: |
| logger.error(f"✗ Dialect model failed to load: {e}") |
|
|
| total_elapsed = time.time() - total_t0 |
| logger.info("=" * 60) |
| logger.info(f"BAYAN — All models loaded in {total_elapsed:.1f}s") |
| logger.info("=" * 60) |
|
|
| |
| _ensure_models_loaded() |
|
|
|
|
| if __name__ == '__main__': |
| |
|
|
| |
| port = int(os.environ.get('PORT', 5000)) |
| debug = os.environ.get('DEBUG', 'False').lower() == 'true' |
| |
| logger.info(f"Starting server on port {port} (debug={debug})") |
| app.run(host='0.0.0.0', port=port, debug=debug) |
|
|