Spaces:
Build error
Build error
| import logging | |
| import time | |
| import uuid | |
| from typing import Optional | |
| from sqlalchemy import select, delete | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from open_webui.internal.db import Base, get_async_db_context | |
| from pydantic import BaseModel, ConfigDict | |
| from sqlalchemy import BigInteger, Column, Text, UniqueConstraint, or_, and_ | |
| from sqlalchemy.dialects.postgresql import JSONB | |
| log = logging.getLogger(__name__) | |
| #################### | |
| # AccessGrant DB Schema | |
| #################### | |
| class AccessGrant(Base): | |
| __tablename__ = 'access_grant' | |
| id = Column(Text, primary_key=True) | |
| resource_type = Column(Text, nullable=False) # "knowledge", "model", "prompt", "tool", "note", "channel", "file" | |
| resource_id = Column(Text, nullable=False) | |
| principal_type = Column(Text, nullable=False) # "user" or "group" | |
| principal_id = Column(Text, nullable=False) # user_id, group_id, or "*" (wildcard for public) | |
| permission = Column(Text, nullable=False) # "read" or "write" | |
| created_at = Column(BigInteger, nullable=False) | |
| __table_args__ = ( | |
| UniqueConstraint( | |
| 'resource_type', | |
| 'resource_id', | |
| 'principal_type', | |
| 'principal_id', | |
| 'permission', | |
| name='uq_access_grant_grant', | |
| ), | |
| ) | |
| class AccessGrantModel(BaseModel): | |
| model_config = ConfigDict(from_attributes=True) | |
| id: str | |
| resource_type: str | |
| resource_id: str | |
| principal_type: str | |
| principal_id: str | |
| permission: str | |
| created_at: int | |
| class AccessGrantResponse(BaseModel): | |
| """Slim grant model for API responses — resource context is implicit from the parent.""" | |
| id: str | |
| principal_type: str | |
| principal_id: str | |
| permission: str | |
| def from_grant(cls, grant: 'AccessGrantModel') -> 'AccessGrantResponse': | |
| return cls( | |
| id=grant.id, | |
| principal_type=grant.principal_type, | |
| principal_id=grant.principal_id, | |
| permission=grant.permission, | |
| ) | |
| #################### | |
| # Conversion utilities | |
| #################### | |
| def access_control_to_grants( | |
| resource_type: str, | |
| resource_id: str, | |
| access_control: Optional[dict], | |
| ) -> list[dict]: | |
| """ | |
| Convert an old-style access_control JSON dict to a flat list of grant dicts. | |
| Semantics: | |
| - None → public read (user:* read) — except files which are private | |
| - {} → private/owner-only (no grants) | |
| - {read: {group_ids, user_ids}, write: {group_ids, user_ids}} → specific grants | |
| Returns a list of dicts with keys: resource_type, resource_id, principal_type, principal_id, permission | |
| """ | |
| grants = [] | |
| if access_control is None: | |
| # NULL → public read (user:* for read) | |
| # Exception: files with NULL are private (owner-only), no grants needed | |
| if resource_type != 'file': | |
| grants.append( | |
| { | |
| 'resource_type': resource_type, | |
| 'resource_id': resource_id, | |
| 'principal_type': 'user', | |
| 'principal_id': '*', | |
| 'permission': 'read', | |
| } | |
| ) | |
| return grants | |
| # {} → private/owner-only, no grants | |
| if not access_control: | |
| return grants | |
| # Parse structured permissions | |
| for permission in ['read', 'write']: | |
| perm_data = access_control.get(permission, {}) | |
| if not perm_data: | |
| continue | |
| for group_id in perm_data.get('group_ids', []): | |
| grants.append( | |
| { | |
| 'resource_type': resource_type, | |
| 'resource_id': resource_id, | |
| 'principal_type': 'group', | |
| 'principal_id': group_id, | |
| 'permission': permission, | |
| } | |
| ) | |
| for user_id in perm_data.get('user_ids', []): | |
| grants.append( | |
| { | |
| 'resource_type': resource_type, | |
| 'resource_id': resource_id, | |
| 'principal_type': 'user', | |
| 'principal_id': user_id, | |
| 'permission': permission, | |
| } | |
| ) | |
| return grants | |
| def normalize_access_grants(access_grants: Optional[list]) -> list[dict]: | |
| """ | |
| Normalize direct access_grants payloads from API forms. | |
| Keeps only valid grants and removes duplicates by | |
| (principal_type, principal_id, permission). | |
| """ | |
| if not access_grants: | |
| return [] | |
| deduped = {} | |
| for grant in access_grants: | |
| if isinstance(grant, BaseModel): | |
| grant = grant.model_dump() | |
| if not isinstance(grant, dict): | |
| continue | |
| principal_type = grant.get('principal_type') | |
| principal_id = grant.get('principal_id') | |
| permission = grant.get('permission') | |
| if principal_type not in ('user', 'group'): | |
| continue | |
| if permission not in ('read', 'write'): | |
| continue | |
| if not isinstance(principal_id, str) or not principal_id: | |
| continue | |
| key = (principal_type, principal_id, permission) | |
| deduped[key] = { | |
| 'id': (grant.get('id') if isinstance(grant.get('id'), str) and grant.get('id') else str(uuid.uuid4())), | |
| 'principal_type': principal_type, | |
| 'principal_id': principal_id, | |
| 'permission': permission, | |
| } | |
| return list(deduped.values()) | |
| def has_public_read_access_grant(access_grants: Optional[list]) -> bool: | |
| """ | |
| Returns True when a direct grant list includes wildcard public-read. | |
| """ | |
| for grant in normalize_access_grants(access_grants): | |
| if grant['principal_type'] == 'user' and grant['principal_id'] == '*' and grant['permission'] == 'read': | |
| return True | |
| return False | |
| def has_public_write_access_grant(access_grants: Optional[list]) -> bool: | |
| """ | |
| Returns True when a direct grant list includes wildcard public-write. | |
| """ | |
| for grant in normalize_access_grants(access_grants): | |
| if grant['principal_type'] == 'user' and grant['principal_id'] == '*' and grant['permission'] == 'write': | |
| return True | |
| return False | |
| def has_user_access_grant(access_grants: Optional[list]) -> bool: | |
| """ | |
| Returns True when a direct grant list includes any non-wildcard user grant. | |
| """ | |
| for grant in normalize_access_grants(access_grants): | |
| if grant['principal_type'] == 'user' and grant['principal_id'] != '*': | |
| return True | |
| return False | |
| def strip_user_access_grants(access_grants: Optional[list]) -> list: | |
| """ | |
| Remove all non-wildcard user grants from the list. | |
| Keeps group grants and the public wildcard (user:*) intact. | |
| """ | |
| if not access_grants: | |
| return [] | |
| return [ | |
| grant | |
| for grant in access_grants | |
| if not ( | |
| (grant.get('principal_type') if isinstance(grant, dict) else getattr(grant, 'principal_type', None)) | |
| == 'user' | |
| and (grant.get('principal_id') if isinstance(grant, dict) else getattr(grant, 'principal_id', None)) != '*' | |
| ) | |
| ] | |
| def grants_to_access_control(grants: list) -> Optional[dict]: | |
| """ | |
| Convert a list of grant objects (AccessGrantModel or AccessGrantResponse) | |
| back to the old-style access_control JSON dict for backward compatibility. | |
| Semantics: | |
| - [] (empty) → {} (private/owner-only) | |
| - Contains user:*:read → None (public), but write grants are preserved | |
| - Otherwise → {read: {group_ids, user_ids}, write: {group_ids, user_ids}} | |
| Note: "public" (user:*:read) still allows additional write permissions | |
| to coexist. When the wildcard read is present the function returns None | |
| for the legacy dict, so callers that need write info should inspect the | |
| grants list directly. | |
| """ | |
| if not grants: | |
| return {} # No grants = private/owner-only | |
| result = { | |
| 'read': {'group_ids': [], 'user_ids': []}, | |
| 'write': {'group_ids': [], 'user_ids': []}, | |
| } | |
| is_public = False | |
| for grant in grants: | |
| if grant.principal_type == 'user' and grant.principal_id == '*' and grant.permission == 'read': | |
| is_public = True | |
| continue # Don't add wildcard to user_ids list | |
| if grant.permission not in ('read', 'write'): | |
| continue | |
| if grant.principal_type == 'group': | |
| if grant.principal_id not in result[grant.permission]['group_ids']: | |
| result[grant.permission]['group_ids'].append(grant.principal_id) | |
| elif grant.principal_type == 'user': | |
| if grant.principal_id not in result[grant.permission]['user_ids']: | |
| result[grant.permission]['user_ids'].append(grant.principal_id) | |
| if is_public: | |
| return None # Public read access | |
| return result | |
| #################### | |
| # Table Operations | |
| #################### | |
| class AccessGrantsTable: | |
| async def grant_access( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| principal_type: str, | |
| principal_id: str, | |
| permission: str, | |
| db: Optional[AsyncSession] = None, | |
| ) -> Optional[AccessGrantModel]: | |
| """Add a single access grant. Idempotent (ignores duplicates).""" | |
| async with get_async_db_context(db) as db: | |
| # Check for existing grant | |
| result = await db.execute( | |
| select(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| principal_type=principal_type, | |
| principal_id=principal_id, | |
| permission=permission, | |
| ) | |
| ) | |
| existing = result.scalars().first() | |
| if existing: | |
| return AccessGrantModel.model_validate(existing) | |
| grant = AccessGrant( | |
| id=str(uuid.uuid4()), | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| principal_type=principal_type, | |
| principal_id=principal_id, | |
| permission=permission, | |
| created_at=int(time.time()), | |
| ) | |
| db.add(grant) | |
| await db.commit() | |
| await db.refresh(grant) | |
| return AccessGrantModel.model_validate(grant) | |
| async def revoke_access( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| principal_type: str, | |
| principal_id: str, | |
| permission: str, | |
| db: Optional[AsyncSession] = None, | |
| ) -> bool: | |
| """Remove a single access grant.""" | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute( | |
| delete(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| principal_type=principal_type, | |
| principal_id=principal_id, | |
| permission=permission, | |
| ) | |
| ) | |
| await db.commit() | |
| return result.rowcount > 0 | |
| async def revoke_all_access( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| db: Optional[AsyncSession] = None, | |
| ) -> int: | |
| """Remove all access grants for a resource.""" | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute( | |
| delete(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ) | |
| ) | |
| await db.commit() | |
| return result.rowcount | |
| async def set_access_control( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| access_control: Optional[dict], | |
| db: Optional[AsyncSession] = None, | |
| ) -> list[AccessGrantModel]: | |
| """ | |
| Replace all grants for a resource from an access_control JSON dict. | |
| This is the primary bridge for backward compat with the frontend. | |
| """ | |
| async with get_async_db_context(db) as db: | |
| # Delete all existing grants for this resource | |
| await db.execute( | |
| delete(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ) | |
| ) | |
| # Convert JSON to grant dicts | |
| grant_dicts = access_control_to_grants(resource_type, resource_id, access_control) | |
| # Insert new grants | |
| results = [] | |
| for grant_dict in grant_dicts: | |
| grant = AccessGrant( | |
| id=str(uuid.uuid4()), | |
| **grant_dict, | |
| created_at=int(time.time()), | |
| ) | |
| db.add(grant) | |
| results.append(grant) | |
| await db.commit() | |
| return [AccessGrantModel.model_validate(g) for g in results] | |
| async def set_access_grants( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| access_grants: Optional[list], | |
| db: Optional[AsyncSession] = None, | |
| ) -> list[AccessGrantModel]: | |
| """ | |
| Replace all grants for a resource from a direct access_grants list. | |
| """ | |
| async with get_async_db_context(db) as db: | |
| await db.execute( | |
| delete(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ) | |
| ) | |
| normalized_grants = normalize_access_grants(access_grants) | |
| results = [] | |
| for grant_dict in normalized_grants: | |
| grant = AccessGrant( | |
| id=str(uuid.uuid4()), | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| principal_type=grant_dict['principal_type'], | |
| principal_id=grant_dict['principal_id'], | |
| permission=grant_dict['permission'], | |
| created_at=int(time.time()), | |
| ) | |
| db.add(grant) | |
| results.append(grant) | |
| await db.commit() | |
| return [AccessGrantModel.model_validate(g) for g in results] | |
| async def get_access_control( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| db: Optional[AsyncSession] = None, | |
| ) -> Optional[dict]: | |
| """ | |
| Reconstruct the old-style access_control JSON dict from grants. | |
| For backward compat with the frontend. | |
| """ | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute( | |
| select(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ) | |
| ) | |
| grants = result.scalars().all() | |
| grant_models = [AccessGrantModel.model_validate(g) for g in grants] | |
| return grants_to_access_control(grant_models) | |
| async def get_grants_by_resource( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| db: Optional[AsyncSession] = None, | |
| ) -> list[AccessGrantModel]: | |
| """Get all grants for a specific resource.""" | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute( | |
| select(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ) | |
| ) | |
| grants = result.scalars().all() | |
| return [AccessGrantModel.model_validate(g) for g in grants] | |
| async def get_grants_by_resources( | |
| self, | |
| resource_type: str, | |
| resource_ids: list[str], | |
| db: Optional[AsyncSession] = None, | |
| ) -> dict[str, list[AccessGrantModel]]: | |
| """Batch-fetch grants for multiple resources. Returns {resource_id: [grants]}.""" | |
| if not resource_ids: | |
| return {} | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute( | |
| select(AccessGrant).filter( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id.in_(resource_ids), | |
| ) | |
| ) | |
| grants = result.scalars().all() | |
| result_dict: dict[str, list[AccessGrantModel]] = {rid: [] for rid in resource_ids} | |
| for g in grants: | |
| result_dict[g.resource_id].append(AccessGrantModel.model_validate(g)) | |
| return result_dict | |
| async def has_access( | |
| self, | |
| user_id: str, | |
| resource_type: str, | |
| resource_id: str, | |
| permission: str = 'read', | |
| user_group_ids: Optional[set[str]] = None, | |
| db: Optional[AsyncSession] = None, | |
| ) -> bool: | |
| """ | |
| Check if a user has the specified permission on a resource. | |
| Access is granted if any of the following is true: | |
| - There's a grant for user:* (public) with the requested permission | |
| - There's a grant for the specific user with the requested permission | |
| - There's a grant for any of the user's groups with the requested permission | |
| """ | |
| async with get_async_db_context(db) as db: | |
| # Build conditions for matching grants | |
| conditions = [ | |
| # Public access | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == '*', | |
| ), | |
| # Direct user access | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == user_id, | |
| ), | |
| ] | |
| # Group access | |
| if user_group_ids is None: | |
| from open_webui.models.groups import Groups | |
| user_groups = await Groups.get_groups_by_member_id(user_id, db=db) | |
| user_group_ids = {group.id for group in user_groups} | |
| if user_group_ids: | |
| conditions.append( | |
| and_( | |
| AccessGrant.principal_type == 'group', | |
| AccessGrant.principal_id.in_(user_group_ids), | |
| ) | |
| ) | |
| result = await db.execute( | |
| select(AccessGrant) | |
| .filter( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == resource_id, | |
| AccessGrant.permission == permission, | |
| or_(*conditions), | |
| ) | |
| .limit(1) | |
| ) | |
| grant = result.scalars().first() | |
| return grant is not None | |
| async def get_accessible_resource_ids( | |
| self, | |
| user_id: str, | |
| resource_type: str, | |
| resource_ids: list[str], | |
| permission: str = 'read', | |
| user_group_ids: Optional[set[str]] = None, | |
| db: Optional[AsyncSession] = None, | |
| ) -> set[str]: | |
| """ | |
| Batch check: return the subset of resource_ids that the user can access. | |
| This replaces calling has_access() in a loop (N+1) with a single query. | |
| """ | |
| if not resource_ids: | |
| return set() | |
| async with get_async_db_context(db) as db: | |
| conditions = [ | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == '*', | |
| ), | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == user_id, | |
| ), | |
| ] | |
| if user_group_ids is None: | |
| from open_webui.models.groups import Groups | |
| user_groups = await Groups.get_groups_by_member_id(user_id, db=db) | |
| user_group_ids = {group.id for group in user_groups} | |
| if user_group_ids: | |
| conditions.append( | |
| and_( | |
| AccessGrant.principal_type == 'group', | |
| AccessGrant.principal_id.in_(user_group_ids), | |
| ) | |
| ) | |
| result = await db.execute( | |
| select(AccessGrant.resource_id) | |
| .filter( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id.in_(resource_ids), | |
| AccessGrant.permission == permission, | |
| or_(*conditions), | |
| ) | |
| .distinct() | |
| ) | |
| rows = result.all() | |
| return {row[0] for row in rows} | |
| async def get_users_with_access( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| permission: str = 'read', | |
| db: Optional[AsyncSession] = None, | |
| ) -> list: | |
| """ | |
| Get all users who have the specified permission on a resource. | |
| Returns a list of UserModel instances. | |
| """ | |
| from open_webui.models.users import Users, UserModel | |
| from open_webui.models.groups import Groups | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute( | |
| select(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| permission=permission, | |
| ) | |
| ) | |
| grants = result.scalars().all() | |
| # Check for public access | |
| for grant in grants: | |
| if grant.principal_type == 'user' and grant.principal_id == '*': | |
| result = await Users.get_users(filter={'roles': ['!pending']}, db=db) | |
| return result.get('users', []) | |
| user_ids_with_access = set() | |
| for grant in grants: | |
| if grant.principal_type == 'user': | |
| user_ids_with_access.add(grant.principal_id) | |
| elif grant.principal_type == 'group': | |
| group_user_ids = await Groups.get_group_user_ids_by_id(grant.principal_id, db=db) | |
| if group_user_ids: | |
| user_ids_with_access.update(group_user_ids) | |
| if not user_ids_with_access: | |
| return [] | |
| return await Users.get_users_by_user_ids(list(user_ids_with_access), db=db) | |
| def has_permission_filter( | |
| self, | |
| db, | |
| query, | |
| DocumentModel, | |
| filter: dict, | |
| resource_type: str, | |
| permission: str = 'read', | |
| ): | |
| """ | |
| Apply access control filtering to a SQLAlchemy query by JOINing with access_grant. | |
| This replaces the old JSON-column-based filtering with a proper relational JOIN. | |
| Note: This method builds SQLAlchemy expressions and does NOT perform I/O itself, | |
| so it remains synchronous. The caller is responsible for executing the query | |
| asynchronously with `await db.execute(...)`. | |
| """ | |
| group_ids = filter.get('group_ids', []) | |
| user_id = filter.get('user_id') | |
| if permission == 'read_only': | |
| return self._has_read_only_permission_filter(db, query, DocumentModel, filter, resource_type) | |
| # Build principal conditions | |
| principal_conditions = [] | |
| if group_ids or user_id: | |
| # Public access: user:* read | |
| principal_conditions.append( | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == '*', | |
| ) | |
| ) | |
| if user_id: | |
| # Owner always has access | |
| principal_conditions.append(DocumentModel.user_id == user_id) | |
| # Direct user grant | |
| principal_conditions.append( | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == user_id, | |
| ) | |
| ) | |
| if group_ids: | |
| # Group grants | |
| principal_conditions.append( | |
| and_( | |
| AccessGrant.principal_type == 'group', | |
| AccessGrant.principal_id.in_(group_ids), | |
| ) | |
| ) | |
| if not principal_conditions: | |
| return query | |
| # LEFT JOIN access_grant and filter | |
| # We use a subquery approach to avoid duplicates from multiple matching grants | |
| from sqlalchemy import exists as sa_exists | |
| grant_exists = ( | |
| select(AccessGrant.id) | |
| .where( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == DocumentModel.id, | |
| AccessGrant.permission == permission, | |
| or_( | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == '*', | |
| ), | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == user_id, | |
| ) | |
| ] | |
| if user_id | |
| else [] | |
| ), | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == 'group', | |
| AccessGrant.principal_id.in_(group_ids), | |
| ) | |
| ] | |
| if group_ids | |
| else [] | |
| ), | |
| ), | |
| ) | |
| .correlate(DocumentModel) | |
| .exists() | |
| ) | |
| # Owner OR has a matching grant | |
| owner_or_grant = [grant_exists] | |
| if user_id: | |
| owner_or_grant.append(DocumentModel.user_id == user_id) | |
| query = query.filter(or_(*owner_or_grant)) | |
| return query | |
| def _has_read_only_permission_filter( | |
| self, | |
| db, | |
| query, | |
| DocumentModel, | |
| filter: dict, | |
| resource_type: str, | |
| ): | |
| """ | |
| Filter for items where user has read BUT NOT write access. | |
| Public items are NOT considered read_only. | |
| Note: This method builds SQLAlchemy expressions and does NOT perform I/O itself, | |
| so it remains synchronous. The caller is responsible for executing the query | |
| asynchronously with `await db.execute(...)`. | |
| """ | |
| group_ids = filter.get('group_ids', []) | |
| user_id = filter.get('user_id') | |
| from sqlalchemy import exists as sa_exists | |
| # Has read grant (not public) | |
| read_grant_exists = ( | |
| select(AccessGrant.id) | |
| .where( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == DocumentModel.id, | |
| AccessGrant.permission == 'read', | |
| or_( | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == user_id, | |
| ) | |
| ] | |
| if user_id | |
| else [] | |
| ), | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == 'group', | |
| AccessGrant.principal_id.in_(group_ids), | |
| ) | |
| ] | |
| if group_ids | |
| else [] | |
| ), | |
| ), | |
| ) | |
| .correlate(DocumentModel) | |
| .exists() | |
| ) | |
| # Does NOT have write grant | |
| write_grant_exists = ( | |
| select(AccessGrant.id) | |
| .where( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == DocumentModel.id, | |
| AccessGrant.permission == 'write', | |
| or_( | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == user_id, | |
| ) | |
| ] | |
| if user_id | |
| else [] | |
| ), | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == 'group', | |
| AccessGrant.principal_id.in_(group_ids), | |
| ) | |
| ] | |
| if group_ids | |
| else [] | |
| ), | |
| ), | |
| ) | |
| .correlate(DocumentModel) | |
| .exists() | |
| ) | |
| # Is NOT public | |
| public_grant_exists = ( | |
| select(AccessGrant.id) | |
| .where( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == DocumentModel.id, | |
| AccessGrant.permission == 'read', | |
| AccessGrant.principal_type == 'user', | |
| AccessGrant.principal_id == '*', | |
| ) | |
| .correlate(DocumentModel) | |
| .exists() | |
| ) | |
| conditions = [read_grant_exists, ~write_grant_exists, ~public_grant_exists] | |
| # Not owner | |
| if user_id: | |
| conditions.append(DocumentModel.user_id != user_id) | |
| query = query.filter(and_(*conditions)) | |
| return query | |
| AccessGrants = AccessGrantsTable() | |