import logging import time import uuid from typing import Optional from sqlalchemy import select, delete from sqlalchemy.ext.asyncio import AsyncSession from open_webui.internal.db import Base, get_async_db_context from pydantic import BaseModel, ConfigDict from sqlalchemy import BigInteger, Column, Text, UniqueConstraint, or_, and_ from sqlalchemy.dialects.postgresql import JSONB log = logging.getLogger(__name__) #################### # AccessGrant DB Schema #################### class AccessGrant(Base): __tablename__ = 'access_grant' id = Column(Text, primary_key=True) resource_type = Column(Text, nullable=False) # "knowledge", "model", "prompt", "tool", "note", "channel", "file" resource_id = Column(Text, nullable=False) principal_type = Column(Text, nullable=False) # "user" or "group" principal_id = Column(Text, nullable=False) # user_id, group_id, or "*" (wildcard for public) permission = Column(Text, nullable=False) # "read" or "write" created_at = Column(BigInteger, nullable=False) __table_args__ = ( UniqueConstraint( 'resource_type', 'resource_id', 'principal_type', 'principal_id', 'permission', name='uq_access_grant_grant', ), ) class AccessGrantModel(BaseModel): model_config = ConfigDict(from_attributes=True) id: str resource_type: str resource_id: str principal_type: str principal_id: str permission: str created_at: int class AccessGrantResponse(BaseModel): """Slim grant model for API responses — resource context is implicit from the parent.""" id: str principal_type: str principal_id: str permission: str @classmethod def from_grant(cls, grant: 'AccessGrantModel') -> 'AccessGrantResponse': return cls( id=grant.id, principal_type=grant.principal_type, principal_id=grant.principal_id, permission=grant.permission, ) #################### # Conversion utilities #################### def access_control_to_grants( resource_type: str, resource_id: str, access_control: Optional[dict], ) -> list[dict]: """ Convert an old-style access_control JSON dict to a flat list of grant dicts. Semantics: - None → public read (user:* read) — except files which are private - {} → private/owner-only (no grants) - {read: {group_ids, user_ids}, write: {group_ids, user_ids}} → specific grants Returns a list of dicts with keys: resource_type, resource_id, principal_type, principal_id, permission """ grants = [] if access_control is None: # NULL → public read (user:* for read) # Exception: files with NULL are private (owner-only), no grants needed if resource_type != 'file': grants.append( { 'resource_type': resource_type, 'resource_id': resource_id, 'principal_type': 'user', 'principal_id': '*', 'permission': 'read', } ) return grants # {} → private/owner-only, no grants if not access_control: return grants # Parse structured permissions for permission in ['read', 'write']: perm_data = access_control.get(permission, {}) if not perm_data: continue for group_id in perm_data.get('group_ids', []): grants.append( { 'resource_type': resource_type, 'resource_id': resource_id, 'principal_type': 'group', 'principal_id': group_id, 'permission': permission, } ) for user_id in perm_data.get('user_ids', []): grants.append( { 'resource_type': resource_type, 'resource_id': resource_id, 'principal_type': 'user', 'principal_id': user_id, 'permission': permission, } ) return grants def normalize_access_grants(access_grants: Optional[list]) -> list[dict]: """ Normalize direct access_grants payloads from API forms. Keeps only valid grants and removes duplicates by (principal_type, principal_id, permission). """ if not access_grants: return [] deduped = {} for grant in access_grants: if isinstance(grant, BaseModel): grant = grant.model_dump() if not isinstance(grant, dict): continue principal_type = grant.get('principal_type') principal_id = grant.get('principal_id') permission = grant.get('permission') if principal_type not in ('user', 'group'): continue if permission not in ('read', 'write'): continue if not isinstance(principal_id, str) or not principal_id: continue key = (principal_type, principal_id, permission) deduped[key] = { 'id': (grant.get('id') if isinstance(grant.get('id'), str) and grant.get('id') else str(uuid.uuid4())), 'principal_type': principal_type, 'principal_id': principal_id, 'permission': permission, } return list(deduped.values()) def has_public_read_access_grant(access_grants: Optional[list]) -> bool: """ Returns True when a direct grant list includes wildcard public-read. """ for grant in normalize_access_grants(access_grants): if grant['principal_type'] == 'user' and grant['principal_id'] == '*' and grant['permission'] == 'read': return True return False def has_public_write_access_grant(access_grants: Optional[list]) -> bool: """ Returns True when a direct grant list includes wildcard public-write. """ for grant in normalize_access_grants(access_grants): if grant['principal_type'] == 'user' and grant['principal_id'] == '*' and grant['permission'] == 'write': return True return False def has_user_access_grant(access_grants: Optional[list]) -> bool: """ Returns True when a direct grant list includes any non-wildcard user grant. """ for grant in normalize_access_grants(access_grants): if grant['principal_type'] == 'user' and grant['principal_id'] != '*': return True return False def strip_user_access_grants(access_grants: Optional[list]) -> list: """ Remove all non-wildcard user grants from the list. Keeps group grants and the public wildcard (user:*) intact. """ if not access_grants: return [] return [ grant for grant in access_grants if not ( (grant.get('principal_type') if isinstance(grant, dict) else getattr(grant, 'principal_type', None)) == 'user' and (grant.get('principal_id') if isinstance(grant, dict) else getattr(grant, 'principal_id', None)) != '*' ) ] def grants_to_access_control(grants: list) -> Optional[dict]: """ Convert a list of grant objects (AccessGrantModel or AccessGrantResponse) back to the old-style access_control JSON dict for backward compatibility. Semantics: - [] (empty) → {} (private/owner-only) - Contains user:*:read → None (public), but write grants are preserved - Otherwise → {read: {group_ids, user_ids}, write: {group_ids, user_ids}} Note: "public" (user:*:read) still allows additional write permissions to coexist. When the wildcard read is present the function returns None for the legacy dict, so callers that need write info should inspect the grants list directly. """ if not grants: return {} # No grants = private/owner-only result = { 'read': {'group_ids': [], 'user_ids': []}, 'write': {'group_ids': [], 'user_ids': []}, } is_public = False for grant in grants: if grant.principal_type == 'user' and grant.principal_id == '*' and grant.permission == 'read': is_public = True continue # Don't add wildcard to user_ids list if grant.permission not in ('read', 'write'): continue if grant.principal_type == 'group': if grant.principal_id not in result[grant.permission]['group_ids']: result[grant.permission]['group_ids'].append(grant.principal_id) elif grant.principal_type == 'user': if grant.principal_id not in result[grant.permission]['user_ids']: result[grant.permission]['user_ids'].append(grant.principal_id) if is_public: return None # Public read access return result #################### # Table Operations #################### class AccessGrantsTable: async def grant_access( self, resource_type: str, resource_id: str, principal_type: str, principal_id: str, permission: str, db: Optional[AsyncSession] = None, ) -> Optional[AccessGrantModel]: """Add a single access grant. Idempotent (ignores duplicates).""" async with get_async_db_context(db) as db: # Check for existing grant result = await db.execute( select(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, principal_type=principal_type, principal_id=principal_id, permission=permission, ) ) existing = result.scalars().first() if existing: return AccessGrantModel.model_validate(existing) grant = AccessGrant( id=str(uuid.uuid4()), resource_type=resource_type, resource_id=resource_id, principal_type=principal_type, principal_id=principal_id, permission=permission, created_at=int(time.time()), ) db.add(grant) await db.commit() await db.refresh(grant) return AccessGrantModel.model_validate(grant) async def revoke_access( self, resource_type: str, resource_id: str, principal_type: str, principal_id: str, permission: str, db: Optional[AsyncSession] = None, ) -> bool: """Remove a single access grant.""" async with get_async_db_context(db) as db: result = await db.execute( delete(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, principal_type=principal_type, principal_id=principal_id, permission=permission, ) ) await db.commit() return result.rowcount > 0 async def revoke_all_access( self, resource_type: str, resource_id: str, db: Optional[AsyncSession] = None, ) -> int: """Remove all access grants for a resource.""" async with get_async_db_context(db) as db: result = await db.execute( delete(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, ) ) await db.commit() return result.rowcount async def set_access_control( self, resource_type: str, resource_id: str, access_control: Optional[dict], db: Optional[AsyncSession] = None, ) -> list[AccessGrantModel]: """ Replace all grants for a resource from an access_control JSON dict. This is the primary bridge for backward compat with the frontend. """ async with get_async_db_context(db) as db: # Delete all existing grants for this resource await db.execute( delete(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, ) ) # Convert JSON to grant dicts grant_dicts = access_control_to_grants(resource_type, resource_id, access_control) # Insert new grants results = [] for grant_dict in grant_dicts: grant = AccessGrant( id=str(uuid.uuid4()), **grant_dict, created_at=int(time.time()), ) db.add(grant) results.append(grant) await db.commit() return [AccessGrantModel.model_validate(g) for g in results] async def set_access_grants( self, resource_type: str, resource_id: str, access_grants: Optional[list], db: Optional[AsyncSession] = None, ) -> list[AccessGrantModel]: """ Replace all grants for a resource from a direct access_grants list. """ async with get_async_db_context(db) as db: await db.execute( delete(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, ) ) normalized_grants = normalize_access_grants(access_grants) results = [] for grant_dict in normalized_grants: grant = AccessGrant( id=str(uuid.uuid4()), resource_type=resource_type, resource_id=resource_id, principal_type=grant_dict['principal_type'], principal_id=grant_dict['principal_id'], permission=grant_dict['permission'], created_at=int(time.time()), ) db.add(grant) results.append(grant) await db.commit() return [AccessGrantModel.model_validate(g) for g in results] async def get_access_control( self, resource_type: str, resource_id: str, db: Optional[AsyncSession] = None, ) -> Optional[dict]: """ Reconstruct the old-style access_control JSON dict from grants. For backward compat with the frontend. """ async with get_async_db_context(db) as db: result = await db.execute( select(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, ) ) grants = result.scalars().all() grant_models = [AccessGrantModel.model_validate(g) for g in grants] return grants_to_access_control(grant_models) async def get_grants_by_resource( self, resource_type: str, resource_id: str, db: Optional[AsyncSession] = None, ) -> list[AccessGrantModel]: """Get all grants for a specific resource.""" async with get_async_db_context(db) as db: result = await db.execute( select(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, ) ) grants = result.scalars().all() return [AccessGrantModel.model_validate(g) for g in grants] async def get_grants_by_resources( self, resource_type: str, resource_ids: list[str], db: Optional[AsyncSession] = None, ) -> dict[str, list[AccessGrantModel]]: """Batch-fetch grants for multiple resources. Returns {resource_id: [grants]}.""" if not resource_ids: return {} async with get_async_db_context(db) as db: result = await db.execute( select(AccessGrant).filter( AccessGrant.resource_type == resource_type, AccessGrant.resource_id.in_(resource_ids), ) ) grants = result.scalars().all() result_dict: dict[str, list[AccessGrantModel]] = {rid: [] for rid in resource_ids} for g in grants: result_dict[g.resource_id].append(AccessGrantModel.model_validate(g)) return result_dict async def has_access( self, user_id: str, resource_type: str, resource_id: str, permission: str = 'read', user_group_ids: Optional[set[str]] = None, db: Optional[AsyncSession] = None, ) -> bool: """ Check if a user has the specified permission on a resource. Access is granted if any of the following is true: - There's a grant for user:* (public) with the requested permission - There's a grant for the specific user with the requested permission - There's a grant for any of the user's groups with the requested permission """ async with get_async_db_context(db) as db: # Build conditions for matching grants conditions = [ # Public access and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == '*', ), # Direct user access and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == user_id, ), ] # Group access if user_group_ids is None: from open_webui.models.groups import Groups user_groups = await Groups.get_groups_by_member_id(user_id, db=db) user_group_ids = {group.id for group in user_groups} if user_group_ids: conditions.append( and_( AccessGrant.principal_type == 'group', AccessGrant.principal_id.in_(user_group_ids), ) ) result = await db.execute( select(AccessGrant) .filter( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == resource_id, AccessGrant.permission == permission, or_(*conditions), ) .limit(1) ) grant = result.scalars().first() return grant is not None async def get_accessible_resource_ids( self, user_id: str, resource_type: str, resource_ids: list[str], permission: str = 'read', user_group_ids: Optional[set[str]] = None, db: Optional[AsyncSession] = None, ) -> set[str]: """ Batch check: return the subset of resource_ids that the user can access. This replaces calling has_access() in a loop (N+1) with a single query. """ if not resource_ids: return set() async with get_async_db_context(db) as db: conditions = [ and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == '*', ), and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == user_id, ), ] if user_group_ids is None: from open_webui.models.groups import Groups user_groups = await Groups.get_groups_by_member_id(user_id, db=db) user_group_ids = {group.id for group in user_groups} if user_group_ids: conditions.append( and_( AccessGrant.principal_type == 'group', AccessGrant.principal_id.in_(user_group_ids), ) ) result = await db.execute( select(AccessGrant.resource_id) .filter( AccessGrant.resource_type == resource_type, AccessGrant.resource_id.in_(resource_ids), AccessGrant.permission == permission, or_(*conditions), ) .distinct() ) rows = result.all() return {row[0] for row in rows} async def get_users_with_access( self, resource_type: str, resource_id: str, permission: str = 'read', db: Optional[AsyncSession] = None, ) -> list: """ Get all users who have the specified permission on a resource. Returns a list of UserModel instances. """ from open_webui.models.users import Users, UserModel from open_webui.models.groups import Groups async with get_async_db_context(db) as db: result = await db.execute( select(AccessGrant).filter_by( resource_type=resource_type, resource_id=resource_id, permission=permission, ) ) grants = result.scalars().all() # Check for public access for grant in grants: if grant.principal_type == 'user' and grant.principal_id == '*': result = await Users.get_users(filter={'roles': ['!pending']}, db=db) return result.get('users', []) user_ids_with_access = set() for grant in grants: if grant.principal_type == 'user': user_ids_with_access.add(grant.principal_id) elif grant.principal_type == 'group': group_user_ids = await Groups.get_group_user_ids_by_id(grant.principal_id, db=db) if group_user_ids: user_ids_with_access.update(group_user_ids) if not user_ids_with_access: return [] return await Users.get_users_by_user_ids(list(user_ids_with_access), db=db) def has_permission_filter( self, db, query, DocumentModel, filter: dict, resource_type: str, permission: str = 'read', ): """ Apply access control filtering to a SQLAlchemy query by JOINing with access_grant. This replaces the old JSON-column-based filtering with a proper relational JOIN. Note: This method builds SQLAlchemy expressions and does NOT perform I/O itself, so it remains synchronous. The caller is responsible for executing the query asynchronously with `await db.execute(...)`. """ group_ids = filter.get('group_ids', []) user_id = filter.get('user_id') if permission == 'read_only': return self._has_read_only_permission_filter(db, query, DocumentModel, filter, resource_type) # Build principal conditions principal_conditions = [] if group_ids or user_id: # Public access: user:* read principal_conditions.append( and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == '*', ) ) if user_id: # Owner always has access principal_conditions.append(DocumentModel.user_id == user_id) # Direct user grant principal_conditions.append( and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == user_id, ) ) if group_ids: # Group grants principal_conditions.append( and_( AccessGrant.principal_type == 'group', AccessGrant.principal_id.in_(group_ids), ) ) if not principal_conditions: return query # LEFT JOIN access_grant and filter # We use a subquery approach to avoid duplicates from multiple matching grants from sqlalchemy import exists as sa_exists grant_exists = ( select(AccessGrant.id) .where( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == DocumentModel.id, AccessGrant.permission == permission, or_( and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == '*', ), *( [ and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == user_id, ) ] if user_id else [] ), *( [ and_( AccessGrant.principal_type == 'group', AccessGrant.principal_id.in_(group_ids), ) ] if group_ids else [] ), ), ) .correlate(DocumentModel) .exists() ) # Owner OR has a matching grant owner_or_grant = [grant_exists] if user_id: owner_or_grant.append(DocumentModel.user_id == user_id) query = query.filter(or_(*owner_or_grant)) return query def _has_read_only_permission_filter( self, db, query, DocumentModel, filter: dict, resource_type: str, ): """ Filter for items where user has read BUT NOT write access. Public items are NOT considered read_only. Note: This method builds SQLAlchemy expressions and does NOT perform I/O itself, so it remains synchronous. The caller is responsible for executing the query asynchronously with `await db.execute(...)`. """ group_ids = filter.get('group_ids', []) user_id = filter.get('user_id') from sqlalchemy import exists as sa_exists # Has read grant (not public) read_grant_exists = ( select(AccessGrant.id) .where( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == DocumentModel.id, AccessGrant.permission == 'read', or_( *( [ and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == user_id, ) ] if user_id else [] ), *( [ and_( AccessGrant.principal_type == 'group', AccessGrant.principal_id.in_(group_ids), ) ] if group_ids else [] ), ), ) .correlate(DocumentModel) .exists() ) # Does NOT have write grant write_grant_exists = ( select(AccessGrant.id) .where( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == DocumentModel.id, AccessGrant.permission == 'write', or_( *( [ and_( AccessGrant.principal_type == 'user', AccessGrant.principal_id == user_id, ) ] if user_id else [] ), *( [ and_( AccessGrant.principal_type == 'group', AccessGrant.principal_id.in_(group_ids), ) ] if group_ids else [] ), ), ) .correlate(DocumentModel) .exists() ) # Is NOT public public_grant_exists = ( select(AccessGrant.id) .where( AccessGrant.resource_type == resource_type, AccessGrant.resource_id == DocumentModel.id, AccessGrant.permission == 'read', AccessGrant.principal_type == 'user', AccessGrant.principal_id == '*', ) .correlate(DocumentModel) .exists() ) conditions = [read_grant_exists, ~write_grant_exists, ~public_grant_exists] # Not owner if user_id: conditions.append(DocumentModel.user_id != user_id) query = query.filter(and_(*conditions)) return query AccessGrants = AccessGrantsTable()