File size: 7,723 Bytes
e3e5444
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
"""
Security utilities: input validation, log redaction, HTML sanitization,
and privacy-first data masking for sensitive fields.
"""
import logging
import re
from typing import Optional

import bleach
import pandas as pd

logger = logging.getLogger(__name__)

# ── Log redaction patterns ──────────────────────────────────────────────────

API_KEY_PATTERN = re.compile(
    r'(api[_\-]?key|x-api-key|authorization[:\s]+bearer\s+)(\S+)',
    re.IGNORECASE
)
DATABASE_URL_PATTERN = re.compile(
    r'(postgres|mysql|sqlite|mongodb)://\S+',
    re.IGNORECASE
)
TOKEN_PATTERN = re.compile(
    r'(token|secret|password)[:\s]*(\S{8,})',
    re.IGNORECASE
)


def redact_sensitive_data(text: str) -> str:
    """Redacts API keys, database URLs, and tokens from log messages."""
    text = API_KEY_PATTERN.sub(r'\1[REDACTED]', text)
    text = DATABASE_URL_PATTERN.sub('[DATABASE_URL_REDACTED]', text)
    text = TOKEN_PATTERN.sub(r'\1[REDACTED]', text)
    return text


class RedactingFormatter(logging.Formatter):
    """Custom logging formatter that redacts sensitive data from all log records."""
    def format(self, record: logging.LogRecord) -> str:
        original_msg = super().format(record)
        return redact_sensitive_data(original_msg)


def setup_redacting_logger(logger_instance: logging.Logger) -> None:
    """Configures a logger to use the redacting formatter on all its handlers."""
    for handler in logger_instance.handlers:
        if isinstance(handler, logging.StreamHandler):
            handler.setFormatter(
                RedactingFormatter(
                    "%(asctime)s %(levelname)s %(name)s %(message)s"
                )
            )


# ── Input validation ────────────────────────────────────────────────────────

def validate_table_name(table_name: str, max_length: int = 63) -> bool:
    """
    Validates SQL table name (PostgreSQL: max 63 chars, alphanumeric + underscore).
    Returns True if valid, False otherwise.
    """
    if not table_name or len(table_name) > max_length:
        return False
    return bool(re.fullmatch(r"[a-zA-Z_][a-zA-Z0-9_]*", table_name))


def validate_column_name(column_name: str, max_length: int = 63) -> bool:
    """Validates SQL column name using same rules as table name validation."""
    return validate_table_name(column_name, max_length)


def sanitize_html(html_text: str) -> str:
    """
    Sanitizes HTML to prevent XSS. Allows safe structural tags,
    strips script tags entirely (including contents).
    """
    allowed_tags = {
        'p', 'br', 'strong', 'em', 'u', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
        'ul', 'ol', 'li', 'blockquote', 'code', 'pre', 'table', 'tr', 'td', 'th',
        'div', 'span'
    }
    allowed_attributes: dict = {
        '*': ['class', 'id'],
        'a': ['href', 'title']
    }
    html_text = re.sub(r'<script[^>]*>.*?</script>', '', html_text, flags=re.DOTALL | re.IGNORECASE)
    return bleach.clean(html_text, tags=allowed_tags, attributes=allowed_attributes, strip=True)


def sanitize_markdown_output(markdown_text: str) -> str:
    """
    Sanitizes LLM markdown output before sending to frontend.
    Strips script tags and inline event handlers.
    Note: Pair with DOMPurify + markdown-it on the client for full XSS protection.
    """
    markdown_text = re.sub(r'<script[^>]*>.*?</script>', '', markdown_text, flags=re.DOTALL | re.IGNORECASE)
    markdown_text = re.sub(r'on\w+\s*=', '', markdown_text, flags=re.IGNORECASE)
    return markdown_text


def validate_file_size(file_size_bytes: int, max_size_bytes: int) -> tuple[bool, str]:
    """
    Validates file size against a maximum.
    Returns (is_valid, error_message).
    """
    if file_size_bytes <= 0:
        return False, "File size must be greater than 0 bytes"
    if file_size_bytes > max_size_bytes:
        max_mb = max_size_bytes / (1024 * 1024)
        return False, f"File too large. Maximum allowed size is {max_mb:.1f} MB"
    return True, ""


# ── Data masking (Privacy-First) ────────────────────────────────────────────

_EMAIL_CONTENT_RE = re.compile(r'^[\w.+\-]+@[\w\-]+\.[\w.]+$')

_SENSITIVE_COL_PATTERNS: dict[str, re.Pattern] = {
    'email': re.compile(
        r'\b(email|e_mail|mail|e[-_]mail)\b', re.IGNORECASE
    ),
    'id': re.compile(
        r'\b(cnic|nic|national_id|ssn|passport|id_no|id_number|nid|tax_id)\b',
        re.IGNORECASE
    ),
    'financial': re.compile(
        r'\b(salary|income|wage|payment|balance|account_no|credit|debit|bank_acc|revenue|earnings)\b',
        re.IGNORECASE
    ),
    'phone': re.compile(
        r'\b(phone|mobile|cell|contact_no|tel|telephone)\b', re.IGNORECASE
    ),
}


def _mask_email_value(value: str) -> str:
    """Masks email: john.doe@example.com β†’ j***@example.com"""
    if not isinstance(value, str) or '@' not in value:
        return value
    local, domain = value.split('@', 1)
    return f"{local[0]}***@{domain}"


def _mask_id_value(value: str) -> str:
    """Masks ID/CNIC: shows first 2 and last 2 chars only."""
    s = str(value) if not isinstance(value, str) else value
    if len(s) <= 4:
        return '****'
    return s[:2] + '*' * (len(s) - 4) + s[-2:]


def _mask_financial_value(value) -> str:
    """Masks financial: shows only last 3 digits of integer portion."""
    try:
        num = float(value)
        s = f"{abs(num):,.0f}"
        prefix = '-' if num < 0 else ''
        return f"{prefix}***{s[-3:]}" if len(s) > 3 else f"{prefix}***"
    except (ValueError, TypeError):
        return '***'


def _detect_sensitive_type(col_name: str, series: pd.Series) -> Optional[str]:
    """
    Detect sensitive column type by name pattern first, then content sampling.
    Returns one of: 'email', 'id', 'financial', 'phone', or None.
    """
    for sens_type, pattern in _SENSITIVE_COL_PATTERNS.items():
        if pattern.search(col_name):
            return sens_type
    # Content-based email detection for object columns
    if series.dtype == object:
        sample = series.dropna().head(30)
        if len(sample) > 0:
            email_hits = sum(
                1 for v in sample
                if isinstance(v, str) and _EMAIL_CONTENT_RE.match(v)
            )
            if email_hits / len(sample) > 0.5:
                return 'email'
    return None


def mask_sensitive_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """
    Returns a copy of df with automatically detected sensitive columns masked:
    - Emails     β†’ a***@domain.com
    - ID / CNIC  β†’ XX***XX (first 2 + stars + last 2)
    - Financial  β†’ ***XYZ  (last 3 digits of integer portion)
    - Phone      β†’ XX***XX

    Detection uses column name patterns and content sampling.
    Only affects identified sensitive columns; all others are unchanged.
    """
    masked = df.copy()
    for col in masked.columns:
        sens_type = _detect_sensitive_type(col, masked[col])
        if sens_type == 'email':
            masked[col] = masked[col].apply(
                lambda v: _mask_email_value(str(v)) if pd.notna(v) else v
            )
        elif sens_type in ('id', 'phone'):
            masked[col] = masked[col].apply(
                lambda v: _mask_id_value(str(v)) if pd.notna(v) else v
            )
        elif sens_type == 'financial':
            masked[col] = masked[col].apply(
                lambda v: _mask_financial_value(v) if pd.notna(v) else v
            )
    return masked