| import json |
| import logging |
| from typing import Optional |
|
|
|
|
| from fastapi import APIRouter, Depends, HTTPException, Request, status, BackgroundTasks |
| from pydantic import BaseModel |
|
|
|
|
| from open_webui.socket.main import sio, get_user_ids_from_room |
| from open_webui.models.users import Users, UserNameResponse |
|
|
| from open_webui.models.channels import Channels, ChannelModel, ChannelForm |
| from open_webui.models.messages import ( |
| Messages, |
| MessageModel, |
| MessageResponse, |
| MessageForm, |
| ) |
|
|
|
|
| from open_webui.config import ENABLE_ADMIN_CHAT_ACCESS, ENABLE_ADMIN_EXPORT |
| from open_webui.constants import ERROR_MESSAGES |
| from open_webui.env import SRC_LOG_LEVELS |
|
|
|
|
| from open_webui.utils.auth import get_admin_user, get_verified_user |
| from open_webui.utils.access_control import has_access, get_users_with_access |
| from open_webui.utils.webhook import post_webhook |
|
|
| log = logging.getLogger(__name__) |
| log.setLevel(SRC_LOG_LEVELS["MODELS"]) |
|
|
| router = APIRouter() |
|
|
| |
| |
| |
|
|
|
|
| @router.get("/", response_model=list[ChannelModel]) |
| async def get_channels(user=Depends(get_verified_user)): |
| if user.role == "admin": |
| return Channels.get_channels() |
| else: |
| return Channels.get_channels_by_user_id(user.id) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/create", response_model=Optional[ChannelModel]) |
| async def create_new_channel(form_data: ChannelForm, user=Depends(get_admin_user)): |
| try: |
| channel = Channels.insert_new_channel(None, form_data, user.id) |
| return ChannelModel(**channel.model_dump()) |
| except Exception as e: |
| log.exception(e) |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.get("/{id}", response_model=Optional[ChannelModel]) |
| async def get_channel_by_id(id: str, user=Depends(get_verified_user)): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if user.role != "admin" and not has_access( |
| user.id, type="read", access_control=channel.access_control |
| ): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| return ChannelModel(**channel.model_dump()) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/{id}/update", response_model=Optional[ChannelModel]) |
| async def update_channel_by_id( |
| id: str, form_data: ChannelForm, user=Depends(get_admin_user) |
| ): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| try: |
| channel = Channels.update_channel_by_id(id, form_data) |
| return ChannelModel(**channel.model_dump()) |
| except Exception as e: |
| log.exception(e) |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.delete("/{id}/delete", response_model=bool) |
| async def delete_channel_by_id(id: str, user=Depends(get_admin_user)): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| try: |
| Channels.delete_channel_by_id(id) |
| return True |
| except Exception as e: |
| log.exception(e) |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class MessageUserResponse(MessageResponse): |
| user: UserNameResponse |
|
|
|
|
| @router.get("/{id}/messages", response_model=list[MessageUserResponse]) |
| async def get_channel_messages( |
| id: str, skip: int = 0, limit: int = 50, user=Depends(get_verified_user) |
| ): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if user.role != "admin" and not has_access( |
| user.id, type="read", access_control=channel.access_control |
| ): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| message_list = Messages.get_messages_by_channel_id(id, skip, limit) |
| users = {} |
|
|
| messages = [] |
| for message in message_list: |
| if message.user_id not in users: |
| user = Users.get_user_by_id(message.user_id) |
| users[message.user_id] = user |
|
|
| replies = Messages.get_replies_by_message_id(message.id) |
| latest_reply_at = replies[0].created_at if replies else None |
|
|
| messages.append( |
| MessageUserResponse( |
| **{ |
| **message.model_dump(), |
| "reply_count": len(replies), |
| "latest_reply_at": latest_reply_at, |
| "reactions": Messages.get_reactions_by_message_id(message.id), |
| "user": UserNameResponse(**users[message.user_id].model_dump()), |
| } |
| ) |
| ) |
|
|
| return messages |
|
|
|
|
| |
| |
| |
|
|
|
|
| async def send_notification(name, webui_url, channel, message, active_user_ids): |
| users = get_users_with_access("read", channel.access_control) |
|
|
| for user in users: |
| if user.id in active_user_ids: |
| continue |
| else: |
| if user.settings: |
| webhook_url = user.settings.ui.get("notifications", {}).get( |
| "webhook_url", None |
| ) |
|
|
| if webhook_url: |
| post_webhook( |
| name, |
| webhook_url, |
| f"#{channel.name} - {webui_url}/channels/{channel.id}\n\n{message.content}", |
| { |
| "action": "channel", |
| "message": message.content, |
| "title": channel.name, |
| "url": f"{webui_url}/channels/{channel.id}", |
| }, |
| ) |
|
|
|
|
| @router.post("/{id}/messages/post", response_model=Optional[MessageModel]) |
| async def post_new_message( |
| request: Request, |
| id: str, |
| form_data: MessageForm, |
| background_tasks: BackgroundTasks, |
| user=Depends(get_verified_user), |
| ): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if user.role != "admin" and not has_access( |
| user.id, type="read", access_control=channel.access_control |
| ): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| try: |
| message = Messages.insert_new_message(form_data, channel.id, user.id) |
|
|
| if message: |
| event_data = { |
| "channel_id": channel.id, |
| "message_id": message.id, |
| "data": { |
| "type": "message", |
| "data": MessageUserResponse( |
| **{ |
| **message.model_dump(), |
| "reply_count": 0, |
| "latest_reply_at": None, |
| "reactions": Messages.get_reactions_by_message_id( |
| message.id |
| ), |
| "user": UserNameResponse(**user.model_dump()), |
| } |
| ).model_dump(), |
| }, |
| "user": UserNameResponse(**user.model_dump()).model_dump(), |
| "channel": channel.model_dump(), |
| } |
|
|
| await sio.emit( |
| "channel-events", |
| event_data, |
| to=f"channel:{channel.id}", |
| ) |
|
|
| if message.parent_id: |
| |
| parent_message = Messages.get_message_by_id(message.parent_id) |
|
|
| if parent_message: |
| await sio.emit( |
| "channel-events", |
| { |
| "channel_id": channel.id, |
| "message_id": parent_message.id, |
| "data": { |
| "type": "message:reply", |
| "data": MessageUserResponse( |
| **{ |
| **parent_message.model_dump(), |
| "user": UserNameResponse( |
| **Users.get_user_by_id( |
| parent_message.user_id |
| ).model_dump() |
| ), |
| } |
| ).model_dump(), |
| }, |
| "user": UserNameResponse(**user.model_dump()).model_dump(), |
| "channel": channel.model_dump(), |
| }, |
| to=f"channel:{channel.id}", |
| ) |
|
|
| active_user_ids = get_user_ids_from_room(f"channel:{channel.id}") |
|
|
| background_tasks.add_task( |
| send_notification, |
| request.app.state.WEBUI_NAME, |
| request.app.state.config.WEBUI_URL, |
| channel, |
| message, |
| active_user_ids, |
| ) |
|
|
| return MessageModel(**message.model_dump()) |
| except Exception as e: |
| log.exception(e) |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.get("/{id}/messages/{message_id}", response_model=Optional[MessageUserResponse]) |
| async def get_channel_message( |
| id: str, message_id: str, user=Depends(get_verified_user) |
| ): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if user.role != "admin" and not has_access( |
| user.id, type="read", access_control=channel.access_control |
| ): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| message = Messages.get_message_by_id(message_id) |
| if not message: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if message.channel_id != id: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| return MessageUserResponse( |
| **{ |
| **message.model_dump(), |
| "user": UserNameResponse( |
| **Users.get_user_by_id(message.user_id).model_dump() |
| ), |
| } |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.get( |
| "/{id}/messages/{message_id}/thread", response_model=list[MessageUserResponse] |
| ) |
| async def get_channel_thread_messages( |
| id: str, |
| message_id: str, |
| skip: int = 0, |
| limit: int = 50, |
| user=Depends(get_verified_user), |
| ): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if user.role != "admin" and not has_access( |
| user.id, type="read", access_control=channel.access_control |
| ): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| message_list = Messages.get_messages_by_parent_id(id, message_id, skip, limit) |
| users = {} |
|
|
| messages = [] |
| for message in message_list: |
| if message.user_id not in users: |
| user = Users.get_user_by_id(message.user_id) |
| users[message.user_id] = user |
|
|
| messages.append( |
| MessageUserResponse( |
| **{ |
| **message.model_dump(), |
| "reply_count": 0, |
| "latest_reply_at": None, |
| "reactions": Messages.get_reactions_by_message_id(message.id), |
| "user": UserNameResponse(**users[message.user_id].model_dump()), |
| } |
| ) |
| ) |
|
|
| return messages |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post( |
| "/{id}/messages/{message_id}/update", response_model=Optional[MessageModel] |
| ) |
| async def update_message_by_id( |
| id: str, message_id: str, form_data: MessageForm, user=Depends(get_verified_user) |
| ): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if user.role != "admin" and not has_access( |
| user.id, type="read", access_control=channel.access_control |
| ): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| message = Messages.get_message_by_id(message_id) |
| if not message: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if message.channel_id != id: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| try: |
| message = Messages.update_message_by_id(message_id, form_data) |
| message = Messages.get_message_by_id(message_id) |
|
|
| if message: |
| await sio.emit( |
| "channel-events", |
| { |
| "channel_id": channel.id, |
| "message_id": message.id, |
| "data": { |
| "type": "message:update", |
| "data": MessageUserResponse( |
| **{ |
| **message.model_dump(), |
| "user": UserNameResponse( |
| **user.model_dump() |
| ).model_dump(), |
| } |
| ).model_dump(), |
| }, |
| "user": UserNameResponse(**user.model_dump()).model_dump(), |
| "channel": channel.model_dump(), |
| }, |
| to=f"channel:{channel.id}", |
| ) |
|
|
| return MessageModel(**message.model_dump()) |
| except Exception as e: |
| log.exception(e) |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class ReactionForm(BaseModel): |
| name: str |
|
|
|
|
| @router.post("/{id}/messages/{message_id}/reactions/add", response_model=bool) |
| async def add_reaction_to_message( |
| id: str, message_id: str, form_data: ReactionForm, user=Depends(get_verified_user) |
| ): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if user.role != "admin" and not has_access( |
| user.id, type="read", access_control=channel.access_control |
| ): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| message = Messages.get_message_by_id(message_id) |
| if not message: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if message.channel_id != id: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| try: |
| Messages.add_reaction_to_message(message_id, user.id, form_data.name) |
| message = Messages.get_message_by_id(message_id) |
|
|
| await sio.emit( |
| "channel-events", |
| { |
| "channel_id": channel.id, |
| "message_id": message.id, |
| "data": { |
| "type": "message:reaction:add", |
| "data": { |
| **message.model_dump(), |
| "user": UserNameResponse( |
| **Users.get_user_by_id(message.user_id).model_dump() |
| ).model_dump(), |
| "name": form_data.name, |
| }, |
| }, |
| "user": UserNameResponse(**user.model_dump()).model_dump(), |
| "channel": channel.model_dump(), |
| }, |
| to=f"channel:{channel.id}", |
| ) |
|
|
| return True |
| except Exception as e: |
| log.exception(e) |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/{id}/messages/{message_id}/reactions/remove", response_model=bool) |
| async def remove_reaction_by_id_and_user_id_and_name( |
| id: str, message_id: str, form_data: ReactionForm, user=Depends(get_verified_user) |
| ): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if user.role != "admin" and not has_access( |
| user.id, type="read", access_control=channel.access_control |
| ): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| message = Messages.get_message_by_id(message_id) |
| if not message: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if message.channel_id != id: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| try: |
| Messages.remove_reaction_by_id_and_user_id_and_name( |
| message_id, user.id, form_data.name |
| ) |
|
|
| message = Messages.get_message_by_id(message_id) |
|
|
| await sio.emit( |
| "channel-events", |
| { |
| "channel_id": channel.id, |
| "message_id": message.id, |
| "data": { |
| "type": "message:reaction:remove", |
| "data": { |
| **message.model_dump(), |
| "user": UserNameResponse( |
| **Users.get_user_by_id(message.user_id).model_dump() |
| ).model_dump(), |
| "name": form_data.name, |
| }, |
| }, |
| "user": UserNameResponse(**user.model_dump()).model_dump(), |
| "channel": channel.model_dump(), |
| }, |
| to=f"channel:{channel.id}", |
| ) |
|
|
| return True |
| except Exception as e: |
| log.exception(e) |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.delete("/{id}/messages/{message_id}/delete", response_model=bool) |
| async def delete_message_by_id( |
| id: str, message_id: str, user=Depends(get_verified_user) |
| ): |
| channel = Channels.get_channel_by_id(id) |
| if not channel: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if user.role != "admin" and not has_access( |
| user.id, type="read", access_control=channel.access_control |
| ): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| message = Messages.get_message_by_id(message_id) |
| if not message: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND |
| ) |
|
|
| if message.channel_id != id: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|
| try: |
| Messages.delete_message_by_id(message_id) |
| await sio.emit( |
| "channel-events", |
| { |
| "channel_id": channel.id, |
| "message_id": message.id, |
| "data": { |
| "type": "message:delete", |
| "data": { |
| **message.model_dump(), |
| "user": UserNameResponse(**user.model_dump()).model_dump(), |
| }, |
| }, |
| "user": UserNameResponse(**user.model_dump()).model_dump(), |
| "channel": channel.model_dump(), |
| }, |
| to=f"channel:{channel.id}", |
| ) |
|
|
| if message.parent_id: |
| |
| parent_message = Messages.get_message_by_id(message.parent_id) |
|
|
| if parent_message: |
| await sio.emit( |
| "channel-events", |
| { |
| "channel_id": channel.id, |
| "message_id": parent_message.id, |
| "data": { |
| "type": "message:reply", |
| "data": MessageUserResponse( |
| **{ |
| **parent_message.model_dump(), |
| "user": UserNameResponse( |
| **Users.get_user_by_id( |
| parent_message.user_id |
| ).model_dump() |
| ), |
| } |
| ).model_dump(), |
| }, |
| "user": UserNameResponse(**user.model_dump()).model_dump(), |
| "channel": channel.model_dump(), |
| }, |
| to=f"channel:{channel.id}", |
| ) |
|
|
| return True |
| except Exception as e: |
| log.exception(e) |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() |
| ) |
|
|