OmniFile-Processor / modules /nlp /feedback.py
Dr. Abdulmalek
deploy: OmniFile AI Processor v4.3.0
900df0b
"""
نظام تسجيل تصحيحات المراجعة (Feedback System)
=================================================
مُرحَّل من src/correction.py إلى modules/nlp/feedback.py
كجزء من خطة ترحيل src/ → modules/ (v4.2.0).
يحتوي:
- append_feedback(): تسجيل تصحيح في ملف CSV
- مُصدَّر من src.correction للحفاظ على التوافق العكسي
"""
import json
import os
import logging
import traceback
import pandas as pd
from datetime import datetime
from collections import Counter, defaultdict
from dataclasses import dataclass, field
logger = logging.getLogger("modules.nlp.feedback")
# ===================== قوائم الكلمات المحمية =====================
TECHNICAL_KEYWORDS = {
# مصطلحات برمجية عامة
"python", "pythonistas", "scraping", "parsing", "ocr",
"batch", "programming", "script", "database", "configure",
"setup", "env", "immutable", "concatenation", "tuples",
"dictionaries", "debugging", "programmatically", "spreadsheet",
"integers", "float", "boolean", "syntax", "web",
"etl", "dataframe", "json", "csv", "yaml", "markdown",
"mermaid", "repository", "clone", "commit", "push",
# اختصارات تقنية
"repl", "dpi", "api", "gpu", "cpu", "ram", "rom",
"lora", "huggingface", "transformers", "pytorch", "tensorboard",
# كلمات من ملاحظات المستخدم
"printouts", "involve", "scattered", "skyrocketed", "stacked",
"affectionately", "serpentine", "cryptic", "sophisticated",
"intricate", "throwaway", "surreal", "conventions",
"trade", "off", "boot", "camps",
# مفاهيم تقنية
"comprehensions", "replication", "precedence", "modulo",
"exponent", "traceback", "overriding",
}
PYTHON_KEYWORDS = {
"False", "None", "True", "and", "as", "assert", "async", "await",
"break", "class", "continue", "def", "del", "elif", "else", "except",
"finally", "for", "from", "global", "if", "import", "in", "is",
"lambda", "nonlocal", "not", "or", "pass", "raise", "return",
"try", "while", "with", "yield",
# دوال مدمجة
"print", "input", "len", "range", "type", "int", "str", "float",
"list", "dict", "set", "tuple", "bool", "open", "file", "super",
"self", "cls", "init", "repr", "main", "name", "args", "kwargs",
"append", "extend", "pop", "sort", "join", "split", "strip",
"format", "replace", "lower", "upper", "title", "capitalize",
"enumerate", "zip", "map", "filter", "sorted", "reversed",
"isinstance", "issubclass", "hasattr", "getattr", "setattr",
"import", "from", "as", "module", "package",
}
_CUSTOM_VOCAB = set()
_PROTECTED_WORDS_LOWER = set()
def _rebuild_protected_set():
"""إعادة بناء مجموعة الكلمات المحمية."""
global _PROTECTED_WORDS_LOWER
_PROTECTED_WORDS_LOWER = (
{k.lower() for k in TECHNICAL_KEYWORDS}
| {k.lower() for k in PYTHON_KEYWORDS}
| {k.lower() for k in _CUSTOM_VOCAB}
)
logger.debug(f"أُعيد بناء القائمة المحمية: {len(_PROTECTED_WORDS_LOWER)} كلمة")
def _is_protected_word(word: str) -> bool:
"""التحقق مما إذا كانت الكلمة محمية."""
result = word.lower() in _PROTECTED_WORDS_LOWER
if result:
logger.debug(f" كلمة محمية: '{word}' — يتجاوز التصحيح")
return result
def load_custom_vocabulary(vocab_list: list[str]) -> None:
"""تحميل مصطلحات إضافية لحمايتها من التصحيح."""
global _CUSTOM_VOCAB
logger.info(f"تحميل {len(vocab_list)} مصطلح إضافي في القائمة المحمية")
new_words = [w.strip() for w in vocab_list if w.strip()]
_CUSTOM_VOCAB.update(new_words)
_rebuild_protected_set()
logger.info(f"المجموع المحمي الآن: {len(_PROTECTED_WORDS_LOWER)} كلمة")
def get_protected_words_count() -> dict:
"""إرجاع عدد الكلمات المحمية لكل فئة."""
return {
"technical_keywords": len(TECHNICAL_KEYWORDS),
"python_keywords": len(PYTHON_KEYWORDS),
"custom_vocabulary": len(_CUSTOM_VOCAB),
"total_protected": len(_PROTECTED_WORDS_LOWER),
}
# ===================== قواعد التصحيح المتقدمة =====================
@dataclass
class CorrectionRule:
"""قاعدة تصحيح ببيانات وصفية كاملة لتتبع الاستخدام والمراجعة."""
original: str
correction: str
votes: int = 1
first_seen: str = field(default_factory=lambda: datetime.now().isoformat())
last_used: str = None
usage_count: int = 0
last_reviewed: str = None
reviewer: str = None
confidence: float = 1.0
contexts: list = field(default_factory=list)
flagged: bool = False
notes: str = ""
def to_dict(self) -> dict:
return {
"original": self.original, "correction": self.correction,
"votes": self.votes, "first_seen": self.first_seen,
"last_used": self.last_used, "usage_count": self.usage_count,
"last_reviewed": self.last_reviewed, "reviewer": self.reviewer,
"confidence": self.confidence, "contexts": self.contexts,
"flagged": self.flagged, "notes": self.notes,
}
@classmethod
def from_dict(cls, data: dict, key: str = "") -> "CorrectionRule":
if isinstance(data, str):
return cls(original=key, correction=data)
return cls(
original=data.get("original", key), correction=data.get("correction", data.get(key, "")),
votes=data.get("votes", 1), first_seen=data.get("first_seen", datetime.now().isoformat()),
last_used=data.get("last_used"), usage_count=data.get("usage_count", 0),
last_reviewed=data.get("last_reviewed"), reviewer=data.get("reviewer"),
confidence=data.get("confidence", 1.0), contexts=data.get("contexts", []),
flagged=data.get("flagged", False), notes=data.get("notes", ""),
)
# ===================== تسجيل التصحيحات (Feedback) =====================
def append_feedback(
feedback_csv: str,
image_id: int,
original: str,
corrected: str,
status: str = "verified",
) -> None:
"""تسجيل تصحيح في ملف CSV مع تسجيل."""
os.makedirs(os.path.dirname(feedback_csv), exist_ok=True)
ts = datetime.now().isoformat()
record = {
"timestamp": ts,
"image_id": image_id,
"original_text": original,
"corrected_text": corrected,
"status": status,
}
file_exists = os.path.exists(feedback_csv)
pd.DataFrame([record]).to_csv(
feedback_csv, mode="a",
header=not file_exists,
index=False, encoding="utf-8-sig",
)
logger.debug(f"append_feedback: image_id={image_id}, '{original[:30]}' => '{corrected[:30]}', status={status}")
# ===================== بناء وتحميل قاموس التصحيح =====================
def build_correction_dict(
feedback_csv: str,
correction_dict_path: str,
min_votes: int = 1,
) -> dict:
"""بناء قاموس تصحيح من تصحيحات المستخدم مع تسجيل مفصّل."""
logger.info(f"بناء قاموس التصحيح: csv={feedback_csv}, dict={correction_dict_path}, min_votes={min_votes}")
if not os.path.exists(feedback_csv):
logger.info(" ملف feedback غير موجود — قاموس فارغ")
return {}
try:
df_fb = pd.read_csv(feedback_csv, encoding="utf-8-sig")
if df_fb.empty:
logger.info(" ملف feedback فارغ — قاموس فارغ")
return {}
buckets = defaultdict(Counter)
for _, row in df_fb.iterrows():
orig = str(row.get("original_text", "")).strip()
corr = str(row.get("corrected_text", "")).strip()
if orig and corr and orig != corr:
buckets[orig][corr] += 1
result = {
orig: cnt.most_common(1)[0][0]
for orig, cnt in buckets.items()
if cnt.most_common(1)[0][1] >= min_votes
}
os.makedirs(os.path.dirname(correction_dict_path), exist_ok=True)
with open(correction_dict_path, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info(f"تم تحديث قاموس التصحيح: {len(result)} كلمة من {len(df_fb)} سجل")
return result
except Exception as e:
logger.error(f"بناء قاموس التصحيح فشل: {e}", exc_info=True)
return {}
def build_correction_dict_v2(feedback_csv: str, correction_dict_path: str, min_votes: int = 1) -> dict:
"""بناء قاموس تصحيح متقدم مع CorrectionRule ببيانات وصفية."""
logger.info(f"build_correction_dict_v2: csv={feedback_csv}, min_votes={min_votes}")
if not os.path.exists(feedback_csv):
return {}
try:
df_fb = pd.read_csv(feedback_csv, encoding="utf-8-sig")
if df_fb.empty:
return {}
buckets = defaultdict(list)
for _, row in df_fb.iterrows():
orig = str(row.get("original_text", "")).strip()
corr = str(row.get("corrected_text", "")).strip()
if orig and corr and orig != corr:
buckets[orig].append({
"correction": corr,
"timestamp": str(row.get("timestamp", "")),
"image_id": row.get("image_id"),
"status": row.get("status"),
})
result = {}
for orig, entries in buckets.items():
counts = Counter(e["correction"] for e in entries)
best_corr, best_count = counts.most_common(1)[0]
if best_count >= min_votes:
rule = CorrectionRule(
original=orig, correction=best_corr,
votes=best_count,
first_seen=min(e["timestamp"] for e in entries if e["timestamp"]) or datetime.now().isoformat(),
contexts=[e["image_id"] for e in entries if e.get("image_id")],
)
result[orig] = rule
os.makedirs(os.path.dirname(correction_dict_path), exist_ok=True)
with open(correction_dict_path, "w", encoding="utf-8") as f:
json.dump({k: v.to_dict() for k, v in result.items()}, f, ensure_ascii=False, indent=2)
logger.info(f"build_correction_dict_v2: {len(result)} قاعدة من {len(df_fb)} سجل")
return result
except Exception as e:
logger.error(f"build_correction_dict_v2 فشل: {e}", exc_info=True)
return {}
def load_correction_dict(correction_dict_path: str) -> dict:
"""تحميل قاموس التصحيح من الملف مع تسجيل."""
if not os.path.exists(correction_dict_path):
logger.debug(f"load_correction_dict: الملف غير موجود: {correction_dict_path}")
return {}
try:
with open(correction_dict_path, "r", encoding="utf-8") as f:
result = json.load(f)
logger.info(f"تم تحميل قاموس التصحيح: {len(result)} كلمة من {correction_dict_path}")
return result
except Exception as e:
logger.error(f"تحميل قاموس التصحيح فشل: {e}", exc_info=True)
return {}
def apply_correction_dict(text: str, correction_dict: dict) -> str:
"""تطبيق قاموس التصحيح على نص مع تسجيل التعديلات."""
if not correction_dict or not text:
return text
words = text.split()
corrected = [correction_dict.get(w, w) for w in words]
changes = [(w, corrected[i]) for i, w in enumerate(words) if w != corrected[i]]
if changes:
logger.debug(f"apply_correction_dict: {len(changes)} تعديل من القاموس: {changes[:5]}")
return " ".join(corrected)
def track_correction_usage(correction_dict_path: str, word: str) -> None:
"""تحديث عداد الاستخدام لقاعدة تصحيح عند تطبيقها."""
if not word or not os.path.exists(correction_dict_path):
return
try:
with open(correction_dict_path, "r", encoding="utf-8") as f:
data = json.load(f)
if word in data:
entry = data[word]
entry["usage_count"] = entry.get("usage_count", 0) + 1
entry["last_used"] = datetime.now().isoformat()
with open(correction_dict_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception:
pass
def calculate_rule_indicator(rule: CorrectionRule, thresholds: dict = None) -> dict:
"""حساب مؤشر بصري لقاعدة تصحيح: 🟢 موثوق / 🟡 مراجعة / 🔴 عاجل / ⏳ جديد."""
if thresholds is None:
thresholds = {
"conf_low": 0.60, "conf_mid": 0.80,
"usage_high": 50, "usage_mid": 20,
"days_critical": 30, "days_warning": 14, "new_days_warning": 3,
}
score = 0
if rule.confidence < thresholds.get("conf_low", 0.60):
score += 3
elif rule.confidence < thresholds.get("conf_mid", 0.80):
score += 1
if rule.flagged:
score += 2
days_review = 999
if rule.last_reviewed:
try:
days_review = (datetime.now() - datetime.fromisoformat(rule.last_reviewed)).days
except Exception:
pass
if rule.usage_count > thresholds.get("usage_high", 50) and days_review > thresholds.get("days_critical", 30):
score += 2
days_seen = 999
try:
days_seen = (datetime.now() - datetime.fromisoformat(rule.first_seen)).days
except Exception:
pass
if score >= 5:
visual = "🔴 عاجل"
elif score >= 3:
visual = "🟡 مراجعة مقترحة"
elif score == 0 and days_seen <= thresholds.get("new_days_warning", 3):
visual = "⏳ جديد"
else:
visual = "🟢 موثوق"
return {
"visual": visual, "score": score,
"confidence": rule.confidence, "usage_count": rule.usage_count,
"days_since_review": days_review, "days_since_seen": days_seen,
"votes": rule.votes, "flagged": rule.flagged,
}
def get_dictionary_audit_queue(correction_dict_path: str, priority: str = "all", limit: int = 20) -> list:
"""جلب قائمة انتظار مراجعة القاموس."""
if not os.path.exists(correction_dict_path):
return []
try:
with open(correction_dict_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not data:
return []
rules = []
for k, v in data.items():
rule = CorrectionRule.from_dict(v, k)
indicator = calculate_rule_indicator(rule)
rules.append({"key": k, "rule": rule, "indicator": indicator})
if priority == "flagged":
rules = [r for r in rules if r["rule"].flagged]
elif priority == "new":
rules = sorted(rules, key=lambda r: r["indicator"]["days_since_seen"], reverse=True)
elif priority == "low_conf":
rules = sorted(rules, key=lambda r: r["rule"].confidence)
else:
rules = sorted(rules, key=lambda r: r["indicator"]["score"], reverse=True)
return rules[:limit]
except Exception as e:
logger.error(f"get_dictionary_audit_queue فشل: {e}", exc_info=True)
return []
def archive_correction_rule(correction_dict_path: str, key: str, reason: str = "") -> bool:
"""أرشفة قاعدة تصحيح بدلاً من حذفها."""
if not os.path.exists(correction_dict_path):
return False
try:
with open(correction_dict_path, "r", encoding="utf-8") as f:
data = json.load(f)
if key not in data:
return False
rule_data = data.pop(key)
rule_data["archived_reason"] = reason
rule_data["archived_at"] = datetime.now().isoformat()
archive_path = correction_dict_path.replace(".json", "_archived.json")
archive = {}
if os.path.exists(archive_path):
with open(archive_path, "r", encoding="utf-8") as f:
archive = json.load(f)
archive[key] = rule_data
with open(archive_path, "w", encoding="utf-8") as f:
json.dump(archive, f, ensure_ascii=False, indent=2)
with open(correction_dict_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"archive_correction_rule: '{key}' reason='{reason}'")
return True
except Exception as e:
logger.error(f"archive_correction_rule فشل: {e}", exc_info=True)
return False
def auto_calibrate_dict_thresholds(correction_dict_path: str, method: str = "percentile") -> dict:
"""معايرة تلقائية لعتبات مؤشرات القاموس."""
if not os.path.exists(correction_dict_path):
return {}
try:
with open(correction_dict_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not data:
return {}
confs = [v.get("confidence", 1.0) for v in data.values()]
usages = [v.get("usage_count", 0) for v in data.values()]
if not confs:
return {}
if method == "std_dev":
import numpy as np
c_low = max(0.0, float(np.mean(confs) - np.std(confs)))
c_mid = float(np.mean(confs))
u_mid = float(np.median(usages))
u_high = float(np.percentile(usages, 90)) if usages else 50
else:
import numpy as np
c_low, c_mid = np.percentile(confs, [25, 50])
u_mid, u_high = np.percentile(usages, [75, 90]) if usages else (20, 50)
thresholds = {
"conf_low": round(c_low, 3), "conf_mid": round(c_mid, 3),
"usage_high": int(u_high), "usage_mid": int(u_mid),
"calibrate_method": method,
}
logger.info(f"auto_calibrate: {thresholds}")
return thresholds
except Exception as e:
logger.error(f"auto_calibrate_dict_thresholds فشل: {e}", exc_info=True)
return {}