OmniFile-Processor / modules /security /sensitive_data_scanner.py
Dr. Abdulmalek
deploy: OmniFile AI Processor v4.3.0
900df0b
"""
فاحص البيانات الحساسة (Sensitive Data Scanner)
==================================================
فحص النصوص للكشف عن بيانات حساسة مثل:
- أرقام بطاقات الائتمانية
- أرقام الهواتف
- عناوين البريد الإلكتروني
- أرقام الهويات الوطنية
- كلمات المرور والأسرار
- عناوين IP
يدعم:
- مكتبة presidio (عند توفرها)
- أنماط Regex احتياطية (بدون مكتبات خارجية)
"""
import logging
import re
from typing import Optional
logger = logging.getLogger(__name__)
class SensitiveDataScanner:
"""
فاحص البيانات الحساسة — يكشف عن الأنماط الخطرة في النصوص.
يعمل بطريقتين:
1. باستخدام presidio (أكثر دقة) إذا كان متاحاً
2. باستخدام Regex احتياطي (يعمل دائماً)
"""
# أنماط Regex الاحتياطية
FALLBACK_PATTERNS: list[dict] = [
{
"name": "CREDIT_CARD",
"label": "بطاقة ائتمانية",
"regex": r"\b(?:\d[ -]*?){13,16}\b",
"risk": "high",
},
{
"name": "EMAIL_ADDRESS",
"label": "بريد إلكتروني",
"regex": r"\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b",
"risk": "medium",
},
{
"name": "PHONE_NUMBER",
"label": "رقم هاتف",
"regex": r"\b(?:\+?\d{1,3}[-.\s]?)?\(?\d{2,4}\)?[-.\s]?\d{3,4}[-.\s]?\d{3,4}\b",
"risk": "medium",
},
{
"name": "IP_ADDRESS",
"label": "عنوان IP",
"regex": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
"risk": "low",
},
{
"name": "SSN",
"label": "رقم ضمان اجتماعي",
"regex": r"\b\d{3}-\d{2}-\d{4}\b",
"risk": "high",
},
{
"name": "API_KEY",
"label": "مفتاح API",
"regex": r"(?i)(?:api[_-]?key|token|secret|password)\s*[:=]\s*['\"]?[a-zA-Z0-9_\-]{20,}['\"]?",
"risk": "high",
},
{
"name": "JWT_TOKEN",
"label": "رمز JWT",
"regex": r"eyJ[a-zA-Z0-9_-]*\.eyJ[a-zA-Z0-9_-]*\.[a-zA-Z0-9_-]*",
"risk": "high",
},
{
"name": "AWS_KEY",
"label": "مفتاح AWS",
"regex": r"(?:AKIA|ASIA)[A-Z0-9]{16}",
"risk": "high",
},
{
"name": "PRIVATE_KEY",
"label": "مفتاح خاص",
"regex": r"-----BEGIN(?: RSA | EC | DSA | OPENSSH )?PRIVATE KEY-----",
"risk": "critical",
},
{
"name": "IBAN",
"label": "رقم IBAN",
"regex": r"\b[A-Z]{2}\d{2}[A-Z0-9]{4,}",
"risk": "high",
},
]
def __init__(self, use_presidio: bool = True) -> None:
"""
تهيئة الفاحص.
المعاملات:
use_presidio: محاولة استخدام presidio إذا كان متاحاً
"""
self.use_presidio = use_presidio
self._presidio_available = False
self._analyzer = None
self._anonymizer = None
# أنماط مخصصة إضافية
self._custom_patterns: list[dict] = []
if use_presidio:
self._try_load_presidio()
def _try_load_presidio(self) -> None:
"""محاولة تحميل مكتبة presidio."""
try:
from presidio_analyzer import AnalyzerEngine # type: ignore
from presidio_anonymizer import AnonymizerEngine # type: ignore
self._analyzer = AnalyzerEngine()
self._anonymizer = AnonymizerEngine()
self._presidio_available = True
logger.info("تم تحميل presidio بنجاح")
except ImportError:
logger.info(
"presidio غير مثبت. سيتم استخدام Regex فقط. "
"pip install presidio-analyzer presidio-anonymizer"
)
except Exception as e:
logger.warning("فشل تحميل presidio: %s", e)
def add_custom_pattern(
self,
name: str,
label: str,
regex: str,
risk: str = "medium",
) -> None:
"""
إضافة نمط مخصص للكشف.
المعاملات:
name: اسم النمط
label: وصف بالعربية
regex: نمط Regex
risk: مستوى الخطورة (low/medium/high/critical)
"""
self._custom_patterns.append({
"name": name,
"label": label,
"regex": regex,
"risk": risk,
})
# إضافة لـ presidio أيضاً
if self._presidio_available:
try:
from presidio_analyzer import Pattern, PatternRecognizer
pattern = Pattern(
name=name,
regex=regex,
score=0.7 if risk == "medium" else 0.9,
)
recognizer = PatternRecognizer(
supported_entity=name,
patterns=[pattern],
)
self._analyzer.registry.add_recognizer(recognizer)
logger.info("تم إضافة النمط المخصص '%s' لـ presidio", name)
except Exception as e:
logger.warning("فشل إضافة النمط لـ presidio: %s", e)
def scan_text(
self,
text: str,
language: str = "en",
) -> dict:
"""
فحص نص للكشف عن بيانات حساسة.
المعاملات:
text: النص المراد فحصه
language: لغة النص ('en' أو 'ar')
العائد:
قاموس يحتوي:
- sensitive_data_found: هل وُجدت بيانات حساسة؟
- entities: قائمة الكيانات المكتشفة
- risk_level: مستوى الخطورة العام
- total_entities: عدد الكيانات
- scanner_type: نوع الفاحص المستخدم
"""
if not text or not text.strip():
return {
"sensitive_data_found": False,
"entities": [],
"risk_level": "none",
"total_entities": 0,
"scanner_type": "none",
}
entities = []
# استخدام presidio إذا كان متاحاً
if self._presidio_available and self._analyzer:
try:
results = self._analyzer.analyze(
text=text,
language=language,
)
for entity in results:
entities.append({
"type": entity.entity_type,
"text": text[entity.start: entity.end],
"start": entity.start,
"end": entity.end,
"confidence": entity.score,
"scanner": "presidio",
})
except Exception as e:
logger.warning("فشل فحص presidio: %s", e)
# دائماً: استخدام Regex كطبقة إضافية
all_patterns = self.FALLBACK_PATTERNS + self._custom_patterns
for pattern_info in all_patterns:
try:
matches = re.finditer(pattern_info["regex"], text)
for match in matches:
matched_text = match.group()
# تجنب التكرار مع presidio
already_found = any(
e["text"] == matched_text and e["start"] == match.start()
for e in entities
)
if not already_found:
entities.append({
"type": pattern_info["name"],
"label": pattern_info["label"],
"text": matched_text,
"start": match.start(),
"end": match.end(),
"risk": pattern_info["risk"],
"scanner": "regex",
})
except re.error:
logger.warning("نمط Regex غير صالح: %s", pattern_info["name"])
# حساب مستوى الخطورة
risk_level = self._calculate_risk_level(entities)
return {
"sensitive_data_found": len(entities) > 0,
"entities": entities,
"risk_level": risk_level,
"total_entities": len(entities),
"scanner_type": "presidio" if self._presidio_available else "regex",
}
def anonymize_text(
self,
text: str,
language: str = "en",
mask_char: str = "[REDACTED]",
) -> str:
"""
إزالة البيانات الحساسة من النص.
المعاملات:
text: النص المراد معالجته
language: لغة النص
mask_char: النص البديل للبيانات الحساسة
العائد:
النص بعد إزالة البيانات الحساسة
"""
if not text:
return text
# استخدام presidio إذا كان متاحاً
if self._presidio_available and self._anonymizer and self._analyzer:
try:
results = self._analyzer.analyze(text=text, language=language)
anonymized = self._anonymizer.anonymize(
text=text,
analyzer_results=results,
)
return anonymized.text
except Exception as e:
logger.warning("فشل إخفاء presidio: %s", e)
# استخدام Regex كاحتياطي
anonymized = text
all_patterns = self.FALLBACK_PATTERNS + self._custom_patterns
for pattern_info in all_patterns:
try:
anonymized = re.sub(pattern_info["regex"], mask_char, anonymized)
except re.error:
pass
return anonymized
def scan_file(self, file_path: str, encoding: str = "utf-8") -> dict:
"""
فحص ملف للكشف عن بيانات حساسة.
المعاملات:
file_path: مسار الملف
encoding: ترميز الملف
العائد:
نتيجة الفحص (مثل scan_text) مع إضافة file_path
"""
try:
with open(file_path, "r", encoding=encoding) as f:
content = f.read()
result = self.scan_text(content)
result["file_path"] = file_path
result["file_size"] = len(content)
return result
except Exception as e:
return {
"sensitive_data_found": False,
"entities": [],
"risk_level": "error",
"total_entities": 0,
"file_path": file_path,
"error": str(e),
}
@staticmethod
def _calculate_risk_level(entities: list[dict]) -> str:
"""
حساب مستوى الخطورة العام.
المعاملات:
entities: قائمة الكيانات المكتشفة
العائد:
مستوى الخطورة: none/low/medium/high/critical
"""
if not entities:
return "none"
risk_scores = {"none": 0, "low": 1, "medium": 2, "high": 3, "critical": 4}
max_risk = 0
for entity in entities:
risk = entity.get("risk", "medium")
if isinstance(risk, str):
score = risk_scores.get(risk, 2)
else:
score = risk
max_risk = max(max_risk, score)
for level, score in risk_scores.items():
if score == max_risk:
return level
return "medium"
def is_available(self) -> dict:
"""
فحص حالة توفر الفاحص.
العائد:
قاموس: {presidio: bool, regex: bool}
"""
return {
"presidio": self._presidio_available,
"regex": True,
"custom_patterns": len(self._custom_patterns),
}