File size: 4,667 Bytes
42c7b73
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from pydantic import BaseModel, ConfigDict
from sqlalchemy import BigInteger, Boolean, Column, String, Text, JSON
from sqlalchemy.dialects.postgresql import JSONB


from sqlalchemy import or_, func, select, and_, text, cast, or_, and_, func


def has_permission(db, DocumentModel, query, filter: dict, permission: str = "read"):
    group_ids = filter.get("group_ids", [])
    user_id = filter.get("user_id")
    dialect_name = db.bind.dialect.name

    conditions = []

    # Handle read_only permission separately
    if permission == "read_only":
        # For read_only, we want items where:
        # 1. User has explicit read permission (via groups or user-level)
        # 2. BUT does NOT have write permission
        # 3. Public items are NOT considered read_only

        read_conditions = []

        # Group-level read permission
        if group_ids:
            group_read_conditions = []
            for gid in group_ids:
                if dialect_name == "sqlite":
                    group_read_conditions.append(
                        DocumentModel.access_control["read"]["group_ids"].contains(
                            [gid]
                        )
                    )
                elif dialect_name == "postgresql":
                    group_read_conditions.append(
                        cast(
                            DocumentModel.access_control["read"]["group_ids"],
                            JSONB,
                        ).contains([gid])
                    )

            if group_read_conditions:
                read_conditions.append(or_(*group_read_conditions))

        # Combine read conditions
        if read_conditions:
            has_read = or_(*read_conditions)
        else:
            # If no read conditions, return empty result
            return query.filter(False)

        # Now exclude items where user has write permission
        write_exclusions = []

        # Exclude items owned by user (they have implicit write)
        if user_id:
            write_exclusions.append(DocumentModel.user_id != user_id)

        # Exclude items where user has explicit write permission via groups
        if group_ids:
            group_write_conditions = []
            for gid in group_ids:
                if dialect_name == "sqlite":
                    group_write_conditions.append(
                        DocumentModel.access_control["write"]["group_ids"].contains(
                            [gid]
                        )
                    )
                elif dialect_name == "postgresql":
                    group_write_conditions.append(
                        cast(
                            DocumentModel.access_control["write"]["group_ids"],
                            JSONB,
                        ).contains([gid])
                    )

            if group_write_conditions:
                # User should NOT have write permission
                write_exclusions.append(~or_(*group_write_conditions))

        # Exclude public items (items without access_control)
        write_exclusions.append(DocumentModel.access_control.isnot(None))
        write_exclusions.append(cast(DocumentModel.access_control, String) != "null")

        # Combine: has read AND does not have write AND not public
        if write_exclusions:
            query = query.filter(and_(has_read, *write_exclusions))
        else:
            query = query.filter(has_read)

        return query

    # Original logic for other permissions (read, write, etc.)
    # Public access conditions
    if group_ids or user_id:
        conditions.extend(
            [
                DocumentModel.access_control.is_(None),
                cast(DocumentModel.access_control, String) == "null",
            ]
        )

    # User-level permission (owner has all permissions)
    if user_id:
        conditions.append(DocumentModel.user_id == user_id)

    # Group-level permission
    if group_ids:
        group_conditions = []
        for gid in group_ids:
            if dialect_name == "sqlite":
                group_conditions.append(
                    DocumentModel.access_control[permission]["group_ids"].contains(
                        [gid]
                    )
                )
            elif dialect_name == "postgresql":
                group_conditions.append(
                    cast(
                        DocumentModel.access_control[permission]["group_ids"],
                        JSONB,
                    ).contains([gid])
                )
        conditions.append(or_(*group_conditions))

    if conditions:
        query = query.filter(or_(*conditions))

    return query