| import json |
| import logging |
| import time |
| import uuid |
| from typing import Optional |
|
|
| from open_webui.internal.db import Base, JSONField, get_async_db_context |
| from open_webui.models.access_grants import AccessGrantModel, AccessGrants |
| from open_webui.models.files import ( |
| File, |
| FileMetadataResponse, |
| FileModel, |
| FileModelResponse, |
| ) |
| from open_webui.models.groups import Groups |
| from open_webui.models.users import User, UserModel, UserResponse, Users |
| from pydantic import BaseModel, ConfigDict, Field |
| from sqlalchemy import ( |
| JSON, |
| BigInteger, |
| Column, |
| ForeignKey, |
| Index, |
| String, |
| Text, |
| UniqueConstraint, |
| delete, |
| func, |
| or_, |
| select, |
| update, |
| ) |
| from sqlalchemy.ext.asyncio import AsyncSession |
|
|
| log = logging.getLogger(__name__) |
|
|
| |
| |
| |
| |
| |
|
|
|
|
| class Knowledge(Base): |
| __tablename__ = 'knowledge' |
|
|
| id = Column(Text, unique=True, primary_key=True) |
| user_id = Column(Text) |
|
|
| name = Column(Text) |
| description = Column(Text) |
|
|
| meta = Column(JSON, nullable=True) |
|
|
| created_at = Column(BigInteger) |
| updated_at = Column(BigInteger) |
|
|
|
|
| class KnowledgeDirectory(Base): |
| __tablename__ = 'knowledge_directory' |
|
|
| id = Column(Text, unique=True, primary_key=True) |
| knowledge_id = Column(Text, ForeignKey('knowledge.id', ondelete='CASCADE'), nullable=False) |
| parent_id = Column(Text, ForeignKey('knowledge_directory.id', ondelete='CASCADE'), nullable=True) |
| name = Column(Text, nullable=False) |
| user_id = Column(Text, nullable=False) |
|
|
| created_at = Column(BigInteger, nullable=False) |
| updated_at = Column(BigInteger, nullable=False) |
|
|
| __table_args__ = ( |
| UniqueConstraint('knowledge_id', 'parent_id', 'name', name='uq_knowledge_directory_knowledge_parent_name'), |
| Index('ix_knowledge_directory_knowledge_id', 'knowledge_id'), |
| Index('ix_knowledge_directory_parent_id', 'parent_id'), |
| ) |
|
|
|
|
| class KnowledgeModel(BaseModel): |
| model_config = ConfigDict(from_attributes=True) |
|
|
| id: str |
| user_id: str |
|
|
| name: str |
| description: str |
|
|
| meta: Optional[dict] = None |
|
|
| access_grants: list[AccessGrantModel] = Field(default_factory=list) |
|
|
| created_at: int |
| updated_at: int |
|
|
|
|
| class KnowledgeFile(Base): |
| __tablename__ = 'knowledge_file' |
|
|
| id = Column(Text, unique=True, primary_key=True) |
|
|
| knowledge_id = Column(Text, ForeignKey('knowledge.id', ondelete='CASCADE'), nullable=False) |
| file_id = Column(Text, ForeignKey('file.id', ondelete='CASCADE'), nullable=False) |
| directory_id = Column(Text, ForeignKey('knowledge_directory.id', ondelete='SET NULL'), nullable=True) |
| user_id = Column(Text, nullable=False) |
|
|
| created_at = Column(BigInteger, nullable=False) |
| updated_at = Column(BigInteger, nullable=False) |
|
|
| __table_args__ = ( |
| UniqueConstraint('knowledge_id', 'file_id', name='uq_knowledge_file_knowledge_file'), |
| Index('ix_knowledge_file_directory_id', 'directory_id'), |
| ) |
|
|
|
|
| class KnowledgeFileModel(BaseModel): |
| id: str |
| knowledge_id: str |
| file_id: str |
| directory_id: Optional[str] = None |
| user_id: str |
|
|
| created_at: int |
| updated_at: int |
|
|
| model_config = ConfigDict(from_attributes=True) |
|
|
|
|
| class KnowledgeDirectoryModel(BaseModel): |
| model_config = ConfigDict(from_attributes=True) |
|
|
| id: str |
| knowledge_id: str |
| parent_id: Optional[str] = None |
| name: str |
| user_id: str |
|
|
| created_at: int |
| updated_at: int |
|
|
|
|
| class KnowledgeDirectoryForm(BaseModel): |
| name: str |
| parent_id: Optional[str] = None |
|
|
|
|
| |
| |
| |
| class KnowledgeUserModel(KnowledgeModel): |
| user: Optional[UserResponse] = None |
|
|
|
|
| class KnowledgeResponse(KnowledgeModel): |
| files: Optional[list[FileMetadataResponse | dict]] = None |
|
|
|
|
| class KnowledgeUserResponse(KnowledgeUserModel): |
| pass |
|
|
|
|
| class KnowledgeForm(BaseModel): |
| name: str |
| description: str |
| access_grants: Optional[list[dict]] = None |
|
|
|
|
| class FileUserResponse(FileModelResponse): |
| user: Optional[UserResponse] = None |
|
|
|
|
| class KnowledgeListResponse(BaseModel): |
| items: list[KnowledgeUserModel] |
| total: int |
|
|
|
|
| class KnowledgeFileListResponse(BaseModel): |
| items: list[FileUserResponse] |
| directories: list[KnowledgeDirectoryModel] = Field(default_factory=list) |
| breadcrumbs: list[KnowledgeDirectoryModel] = Field(default_factory=list) |
| total: int |
|
|
|
|
| class KnowledgeTable: |
| async def _get_access_grants(self, knowledge_id: str, db: Optional[AsyncSession] = None) -> list[AccessGrantModel]: |
| return await AccessGrants.get_grants_by_resource('knowledge', knowledge_id, db=db) |
|
|
| async def _to_knowledge_model( |
| self, |
| knowledge: Knowledge, |
| access_grants: Optional[list[AccessGrantModel]] = None, |
| db: Optional[AsyncSession] = None, |
| ) -> KnowledgeModel: |
| knowledge_data = KnowledgeModel.model_validate(knowledge).model_dump(exclude={'access_grants'}) |
| knowledge_data['access_grants'] = ( |
| access_grants if access_grants is not None else await self._get_access_grants(knowledge_data['id'], db=db) |
| ) |
| return KnowledgeModel.model_validate(knowledge_data) |
|
|
| async def insert_new_knowledge( |
| self, user_id: str, form_data: KnowledgeForm, db: Optional[AsyncSession] = None |
| ) -> Optional[KnowledgeModel]: |
| async with get_async_db_context(db) as db: |
| knowledge = KnowledgeModel( |
| **{ |
| **form_data.model_dump(exclude={'access_grants'}), |
| 'id': str(uuid.uuid4()), |
| 'user_id': user_id, |
| 'created_at': int(time.time()), |
| 'updated_at': int(time.time()), |
| 'access_grants': [], |
| } |
| ) |
|
|
| try: |
| result = Knowledge(**knowledge.model_dump(exclude={'access_grants'})) |
| db.add(result) |
| await db.commit() |
| await db.refresh(result) |
| await AccessGrants.set_access_grants('knowledge', result.id, form_data.access_grants, db=db) |
| if result: |
| return await self._to_knowledge_model(result, db=db) |
| else: |
| return None |
| except Exception: |
| return None |
|
|
| async def get_knowledge_bases( |
| self, skip: int = 0, limit: int = 30, db: Optional[AsyncSession] = None |
| ) -> list[KnowledgeUserModel]: |
| async with get_async_db_context(db) as db: |
| result = await db.execute(select(Knowledge).order_by(Knowledge.updated_at.desc())) |
| all_knowledge = result.scalars().all() |
| user_ids = list(set(knowledge.user_id for knowledge in all_knowledge)) |
| knowledge_ids = [knowledge.id for knowledge in all_knowledge] |
|
|
| users = await Users.get_users_by_user_ids(user_ids, db=db) if user_ids else [] |
| users_dict = {user.id: user for user in users} |
| grants_map = await AccessGrants.get_grants_by_resources('knowledge', knowledge_ids, db=db) |
|
|
| knowledge_bases = [] |
| for knowledge in all_knowledge: |
| user = users_dict.get(knowledge.user_id) |
| knowledge_bases.append( |
| KnowledgeUserModel.model_validate( |
| { |
| **( |
| await self._to_knowledge_model( |
| knowledge, |
| access_grants=grants_map.get(knowledge.id, []), |
| db=db, |
| ) |
| ).model_dump(), |
| 'user': user.model_dump() if user else None, |
| } |
| ) |
| ) |
| return knowledge_bases |
|
|
| async def search_knowledge_bases( |
| self, |
| user_id: str, |
| filter: dict, |
| skip: int = 0, |
| limit: int = 30, |
| db: Optional[AsyncSession] = None, |
| ) -> KnowledgeListResponse: |
| try: |
| async with get_async_db_context(db) as db: |
| stmt = select(Knowledge, User).outerjoin(User, User.id == Knowledge.user_id) |
|
|
| if filter: |
| query_key = filter.get('query') |
| if query_key: |
| stmt = stmt.filter( |
| or_( |
| Knowledge.name.ilike(f'%{query_key}%'), |
| Knowledge.description.ilike(f'%{query_key}%'), |
| User.name.ilike(f'%{query_key}%'), |
| User.email.ilike(f'%{query_key}%'), |
| User.username.ilike(f'%{query_key}%'), |
| ) |
| ) |
|
|
| view_option = filter.get('view_option') |
| if view_option == 'created': |
| stmt = stmt.filter(Knowledge.user_id == user_id) |
| elif view_option == 'shared': |
| stmt = stmt.filter(Knowledge.user_id != user_id) |
|
|
| stmt = AccessGrants.has_permission_filter( |
| db=db, |
| query=stmt, |
| DocumentModel=Knowledge, |
| filter=filter, |
| resource_type='knowledge', |
| permission='read', |
| ) |
|
|
| stmt = stmt.order_by(Knowledge.updated_at.desc(), Knowledge.id.asc()) |
|
|
| count_result = await db.execute(select(func.count()).select_from(stmt.subquery())) |
| total = count_result.scalar() |
| if skip: |
| stmt = stmt.offset(skip) |
| if limit: |
| stmt = stmt.limit(limit) |
|
|
| result = await db.execute(stmt) |
| items = result.all() |
|
|
| knowledge_ids = [kb.id for kb, _ in items] |
| grants_map = await AccessGrants.get_grants_by_resources('knowledge', knowledge_ids, db=db) |
|
|
| knowledge_bases = [] |
| for knowledge_base, user in items: |
| knowledge_bases.append( |
| KnowledgeUserModel.model_validate( |
| { |
| **( |
| await self._to_knowledge_model( |
| knowledge_base, |
| access_grants=grants_map.get(knowledge_base.id, []), |
| db=db, |
| ) |
| ).model_dump(), |
| 'user': (UserModel.model_validate(user).model_dump() if user else None), |
| } |
| ) |
| ) |
|
|
| return KnowledgeListResponse(items=knowledge_bases, total=total) |
| except Exception as e: |
| print(e) |
| return KnowledgeListResponse(items=[], total=0) |
|
|
| async def search_knowledge_files( |
| self, filter: dict, skip: int = 0, limit: int = 30, db: Optional[AsyncSession] = None |
| ) -> KnowledgeFileListResponse: |
| """ |
| Scalable version: search files across all knowledge bases the user has |
| READ access to, without loading all KBs or using large IN() lists. |
| """ |
| try: |
| async with get_async_db_context(db) as db: |
| |
| stmt = ( |
| select(File, User, Knowledge) |
| .join(KnowledgeFile, File.id == KnowledgeFile.file_id) |
| .join(Knowledge, KnowledgeFile.knowledge_id == Knowledge.id) |
| .outerjoin(User, User.id == KnowledgeFile.user_id) |
| ) |
|
|
| |
| stmt = AccessGrants.has_permission_filter( |
| db=db, |
| query=stmt, |
| DocumentModel=Knowledge, |
| filter=filter, |
| resource_type='knowledge', |
| permission='read', |
| ) |
|
|
| |
| search_filter = None |
| if filter: |
| q = filter.get('query') |
| if q: |
| if filter.get('include_content'): |
| |
| |
| |
| content_text = File.data['content'].as_string() |
| search_filter = or_( |
| File.filename.ilike(f'%{q}%'), |
| content_text.ilike(f'%{q}%'), |
| ) |
| else: |
| search_filter = File.filename.ilike(f'%{q}%') |
| stmt = stmt.filter(search_filter) |
|
|
| |
| stmt = stmt.order_by(File.updated_at.desc(), File.id.asc()) |
|
|
| |
| count_stmt = ( |
| select(func.count(File.id)) |
| .select_from(File) |
| .join(KnowledgeFile, File.id == KnowledgeFile.file_id) |
| .join(Knowledge, KnowledgeFile.knowledge_id == Knowledge.id) |
| ) |
| count_stmt = AccessGrants.has_permission_filter( |
| db=db, |
| query=count_stmt, |
| DocumentModel=Knowledge, |
| filter=filter, |
| resource_type='knowledge', |
| permission='read', |
| ) |
| if search_filter is not None: |
| count_stmt = count_stmt.filter(search_filter) |
| count_result = await db.execute(count_stmt) |
| total = count_result.scalar() |
|
|
| if skip: |
| stmt = stmt.offset(skip) |
| if limit: |
| stmt = stmt.limit(limit) |
|
|
| result = await db.execute(stmt) |
| rows = result.all() |
|
|
| items = [] |
| for file, user, knowledge in rows: |
| items.append( |
| FileUserResponse( |
| **FileModel.model_validate(file).model_dump(), |
| user=(UserResponse(**UserModel.model_validate(user).model_dump()) if user else None), |
| collection=(await self._to_knowledge_model(knowledge, db=db)).model_dump(), |
| ) |
| ) |
|
|
| return KnowledgeFileListResponse(items=items, total=total) |
|
|
| except Exception as e: |
| print('search_knowledge_files error:', e) |
| return KnowledgeFileListResponse(items=[], total=0) |
|
|
| async def check_access_by_user_id(self, id, user_id, permission='write', db: Optional[AsyncSession] = None) -> bool: |
| knowledge = await self.get_knowledge_by_id(id, db=db) |
| if not knowledge: |
| return False |
| if knowledge.user_id == user_id: |
| return True |
| user_groups = await Groups.get_groups_by_member_id(user_id, db=db) |
| user_group_ids = {group.id for group in user_groups} |
| return await AccessGrants.has_access( |
| user_id=user_id, |
| resource_type='knowledge', |
| resource_id=knowledge.id, |
| permission=permission, |
| user_group_ids=user_group_ids, |
| db=db, |
| ) |
|
|
| async def get_knowledge_bases_by_user_id( |
| self, user_id: str, permission: str = 'write', db: Optional[AsyncSession] = None |
| ) -> list[KnowledgeUserModel]: |
| knowledge_bases = await self.get_knowledge_bases(db=db) |
| user_groups = await Groups.get_groups_by_member_id(user_id, db=db) |
| user_group_ids = {group.id for group in user_groups} |
|
|
| result = [] |
| for knowledge_base in knowledge_bases: |
| if knowledge_base.user_id == user_id: |
| result.append(knowledge_base) |
| elif await AccessGrants.has_access( |
| user_id=user_id, |
| resource_type='knowledge', |
| resource_id=knowledge_base.id, |
| permission=permission, |
| user_group_ids=user_group_ids, |
| db=db, |
| ): |
| result.append(knowledge_base) |
| return result |
|
|
| async def get_knowledge_by_id(self, id: str, db: Optional[AsyncSession] = None) -> Optional[KnowledgeModel]: |
| try: |
| async with get_async_db_context(db) as db: |
| result = await db.execute(select(Knowledge).filter_by(id=id)) |
| knowledge = result.scalars().first() |
| return await self._to_knowledge_model(knowledge, db=db) if knowledge else None |
| except Exception: |
| return None |
|
|
| async def get_knowledge_by_id_and_user_id( |
| self, id: str, user_id: str, db: Optional[AsyncSession] = None |
| ) -> Optional[KnowledgeModel]: |
| knowledge = await self.get_knowledge_by_id(id, db=db) |
| if not knowledge: |
| return None |
|
|
| if knowledge.user_id == user_id: |
| return knowledge |
|
|
| user_groups = await Groups.get_groups_by_member_id(user_id, db=db) |
| user_group_ids = {group.id for group in user_groups} |
| if await AccessGrants.has_access( |
| user_id=user_id, |
| resource_type='knowledge', |
| resource_id=knowledge.id, |
| permission='write', |
| user_group_ids=user_group_ids, |
| db=db, |
| ): |
| return knowledge |
| return None |
|
|
| async def get_knowledges_by_file_id(self, file_id: str, db: Optional[AsyncSession] = None) -> list[KnowledgeModel]: |
| try: |
| async with get_async_db_context(db) as db: |
| result = await db.execute( |
| select(Knowledge) |
| .join(KnowledgeFile, Knowledge.id == KnowledgeFile.knowledge_id) |
| .filter(KnowledgeFile.file_id == file_id) |
| ) |
| knowledges = result.scalars().all() |
| knowledge_ids = [k.id for k in knowledges] |
| grants_map = await AccessGrants.get_grants_by_resources('knowledge', knowledge_ids, db=db) |
| return [ |
| await self._to_knowledge_model( |
| knowledge, |
| access_grants=grants_map.get(knowledge.id, []), |
| db=db, |
| ) |
| for knowledge in knowledges |
| ] |
| except Exception: |
| return [] |
|
|
| async def search_files_by_id( |
| self, |
| knowledge_id: str, |
| user_id: str, |
| filter: dict, |
| skip: int = 0, |
| limit: int = 30, |
| db: Optional[AsyncSession] = None, |
| ) -> KnowledgeFileListResponse: |
| try: |
| async with get_async_db_context(db) as db: |
| stmt = ( |
| select(File, User) |
| .join(KnowledgeFile, File.id == KnowledgeFile.file_id) |
| .outerjoin(User, User.id == KnowledgeFile.user_id) |
| .filter(KnowledgeFile.knowledge_id == knowledge_id) |
| ) |
|
|
| |
| directory_id = filter.get('directory_id') if filter else None |
| if directory_id: |
| stmt = stmt.filter(KnowledgeFile.directory_id == directory_id) |
| elif filter and 'directory_id' in filter: |
| |
| stmt = stmt.filter(KnowledgeFile.directory_id.is_(None)) |
|
|
| |
| primary_sort = File.updated_at.desc() |
|
|
| if filter: |
| query_key = filter.get('query') |
| if query_key: |
| if filter.get('include_content'): |
| |
| |
| |
| content_text = File.data['content'].as_string() |
| stmt = stmt.filter( |
| or_( |
| File.filename.ilike(f'%{query_key}%'), |
| content_text.ilike(f'%{query_key}%'), |
| ) |
| ) |
| else: |
| stmt = stmt.filter(File.filename.ilike(f'%{query_key}%')) |
|
|
| view_option = filter.get('view_option') |
| if view_option == 'created': |
| stmt = stmt.filter(KnowledgeFile.user_id == user_id) |
| elif view_option == 'shared': |
| stmt = stmt.filter(KnowledgeFile.user_id != user_id) |
|
|
| order_by = filter.get('order_by') |
| direction = filter.get('direction') |
| is_asc = direction == 'asc' |
|
|
| if order_by == 'name': |
| primary_sort = File.filename.asc() if is_asc else File.filename.desc() |
| elif order_by == 'created_at': |
| primary_sort = File.created_at.asc() if is_asc else File.created_at.desc() |
| elif order_by == 'updated_at': |
| primary_sort = File.updated_at.asc() if is_asc else File.updated_at.desc() |
|
|
| |
| stmt = stmt.order_by(primary_sort, File.id.asc()) |
|
|
| |
| count_result = await db.execute(select(func.count()).select_from(stmt.subquery())) |
| total = count_result.scalar() |
|
|
| if skip: |
| stmt = stmt.offset(skip) |
| if limit: |
| stmt = stmt.limit(limit) |
|
|
| result = await db.execute(stmt) |
| items = result.all() |
|
|
| files = [] |
| for file, user in items: |
| files.append( |
| FileUserResponse( |
| **FileModel.model_validate(file).model_dump(), |
| user=(UserResponse(**UserModel.model_validate(user).model_dump()) if user else None), |
| ) |
| ) |
|
|
| return KnowledgeFileListResponse( |
| items=files, |
| directories=await self.get_directories( |
| knowledge_id, |
| parent_id=filter.get('directory_id') if filter else None, |
| db=db, |
| ), |
| breadcrumbs=await self.get_directory_breadcrumbs( |
| filter.get('directory_id') if filter else None, |
| db=db, |
| ), |
| total=total, |
| ) |
| except Exception as e: |
| print(e) |
| return KnowledgeFileListResponse(items=[], total=0) |
|
|
| async def get_files_by_id(self, knowledge_id: str, db: Optional[AsyncSession] = None) -> list[FileModel]: |
| try: |
| async with get_async_db_context(db) as db: |
| result = await db.execute( |
| select(File) |
| .join(KnowledgeFile, File.id == KnowledgeFile.file_id) |
| .filter(KnowledgeFile.knowledge_id == knowledge_id) |
| ) |
| files = result.scalars().all() |
| return [FileModel.model_validate(file) for file in files] |
| except Exception: |
| return [] |
|
|
| async def get_file_metadatas_by_id( |
| self, knowledge_id: str, db: Optional[AsyncSession] = None |
| ) -> list[FileMetadataResponse]: |
| try: |
| files = await self.get_files_by_id(knowledge_id, db=db) |
| return [FileMetadataResponse(**file.model_dump()) for file in files] |
| except Exception: |
| return [] |
|
|
| async def add_file_to_knowledge_by_id( |
| self, |
| knowledge_id: str, |
| file_id: str, |
| user_id: str, |
| directory_id: Optional[str] = None, |
| db: Optional[AsyncSession] = None, |
| ) -> Optional[KnowledgeFileModel]: |
| async with get_async_db_context(db) as db: |
| knowledge_file = KnowledgeFileModel( |
| **{ |
| 'id': str(uuid.uuid4()), |
| 'knowledge_id': knowledge_id, |
| 'file_id': file_id, |
| 'directory_id': directory_id, |
| 'user_id': user_id, |
| 'created_at': int(time.time()), |
| 'updated_at': int(time.time()), |
| } |
| ) |
|
|
| try: |
| result = KnowledgeFile(**knowledge_file.model_dump()) |
| db.add(result) |
| await db.commit() |
| await db.refresh(result) |
| if result: |
| return KnowledgeFileModel.model_validate(result) |
| else: |
| return None |
| except Exception: |
| return None |
|
|
| async def has_file(self, knowledge_id: str, file_id: str, db: Optional[AsyncSession] = None) -> bool: |
| """Check whether a file belongs to a knowledge base.""" |
| try: |
| async with get_async_db_context(db) as db: |
| result = await db.execute( |
| select(KnowledgeFile).filter_by(knowledge_id=knowledge_id, file_id=file_id).limit(1) |
| ) |
| return result.scalars().first() is not None |
| except Exception: |
| return False |
|
|
| async def remove_file_from_knowledge_by_id( |
| self, knowledge_id: str, file_id: str, db: Optional[AsyncSession] = None |
| ) -> bool: |
| try: |
| async with get_async_db_context(db) as db: |
| await db.execute(delete(KnowledgeFile).filter_by(knowledge_id=knowledge_id, file_id=file_id)) |
| await db.commit() |
| return True |
| except Exception: |
| return False |
|
|
| async def reset_knowledge_by_id( |
| self, id: str, include_directories: bool = True, db: Optional[AsyncSession] = None |
| ) -> Optional[KnowledgeModel]: |
| try: |
| async with get_async_db_context(db) as db: |
| |
| await db.execute(delete(KnowledgeFile).filter_by(knowledge_id=id)) |
|
|
| |
| if include_directories: |
| await db.execute(delete(KnowledgeDirectory).filter_by(knowledge_id=id)) |
|
|
| await db.commit() |
|
|
| |
| await db.execute(update(Knowledge).filter_by(id=id).values(updated_at=int(time.time()))) |
| await db.commit() |
|
|
| return await self.get_knowledge_by_id(id=id, db=db) |
| except Exception as e: |
| log.exception(e) |
| return None |
|
|
| async def update_knowledge_by_id( |
| self, |
| id: str, |
| form_data: KnowledgeForm, |
| overwrite: bool = False, |
| db: Optional[AsyncSession] = None, |
| ) -> Optional[KnowledgeModel]: |
| try: |
| async with get_async_db_context(db) as db: |
| await db.execute( |
| update(Knowledge) |
| .filter_by(id=id) |
| .values( |
| **form_data.model_dump(exclude={'access_grants'}), |
| updated_at=int(time.time()), |
| ) |
| ) |
| await db.commit() |
| if form_data.access_grants is not None: |
| await AccessGrants.set_access_grants('knowledge', id, form_data.access_grants, db=db) |
| return await self.get_knowledge_by_id(id=id, db=db) |
| except Exception as e: |
| log.exception(e) |
| return None |
|
|
| async def update_knowledge_data_by_id( |
| self, id: str, data: dict, db: Optional[AsyncSession] = None |
| ) -> Optional[KnowledgeModel]: |
| try: |
| async with get_async_db_context(db) as db: |
| await db.execute( |
| update(Knowledge) |
| .filter_by(id=id) |
| .values( |
| data=data, |
| updated_at=int(time.time()), |
| ) |
| ) |
| await db.commit() |
| return await self.get_knowledge_by_id(id=id, db=db) |
| except Exception as e: |
| log.exception(e) |
| return None |
|
|
| async def delete_knowledge_by_id(self, id: str, db: Optional[AsyncSession] = None) -> bool: |
| try: |
| async with get_async_db_context(db) as db: |
| await AccessGrants.revoke_all_access('knowledge', id, db=db) |
| await db.execute(delete(Knowledge).filter_by(id=id)) |
| await db.commit() |
| return True |
| except Exception: |
| return False |
|
|
| async def delete_all_knowledge(self, db: Optional[AsyncSession] = None) -> bool: |
| async with get_async_db_context(db) as db: |
| try: |
| result = await db.execute(select(Knowledge.id)) |
| knowledge_ids = [row[0] for row in result.all()] |
| for knowledge_id in knowledge_ids: |
| await AccessGrants.revoke_all_access('knowledge', knowledge_id, db=db) |
| await db.execute(delete(Knowledge)) |
| await db.commit() |
|
|
| return True |
| except Exception: |
| return False |
|
|
| |
|
|
| async def create_directory( |
| self, |
| knowledge_id: str, |
| name: str, |
| user_id: str, |
| parent_id: Optional[str] = None, |
| db: Optional[AsyncSession] = None, |
| ) -> Optional[KnowledgeDirectoryModel]: |
| async with get_async_db_context(db) as db: |
| try: |
| now = int(time.time()) |
| directory = KnowledgeDirectory( |
| id=str(uuid.uuid4()), |
| knowledge_id=knowledge_id, |
| parent_id=parent_id, |
| name=name, |
| user_id=user_id, |
| created_at=now, |
| updated_at=now, |
| ) |
| db.add(directory) |
| await db.commit() |
| await db.refresh(directory) |
| return KnowledgeDirectoryModel.model_validate(directory) |
| except Exception as e: |
| log.exception(e) |
| return None |
|
|
| async def get_directories( |
| self, |
| knowledge_id: str, |
| parent_id: Optional[str] = None, |
| db: Optional[AsyncSession] = None, |
| ) -> list[KnowledgeDirectoryModel]: |
| """List directories at a given level (parent_id=None for root).""" |
| async with get_async_db_context(db) as db: |
| stmt = select(KnowledgeDirectory).filter(KnowledgeDirectory.knowledge_id == knowledge_id) |
| if parent_id: |
| stmt = stmt.filter(KnowledgeDirectory.parent_id == parent_id) |
| else: |
| stmt = stmt.filter(KnowledgeDirectory.parent_id.is_(None)) |
|
|
| stmt = stmt.order_by(KnowledgeDirectory.name.asc()) |
| result = await db.execute(stmt) |
| return [KnowledgeDirectoryModel.model_validate(d) for d in result.scalars().all()] |
|
|
| async def get_all_directories( |
| self, |
| knowledge_id: str, |
| db: Optional[AsyncSession] = None, |
| ) -> list[KnowledgeDirectoryModel]: |
| """Get ALL directories for a KB (no parent filter). Used for tree building.""" |
| async with get_async_db_context(db) as db: |
| stmt = ( |
| select(KnowledgeDirectory) |
| .filter(KnowledgeDirectory.knowledge_id == knowledge_id) |
| .order_by(KnowledgeDirectory.name.asc()) |
| ) |
| result = await db.execute(stmt) |
| return [KnowledgeDirectoryModel.model_validate(d) for d in result.scalars().all()] |
|
|
| async def get_files_with_directory_ids( |
| self, |
| knowledge_id: str, |
| db: Optional[AsyncSession] = None, |
| ) -> list[tuple[FileModel, Optional[str]]]: |
| """Get all files in a KB with their directory_id from KnowledgeFile.""" |
| try: |
| async with get_async_db_context(db) as db: |
| result = await db.execute( |
| select(File, KnowledgeFile.directory_id) |
| .join(KnowledgeFile, File.id == KnowledgeFile.file_id) |
| .filter(KnowledgeFile.knowledge_id == knowledge_id) |
| ) |
| return [(FileModel.model_validate(file), dir_id) for file, dir_id in result.all()] |
| except Exception: |
| return [] |
|
|
| async def get_directory_by_id( |
| self, directory_id: str, db: Optional[AsyncSession] = None |
| ) -> Optional[KnowledgeDirectoryModel]: |
| async with get_async_db_context(db) as db: |
| result = await db.execute(select(KnowledgeDirectory).filter_by(id=directory_id)) |
| directory = result.scalars().first() |
| return KnowledgeDirectoryModel.model_validate(directory) if directory else None |
|
|
| async def get_directory_breadcrumbs( |
| self, |
| directory_id: Optional[str], |
| db: Optional[AsyncSession] = None, |
| ) -> list[KnowledgeDirectoryModel]: |
| """Walk up the parent chain to build breadcrumbs (root first).""" |
| if not directory_id: |
| return [] |
|
|
| async with get_async_db_context(db) as db: |
| breadcrumbs = [] |
| current_id = directory_id |
| seen = set() |
|
|
| while current_id and current_id not in seen: |
| seen.add(current_id) |
| result = await db.execute(select(KnowledgeDirectory).filter_by(id=current_id)) |
| directory = result.scalars().first() |
| if not directory: |
| break |
| breadcrumbs.append(KnowledgeDirectoryModel.model_validate(directory)) |
| current_id = directory.parent_id |
|
|
| breadcrumbs.reverse() |
| return breadcrumbs |
|
|
| async def rename_directory( |
| self, |
| directory_id: str, |
| name: str, |
| db: Optional[AsyncSession] = None, |
| ) -> Optional[KnowledgeDirectoryModel]: |
| async with get_async_db_context(db) as db: |
| try: |
| await db.execute( |
| update(KnowledgeDirectory).filter_by(id=directory_id).values(name=name, updated_at=int(time.time())) |
| ) |
| await db.commit() |
| return await self.get_directory_by_id(directory_id, db=db) |
| except Exception as e: |
| log.exception(e) |
| return None |
|
|
| async def move_directory( |
| self, |
| directory_id: str, |
| new_parent_id: Optional[str], |
| db: Optional[AsyncSession] = None, |
| ) -> Optional[KnowledgeDirectoryModel]: |
| """Move a directory to a new parent, with cycle detection.""" |
| async with get_async_db_context(db) as db: |
| try: |
| |
| |
| if new_parent_id: |
| current = new_parent_id |
| seen = set() |
| while current and current not in seen: |
| if current == directory_id: |
| return None |
| seen.add(current) |
| result = await db.execute(select(KnowledgeDirectory.parent_id).filter_by(id=current)) |
| row = result.first() |
| current = row[0] if row else None |
|
|
| await db.execute( |
| update(KnowledgeDirectory) |
| .filter_by(id=directory_id) |
| .values(parent_id=new_parent_id, updated_at=int(time.time())) |
| ) |
| await db.commit() |
| return await self.get_directory_by_id(directory_id, db=db) |
| except Exception as e: |
| log.exception(e) |
| return None |
|
|
| async def update_directory( |
| self, |
| directory_id: str, |
| name: Optional[str] = None, |
| parent_id: Optional[str] = '__unset__', |
| db: Optional[AsyncSession] = None, |
| ) -> Optional[KnowledgeDirectoryModel]: |
| """Update directory name and/or parent. Pass parent_id=None to move to root.""" |
| |
| if parent_id != '__unset__': |
| result = await self.move_directory(directory_id, parent_id, db=db) |
| if result is None: |
| return None |
|
|
| if name is not None: |
| return await self.rename_directory(directory_id, name, db=db) |
|
|
| return await self.get_directory_by_id(directory_id, db=db) |
|
|
| async def delete_directory( |
| self, |
| directory_id: str, |
| move_files_to_parent: bool = True, |
| db: Optional[AsyncSession] = None, |
| ) -> bool: |
| """ |
| Delete a directory. |
| - If move_files_to_parent=True: files move to parent dir (or root) |
| - If move_files_to_parent=False: files are also deleted |
| """ |
| async with get_async_db_context(db) as db: |
| try: |
| |
| result = await db.execute(select(KnowledgeDirectory).filter_by(id=directory_id)) |
| directory = result.scalars().first() |
| if not directory: |
| return False |
|
|
| parent_id = directory.parent_id |
|
|
| if move_files_to_parent: |
| |
| await db.execute( |
| update(KnowledgeFile).filter_by(directory_id=directory_id).values(directory_id=parent_id) |
| ) |
| |
| await self._move_files_from_subtree(directory_id, parent_id, db=db) |
| else: |
| |
| await self._delete_files_in_subtree(directory_id, db=db) |
|
|
| |
| await db.execute(delete(KnowledgeDirectory).filter_by(id=directory_id)) |
| await db.commit() |
| return True |
| except Exception as e: |
| log.exception(e) |
| return False |
|
|
| async def _move_files_from_subtree( |
| self, |
| directory_id: str, |
| target_directory_id: Optional[str], |
| db: AsyncSession, |
| ) -> None: |
| """Recursively move all files from a directory subtree to the target.""" |
| result = await db.execute(select(KnowledgeDirectory.id).filter_by(parent_id=directory_id)) |
| child_ids = [row[0] for row in result.all()] |
|
|
| for child_id in child_ids: |
| await db.execute( |
| update(KnowledgeFile).filter_by(directory_id=child_id).values(directory_id=target_directory_id) |
| ) |
| await self._move_files_from_subtree(child_id, target_directory_id, db=db) |
|
|
| async def _delete_files_in_subtree( |
| self, |
| directory_id: str, |
| db: AsyncSession, |
| ) -> None: |
| """Recursively delete all files from a directory subtree.""" |
| await db.execute(delete(KnowledgeFile).filter_by(directory_id=directory_id)) |
| result = await db.execute(select(KnowledgeDirectory.id).filter_by(parent_id=directory_id)) |
| child_ids = [row[0] for row in result.all()] |
| for child_id in child_ids: |
| await self._delete_files_in_subtree(child_id, db=db) |
|
|
| async def move_file_to_directory( |
| self, |
| knowledge_id: str, |
| file_id: str, |
| directory_id: Optional[str] = None, |
| db: Optional[AsyncSession] = None, |
| ) -> bool: |
| """Move a file to a different directory within the same KB.""" |
| async with get_async_db_context(db) as db: |
| try: |
| await db.execute( |
| update(KnowledgeFile) |
| .filter_by(knowledge_id=knowledge_id, file_id=file_id) |
| .values(directory_id=directory_id, updated_at=int(time.time())) |
| ) |
| await db.commit() |
| return True |
| except Exception as e: |
| log.exception(e) |
| return False |
|
|
|
|
| Knowledges = KnowledgeTable() |
|
|