omnichat / backend /open_webui /models /knowledge.py
REXPro's picture
Upload 5247 files
c36067d verified
Raw
History Blame Contribute Delete
41.7 kB
import json
import logging
import time
import uuid
from typing import Optional
from open_webui.internal.db import Base, JSONField, get_async_db_context
from open_webui.models.access_grants import AccessGrantModel, AccessGrants
from open_webui.models.files import (
File,
FileMetadataResponse,
FileModel,
FileModelResponse,
)
from open_webui.models.groups import Groups
from open_webui.models.users import User, UserModel, UserResponse, Users
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import (
JSON,
BigInteger,
Column,
ForeignKey,
Index,
String,
Text,
UniqueConstraint,
delete,
func,
or_,
select,
update,
)
from sqlalchemy.ext.asyncio import AsyncSession
log = logging.getLogger(__name__)
####################
# Knowledge DB Schema
# Let what was gathered here outlast the one who gathered it,
# and still teach when the builder is gone.
####################
class Knowledge(Base):
__tablename__ = 'knowledge'
id = Column(Text, unique=True, primary_key=True)
user_id = Column(Text)
name = Column(Text)
description = Column(Text)
meta = Column(JSON, nullable=True)
created_at = Column(BigInteger)
updated_at = Column(BigInteger)
class KnowledgeDirectory(Base):
__tablename__ = 'knowledge_directory'
id = Column(Text, unique=True, primary_key=True)
knowledge_id = Column(Text, ForeignKey('knowledge.id', ondelete='CASCADE'), nullable=False)
parent_id = Column(Text, ForeignKey('knowledge_directory.id', ondelete='CASCADE'), nullable=True)
name = Column(Text, nullable=False)
user_id = Column(Text, nullable=False)
created_at = Column(BigInteger, nullable=False)
updated_at = Column(BigInteger, nullable=False)
__table_args__ = (
UniqueConstraint('knowledge_id', 'parent_id', 'name', name='uq_knowledge_directory_knowledge_parent_name'),
Index('ix_knowledge_directory_knowledge_id', 'knowledge_id'),
Index('ix_knowledge_directory_parent_id', 'parent_id'),
)
class KnowledgeModel(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: str
user_id: str
name: str
description: str
meta: Optional[dict] = None
access_grants: list[AccessGrantModel] = Field(default_factory=list)
created_at: int # timestamp in epoch
updated_at: int # timestamp in epoch
class KnowledgeFile(Base):
__tablename__ = 'knowledge_file'
id = Column(Text, unique=True, primary_key=True)
knowledge_id = Column(Text, ForeignKey('knowledge.id', ondelete='CASCADE'), nullable=False)
file_id = Column(Text, ForeignKey('file.id', ondelete='CASCADE'), nullable=False)
directory_id = Column(Text, ForeignKey('knowledge_directory.id', ondelete='SET NULL'), nullable=True)
user_id = Column(Text, nullable=False)
created_at = Column(BigInteger, nullable=False)
updated_at = Column(BigInteger, nullable=False)
__table_args__ = (
UniqueConstraint('knowledge_id', 'file_id', name='uq_knowledge_file_knowledge_file'),
Index('ix_knowledge_file_directory_id', 'directory_id'),
)
class KnowledgeFileModel(BaseModel):
id: str
knowledge_id: str
file_id: str
directory_id: Optional[str] = None
user_id: str
created_at: int # timestamp in epoch
updated_at: int # timestamp in epoch
model_config = ConfigDict(from_attributes=True)
class KnowledgeDirectoryModel(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: str
knowledge_id: str
parent_id: Optional[str] = None
name: str
user_id: str
created_at: int # timestamp in epoch
updated_at: int # timestamp in epoch
class KnowledgeDirectoryForm(BaseModel):
name: str
parent_id: Optional[str] = None
####################
# Forms
####################
class KnowledgeUserModel(KnowledgeModel):
user: Optional[UserResponse] = None
class KnowledgeResponse(KnowledgeModel):
files: Optional[list[FileMetadataResponse | dict]] = None
class KnowledgeUserResponse(KnowledgeUserModel):
pass
class KnowledgeForm(BaseModel):
name: str
description: str
access_grants: Optional[list[dict]] = None
class FileUserResponse(FileModelResponse):
user: Optional[UserResponse] = None
class KnowledgeListResponse(BaseModel):
items: list[KnowledgeUserModel]
total: int
class KnowledgeFileListResponse(BaseModel):
items: list[FileUserResponse]
directories: list[KnowledgeDirectoryModel] = Field(default_factory=list)
breadcrumbs: list[KnowledgeDirectoryModel] = Field(default_factory=list)
total: int
class KnowledgeTable:
async def _get_access_grants(self, knowledge_id: str, db: Optional[AsyncSession] = None) -> list[AccessGrantModel]:
return await AccessGrants.get_grants_by_resource('knowledge', knowledge_id, db=db)
async def _to_knowledge_model(
self,
knowledge: Knowledge,
access_grants: Optional[list[AccessGrantModel]] = None,
db: Optional[AsyncSession] = None,
) -> KnowledgeModel:
knowledge_data = KnowledgeModel.model_validate(knowledge).model_dump(exclude={'access_grants'})
knowledge_data['access_grants'] = (
access_grants if access_grants is not None else await self._get_access_grants(knowledge_data['id'], db=db)
)
return KnowledgeModel.model_validate(knowledge_data)
async def insert_new_knowledge(
self, user_id: str, form_data: KnowledgeForm, db: Optional[AsyncSession] = None
) -> Optional[KnowledgeModel]:
async with get_async_db_context(db) as db:
knowledge = KnowledgeModel(
**{
**form_data.model_dump(exclude={'access_grants'}),
'id': str(uuid.uuid4()),
'user_id': user_id,
'created_at': int(time.time()),
'updated_at': int(time.time()),
'access_grants': [],
}
)
try:
result = Knowledge(**knowledge.model_dump(exclude={'access_grants'}))
db.add(result)
await db.commit()
await db.refresh(result)
await AccessGrants.set_access_grants('knowledge', result.id, form_data.access_grants, db=db)
if result:
return await self._to_knowledge_model(result, db=db)
else:
return None
except Exception:
return None
async def get_knowledge_bases(
self, skip: int = 0, limit: int = 30, db: Optional[AsyncSession] = None
) -> list[KnowledgeUserModel]:
async with get_async_db_context(db) as db:
result = await db.execute(select(Knowledge).order_by(Knowledge.updated_at.desc()))
all_knowledge = result.scalars().all()
user_ids = list(set(knowledge.user_id for knowledge in all_knowledge))
knowledge_ids = [knowledge.id for knowledge in all_knowledge]
users = await Users.get_users_by_user_ids(user_ids, db=db) if user_ids else []
users_dict = {user.id: user for user in users}
grants_map = await AccessGrants.get_grants_by_resources('knowledge', knowledge_ids, db=db)
knowledge_bases = []
for knowledge in all_knowledge:
user = users_dict.get(knowledge.user_id)
knowledge_bases.append(
KnowledgeUserModel.model_validate(
{
**(
await self._to_knowledge_model(
knowledge,
access_grants=grants_map.get(knowledge.id, []),
db=db,
)
).model_dump(),
'user': user.model_dump() if user else None,
}
)
)
return knowledge_bases
async def search_knowledge_bases(
self,
user_id: str,
filter: dict,
skip: int = 0,
limit: int = 30,
db: Optional[AsyncSession] = None,
) -> KnowledgeListResponse:
try:
async with get_async_db_context(db) as db:
stmt = select(Knowledge, User).outerjoin(User, User.id == Knowledge.user_id)
if filter:
query_key = filter.get('query')
if query_key:
stmt = stmt.filter(
or_(
Knowledge.name.ilike(f'%{query_key}%'),
Knowledge.description.ilike(f'%{query_key}%'),
User.name.ilike(f'%{query_key}%'),
User.email.ilike(f'%{query_key}%'),
User.username.ilike(f'%{query_key}%'),
)
)
view_option = filter.get('view_option')
if view_option == 'created':
stmt = stmt.filter(Knowledge.user_id == user_id)
elif view_option == 'shared':
stmt = stmt.filter(Knowledge.user_id != user_id)
stmt = AccessGrants.has_permission_filter(
db=db,
query=stmt,
DocumentModel=Knowledge,
filter=filter,
resource_type='knowledge',
permission='read',
)
stmt = stmt.order_by(Knowledge.updated_at.desc(), Knowledge.id.asc())
count_result = await db.execute(select(func.count()).select_from(stmt.subquery()))
total = count_result.scalar()
if skip:
stmt = stmt.offset(skip)
if limit:
stmt = stmt.limit(limit)
result = await db.execute(stmt)
items = result.all()
knowledge_ids = [kb.id for kb, _ in items]
grants_map = await AccessGrants.get_grants_by_resources('knowledge', knowledge_ids, db=db)
knowledge_bases = []
for knowledge_base, user in items:
knowledge_bases.append(
KnowledgeUserModel.model_validate(
{
**(
await self._to_knowledge_model(
knowledge_base,
access_grants=grants_map.get(knowledge_base.id, []),
db=db,
)
).model_dump(),
'user': (UserModel.model_validate(user).model_dump() if user else None),
}
)
)
return KnowledgeListResponse(items=knowledge_bases, total=total)
except Exception as e:
print(e)
return KnowledgeListResponse(items=[], total=0)
async def search_knowledge_files(
self, filter: dict, skip: int = 0, limit: int = 30, db: Optional[AsyncSession] = None
) -> KnowledgeFileListResponse:
"""
Scalable version: search files across all knowledge bases the user has
READ access to, without loading all KBs or using large IN() lists.
"""
try:
async with get_async_db_context(db) as db:
# Base query: join Knowledge → KnowledgeFile → File
stmt = (
select(File, User, Knowledge)
.join(KnowledgeFile, File.id == KnowledgeFile.file_id)
.join(Knowledge, KnowledgeFile.knowledge_id == Knowledge.id)
.outerjoin(User, User.id == KnowledgeFile.user_id)
)
# Apply access-control directly to the joined query
stmt = AccessGrants.has_permission_filter(
db=db,
query=stmt,
DocumentModel=Knowledge,
filter=filter,
resource_type='knowledge',
permission='read',
)
# Apply filename / content search
search_filter = None
if filter:
q = filter.get('query')
if q:
if filter.get('include_content'):
# Use ->> (as_string) instead of CAST(-> AS TEXT)
# to avoid PostgreSQL "invalid memory alloc request
# size" on large extracted-content rows (#24670).
content_text = File.data['content'].as_string()
search_filter = or_(
File.filename.ilike(f'%{q}%'),
content_text.ilike(f'%{q}%'),
)
else:
search_filter = File.filename.ilike(f'%{q}%')
stmt = stmt.filter(search_filter)
# Order by file changes
stmt = stmt.order_by(File.updated_at.desc(), File.id.asc())
# Lightweight count: avoid selecting File.data and ORDER BY
count_stmt = (
select(func.count(File.id))
.select_from(File)
.join(KnowledgeFile, File.id == KnowledgeFile.file_id)
.join(Knowledge, KnowledgeFile.knowledge_id == Knowledge.id)
)
count_stmt = AccessGrants.has_permission_filter(
db=db,
query=count_stmt,
DocumentModel=Knowledge,
filter=filter,
resource_type='knowledge',
permission='read',
)
if search_filter is not None:
count_stmt = count_stmt.filter(search_filter)
count_result = await db.execute(count_stmt)
total = count_result.scalar()
if skip:
stmt = stmt.offset(skip)
if limit:
stmt = stmt.limit(limit)
result = await db.execute(stmt)
rows = result.all()
items = []
for file, user, knowledge in rows:
items.append(
FileUserResponse(
**FileModel.model_validate(file).model_dump(),
user=(UserResponse(**UserModel.model_validate(user).model_dump()) if user else None),
collection=(await self._to_knowledge_model(knowledge, db=db)).model_dump(),
)
)
return KnowledgeFileListResponse(items=items, total=total)
except Exception as e:
print('search_knowledge_files error:', e)
return KnowledgeFileListResponse(items=[], total=0)
async def check_access_by_user_id(self, id, user_id, permission='write', db: Optional[AsyncSession] = None) -> bool:
knowledge = await self.get_knowledge_by_id(id, db=db)
if not knowledge:
return False
if knowledge.user_id == user_id:
return True
user_groups = await Groups.get_groups_by_member_id(user_id, db=db)
user_group_ids = {group.id for group in user_groups}
return await AccessGrants.has_access(
user_id=user_id,
resource_type='knowledge',
resource_id=knowledge.id,
permission=permission,
user_group_ids=user_group_ids,
db=db,
)
async def get_knowledge_bases_by_user_id(
self, user_id: str, permission: str = 'write', db: Optional[AsyncSession] = None
) -> list[KnowledgeUserModel]:
knowledge_bases = await self.get_knowledge_bases(db=db)
user_groups = await Groups.get_groups_by_member_id(user_id, db=db)
user_group_ids = {group.id for group in user_groups}
result = []
for knowledge_base in knowledge_bases:
if knowledge_base.user_id == user_id:
result.append(knowledge_base)
elif await AccessGrants.has_access(
user_id=user_id,
resource_type='knowledge',
resource_id=knowledge_base.id,
permission=permission,
user_group_ids=user_group_ids,
db=db,
):
result.append(knowledge_base)
return result
async def get_knowledge_by_id(self, id: str, db: Optional[AsyncSession] = None) -> Optional[KnowledgeModel]:
try:
async with get_async_db_context(db) as db:
result = await db.execute(select(Knowledge).filter_by(id=id))
knowledge = result.scalars().first()
return await self._to_knowledge_model(knowledge, db=db) if knowledge else None
except Exception:
return None
async def get_knowledge_by_id_and_user_id(
self, id: str, user_id: str, db: Optional[AsyncSession] = None
) -> Optional[KnowledgeModel]:
knowledge = await self.get_knowledge_by_id(id, db=db)
if not knowledge:
return None
if knowledge.user_id == user_id:
return knowledge
user_groups = await Groups.get_groups_by_member_id(user_id, db=db)
user_group_ids = {group.id for group in user_groups}
if await AccessGrants.has_access(
user_id=user_id,
resource_type='knowledge',
resource_id=knowledge.id,
permission='write',
user_group_ids=user_group_ids,
db=db,
):
return knowledge
return None
async def get_knowledges_by_file_id(self, file_id: str, db: Optional[AsyncSession] = None) -> list[KnowledgeModel]:
try:
async with get_async_db_context(db) as db:
result = await db.execute(
select(Knowledge)
.join(KnowledgeFile, Knowledge.id == KnowledgeFile.knowledge_id)
.filter(KnowledgeFile.file_id == file_id)
)
knowledges = result.scalars().all()
knowledge_ids = [k.id for k in knowledges]
grants_map = await AccessGrants.get_grants_by_resources('knowledge', knowledge_ids, db=db)
return [
await self._to_knowledge_model(
knowledge,
access_grants=grants_map.get(knowledge.id, []),
db=db,
)
for knowledge in knowledges
]
except Exception:
return []
async def search_files_by_id(
self,
knowledge_id: str,
user_id: str,
filter: dict,
skip: int = 0,
limit: int = 30,
db: Optional[AsyncSession] = None,
) -> KnowledgeFileListResponse:
try:
async with get_async_db_context(db) as db:
stmt = (
select(File, User)
.join(KnowledgeFile, File.id == KnowledgeFile.file_id)
.outerjoin(User, User.id == KnowledgeFile.user_id)
.filter(KnowledgeFile.knowledge_id == knowledge_id)
)
# Filter by directory_id (None = root level)
directory_id = filter.get('directory_id') if filter else None
if directory_id:
stmt = stmt.filter(KnowledgeFile.directory_id == directory_id)
elif filter and 'directory_id' in filter:
# Explicit None = root level only
stmt = stmt.filter(KnowledgeFile.directory_id.is_(None))
# Default sort: updated_at descending
primary_sort = File.updated_at.desc()
if filter:
query_key = filter.get('query')
if query_key:
if filter.get('include_content'):
# Use ->> (as_string) instead of CAST(-> AS TEXT)
# to avoid PostgreSQL memory allocation failures on
# large content (#24670).
content_text = File.data['content'].as_string()
stmt = stmt.filter(
or_(
File.filename.ilike(f'%{query_key}%'),
content_text.ilike(f'%{query_key}%'),
)
)
else:
stmt = stmt.filter(File.filename.ilike(f'%{query_key}%'))
view_option = filter.get('view_option')
if view_option == 'created':
stmt = stmt.filter(KnowledgeFile.user_id == user_id)
elif view_option == 'shared':
stmt = stmt.filter(KnowledgeFile.user_id != user_id)
order_by = filter.get('order_by')
direction = filter.get('direction')
is_asc = direction == 'asc'
if order_by == 'name':
primary_sort = File.filename.asc() if is_asc else File.filename.desc()
elif order_by == 'created_at':
primary_sort = File.created_at.asc() if is_asc else File.created_at.desc()
elif order_by == 'updated_at':
primary_sort = File.updated_at.asc() if is_asc else File.updated_at.desc()
# Apply sort with secondary key for deterministic pagination
stmt = stmt.order_by(primary_sort, File.id.asc())
# Count BEFORE pagination
count_result = await db.execute(select(func.count()).select_from(stmt.subquery()))
total = count_result.scalar()
if skip:
stmt = stmt.offset(skip)
if limit:
stmt = stmt.limit(limit)
result = await db.execute(stmt)
items = result.all()
files = []
for file, user in items:
files.append(
FileUserResponse(
**FileModel.model_validate(file).model_dump(),
user=(UserResponse(**UserModel.model_validate(user).model_dump()) if user else None),
)
)
return KnowledgeFileListResponse(
items=files,
directories=await self.get_directories(
knowledge_id,
parent_id=filter.get('directory_id') if filter else None,
db=db,
),
breadcrumbs=await self.get_directory_breadcrumbs(
filter.get('directory_id') if filter else None,
db=db,
),
total=total,
)
except Exception as e:
print(e)
return KnowledgeFileListResponse(items=[], total=0)
async def get_files_by_id(self, knowledge_id: str, db: Optional[AsyncSession] = None) -> list[FileModel]:
try:
async with get_async_db_context(db) as db:
result = await db.execute(
select(File)
.join(KnowledgeFile, File.id == KnowledgeFile.file_id)
.filter(KnowledgeFile.knowledge_id == knowledge_id)
)
files = result.scalars().all()
return [FileModel.model_validate(file) for file in files]
except Exception:
return []
async def get_file_metadatas_by_id(
self, knowledge_id: str, db: Optional[AsyncSession] = None
) -> list[FileMetadataResponse]:
try:
files = await self.get_files_by_id(knowledge_id, db=db)
return [FileMetadataResponse(**file.model_dump()) for file in files]
except Exception:
return []
async def add_file_to_knowledge_by_id(
self,
knowledge_id: str,
file_id: str,
user_id: str,
directory_id: Optional[str] = None,
db: Optional[AsyncSession] = None,
) -> Optional[KnowledgeFileModel]:
async with get_async_db_context(db) as db:
knowledge_file = KnowledgeFileModel(
**{
'id': str(uuid.uuid4()),
'knowledge_id': knowledge_id,
'file_id': file_id,
'directory_id': directory_id,
'user_id': user_id,
'created_at': int(time.time()),
'updated_at': int(time.time()),
}
)
try:
result = KnowledgeFile(**knowledge_file.model_dump())
db.add(result)
await db.commit()
await db.refresh(result)
if result:
return KnowledgeFileModel.model_validate(result)
else:
return None
except Exception:
return None
async def has_file(self, knowledge_id: str, file_id: str, db: Optional[AsyncSession] = None) -> bool:
"""Check whether a file belongs to a knowledge base."""
try:
async with get_async_db_context(db) as db:
result = await db.execute(
select(KnowledgeFile).filter_by(knowledge_id=knowledge_id, file_id=file_id).limit(1)
)
return result.scalars().first() is not None
except Exception:
return False
async def remove_file_from_knowledge_by_id(
self, knowledge_id: str, file_id: str, db: Optional[AsyncSession] = None
) -> bool:
try:
async with get_async_db_context(db) as db:
await db.execute(delete(KnowledgeFile).filter_by(knowledge_id=knowledge_id, file_id=file_id))
await db.commit()
return True
except Exception:
return False
async def reset_knowledge_by_id(
self, id: str, include_directories: bool = True, db: Optional[AsyncSession] = None
) -> Optional[KnowledgeModel]:
try:
async with get_async_db_context(db) as db:
# Delete all knowledge_file entries for this knowledge_id
await db.execute(delete(KnowledgeFile).filter_by(knowledge_id=id))
# Delete all directories if requested
if include_directories:
await db.execute(delete(KnowledgeDirectory).filter_by(knowledge_id=id))
await db.commit()
# Update the knowledge entry's updated_at timestamp
await db.execute(update(Knowledge).filter_by(id=id).values(updated_at=int(time.time())))
await db.commit()
return await self.get_knowledge_by_id(id=id, db=db)
except Exception as e:
log.exception(e)
return None
async def update_knowledge_by_id(
self,
id: str,
form_data: KnowledgeForm,
overwrite: bool = False,
db: Optional[AsyncSession] = None,
) -> Optional[KnowledgeModel]:
try:
async with get_async_db_context(db) as db:
await db.execute(
update(Knowledge)
.filter_by(id=id)
.values(
**form_data.model_dump(exclude={'access_grants'}),
updated_at=int(time.time()),
)
)
await db.commit()
if form_data.access_grants is not None:
await AccessGrants.set_access_grants('knowledge', id, form_data.access_grants, db=db)
return await self.get_knowledge_by_id(id=id, db=db)
except Exception as e:
log.exception(e)
return None
async def update_knowledge_data_by_id(
self, id: str, data: dict, db: Optional[AsyncSession] = None
) -> Optional[KnowledgeModel]:
try:
async with get_async_db_context(db) as db:
await db.execute(
update(Knowledge)
.filter_by(id=id)
.values(
data=data,
updated_at=int(time.time()),
)
)
await db.commit()
return await self.get_knowledge_by_id(id=id, db=db)
except Exception as e:
log.exception(e)
return None
async def delete_knowledge_by_id(self, id: str, db: Optional[AsyncSession] = None) -> bool:
try:
async with get_async_db_context(db) as db:
await AccessGrants.revoke_all_access('knowledge', id, db=db)
await db.execute(delete(Knowledge).filter_by(id=id))
await db.commit()
return True
except Exception:
return False
async def delete_all_knowledge(self, db: Optional[AsyncSession] = None) -> bool:
async with get_async_db_context(db) as db:
try:
result = await db.execute(select(Knowledge.id))
knowledge_ids = [row[0] for row in result.all()]
for knowledge_id in knowledge_ids:
await AccessGrants.revoke_all_access('knowledge', knowledge_id, db=db)
await db.execute(delete(Knowledge))
await db.commit()
return True
except Exception:
return False
# ── Directory CRUD ────────────────────────────────────────────────
async def create_directory(
self,
knowledge_id: str,
name: str,
user_id: str,
parent_id: Optional[str] = None,
db: Optional[AsyncSession] = None,
) -> Optional[KnowledgeDirectoryModel]:
async with get_async_db_context(db) as db:
try:
now = int(time.time())
directory = KnowledgeDirectory(
id=str(uuid.uuid4()),
knowledge_id=knowledge_id,
parent_id=parent_id,
name=name,
user_id=user_id,
created_at=now,
updated_at=now,
)
db.add(directory)
await db.commit()
await db.refresh(directory)
return KnowledgeDirectoryModel.model_validate(directory)
except Exception as e:
log.exception(e)
return None
async def get_directories(
self,
knowledge_id: str,
parent_id: Optional[str] = None,
db: Optional[AsyncSession] = None,
) -> list[KnowledgeDirectoryModel]:
"""List directories at a given level (parent_id=None for root)."""
async with get_async_db_context(db) as db:
stmt = select(KnowledgeDirectory).filter(KnowledgeDirectory.knowledge_id == knowledge_id)
if parent_id:
stmt = stmt.filter(KnowledgeDirectory.parent_id == parent_id)
else:
stmt = stmt.filter(KnowledgeDirectory.parent_id.is_(None))
stmt = stmt.order_by(KnowledgeDirectory.name.asc())
result = await db.execute(stmt)
return [KnowledgeDirectoryModel.model_validate(d) for d in result.scalars().all()]
async def get_all_directories(
self,
knowledge_id: str,
db: Optional[AsyncSession] = None,
) -> list[KnowledgeDirectoryModel]:
"""Get ALL directories for a KB (no parent filter). Used for tree building."""
async with get_async_db_context(db) as db:
stmt = (
select(KnowledgeDirectory)
.filter(KnowledgeDirectory.knowledge_id == knowledge_id)
.order_by(KnowledgeDirectory.name.asc())
)
result = await db.execute(stmt)
return [KnowledgeDirectoryModel.model_validate(d) for d in result.scalars().all()]
async def get_files_with_directory_ids(
self,
knowledge_id: str,
db: Optional[AsyncSession] = None,
) -> list[tuple[FileModel, Optional[str]]]:
"""Get all files in a KB with their directory_id from KnowledgeFile."""
try:
async with get_async_db_context(db) as db:
result = await db.execute(
select(File, KnowledgeFile.directory_id)
.join(KnowledgeFile, File.id == KnowledgeFile.file_id)
.filter(KnowledgeFile.knowledge_id == knowledge_id)
)
return [(FileModel.model_validate(file), dir_id) for file, dir_id in result.all()]
except Exception:
return []
async def get_directory_by_id(
self, directory_id: str, db: Optional[AsyncSession] = None
) -> Optional[KnowledgeDirectoryModel]:
async with get_async_db_context(db) as db:
result = await db.execute(select(KnowledgeDirectory).filter_by(id=directory_id))
directory = result.scalars().first()
return KnowledgeDirectoryModel.model_validate(directory) if directory else None
async def get_directory_breadcrumbs(
self,
directory_id: Optional[str],
db: Optional[AsyncSession] = None,
) -> list[KnowledgeDirectoryModel]:
"""Walk up the parent chain to build breadcrumbs (root first)."""
if not directory_id:
return []
async with get_async_db_context(db) as db:
breadcrumbs = []
current_id = directory_id
seen = set()
while current_id and current_id not in seen:
seen.add(current_id)
result = await db.execute(select(KnowledgeDirectory).filter_by(id=current_id))
directory = result.scalars().first()
if not directory:
break
breadcrumbs.append(KnowledgeDirectoryModel.model_validate(directory))
current_id = directory.parent_id
breadcrumbs.reverse() # root first
return breadcrumbs
async def rename_directory(
self,
directory_id: str,
name: str,
db: Optional[AsyncSession] = None,
) -> Optional[KnowledgeDirectoryModel]:
async with get_async_db_context(db) as db:
try:
await db.execute(
update(KnowledgeDirectory).filter_by(id=directory_id).values(name=name, updated_at=int(time.time()))
)
await db.commit()
return await self.get_directory_by_id(directory_id, db=db)
except Exception as e:
log.exception(e)
return None
async def move_directory(
self,
directory_id: str,
new_parent_id: Optional[str],
db: Optional[AsyncSession] = None,
) -> Optional[KnowledgeDirectoryModel]:
"""Move a directory to a new parent, with cycle detection."""
async with get_async_db_context(db) as db:
try:
# Cycle detection: walk up from new_parent_id to ensure
# we don't encounter directory_id
if new_parent_id:
current = new_parent_id
seen = set()
while current and current not in seen:
if current == directory_id:
return None # Would create a cycle
seen.add(current)
result = await db.execute(select(KnowledgeDirectory.parent_id).filter_by(id=current))
row = result.first()
current = row[0] if row else None
await db.execute(
update(KnowledgeDirectory)
.filter_by(id=directory_id)
.values(parent_id=new_parent_id, updated_at=int(time.time()))
)
await db.commit()
return await self.get_directory_by_id(directory_id, db=db)
except Exception as e:
log.exception(e)
return None
async def update_directory(
self,
directory_id: str,
name: Optional[str] = None,
parent_id: Optional[str] = '__unset__',
db: Optional[AsyncSession] = None,
) -> Optional[KnowledgeDirectoryModel]:
"""Update directory name and/or parent. Pass parent_id=None to move to root."""
# Handle move if parent_id is being changed
if parent_id != '__unset__':
result = await self.move_directory(directory_id, parent_id, db=db)
if result is None:
return None # Cycle detected or error
if name is not None:
return await self.rename_directory(directory_id, name, db=db)
return await self.get_directory_by_id(directory_id, db=db)
async def delete_directory(
self,
directory_id: str,
move_files_to_parent: bool = True,
db: Optional[AsyncSession] = None,
) -> bool:
"""
Delete a directory.
- If move_files_to_parent=True: files move to parent dir (or root)
- If move_files_to_parent=False: files are also deleted
"""
async with get_async_db_context(db) as db:
try:
# Get the directory to find its parent
result = await db.execute(select(KnowledgeDirectory).filter_by(id=directory_id))
directory = result.scalars().first()
if not directory:
return False
parent_id = directory.parent_id
if move_files_to_parent:
# Move files in this directory to its parent (or root)
await db.execute(
update(KnowledgeFile).filter_by(directory_id=directory_id).values(directory_id=parent_id)
)
# Recursively move files from all subdirectories too
await self._move_files_from_subtree(directory_id, parent_id, db=db)
else:
# Delete files in this directory and all subdirectories
await self._delete_files_in_subtree(directory_id, db=db)
# CASCADE on parent_id will handle deleting subdirectories
await db.execute(delete(KnowledgeDirectory).filter_by(id=directory_id))
await db.commit()
return True
except Exception as e:
log.exception(e)
return False
async def _move_files_from_subtree(
self,
directory_id: str,
target_directory_id: Optional[str],
db: AsyncSession,
) -> None:
"""Recursively move all files from a directory subtree to the target."""
result = await db.execute(select(KnowledgeDirectory.id).filter_by(parent_id=directory_id))
child_ids = [row[0] for row in result.all()]
for child_id in child_ids:
await db.execute(
update(KnowledgeFile).filter_by(directory_id=child_id).values(directory_id=target_directory_id)
)
await self._move_files_from_subtree(child_id, target_directory_id, db=db)
async def _delete_files_in_subtree(
self,
directory_id: str,
db: AsyncSession,
) -> None:
"""Recursively delete all files from a directory subtree."""
await db.execute(delete(KnowledgeFile).filter_by(directory_id=directory_id))
result = await db.execute(select(KnowledgeDirectory.id).filter_by(parent_id=directory_id))
child_ids = [row[0] for row in result.all()]
for child_id in child_ids:
await self._delete_files_in_subtree(child_id, db=db)
async def move_file_to_directory(
self,
knowledge_id: str,
file_id: str,
directory_id: Optional[str] = None,
db: Optional[AsyncSession] = None,
) -> bool:
"""Move a file to a different directory within the same KB."""
async with get_async_db_context(db) as db:
try:
await db.execute(
update(KnowledgeFile)
.filter_by(knowledge_id=knowledge_id, file_id=file_id)
.values(directory_id=directory_id, updated_at=int(time.time()))
)
await db.commit()
return True
except Exception as e:
log.exception(e)
return False
Knowledges = KnowledgeTable()