File size: 2,256 Bytes
e44e5dd
 
 
 
 
 
b65ef75
 
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
 
b65ef75
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b65ef75
e44e5dd
 
 
 
 
 
 
 
 
 
b65ef75
 
 
 
 
 
 
 
 
 
 
 
 
 
e44e5dd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from __future__ import annotations

import re
from dataclasses import dataclass
from typing import Any, Mapping, Optional

from .access_control import normalize_role


class TenantValidationError(ValueError):
    """Raised when tenant metadata is missing or malformed."""


TENANT_ID_PATTERN = re.compile(r"^[A-Za-z0-9_\-.:/]{3,128}$")


@dataclass(slots=True)
class TenantContext:
    tenant_id: str
    user_id: Optional[str] = None
    metadata: Optional[dict[str, Any]] = None
    role: str = "viewer"


def _extract_tenant_id(payload: Mapping[str, Any]) -> str:
    for key in ("tenant_id", "tenantId", "tenant"):
        if key in payload:
            value = payload[key]
            if isinstance(value, str):
                return value.strip()
    raise TenantValidationError("tenant_id is required for every MCP tool call")


def _normalize_tenant_id(raw_value: str) -> str:
    normalized = raw_value.strip()
    if not normalized:
        raise TenantValidationError("tenant_id cannot be empty")
    if not TENANT_ID_PATTERN.match(normalized):
        raise TenantValidationError(
            "tenant_id must be 3-128 chars and may only contain letters, numbers, '.', '-', '_', or ':'"
        )
    return normalized


def build_tenant_context(payload: Mapping[str, Any]) -> TenantContext:
    tenant_id = _normalize_tenant_id(_extract_tenant_id(payload))
    user_id: Optional[str] = None
    metadata: Optional[dict[str, Any]] = None
    role: str = "viewer"

    for key in ("user_id", "userId"):
        if key in payload and isinstance(payload[key], str):
            user_id = payload[key].strip() or None
            break

    meta_candidate = payload.get("metadata")
    if isinstance(meta_candidate, dict):
        metadata = meta_candidate

    # Extract role from payload or metadata (if provided)
    role_candidates = [
        payload.get("role"),
        payload.get("user_role"),
        payload.get("userRole"),
    ]
    if metadata:
        role_candidates.append(metadata.get("role"))
    for candidate in role_candidates:
        if isinstance(candidate, str):
            role = normalize_role(candidate)
            break

    return TenantContext(tenant_id=tenant_id, user_id=user_id, metadata=metadata, role=role)