File size: 12,310 Bytes
1e4e9aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
"""
osint_core.policy
=================

Policy enforcement for the Passive OSINT Control Panel.

This module is the authorization boundary between validated input and execution.

Design constraints:
- Passive by default.
- No module execution decision should be made outside this layer.
- Authorized-only modules must be blocked unless explicit authorization is present.
- Forbidden capabilities are always denied.
- Correction verbs are closed over a fixed allowlist.
- Policy evaluation is side-effect free: it returns a decision, it does not execute.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import Iterable, Literal


CorrectionVerb = Literal["ADAPT", "CONSTRAIN", "REVERT", "OBSERVE"]
RiskLevel = Literal["low", "conditional", "forbidden"]
PolicyTier = Literal["T1", "T2", "T3", "T4"]


class PolicyDecision(str, Enum):
    ALLOW = "allow"
    BLOCK = "block"
    CONSTRAIN = "constrain"


class PolicyErrorCode(str, Enum):
    UNKNOWN_MODULE = "unknown_module"
    AUTHORIZATION_REQUIRED = "authorization_required"
    FORBIDDEN_MODULE = "forbidden_module"
    INVALID_CORRECTION_VERB = "invalid_correction_verb"
    POLICY_MUTATION_BLOCKED = "policy_mutation_blocked"
    RAW_LOGGING_BLOCKED = "raw_logging_blocked"


@dataclass(frozen=True)
class ModulePolicy:
    name: str
    canonical_name: str
    risk: RiskLevel
    tier: PolicyTier
    description: str
    requires_authorization: bool = False


@dataclass(frozen=True)
class PolicyViolation:
    code: PolicyErrorCode
    message: str
    module: str | None = None


@dataclass(frozen=True)
class PolicyEvaluation:
    decision: PolicyDecision
    allowed_modules: list[str] = field(default_factory=list)
    blocked_modules: list[str] = field(default_factory=list)
    violations: list[PolicyViolation] = field(default_factory=list)
    correction_verbs_allowed: list[CorrectionVerb] = field(default_factory=list)


ALLOWED_CORRECTION_VERBS: tuple[CorrectionVerb, ...] = (
    "ADAPT",
    "CONSTRAIN",
    "REVERT",
    "OBSERVE",
)

# Canonical module registry.
# Keep this small and explicit. New capabilities should be added deliberately.
MODULE_POLICIES: dict[str, ModulePolicy] = {
    "resource_links": ModulePolicy(
        name="Resource Links",
        canonical_name="resource_links",
        risk="low",
        tier="T4",
        description="Generate links to external OSINT resources without contacting the target.",
    ),
    "dns_records": ModulePolicy(
        name="DNS Records",
        canonical_name="dns_records",
        risk="low",
        tier="T3",
        description="Resolve DNS records using a resolver. Low-impact, but still a network lookup.",
    ),
    "local_url_parse": ModulePolicy(
        name="Local URL Parse",
        canonical_name="local_url_parse",
        risk="low",
        tier="T4",
        description="Parse a URL locally without contacting the target.",
    ),
    "http_headers": ModulePolicy(
        name="HTTP Headers",
        canonical_name="http_headers",
        risk="conditional",
        tier="T2",
        description="Fetch HTTP headers from an explicitly authorized target.",
        requires_authorization=True,
    ),
    "robots_txt": ModulePolicy(
        name="Robots.txt",
        canonical_name="robots_txt",
        risk="conditional",
        tier="T2",
        description="Fetch robots.txt from an explicitly authorized target.",
        requires_authorization=True,
    ),
    "screenshot": ModulePolicy(
        name="Screenshot",
        canonical_name="screenshot",
        risk="conditional",
        tier="T2",
        description="Render a screenshot of an explicitly authorized URL.",
        requires_authorization=True,
    ),
    "port_scan": ModulePolicy(
        name="Port Scan",
        canonical_name="port_scan",
        risk="forbidden",
        tier="T1",
        description="Port scanning is outside the passive OSINT boundary.",
    ),
    "brute_force": ModulePolicy(
        name="Brute Force",
        canonical_name="brute_force",
        risk="forbidden",
        tier="T1",
        description="Credential or username brute forcing is forbidden.",
    ),
    "credential_testing": ModulePolicy(
        name="Credential Testing",
        canonical_name="credential_testing",
        risk="forbidden",
        tier="T1",
        description="Credential testing is forbidden.",
    ),
    "exploitation": ModulePolicy(
        name="Exploitation",
        canonical_name="exploitation",
        risk="forbidden",
        tier="T1",
        description="Exploit execution is forbidden.",
    ),
}


ALIASES: dict[str, str] = {
    "resource links": "resource_links",
    "links": "resource_links",
    "source links": "resource_links",
    "dns": "dns_records",
    "dns records": "dns_records",
    "local url parse": "local_url_parse",
    "url parse": "local_url_parse",
    "http headers": "http_headers",
    "headers": "http_headers",
    "robots.txt": "robots_txt",
    "robots": "robots_txt",
    "screenshot": "screenshot",
    "port scan": "port_scan",
    "nmap": "port_scan",
    "masscan": "port_scan",
    "brute force": "brute_force",
    "bruteforce": "brute_force",
    "credential testing": "credential_testing",
    "creds": "credential_testing",
    "exploitation": "exploitation",
    "exploit": "exploitation",
}


def canonicalize_module_name(module_name: str) -> str:
    """
    Convert a UI label or alias to canonical module name.
    """
    key = str(module_name or "").strip().lower().replace("-", " ").replace("_", " ")
    return ALIASES.get(key, key.replace(" ", "_"))


def get_module_policy(module_name: str) -> ModulePolicy | None:
    return MODULE_POLICIES.get(canonicalize_module_name(module_name))


def evaluate_modules(
    requested_modules: Iterable[str],
    *,
    authorized_target: bool = False,
    passive_only: bool = True,
    allow_unknown_modules: bool = False,
) -> PolicyEvaluation:
    """
    Evaluate requested modules against the policy.

    Parameters
    ----------
    requested_modules:
        Module names from UI/API.
    authorized_target:
        Explicit confirmation that the target is authorized for conditional interaction.
    passive_only:
        When True, conditional modules are blocked even if authorization is present.
        Use False only for an authorized execution mode.
    allow_unknown_modules:
        Should remain False in production.

    Returns
    -------
    PolicyEvaluation
        Side-effect-free decision describing what may execute.
    """
    allowed: list[str] = []
    blocked: list[str] = []
    violations: list[PolicyViolation] = []

    for raw_name in requested_modules:
        canonical = canonicalize_module_name(raw_name)
        policy = MODULE_POLICIES.get(canonical)

        if policy is None:
            if allow_unknown_modules:
                allowed.append(canonical)
            else:
                blocked.append(canonical)
                violations.append(
                    PolicyViolation(
                        code=PolicyErrorCode.UNKNOWN_MODULE,
                        message=f"Unknown module blocked: {raw_name}",
                        module=canonical,
                    )
                )
            continue

        if policy.risk == "forbidden":
            blocked.append(policy.canonical_name)
            violations.append(
                PolicyViolation(
                    code=PolicyErrorCode.FORBIDDEN_MODULE,
                    message=f"Forbidden module blocked: {policy.name}",
                    module=policy.canonical_name,
                )
            )
            continue

        if policy.requires_authorization:
            if passive_only:
                blocked.append(policy.canonical_name)
                violations.append(
                    PolicyViolation(
                        code=PolicyErrorCode.AUTHORIZATION_REQUIRED,
                        message=f"Conditional module blocked in passive-only mode: {policy.name}",
                        module=policy.canonical_name,
                    )
                )
                continue

            if not authorized_target:
                blocked.append(policy.canonical_name)
                violations.append(
                    PolicyViolation(
                        code=PolicyErrorCode.AUTHORIZATION_REQUIRED,
                        message=f"Authorization required for module: {policy.name}",
                        module=policy.canonical_name,
                    )
                )
                continue

        allowed.append(policy.canonical_name)

    if violations:
        # Any T1 forbidden issue or policy/auth issue should constrain execution.
        decision = PolicyDecision.CONSTRAIN
    else:
        decision = PolicyDecision.ALLOW

    return PolicyEvaluation(
        decision=decision,
        allowed_modules=dedupe_preserve_order(allowed),
        blocked_modules=dedupe_preserve_order(blocked),
        violations=violations,
        correction_verbs_allowed=list(ALLOWED_CORRECTION_VERBS),
    )


def enforce_correction_verb(verb: str) -> CorrectionVerb:
    """
    Validate that a correction verb is part of the closed mutation vocabulary.
    """
    normalized = str(verb or "").strip().upper()
    if normalized not in ALLOWED_CORRECTION_VERBS:
        raise PolicyViolationException(
            PolicyViolation(
                code=PolicyErrorCode.INVALID_CORRECTION_VERB,
                message=f"Invalid correction verb: {verb}",
            )
        )
    return normalized  # type: ignore[return-value]


def may_mutate_policy(*, out_of_band_approval: bool = False) -> bool:
    """
    Policy cannot rewrite itself. Mutation requires an out-of-band gate.
    """
    return bool(out_of_band_approval)


def enforce_policy_mutation_gate(*, out_of_band_approval: bool = False) -> None:
    if not may_mutate_policy(out_of_band_approval=out_of_band_approval):
        raise PolicyViolationException(
            PolicyViolation(
                code=PolicyErrorCode.POLICY_MUTATION_BLOCKED,
                message="Policy mutation requires out-of-band approval.",
            )
        )


def enforce_audit_payload(payload: dict) -> None:
    """
    Prevent raw sensitive indicators from appearing in audit payloads.

    This is a defensive check. The audit module should already avoid raw values.
    """
    forbidden_keys = {
        "raw_indicator",
        "raw_input",
        "indicator",
        "email",
        "domain",
        "username",
        "url",
        "ip",
    }

    present = forbidden_keys.intersection(payload.keys())
    if present:
        raise PolicyViolationException(
            PolicyViolation(
                code=PolicyErrorCode.RAW_LOGGING_BLOCKED,
                message=f"Audit payload contains forbidden raw field(s): {sorted(present)}",
            )
        )


def module_catalog() -> list[dict[str, str | bool]]:
    """
    Return a serializable catalog suitable for UI display.
    """
    return [
        {
            "name": policy.name,
            "canonical_name": policy.canonical_name,
            "risk": policy.risk,
            "tier": policy.tier,
            "requires_authorization": policy.requires_authorization,
            "description": policy.description,
        }
        for policy in MODULE_POLICIES.values()
    ]


def allowed_ui_modules(*, include_conditional: bool = True) -> list[str]:
    """
    Return user-facing modules, excluding forbidden capabilities.
    """
    names: list[str] = []
    for policy in MODULE_POLICIES.values():
        if policy.risk == "forbidden":
            continue
        if policy.risk == "conditional" and not include_conditional:
            continue
        names.append(policy.name)
    return names


def dedupe_preserve_order(values: Iterable[str]) -> list[str]:
    seen: set[str] = set()
    output: list[str] = []
    for value in values:
        if value not in seen:
            output.append(value)
            seen.add(value)
    return output


class PolicyViolationException(PermissionError):
    def __init__(self, violation: PolicyViolation):
        super().__init__(violation.message)
        self.violation = violation