Spaces:
Build error
Build error
| import json | |
| import logging | |
| import base64 | |
| import io | |
| from typing import Optional | |
| from fastapi import APIRouter, Depends, HTTPException, Request, status, BackgroundTasks | |
| from fastapi.responses import Response, StreamingResponse, FileResponse | |
| from pydantic import BaseModel | |
| from pydantic import field_validator | |
| from open_webui.socket.main import ( | |
| emit_to_users, | |
| enter_room_for_users, | |
| sio, | |
| get_user_ids_from_room, | |
| ) | |
| from open_webui.models.users import ( | |
| UserIdNameResponse, | |
| UserIdNameStatusResponse, | |
| UserListResponse, | |
| UserModelResponse, | |
| Users, | |
| UserModel, | |
| UserNameResponse, | |
| ) | |
| from open_webui.models.groups import Groups | |
| from open_webui.models.channels import ( | |
| Channels, | |
| ChannelModel, | |
| ChannelForm, | |
| ChannelResponse, | |
| CreateChannelForm, | |
| ChannelWebhookModel, | |
| ChannelWebhookForm, | |
| ) | |
| from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_public_write_access_grant | |
| from open_webui.models.messages import ( | |
| Messages, | |
| MessageModel, | |
| MessageResponse, | |
| MessageWithReactionsResponse, | |
| MessageForm, | |
| ) | |
| from open_webui.utils.files import get_image_base64_from_file_id | |
| from open_webui.config import ENABLE_ADMIN_CHAT_ACCESS, ENABLE_ADMIN_EXPORT | |
| from open_webui.constants import ERROR_MESSAGES | |
| from open_webui.env import STATIC_DIR | |
| from open_webui.utils.models import ( | |
| get_all_models, | |
| get_filtered_models, | |
| ) | |
| from open_webui.utils.chat import generate_chat_completion | |
| from open_webui.utils.auth import get_admin_user, get_verified_user | |
| from open_webui.utils.access_control import has_permission, filter_allowed_access_grants | |
| from open_webui.utils.webhook import post_webhook | |
| from open_webui.utils.channels import extract_mentions, replace_mentions | |
| from open_webui.internal.db import get_async_session | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| log = logging.getLogger(__name__) | |
| router = APIRouter() | |
| async def channel_has_access( | |
| user_id: str, | |
| channel: ChannelModel, | |
| permission: str = 'read', | |
| strict: bool = True, | |
| db: Optional[AsyncSession] = None, | |
| ) -> bool: | |
| if await AccessGrants.has_access( | |
| user_id=user_id, | |
| resource_type='channel', | |
| resource_id=channel.id, | |
| permission=permission, | |
| db=db, | |
| ): | |
| return True | |
| if not strict and permission == 'write' and has_public_write_access_grant(channel.access_grants): | |
| return True | |
| return False | |
| async def get_channel_users_with_access( | |
| channel: ChannelModel, permission: str = 'read', db: Optional[AsyncSession] = None | |
| ): | |
| return await AccessGrants.get_users_with_access( | |
| resource_type='channel', | |
| resource_id=channel.id, | |
| permission=permission, | |
| db=db, | |
| ) | |
| def get_channel_permitted_group_and_user_ids( | |
| channel: ChannelModel, permission: str = 'read' | |
| ) -> Optional[dict[str, list[str]]]: | |
| if permission == 'read' and has_public_read_access_grant(channel.access_grants): | |
| return None | |
| user_ids = [] | |
| group_ids = [] | |
| for grant in channel.access_grants: | |
| if grant.permission != permission: | |
| continue | |
| if grant.principal_type == 'group': | |
| group_ids.append(grant.principal_id) | |
| elif grant.principal_type == 'user' and grant.principal_id != '*': | |
| user_ids.append(grant.principal_id) | |
| return { | |
| 'user_ids': list(dict.fromkeys(user_ids)), | |
| 'group_ids': list(dict.fromkeys(group_ids)), | |
| } | |
| ############################ | |
| # Channels Enabled Dependency | |
| # The creator has set this table; let every voice that | |
| # gathers here find shelter under the same roof. | |
| ############################ | |
| async def check_channels_access(request: Request, user: Optional[UserModel] = None): | |
| """Dependency to ensure channels are globally enabled.""" | |
| if not request.app.state.config.ENABLE_CHANNELS: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail=ERROR_MESSAGES.FEATURE_DISABLED('Channels'), | |
| ) | |
| if user: | |
| if user.role != 'admin' and not await has_permission( | |
| user.id, 'features.channels', request.app.state.config.USER_PERMISSIONS | |
| ): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.UNAUTHORIZED, | |
| ) | |
| ############################ | |
| # GetChatList | |
| ############################ | |
| class ChannelListItemResponse(ChannelModel): | |
| user_ids: Optional[list[str]] = None # 'dm' channels only | |
| users: Optional[list[UserIdNameStatusResponse]] = None # 'dm' channels only | |
| last_message_at: Optional[int] = None # timestamp in epoch (time_ns) | |
| unread_count: int = 0 | |
| async def get_channels( | |
| request: Request, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channels = await Channels.get_channels_by_user_id(user.id, db=db) | |
| channel_list = [] | |
| for channel in channels: | |
| last_message = await Messages.get_last_message_by_channel_id(channel.id, db=db) | |
| last_message_at = last_message.created_at if last_message else None | |
| channel_member = await Channels.get_member_by_channel_and_user_id(channel.id, user.id, db=db) | |
| unread_count = ( | |
| await Messages.get_unread_message_count(channel.id, user.id, channel_member.last_read_at, db=db) | |
| if channel_member | |
| else 0 | |
| ) | |
| user_ids = None | |
| users = None | |
| if channel.type == 'dm': | |
| user_ids = [member.user_id for member in await Channels.get_members_by_channel_id(channel.id, db=db)] | |
| users = [ | |
| UserIdNameStatusResponse( | |
| **{ | |
| **u.model_dump(), | |
| 'is_active': Users.is_active(u), | |
| } | |
| ) | |
| for u in await Users.get_users_by_user_ids(user_ids, db=db) | |
| ] | |
| channel_list.append( | |
| ChannelListItemResponse( | |
| **channel.model_dump(), | |
| user_ids=user_ids, | |
| users=users, | |
| last_message_at=last_message_at, | |
| unread_count=unread_count, | |
| ) | |
| ) | |
| return channel_list | |
| async def get_all_channels( | |
| request: Request, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| if user.role == 'admin': | |
| return await Channels.get_channels(db=db) | |
| return await Channels.get_channels_by_user_id(user.id, db=db) | |
| ############################ | |
| # GetDMChannelByUserId | |
| ############################ | |
| async def get_dm_channel_by_user_id( | |
| request: Request, | |
| user_id: str, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| try: | |
| existing_channel = await Channels.get_dm_channel_by_user_ids([user.id, user_id], db=db) | |
| if existing_channel: | |
| participant_ids = [ | |
| member.user_id for member in await Channels.get_members_by_channel_id(existing_channel.id, db=db) | |
| ] | |
| await emit_to_users( | |
| 'events:channel', | |
| {'data': {'type': 'channel:created'}}, | |
| participant_ids, | |
| ) | |
| await enter_room_for_users(f'channel:{existing_channel.id}', participant_ids) | |
| await Channels.update_member_active_status(existing_channel.id, user.id, True, db=db) | |
| return ChannelModel(**existing_channel.model_dump()) | |
| channel = await Channels.insert_new_channel( | |
| CreateChannelForm( | |
| type='dm', | |
| name='', | |
| user_ids=[user_id], | |
| ), | |
| user.id, | |
| db=db, | |
| ) | |
| if channel: | |
| participant_ids = [member.user_id for member in await Channels.get_members_by_channel_id(channel.id, db=db)] | |
| await emit_to_users( | |
| 'events:channel', | |
| {'data': {'type': 'channel:created'}}, | |
| participant_ids, | |
| ) | |
| await enter_room_for_users(f'channel:{channel.id}', participant_ids) | |
| return ChannelModel(**channel.model_dump()) | |
| else: | |
| raise Exception('Error creating channel') | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # CreateNewChannel | |
| ############################ | |
| async def create_new_channel( | |
| request: Request, | |
| form_data: CreateChannelForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| if form_data.type not in ['group', 'dm'] and user.role != 'admin': | |
| # Only admins can create standard channels (joined by default) | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.UNAUTHORIZED, | |
| ) | |
| form_data.access_grants = await filter_allowed_access_grants( | |
| request.app.state.config.USER_PERMISSIONS, | |
| user.id, | |
| user.role, | |
| form_data.access_grants, | |
| 'sharing.public_channels', | |
| ) | |
| try: | |
| if form_data.type == 'dm': | |
| existing_channel = await Channels.get_dm_channel_by_user_ids([user.id, *form_data.user_ids], db=db) | |
| if existing_channel: | |
| participant_ids = [ | |
| member.user_id for member in await Channels.get_members_by_channel_id(existing_channel.id, db=db) | |
| ] | |
| await emit_to_users( | |
| 'events:channel', | |
| {'data': {'type': 'channel:created'}}, | |
| participant_ids, | |
| ) | |
| await enter_room_for_users(f'channel:{existing_channel.id}', participant_ids) | |
| await Channels.update_member_active_status(existing_channel.id, user.id, True, db=db) | |
| return ChannelModel(**existing_channel.model_dump()) | |
| channel = await Channels.insert_new_channel(form_data, user.id, db=db) | |
| if channel: | |
| participant_ids = [member.user_id for member in await Channels.get_members_by_channel_id(channel.id, db=db)] | |
| await emit_to_users( | |
| 'events:channel', | |
| {'data': {'type': 'channel:created'}}, | |
| participant_ids, | |
| ) | |
| await enter_room_for_users(f'channel:{channel.id}', participant_ids) | |
| return ChannelModel(**channel.model_dump()) | |
| else: | |
| raise Exception('Error creating channel') | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # GetChannelById | |
| ############################ | |
| class ChannelFullResponse(ChannelResponse): | |
| user_ids: Optional[list[str]] = None # 'group'/'dm' channels only | |
| users: Optional[list[UserIdNameStatusResponse]] = None # 'group'/'dm' channels only | |
| last_read_at: Optional[int] = None # timestamp in epoch (time_ns) | |
| unread_count: int = 0 | |
| async def get_channel_by_id( | |
| request: Request, | |
| id: str, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| user_ids = None | |
| users = None | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| user_ids = [member.user_id for member in await Channels.get_members_by_channel_id(channel.id, db=db)] | |
| users = [ | |
| UserIdNameStatusResponse( | |
| **{ | |
| **u.model_dump(), | |
| 'is_active': Users.is_active(u), | |
| } | |
| ) | |
| for u in await Users.get_users_by_user_ids(user_ids, db=db) | |
| ] | |
| channel_member = await Channels.get_member_by_channel_and_user_id(channel.id, user.id, db=db) | |
| unread_count = await Messages.get_unread_message_count( | |
| channel.id, user.id, channel_member.last_read_at if channel_member else None | |
| ) | |
| return ChannelFullResponse( | |
| **{ | |
| **channel.model_dump(), | |
| 'user_ids': user_ids, | |
| 'users': users, | |
| 'is_manager': await Channels.is_user_channel_manager(channel.id, user.id, db=db), | |
| 'write_access': True, | |
| 'user_count': len(user_ids), | |
| 'last_read_at': channel_member.last_read_at if channel_member else None, | |
| 'unread_count': unread_count, | |
| } | |
| ) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access(user.id, channel, permission='read', db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| write_access = await channel_has_access( | |
| user.id, | |
| channel, | |
| permission='write', | |
| strict=False, | |
| db=db, | |
| ) | |
| user_count = len(await get_channel_users_with_access(channel, 'read', db=db)) | |
| channel_member = await Channels.get_member_by_channel_and_user_id(channel.id, user.id, db=db) | |
| unread_count = await Messages.get_unread_message_count( | |
| channel.id, user.id, channel_member.last_read_at if channel_member else None | |
| ) | |
| return ChannelFullResponse( | |
| **{ | |
| **channel.model_dump(), | |
| 'user_ids': user_ids, | |
| 'users': users, | |
| 'is_manager': await Channels.is_user_channel_manager(channel.id, user.id, db=db), | |
| 'write_access': write_access or user.role == 'admin', | |
| 'user_count': user_count, | |
| 'last_read_at': channel_member.last_read_at if channel_member else None, | |
| 'unread_count': unread_count, | |
| } | |
| ) | |
| ############################ | |
| # GetChannelMembersById | |
| ############################ | |
| PAGE_ITEM_COUNT = 30 | |
| async def get_channel_members_by_id( | |
| request: Request, | |
| id: str, | |
| query: Optional[str] = None, | |
| order_by: Optional[str] = None, | |
| direction: Optional[str] = None, | |
| page: Optional[int] = 1, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| limit = PAGE_ITEM_COUNT | |
| page = max(1, page) | |
| skip = (page - 1) * limit | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access(user.id, channel, permission='read', db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| if channel.type == 'dm': | |
| user_ids = [member.user_id for member in await Channels.get_members_by_channel_id(channel.id, db=db)] | |
| fetched_users = await Users.get_users_by_user_ids(user_ids, db=db) | |
| total = len(fetched_users) | |
| return { | |
| 'users': [UserModelResponse(**u.model_dump(), is_active=Users.is_active(u)) for u in fetched_users], | |
| 'total': total, | |
| } | |
| else: | |
| filter = {} | |
| if query: | |
| filter['query'] = query | |
| if order_by: | |
| filter['order_by'] = order_by | |
| if direction: | |
| filter['direction'] = direction | |
| if channel.type == 'group': | |
| filter['channel_id'] = channel.id | |
| else: | |
| filter['roles'] = ['!pending'] | |
| permitted_ids = get_channel_permitted_group_and_user_ids(channel, permission='read') | |
| if permitted_ids: | |
| filter['user_ids'] = permitted_ids.get('user_ids') | |
| filter['group_ids'] = permitted_ids.get('group_ids') | |
| result = await Users.get_users(filter=filter, skip=skip, limit=limit, db=db) | |
| fetched_users = result['users'] | |
| total = result['total'] | |
| return { | |
| 'users': [UserModelResponse(**u.model_dump(), is_active=Users.is_active(u)) for u in fetched_users], | |
| 'total': total, | |
| } | |
| ################################################# | |
| # UpdateIsActiveMemberByIdAndUserId | |
| ################################################# | |
| class UpdateActiveMemberForm(BaseModel): | |
| is_active: bool | |
| async def update_is_active_member_by_id_and_user_id( | |
| request: Request, | |
| id: str, | |
| form_data: UpdateActiveMemberForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| await Channels.update_member_active_status(channel.id, user.id, form_data.is_active, db=db) | |
| return True | |
| ################################################# | |
| # AddMembersById | |
| ################################################# | |
| class UpdateMembersForm(BaseModel): | |
| user_ids: list[str] = [] | |
| group_ids: list[str] = [] | |
| async def add_members_by_id( | |
| request: Request, | |
| id: str, | |
| form_data: UpdateMembersForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.user_id != user.id and user.role != 'admin': | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| try: | |
| memberships = await Channels.add_members_to_channel( | |
| channel.id, user.id, form_data.user_ids, form_data.group_ids, db=db | |
| ) | |
| return memberships | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ################################################# | |
| # | |
| ################################################# | |
| class RemoveMembersForm(BaseModel): | |
| user_ids: list[str] = [] | |
| async def remove_members_by_id( | |
| request: Request, | |
| id: str, | |
| form_data: RemoveMembersForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.user_id != user.id and user.role != 'admin': | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| try: | |
| deleted = await Channels.remove_members_from_channel(channel.id, form_data.user_ids, db=db) | |
| return deleted | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # UpdateChannelById | |
| ############################ | |
| async def update_channel_by_id( | |
| request: Request, | |
| id: str, | |
| form_data: ChannelForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.user_id != user.id and user.role != 'admin': | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| form_data.access_grants = await filter_allowed_access_grants( | |
| request.app.state.config.USER_PERMISSIONS, | |
| user.id, | |
| user.role, | |
| form_data.access_grants, | |
| 'sharing.public_channels', | |
| ) | |
| try: | |
| channel = await Channels.update_channel_by_id(id, form_data, db=db) | |
| return ChannelModel(**channel.model_dump()) | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # DeleteChannelById | |
| ############################ | |
| async def delete_channel_by_id( | |
| request: Request, | |
| id: str, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.user_id != user.id and user.role != 'admin': | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| try: | |
| await Channels.delete_channel_by_id(id, db=db) | |
| return True | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # GetChannelMessages | |
| ############################ | |
| class MessageUserResponse(MessageResponse): | |
| data: bool | None = None | |
| def convert_data_to_bool(cls, v): | |
| # No data or not a dict → False | |
| if not isinstance(v, dict): | |
| return False | |
| # True if ANY value in the dict is non-empty | |
| return any(bool(val) for val in v.values()) | |
| async def get_channel_messages( | |
| request: Request, | |
| id: str, | |
| skip: int = 0, | |
| limit: int = 50, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access(user.id, channel, permission='read', db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| channel_member = await Channels.join_channel(id, user.id, db=db) # Ensure user is a member of the channel | |
| message_list = await Messages.get_messages_by_channel_id(id, skip, limit, db=db) | |
| if not message_list: | |
| return [] | |
| # Batch fetch all users in a single query (fixes N+1 problem) | |
| user_ids = list(set(m.user_id for m in message_list)) | |
| fetched_users = {u.id: u for u in await Users.get_users_by_user_ids(user_ids, db=db)} | |
| messages = [] | |
| for message in message_list: | |
| thread_replies = await Messages.get_thread_replies_by_message_id(message.id, db=db) | |
| latest_thread_reply_at = thread_replies[0].created_at if thread_replies else None | |
| # Use message.user if present (for webhooks), otherwise look up by user_id | |
| user_info = message.user | |
| if user_info is None and message.user_id in fetched_users: | |
| user_info = UserNameResponse(**fetched_users[message.user_id].model_dump()) | |
| messages.append( | |
| MessageUserResponse( | |
| **{ | |
| **message.model_dump(), | |
| 'reply_count': len(thread_replies), | |
| 'latest_reply_at': latest_thread_reply_at, | |
| 'reactions': await Messages.get_reactions_by_message_id(message.id, db=db), | |
| 'user': user_info, | |
| } | |
| ) | |
| ) | |
| return messages | |
| ############################ | |
| # GetPinnedChannelMessages | |
| ############################ | |
| PAGE_ITEM_COUNT_PINNED = 20 | |
| async def get_pinned_channel_messages( | |
| request: Request, | |
| id: str, | |
| page: int = 1, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access(user.id, channel, permission='read', db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| page = max(1, page) | |
| skip = (page - 1) * PAGE_ITEM_COUNT_PINNED | |
| limit = PAGE_ITEM_COUNT_PINNED | |
| message_list = await Messages.get_pinned_messages_by_channel_id(id, skip, limit, db=db) | |
| if not message_list: | |
| return [] | |
| # Batch fetch all users in a single query (fixes N+1 problem) | |
| user_ids = list(set(m.user_id for m in message_list)) | |
| fetched_users = {u.id: u for u in await Users.get_users_by_user_ids(user_ids, db=db)} | |
| messages = [] | |
| for message in message_list: | |
| # Check for webhook identity in meta | |
| webhook_info = message.meta.get('webhook') if message.meta else None | |
| if webhook_info: | |
| user_info = UserNameResponse( | |
| id=webhook_info.get('id') or '', | |
| name=webhook_info.get('name') or 'Webhook', | |
| role='webhook', | |
| ) | |
| elif message.user_id in fetched_users: | |
| user_info = UserNameResponse(**fetched_users[message.user_id].model_dump()) | |
| else: | |
| user_info = None | |
| messages.append( | |
| MessageWithReactionsResponse( | |
| **{ | |
| **message.model_dump(), | |
| 'reactions': await Messages.get_reactions_by_message_id(message.id, db=db), | |
| 'user': user_info, | |
| } | |
| ) | |
| ) | |
| return messages | |
| ############################ | |
| # PostNewMessage | |
| ############################ | |
| async def send_notification(request, channel, message, active_user_ids, db=None): | |
| name = request.app.state.WEBUI_NAME | |
| webui_url = request.app.state.config.WEBUI_URL | |
| enable_user_webhooks = request.app.state.config.ENABLE_USER_WEBHOOKS | |
| users = await get_channel_users_with_access(channel, 'read', db=db) | |
| for u in users: | |
| if (u.id not in active_user_ids) and await Channels.is_user_channel_member(channel.id, u.id, db=db): | |
| if enable_user_webhooks and u.settings: | |
| webhook_url = u.settings.ui.get('notifications', {}).get('webhook_url', None) | |
| if webhook_url: | |
| await post_webhook( | |
| name, | |
| webhook_url, | |
| f'#{channel.name} - {webui_url}/channels/{channel.id}\n\n{message.content}', | |
| { | |
| 'action': 'channel', | |
| 'message': message.content, | |
| 'title': channel.name, | |
| 'url': f'{webui_url}/channels/{channel.id}', | |
| }, | |
| ) | |
| return True | |
| async def model_response_handler(request, channel, message, user, db=None): | |
| MODELS = {model['id']: model for model in await get_filtered_models(await get_all_models(request, user=user), user)} | |
| mentions = extract_mentions(message.content) | |
| message_content = replace_mentions(message.content) | |
| model_mentions = {} | |
| # check if the message is a reply to a message sent by a model | |
| if ( | |
| message.reply_to_message | |
| and message.reply_to_message.meta | |
| and message.reply_to_message.meta.get('model_id', None) | |
| ): | |
| model_id = message.reply_to_message.meta.get('model_id', None) | |
| model_mentions[model_id] = {'id': model_id, 'id_type': 'M'} | |
| # check if any of the mentions are models | |
| for mention in mentions: | |
| if mention['id_type'] == 'M' and mention['id'] not in model_mentions: | |
| model_mentions[mention['id']] = mention | |
| if not model_mentions: | |
| return False | |
| for mention in model_mentions.values(): | |
| model_id = mention['id'] | |
| model = MODELS.get(model_id, None) | |
| if model: | |
| try: | |
| # reverse to get in chronological order | |
| thread_messages = ( | |
| await Messages.get_messages_by_parent_id( | |
| channel.id, | |
| message.parent_id if message.parent_id else message.id, | |
| db=db, | |
| ) | |
| )[::-1] | |
| response_message, channel = await new_message_handler( | |
| request, | |
| channel.id, | |
| MessageForm( | |
| **{ | |
| 'parent_id': (message.parent_id if message.parent_id else message.id), | |
| 'content': f'', | |
| 'data': {}, | |
| 'meta': { | |
| 'model_id': model_id, | |
| 'model_name': model.get('name', model_id), | |
| }, | |
| } | |
| ), | |
| user, | |
| db, | |
| ) | |
| thread_history = [] | |
| images = [] | |
| # Batch fetch all users in a single query (fixes N+1 problem) | |
| user_ids = list({message.user_id for message in thread_messages}) | |
| message_users = {user.id: user for user in await Users.get_users_by_user_ids(user_ids, db=db)} | |
| for thread_message in thread_messages: | |
| message_user = message_users.get(thread_message.user_id) | |
| if thread_message.meta and thread_message.meta.get('model_id', None): | |
| # If the message was sent by a model, use the model name | |
| message_model_id = thread_message.meta.get('model_id', None) | |
| message_model = MODELS.get(message_model_id, None) | |
| username = message_model.get('name', message_model_id) if message_model else message_model_id | |
| else: | |
| username = message_user.name if message_user else 'Unknown' | |
| thread_history.append(f'{username}: {replace_mentions(thread_message.content)}') | |
| thread_message_files = (thread_message.data or {}).get('files', []) | |
| for file in thread_message_files: | |
| if file.get('type', '') == 'image': | |
| images.append(file.get('url', '')) | |
| elif file.get('content_type', '').startswith('image/'): | |
| image = await get_image_base64_from_file_id(file.get('id', '')) | |
| if image: | |
| images.append(image) | |
| thread_history_string = '\n\n'.join(thread_history) | |
| system_message = { | |
| 'role': 'system', | |
| 'content': f'You are {model.get("name", model_id)}, participating in a threaded conversation. Be concise and conversational.' | |
| + ( | |
| f"Here's the thread history:\n\n\n{thread_history_string}\n\n\nContinue the conversation naturally as {model.get('name', model_id)}, addressing the most recent message while being aware of the full context." | |
| if thread_history | |
| else '' | |
| ), | |
| } | |
| content = f'{user.name if user else "User"}: {message_content}' | |
| if images: | |
| content = [ | |
| { | |
| 'type': 'text', | |
| 'text': content, | |
| }, | |
| *[ | |
| { | |
| 'type': 'image_url', | |
| 'image_url': { | |
| 'url': image, | |
| }, | |
| } | |
| for image in images | |
| ], | |
| ] | |
| form_data = { | |
| 'model': model_id, | |
| 'messages': [ | |
| system_message, | |
| {'role': 'user', 'content': content}, | |
| ], | |
| 'stream': False, | |
| } | |
| res = await generate_chat_completion( | |
| request, | |
| form_data=form_data, | |
| user=user, | |
| ) | |
| if res: | |
| if res.get('choices', []) and len(res['choices']) > 0: | |
| await update_message_by_id( | |
| request, | |
| channel.id, | |
| response_message.id, | |
| MessageForm( | |
| **{ | |
| 'content': res['choices'][0]['message']['content'], | |
| 'meta': { | |
| 'done': True, | |
| }, | |
| } | |
| ), | |
| user, | |
| db, | |
| ) | |
| elif res.get('error', None): | |
| await update_message_by_id( | |
| request, | |
| channel.id, | |
| response_message.id, | |
| MessageForm( | |
| **{ | |
| 'content': f'Error: {res["error"]}', | |
| 'meta': { | |
| 'done': True, | |
| }, | |
| } | |
| ), | |
| user, | |
| db, | |
| ) | |
| except Exception as e: | |
| log.info(e) | |
| pass | |
| return True | |
| async def new_message_handler(request: Request, id: str, form_data: MessageForm, user, db): | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access( | |
| user.id, | |
| channel, | |
| permission='write', | |
| strict=False, | |
| db=db, | |
| ): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| try: | |
| message = await Messages.insert_new_message(form_data, channel.id, user.id, db=db) | |
| if message: | |
| if channel.type in ['group', 'dm']: | |
| members = await Channels.get_members_by_channel_id(channel.id, db=db) | |
| for member in members: | |
| if not member.is_active: | |
| await Channels.update_member_active_status(channel.id, member.user_id, True, db=db) | |
| message = await Messages.get_message_by_id(message.id, db=db) | |
| event_data = { | |
| 'channel_id': channel.id, | |
| 'message_id': message.id, | |
| 'data': { | |
| 'type': 'message', | |
| 'data': {'temp_id': form_data.temp_id, **message.model_dump()}, | |
| }, | |
| 'user': UserNameResponse(**user.model_dump()).model_dump(), | |
| 'channel': channel.model_dump(), | |
| } | |
| await sio.emit( | |
| 'events:channel', | |
| event_data, | |
| to=f'channel:{channel.id}', | |
| ) | |
| if message.parent_id: | |
| # If this message is a reply, emit to the parent message as well | |
| parent_message = await Messages.get_message_by_id(message.parent_id, db=db) | |
| if parent_message: | |
| await sio.emit( | |
| 'events:channel', | |
| { | |
| 'channel_id': channel.id, | |
| 'message_id': parent_message.id, | |
| 'data': { | |
| 'type': 'message:reply', | |
| 'data': parent_message.model_dump(), | |
| }, | |
| 'user': UserNameResponse(**user.model_dump()).model_dump(), | |
| 'channel': channel.model_dump(), | |
| }, | |
| to=f'channel:{channel.id}', | |
| ) | |
| return message, channel | |
| else: | |
| raise Exception('Error creating message') | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| async def post_new_message( | |
| request: Request, | |
| id: str, | |
| form_data: MessageForm, | |
| background_tasks: BackgroundTasks, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| try: | |
| message, channel = await new_message_handler(request, id, form_data, user, db) | |
| try: | |
| if files := message.data.get('files', []): | |
| for file in files: | |
| await Channels.set_file_message_id_in_channel_by_id( | |
| channel.id, file.get('id', ''), message.id, db=db | |
| ) | |
| except Exception as e: | |
| log.debug(e) | |
| active_user_ids = get_user_ids_from_room(f'channel:{channel.id}') | |
| # NOTE: We intentionally do NOT pass db to background_handler. | |
| # Background tasks should manage their own short-lived sessions to avoid | |
| # holding database connections during slow operations (e.g., LLM calls). | |
| async def background_handler(): | |
| await model_response_handler(request, channel, message, user) | |
| await send_notification( | |
| request, | |
| channel, | |
| message, | |
| active_user_ids, | |
| ) | |
| background_tasks.add_task(background_handler) | |
| return message | |
| except HTTPException as e: | |
| raise e | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # GetChannelMessage | |
| ############################ | |
| async def get_channel_message( | |
| request: Request, | |
| id: str, | |
| message_id: str, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access(user.id, channel, permission='read', db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| if not message: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if message.channel_id != id: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| message_user = await Users.get_user_by_id(message.user_id, db=db) | |
| return MessageResponse( | |
| **{ | |
| **message.model_dump(), | |
| 'user': UserNameResponse(**message_user.model_dump()) if message_user else None, | |
| } | |
| ) | |
| ############################ | |
| # GetChannelMessageData | |
| ############################ | |
| async def get_channel_message_data( | |
| request: Request, | |
| id: str, | |
| message_id: str, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access(user.id, channel, permission='read', db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| if not message: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if message.channel_id != id: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| return message.data | |
| ############################ | |
| # PinChannelMessage | |
| ############################ | |
| class PinMessageForm(BaseModel): | |
| is_pinned: bool | |
| async def pin_channel_message( | |
| request: Request, | |
| id: str, | |
| message_id: str, | |
| form_data: PinMessageForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access(user.id, channel, permission='read', db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| if not message: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if message.channel_id != id: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| try: | |
| await Messages.update_is_pinned_by_id(message_id, form_data.is_pinned, user.id, db=db) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| message_user = await Users.get_user_by_id(message.user_id, db=db) | |
| return MessageUserResponse( | |
| **{ | |
| **message.model_dump(), | |
| 'user': UserNameResponse(**message_user.model_dump()) if message_user else None, | |
| } | |
| ) | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # GetChannelThreadMessages | |
| ############################ | |
| async def get_channel_thread_messages( | |
| request: Request, | |
| id: str, | |
| message_id: str, | |
| skip: int = 0, | |
| limit: int = 50, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access(user.id, channel, permission='read', db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| message_list = await Messages.get_messages_by_parent_id(id, message_id, skip, limit, db=db) | |
| if not message_list: | |
| return [] | |
| # Batch fetch all users in a single query (fixes N+1 problem) | |
| user_ids = list(set(m.user_id for m in message_list)) | |
| fetched_users = {u.id: u for u in await Users.get_users_by_user_ids(user_ids, db=db)} | |
| messages = [] | |
| for message in message_list: | |
| # Use message.user if present (for webhooks), otherwise look up by user_id | |
| user_info = message.user | |
| if user_info is None and message.user_id in fetched_users: | |
| user_info = UserNameResponse(**fetched_users[message.user_id].model_dump()) | |
| messages.append( | |
| MessageUserResponse( | |
| **{ | |
| **message.model_dump(), | |
| 'reply_count': 0, | |
| 'latest_reply_at': None, | |
| 'reactions': await Messages.get_reactions_by_message_id(message.id, db=db), | |
| 'user': user_info, | |
| } | |
| ) | |
| ) | |
| return messages | |
| ############################ | |
| # UpdateMessageById | |
| ############################ | |
| async def update_message_by_id( | |
| request: Request, | |
| id: str, | |
| message_id: str, | |
| form_data: MessageForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| if not message: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if message.channel_id != id: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if ( | |
| user.role != 'admin' | |
| and message.user_id != user.id | |
| and not await channel_has_access(user.id, channel, permission='write', strict=False, db=db) | |
| ): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| try: | |
| await Messages.update_message_by_id(message_id, form_data, db=db) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| if message: | |
| await sio.emit( | |
| 'events:channel', | |
| { | |
| 'channel_id': channel.id, | |
| 'message_id': message.id, | |
| 'data': { | |
| 'type': 'message:update', | |
| 'data': message.model_dump(), | |
| }, | |
| 'user': UserNameResponse(**user.model_dump()).model_dump(), | |
| 'channel': channel.model_dump(), | |
| }, | |
| to=f'channel:{channel.id}', | |
| ) | |
| return MessageModel(**message.model_dump()) | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # AddReactionToMessage | |
| ############################ | |
| class ReactionForm(BaseModel): | |
| name: str | |
| async def add_reaction_to_message( | |
| request: Request, | |
| id: str, | |
| message_id: str, | |
| form_data: ReactionForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access( | |
| user.id, | |
| channel, | |
| permission='write', | |
| strict=False, | |
| db=db, | |
| ): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| if not message: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if message.channel_id != id: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| try: | |
| await Messages.add_reaction_to_message(message_id, user.id, form_data.name, db=db) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| await sio.emit( | |
| 'events:channel', | |
| { | |
| 'channel_id': channel.id, | |
| 'message_id': message.id, | |
| 'data': { | |
| 'type': 'message:reaction:add', | |
| 'data': { | |
| **message.model_dump(), | |
| 'name': form_data.name, | |
| }, | |
| }, | |
| 'user': UserNameResponse(**user.model_dump()).model_dump(), | |
| 'channel': channel.model_dump(), | |
| }, | |
| to=f'channel:{channel.id}', | |
| ) | |
| return True | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # RemoveReactionById | |
| ############################ | |
| async def remove_reaction_by_id_and_user_id_and_name( | |
| request: Request, | |
| id: str, | |
| message_id: str, | |
| form_data: ReactionForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if user.role != 'admin' and not await channel_has_access( | |
| user.id, | |
| channel, | |
| permission='write', | |
| strict=False, | |
| db=db, | |
| ): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| if not message: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if message.channel_id != id: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| try: | |
| await Messages.remove_reaction_by_id_and_user_id_and_name(message_id, user.id, form_data.name, db=db) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| await sio.emit( | |
| 'events:channel', | |
| { | |
| 'channel_id': channel.id, | |
| 'message_id': message.id, | |
| 'data': { | |
| 'type': 'message:reaction:remove', | |
| 'data': { | |
| **message.model_dump(), | |
| 'name': form_data.name, | |
| }, | |
| }, | |
| 'user': UserNameResponse(**user.model_dump()).model_dump(), | |
| 'channel': channel.model_dump(), | |
| }, | |
| to=f'channel:{channel.id}', | |
| ) | |
| return True | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # DeleteMessageById | |
| ############################ | |
| async def delete_message_by_id( | |
| request: Request, | |
| id: str, | |
| message_id: str, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| message = await Messages.get_message_by_id(message_id, db=db) | |
| if not message: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| if message.channel_id != id: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| if channel.type in ['group', 'dm']: | |
| if not await Channels.is_user_channel_member(channel.id, user.id, db=db): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| else: | |
| if ( | |
| user.role != 'admin' | |
| and message.user_id != user.id | |
| and not await channel_has_access( | |
| user.id, | |
| channel, | |
| permission='write', | |
| strict=False, | |
| db=db, | |
| ) | |
| ): | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()) | |
| try: | |
| await Messages.delete_message_by_id(message_id, db=db) | |
| await sio.emit( | |
| 'events:channel', | |
| { | |
| 'channel_id': channel.id, | |
| 'message_id': message.id, | |
| 'data': { | |
| 'type': 'message:delete', | |
| 'data': { | |
| **message.model_dump(), | |
| 'user': UserNameResponse(**user.model_dump()).model_dump(), | |
| }, | |
| }, | |
| 'user': UserNameResponse(**user.model_dump()).model_dump(), | |
| 'channel': channel.model_dump(), | |
| }, | |
| to=f'channel:{channel.id}', | |
| ) | |
| if message.parent_id: | |
| # If this message is a reply, emit to the parent message as well | |
| parent_message = await Messages.get_message_by_id(message.parent_id, db=db) | |
| if parent_message: | |
| await sio.emit( | |
| 'events:channel', | |
| { | |
| 'channel_id': channel.id, | |
| 'message_id': parent_message.id, | |
| 'data': { | |
| 'type': 'message:reply', | |
| 'data': parent_message.model_dump(), | |
| }, | |
| 'user': UserNameResponse(**user.model_dump()).model_dump(), | |
| 'channel': channel.model_dump(), | |
| }, | |
| to=f'channel:{channel.id}', | |
| ) | |
| return True | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| ############################ | |
| # Webhooks | |
| ############################ | |
| async def get_webhook_profile_image(webhook_id: str, user=Depends(get_verified_user)): | |
| """Get webhook profile image by webhook ID.""" | |
| webhook = await Channels.get_webhook_by_id(webhook_id) | |
| if not webhook: | |
| # Return default favicon if webhook not found | |
| return FileResponse(f'{STATIC_DIR}/favicon.png') | |
| if webhook.profile_image_url: | |
| # Check if it's url or base64 | |
| if webhook.profile_image_url.startswith('http'): | |
| return Response( | |
| status_code=status.HTTP_302_FOUND, | |
| headers={'Location': webhook.profile_image_url}, | |
| ) | |
| elif webhook.profile_image_url.startswith('data:image'): | |
| try: | |
| header, base64_data = webhook.profile_image_url.split(',', 1) | |
| image_data = base64.b64decode(base64_data) | |
| image_buffer = io.BytesIO(image_data) | |
| media_type = header.split(';')[0].lstrip('data:') | |
| return StreamingResponse( | |
| image_buffer, | |
| media_type=media_type, | |
| headers={'Content-Disposition': 'inline'}, | |
| ) | |
| except Exception as e: | |
| pass | |
| # Return default favicon if no profile image | |
| return FileResponse(f'{STATIC_DIR}/favicon.png') | |
| async def get_channel_webhooks( | |
| request: Request, | |
| id: str, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| # Only channel managers can view webhooks | |
| if not await Channels.is_user_channel_manager(channel.id, user.id, db=db) and user.role != 'admin': | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED) | |
| return await Channels.get_webhooks_by_channel_id(id, db=db) | |
| async def create_channel_webhook( | |
| request: Request, | |
| id: str, | |
| form_data: ChannelWebhookForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| # Only channel managers can create webhooks | |
| if not await Channels.is_user_channel_manager(channel.id, user.id, db=db) and user.role != 'admin': | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED) | |
| webhook = await Channels.insert_webhook(id, user.id, form_data, db=db) | |
| if not webhook: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| return webhook | |
| async def update_channel_webhook( | |
| request: Request, | |
| id: str, | |
| webhook_id: str, | |
| form_data: ChannelWebhookForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| # Only channel managers can update webhooks | |
| if not await Channels.is_user_channel_manager(channel.id, user.id, db=db) and user.role != 'admin': | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED) | |
| webhook = await Channels.get_webhook_by_id(webhook_id, db=db) | |
| if not webhook or webhook.channel_id != id: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| updated = await Channels.update_webhook_by_id(webhook_id, form_data, db=db) | |
| if not updated: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()) | |
| return updated | |
| async def delete_channel_webhook( | |
| request: Request, | |
| id: str, | |
| webhook_id: str, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| await check_channels_access(request, user) | |
| channel = await Channels.get_channel_by_id(id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| # Only channel managers can delete webhooks | |
| if not await Channels.is_user_channel_manager(channel.id, user.id, db=db) and user.role != 'admin': | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED) | |
| webhook = await Channels.get_webhook_by_id(webhook_id, db=db) | |
| if not webhook or webhook.channel_id != id: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| return await Channels.delete_webhook_by_id(webhook_id, db=db) | |
| ############################ | |
| # Public Webhook Endpoint | |
| ############################ | |
| class WebhookMessageForm(BaseModel): | |
| content: str | |
| async def post_webhook_message( | |
| request: Request, | |
| webhook_id: str, | |
| token: str, | |
| form_data: WebhookMessageForm, | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| """Public endpoint to post messages via webhook. No authentication required.""" | |
| await check_channels_access(request) | |
| # Validate webhook | |
| webhook = await Channels.get_webhook_by_id_and_token(webhook_id, token, db=db) | |
| if not webhook: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.INVALID_URL, | |
| ) | |
| channel = await Channels.get_channel_by_id(webhook.channel_id, db=db) | |
| if not channel: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| # Create message with webhook identity stored in meta | |
| message = await Messages.insert_new_message( | |
| MessageForm(content=form_data.content, meta={'webhook': {'id': webhook.id}}), | |
| webhook.channel_id, | |
| webhook.user_id, # Required for DB but webhook info in meta takes precedence | |
| db=db, | |
| ) | |
| if not message: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT('Failed to create message'), | |
| ) | |
| # Update last_used_at | |
| await Channels.update_webhook_last_used_at(webhook_id, db=db) | |
| # Get full message and emit event | |
| message = await Messages.get_message_by_id(message.id, db=db) | |
| event_data = { | |
| 'channel_id': channel.id, | |
| 'message_id': message.id, | |
| 'data': { | |
| 'type': 'message', | |
| 'data': { | |
| **message.model_dump(), | |
| 'user': { | |
| 'id': webhook.id, | |
| 'name': webhook.name, | |
| 'role': 'webhook', | |
| }, | |
| }, | |
| }, | |
| 'user': { | |
| 'id': webhook.id, | |
| 'name': webhook.name, | |
| 'role': 'webhook', | |
| }, | |
| 'channel': channel.model_dump(), | |
| } | |
| await sio.emit( | |
| 'events:channel', | |
| event_data, | |
| to=f'channel:{channel.id}', | |
| ) | |
| return {'success': True, 'message_id': message.id} | |