Spaces:
Build error
Build error
| import json | |
| import logging | |
| import time | |
| from typing import Optional | |
| from sqlalchemy import select, delete, update, or_, func, String, cast | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from open_webui.internal.db import Base, JSONField, get_async_db_context | |
| from open_webui.models.groups import Groups | |
| from open_webui.models.users import User, UserModel, Users, UserResponse | |
| from open_webui.models.access_grants import AccessGrantModel, AccessGrants | |
| from pydantic import BaseModel, ConfigDict, Field, model_validator | |
| from sqlalchemy.dialects.postgresql import JSONB | |
| from sqlalchemy import BigInteger, Column, Text, Boolean | |
| log = logging.getLogger(__name__) | |
| #################### | |
| # Models DB Schema | |
| # A misconfigured model wastes the time of everyone | |
| # who trusts it. Let what is set here be set with care. | |
| #################### | |
| # ModelParams is a model for the data stored in the params field of the Model table | |
| class ModelParams(BaseModel): | |
| model_config = ConfigDict(extra='allow') | |
| pass | |
| # ModelMeta is a model for the data stored in the meta field of the Model table | |
| class ModelMeta(BaseModel): | |
| profile_image_url: Optional[str] = '/static/favicon.png' | |
| description: Optional[str] = None | |
| """ | |
| User-facing description of the model. | |
| """ | |
| capabilities: Optional[dict] = None | |
| model_config = ConfigDict(extra='allow') | |
| def normalize_tags(cls, data): | |
| if isinstance(data, dict) and 'tags' in data: | |
| raw_tags = data['tags'] | |
| if isinstance(raw_tags, list): | |
| normalized = [] | |
| for tag in raw_tags: | |
| if isinstance(tag, str): | |
| normalized.append({'name': tag}) | |
| elif isinstance(tag, dict) and 'name' in tag: | |
| normalized.append(tag) | |
| data['tags'] = normalized | |
| return data | |
| class Model(Base): | |
| __tablename__ = 'model' | |
| id = Column(Text, primary_key=True, unique=True) | |
| """ | |
| The model's id as used in the API. If set to an existing model, it will override the model. | |
| """ | |
| user_id = Column(Text) | |
| base_model_id = Column(Text, nullable=True) | |
| """ | |
| An optional pointer to the actual model that should be used when proxying requests. | |
| """ | |
| name = Column(Text) | |
| """ | |
| The human-readable display name of the model. | |
| """ | |
| params = Column(JSONField) | |
| """ | |
| Holds a JSON encoded blob of parameters, see `ModelParams`. | |
| """ | |
| meta = Column(JSONField) | |
| """ | |
| Holds a JSON encoded blob of metadata, see `ModelMeta`. | |
| """ | |
| is_active = Column(Boolean, default=True) | |
| updated_at = Column(BigInteger) | |
| created_at = Column(BigInteger) | |
| class ModelModel(BaseModel): | |
| id: str | |
| user_id: str | |
| base_model_id: Optional[str] = None | |
| name: str | |
| params: ModelParams | |
| meta: ModelMeta | |
| access_grants: list[AccessGrantModel] = Field(default_factory=list) | |
| is_active: bool | |
| updated_at: int # timestamp in epoch | |
| created_at: int # timestamp in epoch | |
| model_config = ConfigDict(from_attributes=True) | |
| #################### | |
| # Forms | |
| #################### | |
| class ModelUserResponse(ModelModel): | |
| user: Optional[UserResponse] = None | |
| class ModelAccessResponse(ModelUserResponse): | |
| write_access: Optional[bool] = False | |
| class ModelResponse(ModelModel): | |
| pass | |
| class ModelListResponse(BaseModel): | |
| items: list[ModelUserResponse] | |
| total: int | |
| class ModelAccessListResponse(BaseModel): | |
| items: list[ModelAccessResponse] | |
| total: int | |
| class ModelForm(BaseModel): | |
| model_config = ConfigDict(extra='ignore') | |
| id: str | |
| base_model_id: Optional[str] = None | |
| name: str | |
| meta: ModelMeta | |
| params: ModelParams | |
| access_grants: Optional[list[dict]] = None | |
| is_active: bool = True | |
| class ModelsTable: | |
| async def _get_access_grants(self, model_id: str, db: Optional[AsyncSession] = None) -> list[AccessGrantModel]: | |
| return await AccessGrants.get_grants_by_resource('model', model_id, db=db) | |
| async def _to_model_model( | |
| self, | |
| model: Model, | |
| access_grants: Optional[list[AccessGrantModel]] = None, | |
| db: Optional[AsyncSession] = None, | |
| ) -> ModelModel: | |
| model_data = ModelModel.model_validate(model).model_dump(exclude={'access_grants'}) | |
| model_data['access_grants'] = ( | |
| access_grants if access_grants is not None else await self._get_access_grants(model_data['id'], db=db) | |
| ) | |
| return ModelModel.model_validate(model_data) | |
| async def insert_new_model( | |
| self, form_data: ModelForm, user_id: str, db: Optional[AsyncSession] = None | |
| ) -> Optional[ModelModel]: | |
| try: | |
| async with get_async_db_context(db) as db: | |
| result = Model( | |
| **{ | |
| **form_data.model_dump(exclude={'access_grants'}), | |
| 'user_id': user_id, | |
| 'created_at': int(time.time()), | |
| 'updated_at': int(time.time()), | |
| } | |
| ) | |
| db.add(result) | |
| await db.commit() | |
| await db.refresh(result) | |
| await AccessGrants.set_access_grants('model', result.id, form_data.access_grants, db=db) | |
| if result: | |
| return await self._to_model_model(result, db=db) | |
| else: | |
| return None | |
| except Exception as e: | |
| log.exception(f'Failed to insert a new model: {e}') | |
| return None | |
| async def get_all_models(self, db: Optional[AsyncSession] = None) -> list[ModelModel]: | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute(select(Model)) | |
| all_models = result.scalars().all() | |
| model_ids = [model.id for model in all_models] | |
| grants_map = await AccessGrants.get_grants_by_resources('model', model_ids, db=db) | |
| return [ | |
| await self._to_model_model(model, access_grants=grants_map.get(model.id, []), db=db) | |
| for model in all_models | |
| ] | |
| async def get_models(self, db: Optional[AsyncSession] = None) -> list[ModelUserResponse]: | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute(select(Model).filter(Model.base_model_id != None)) | |
| all_models = result.scalars().all() | |
| user_ids = list(set(model.user_id for model in all_models)) | |
| model_ids = [model.id for model in all_models] | |
| users = await Users.get_users_by_user_ids(user_ids, db=db) if user_ids else [] | |
| users_dict = {user.id: user for user in users} | |
| grants_map = await AccessGrants.get_grants_by_resources('model', model_ids, db=db) | |
| models = [] | |
| for model in all_models: | |
| user = users_dict.get(model.user_id) | |
| models.append( | |
| ModelUserResponse.model_validate( | |
| { | |
| **( | |
| await self._to_model_model( | |
| model, | |
| access_grants=grants_map.get(model.id, []), | |
| db=db, | |
| ) | |
| ).model_dump(), | |
| 'user': user.model_dump() if user else None, | |
| } | |
| ) | |
| ) | |
| return models | |
| async def get_base_models(self, db: Optional[AsyncSession] = None) -> list[ModelModel]: | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute(select(Model).filter(Model.base_model_id == None)) | |
| all_models = result.scalars().all() | |
| model_ids = [model.id for model in all_models] | |
| grants_map = await AccessGrants.get_grants_by_resources('model', model_ids, db=db) | |
| return [ | |
| await self._to_model_model(model, access_grants=grants_map.get(model.id, []), db=db) | |
| for model in all_models | |
| ] | |
| async def get_models_by_user_id( | |
| self, user_id: str, permission: str = 'write', db: Optional[AsyncSession] = None | |
| ) -> list[ModelUserResponse]: | |
| models = await self.get_models(db=db) | |
| user_groups = await Groups.get_groups_by_member_id(user_id, db=db) | |
| user_group_ids = {group.id for group in user_groups} | |
| result = [] | |
| for model in models: | |
| if model.user_id == user_id: | |
| result.append(model) | |
| elif await AccessGrants.has_access( | |
| user_id=user_id, | |
| resource_type='model', | |
| resource_id=model.id, | |
| permission=permission, | |
| user_group_ids=user_group_ids, | |
| db=db, | |
| ): | |
| result.append(model) | |
| return result | |
| def _has_permission(self, db, query, filter: dict, permission: str = 'read'): | |
| return AccessGrants.has_permission_filter( | |
| db=db, | |
| query=query, | |
| DocumentModel=Model, | |
| filter=filter, | |
| resource_type='model', | |
| permission=permission, | |
| ) | |
| async def search_models( | |
| self, | |
| user_id: str, | |
| filter: dict = {}, | |
| skip: int = 0, | |
| limit: int = 30, | |
| db: Optional[AsyncSession] = None, | |
| ) -> ModelListResponse: | |
| async with get_async_db_context(db) as db: | |
| stmt = select(Model, User).outerjoin(User, User.id == Model.user_id) | |
| stmt = stmt.filter(Model.base_model_id != None) | |
| if filter: | |
| query_key = filter.get('query') | |
| if query_key: | |
| stmt = stmt.filter( | |
| or_( | |
| Model.name.ilike(f'%{query_key}%'), | |
| Model.base_model_id.ilike(f'%{query_key}%'), | |
| User.name.ilike(f'%{query_key}%'), | |
| User.email.ilike(f'%{query_key}%'), | |
| User.username.ilike(f'%{query_key}%'), | |
| ) | |
| ) | |
| view_option = filter.get('view_option') | |
| if view_option == 'created': | |
| stmt = stmt.filter(Model.user_id == user_id) | |
| elif view_option == 'shared': | |
| stmt = stmt.filter(Model.user_id != user_id) | |
| # Apply access control filtering | |
| stmt = self._has_permission( | |
| db, | |
| stmt, | |
| filter, | |
| permission='read', | |
| ) | |
| tag = filter.get('tag') | |
| if tag: | |
| # SQLite stores JSON text via json.dumps(ensure_ascii=True), | |
| # so non-ASCII chars are \uXXXX-escaped. PostgreSQL native JSONB | |
| # stores literal Unicode. Use the right pattern for each. | |
| if db.bind.dialect.name == 'sqlite': | |
| if tag.isascii(): | |
| meta_text = func.lower(cast(Model.meta, String)) | |
| pattern = f'%{json.dumps(tag.lower())}%' | |
| else: | |
| meta_text = cast(Model.meta, String) | |
| pattern = f'%{json.dumps(tag)}%' | |
| else: | |
| meta_text = func.lower(cast(Model.meta, String)) | |
| pattern = f'%{json.dumps(tag.lower(), ensure_ascii=False)}%' | |
| stmt = stmt.filter(meta_text.like(pattern)) | |
| order_by = filter.get('order_by') | |
| direction = filter.get('direction') | |
| if order_by == 'name': | |
| if direction == 'asc': | |
| stmt = stmt.order_by(Model.name.asc()) | |
| else: | |
| stmt = stmt.order_by(Model.name.desc()) | |
| elif order_by == 'created_at': | |
| if direction == 'asc': | |
| stmt = stmt.order_by(Model.created_at.asc()) | |
| else: | |
| stmt = stmt.order_by(Model.created_at.desc()) | |
| elif order_by == 'updated_at': | |
| if direction == 'asc': | |
| stmt = stmt.order_by(Model.updated_at.asc()) | |
| else: | |
| stmt = stmt.order_by(Model.updated_at.desc()) | |
| else: | |
| stmt = stmt.order_by(Model.created_at.desc()) | |
| # Count BEFORE pagination | |
| count_result = await db.execute(select(func.count()).select_from(stmt.subquery())) | |
| total = count_result.scalar() | |
| if skip: | |
| stmt = stmt.offset(skip) | |
| if limit: | |
| stmt = stmt.limit(limit) | |
| result = await db.execute(stmt) | |
| items = result.all() | |
| model_ids = [model.id for model, _ in items] | |
| grants_map = await AccessGrants.get_grants_by_resources('model', model_ids, db=db) | |
| models = [] | |
| for model, user in items: | |
| models.append( | |
| ModelUserResponse( | |
| **( | |
| await self._to_model_model( | |
| model, | |
| access_grants=grants_map.get(model.id, []), | |
| db=db, | |
| ) | |
| ).model_dump(), | |
| user=(UserResponse(**UserModel.model_validate(user).model_dump()) if user else None), | |
| ) | |
| ) | |
| return ModelListResponse(items=models, total=total) | |
| async def get_model_meta_by_id(self, id: str, db: Optional[AsyncSession] = None) -> Optional[tuple[dict, int]]: | |
| """Return (meta, updated_at) for a model, skipping access grant resolution.""" | |
| try: | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute(select(Model.meta, Model.updated_at).filter_by(id=id)) | |
| return result.first() | |
| except Exception: | |
| return None | |
| async def get_all_tags( | |
| self, | |
| user_id: str, | |
| is_admin: bool = False, | |
| db: Optional[AsyncSession] = None, | |
| ) -> set[str]: | |
| """Extract unique tag names from model meta, querying only the meta column.""" | |
| async with get_async_db_context(db) as db: | |
| stmt = select(Model.meta).filter(Model.base_model_id != None) | |
| if not is_admin: | |
| user_groups = await Groups.get_groups_by_member_id(user_id, db=db) | |
| user_group_ids = [group.id for group in user_groups] | |
| filter_dict = {'user_id': user_id} | |
| if user_group_ids: | |
| filter_dict['group_ids'] = user_group_ids | |
| stmt = self._has_permission(db, stmt, filter_dict, permission='read') | |
| result = await db.execute(stmt) | |
| rows = result.scalars().all() | |
| tags_set: set[str] = set() | |
| for meta in rows: | |
| if not meta: | |
| continue | |
| for tag in meta.get('tags', []): | |
| try: | |
| name = tag.get('name') if isinstance(tag, dict) else str(tag) | |
| if name: | |
| tags_set.add(name) | |
| except Exception: | |
| continue | |
| return tags_set | |
| async def get_model_by_id(self, id: str, db: Optional[AsyncSession] = None) -> Optional[ModelModel]: | |
| try: | |
| async with get_async_db_context(db) as db: | |
| model = await db.get(Model, id) | |
| return await self._to_model_model(model, db=db) if model else None | |
| except Exception: | |
| return None | |
| async def get_models_by_ids(self, ids: list[str], db: Optional[AsyncSession] = None) -> list[ModelModel]: | |
| try: | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute(select(Model).filter(Model.id.in_(ids))) | |
| models = result.scalars().all() | |
| model_ids = [model.id for model in models] | |
| grants_map = await AccessGrants.get_grants_by_resources('model', model_ids, db=db) | |
| return [ | |
| await self._to_model_model( | |
| model, | |
| access_grants=grants_map.get(model.id, []), | |
| db=db, | |
| ) | |
| for model in models | |
| ] | |
| except Exception: | |
| return [] | |
| async def toggle_model_by_id(self, id: str, db: Optional[AsyncSession] = None) -> Optional[ModelModel]: | |
| async with get_async_db_context(db) as db: | |
| try: | |
| result = await db.execute(select(Model).filter_by(id=id)) | |
| model = result.scalars().first() | |
| if not model: | |
| return None | |
| model.is_active = not model.is_active | |
| model.updated_at = int(time.time()) | |
| await db.commit() | |
| await db.refresh(model) | |
| return await self._to_model_model(model, db=db) | |
| except Exception: | |
| return None | |
| async def update_model_by_id( | |
| self, id: str, model: ModelForm, db: Optional[AsyncSession] = None | |
| ) -> Optional[ModelModel]: | |
| try: | |
| async with get_async_db_context(db) as db: | |
| # update only the fields that are present in the model | |
| data = model.model_dump(exclude={'id', 'access_grants'}) | |
| data['updated_at'] = int(time.time()) | |
| await db.execute(update(Model).filter_by(id=id).values(**data)) | |
| await db.commit() | |
| if model.access_grants is not None: | |
| await AccessGrants.set_access_grants('model', id, model.access_grants, db=db) | |
| return await self.get_model_by_id(id, db=db) | |
| except Exception as e: | |
| log.exception(f'Failed to update the model by id {id}: {e}') | |
| return None | |
| async def update_model_updated_at_by_id(self, id: str, db: Optional[AsyncSession] = None) -> Optional[ModelModel]: | |
| try: | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute(select(Model).filter_by(id=id)) | |
| model_obj = result.scalars().first() | |
| if not model_obj: | |
| return None | |
| model_obj.updated_at = int(time.time()) | |
| await db.commit() | |
| await db.refresh(model_obj) | |
| return await self._to_model_model(model_obj, db=db) | |
| except Exception as e: | |
| log.exception(f'Failed to update the model updated_at by id {id}: {e}') | |
| return None | |
| async def delete_model_by_id(self, id: str, db: Optional[AsyncSession] = None) -> bool: | |
| try: | |
| async with get_async_db_context(db) as db: | |
| await AccessGrants.revoke_all_access('model', id, db=db) | |
| await db.execute(delete(Model).filter_by(id=id)) | |
| await db.commit() | |
| return True | |
| except Exception: | |
| return False | |
| async def delete_all_models(self, db: Optional[AsyncSession] = None) -> bool: | |
| try: | |
| async with get_async_db_context(db) as db: | |
| result = await db.execute(select(Model.id)) | |
| model_ids = [row[0] for row in result.all()] | |
| for model_id in model_ids: | |
| await AccessGrants.revoke_all_access('model', model_id, db=db) | |
| await db.execute(delete(Model)) | |
| await db.commit() | |
| return True | |
| except Exception: | |
| return False | |
| async def sync_models( | |
| self, user_id: str, models: list[ModelModel], db: Optional[AsyncSession] = None | |
| ) -> list[ModelModel]: | |
| try: | |
| async with get_async_db_context(db) as db: | |
| # Get existing models | |
| result = await db.execute(select(Model)) | |
| existing_models = result.scalars().all() | |
| existing_ids = {model.id for model in existing_models} | |
| # Prepare a set of new model IDs | |
| new_model_ids = {model.id for model in models} | |
| # Update or insert models | |
| for model in models: | |
| if model.id in existing_ids: | |
| await db.execute( | |
| update(Model) | |
| .filter_by(id=model.id) | |
| .values( | |
| **model.model_dump(exclude={'access_grants'}), | |
| user_id=user_id, | |
| updated_at=int(time.time()), | |
| ) | |
| ) | |
| else: | |
| new_model = Model( | |
| **{ | |
| **model.model_dump(exclude={'access_grants'}), | |
| 'user_id': user_id, | |
| 'updated_at': int(time.time()), | |
| } | |
| ) | |
| db.add(new_model) | |
| await AccessGrants.set_access_grants('model', model.id, model.access_grants, db=db) | |
| # Remove models that are no longer present | |
| for model in existing_models: | |
| if model.id not in new_model_ids: | |
| await AccessGrants.revoke_all_access('model', model.id, db=db) | |
| await db.delete(model) | |
| await db.commit() | |
| result = await db.execute(select(Model)) | |
| all_models = result.scalars().all() | |
| model_ids = [model.id for model in all_models] | |
| grants_map = await AccessGrants.get_grants_by_resources('model', model_ids, db=db) | |
| return [ | |
| await self._to_model_model( | |
| model, | |
| access_grants=grants_map.get(model.id, []), | |
| db=db, | |
| ) | |
| for model in all_models | |
| ] | |
| except Exception as e: | |
| log.exception(f'Error syncing models for user {user_id}: {e}') | |
| return [] | |
| Models = ModelsTable() | |