| | """ |
| | Built-in tools for Open WebUI. |
| | |
| | These tools are automatically available when native function calling is enabled. |
| | |
| | IMPORTANT: DO NOT IMPORT THIS MODULE DIRECTLY IN OTHER PARTS OF THE CODEBASE. |
| | """ |
| |
|
| | import json |
| | import logging |
| | import time |
| | import asyncio |
| | from typing import Optional |
| |
|
| | from fastapi import Request |
| |
|
| | from open_webui.models.users import UserModel |
| | from open_webui.routers.retrieval import search_web as _search_web |
| | from open_webui.retrieval.utils import get_content_from_url |
| | from open_webui.routers.images import ( |
| | image_generations, |
| | image_edits, |
| | CreateImageForm, |
| | EditImageForm, |
| | ) |
| | from open_webui.routers.memories import ( |
| | query_memory, |
| | add_memory as _add_memory, |
| | update_memory_by_id, |
| | QueryMemoryForm, |
| | AddMemoryForm, |
| | MemoryUpdateModel, |
| | ) |
| | from open_webui.models.notes import Notes |
| | from open_webui.models.chats import Chats |
| | from open_webui.models.channels import Channels, ChannelMember, Channel |
| | from open_webui.models.messages import Messages, Message |
| | from open_webui.models.groups import Groups |
| | from open_webui.utils.sanitize import sanitize_code |
| |
|
| | log = logging.getLogger(__name__) |
| |
|
| | MAX_KNOWLEDGE_BASE_SEARCH_ITEMS = 10_000 |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def get_current_timestamp( |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Get the current Unix timestamp in seconds. |
| | |
| | :return: JSON with current_timestamp (seconds) and current_iso (ISO format) |
| | """ |
| | try: |
| | import datetime |
| |
|
| | now = datetime.datetime.now(datetime.timezone.utc) |
| | return json.dumps( |
| | { |
| | "current_timestamp": int(now.timestamp()), |
| | "current_iso": now.isoformat(), |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"get_current_timestamp error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def calculate_timestamp( |
| | days_ago: int = 0, |
| | weeks_ago: int = 0, |
| | months_ago: int = 0, |
| | years_ago: int = 0, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Get the current Unix timestamp, optionally adjusted by days, weeks, months, or years. |
| | Use this to calculate timestamps for date filtering in search functions. |
| | Examples: "last week" = weeks_ago=1, "3 days ago" = days_ago=3, "a year ago" = years_ago=1 |
| | |
| | :param days_ago: Number of days to subtract from current time (default: 0) |
| | :param weeks_ago: Number of weeks to subtract from current time (default: 0) |
| | :param months_ago: Number of months to subtract from current time (default: 0) |
| | :param years_ago: Number of years to subtract from current time (default: 0) |
| | :return: JSON with current_timestamp and calculated_timestamp (both in seconds) |
| | """ |
| | try: |
| | import datetime |
| | from dateutil.relativedelta import relativedelta |
| |
|
| | now = datetime.datetime.now(datetime.timezone.utc) |
| | current_ts = int(now.timestamp()) |
| |
|
| | |
| | total_days = days_ago + (weeks_ago * 7) |
| | adjusted = now - datetime.timedelta(days=total_days) |
| |
|
| | |
| | if months_ago > 0 or years_ago > 0: |
| | adjusted = adjusted - relativedelta(months=months_ago, years=years_ago) |
| |
|
| | adjusted_ts = int(adjusted.timestamp()) |
| |
|
| | return json.dumps( |
| | { |
| | "current_timestamp": current_ts, |
| | "current_iso": now.isoformat(), |
| | "calculated_timestamp": adjusted_ts, |
| | "calculated_iso": adjusted.isoformat(), |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except ImportError: |
| | |
| | import datetime |
| |
|
| | now = datetime.datetime.now(datetime.timezone.utc) |
| | current_ts = int(now.timestamp()) |
| | total_days = days_ago + (weeks_ago * 7) + (months_ago * 30) + (years_ago * 365) |
| | adjusted = now - datetime.timedelta(days=total_days) |
| | adjusted_ts = int(adjusted.timestamp()) |
| | return json.dumps( |
| | { |
| | "current_timestamp": current_ts, |
| | "current_iso": now.isoformat(), |
| | "calculated_timestamp": adjusted_ts, |
| | "calculated_iso": adjusted.isoformat(), |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"calculate_timestamp error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def search_web( |
| | query: str, |
| | count: int = 5, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Search the public web for information. Best for current events, external references, |
| | or topics not covered in internal documents. If knowledge base tools are available, |
| | consider checking those first for internal information. |
| | |
| | :param query: The search query to look up |
| | :param count: Number of results to return (default: 5) |
| | :return: JSON with search results containing title, link, and snippet for each result |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | try: |
| | engine = __request__.app.state.config.WEB_SEARCH_ENGINE |
| | user = UserModel(**__user__) if __user__ else None |
| |
|
| | |
| | count = __request__.app.state.config.WEB_SEARCH_RESULT_COUNT or count |
| |
|
| | results = await asyncio.to_thread(_search_web, __request__, engine, query, user) |
| |
|
| | |
| | results = results[:count] if results else [] |
| |
|
| | return json.dumps( |
| | [{"title": r.title, "link": r.link, "snippet": r.snippet} for r in results], |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"search_web error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def fetch_url( |
| | url: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Fetch and extract the main text content from a web page URL. |
| | |
| | :param url: The URL to fetch content from |
| | :return: The extracted text content from the page |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | try: |
| | content, _ = await asyncio.to_thread(get_content_from_url, __request__, url) |
| |
|
| | |
| | max_length = 50000 |
| | if len(content) > max_length: |
| | content = content[:max_length] + "\n\n[Content truncated...]" |
| |
|
| | return content |
| | except Exception as e: |
| | log.exception(f"fetch_url error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def generate_image( |
| | prompt: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | __event_emitter__: callable = None, |
| | __chat_id__: str = None, |
| | __message_id__: str = None, |
| | ) -> str: |
| | """ |
| | Generate an image based on a text prompt. |
| | |
| | :param prompt: A detailed description of the image to generate |
| | :return: Confirmation that the image was generated, or an error message |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | try: |
| | user = UserModel(**__user__) if __user__ else None |
| |
|
| | images = await image_generations( |
| | request=__request__, |
| | form_data=CreateImageForm(prompt=prompt), |
| | user=user, |
| | ) |
| |
|
| | |
| | image_files = [{"type": "image", "url": img["url"]} for img in images] |
| |
|
| | |
| | if __chat_id__ and __message_id__ and images: |
| | image_files = Chats.add_message_files_by_id_and_message_id( |
| | __chat_id__, |
| | __message_id__, |
| | image_files, |
| | ) |
| |
|
| | |
| | if __event_emitter__ and image_files: |
| | await __event_emitter__( |
| | { |
| | "type": "chat:message:files", |
| | "data": { |
| | "files": image_files, |
| | }, |
| | } |
| | ) |
| | |
| | return json.dumps( |
| | { |
| | "status": "success", |
| | "message": "The image has been successfully generated and is already visible to the user in the chat. You do not need to display or embed the image again - just acknowledge that it has been created.", |
| | "images": images, |
| | }, |
| | ensure_ascii=False, |
| | ) |
| |
|
| | return json.dumps({"status": "success", "images": images}, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"generate_image error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def edit_image( |
| | prompt: str, |
| | image_urls: list[str], |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | __event_emitter__: callable = None, |
| | __chat_id__: str = None, |
| | __message_id__: str = None, |
| | ) -> str: |
| | """ |
| | Edit existing images based on a text prompt. |
| | |
| | :param prompt: A description of the changes to make to the images |
| | :param image_urls: A list of URLs of the images to edit |
| | :return: Confirmation that the images were edited, or an error message |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | try: |
| | user = UserModel(**__user__) if __user__ else None |
| |
|
| | images = await image_edits( |
| | request=__request__, |
| | form_data=EditImageForm(prompt=prompt, image=image_urls), |
| | user=user, |
| | ) |
| |
|
| | |
| | image_files = [{"type": "image", "url": img["url"]} for img in images] |
| |
|
| | |
| | if __chat_id__ and __message_id__ and images: |
| | image_files = Chats.add_message_files_by_id_and_message_id( |
| | __chat_id__, |
| | __message_id__, |
| | image_files, |
| | ) |
| |
|
| | |
| | if __event_emitter__ and image_files: |
| | await __event_emitter__( |
| | { |
| | "type": "chat:message:files", |
| | "data": { |
| | "files": image_files, |
| | }, |
| | } |
| | ) |
| | |
| | return json.dumps( |
| | { |
| | "status": "success", |
| | "message": "The edited image has been successfully generated and is already visible to the user in the chat. You do not need to display or embed the image again - just acknowledge that it has been created.", |
| | "images": images, |
| | }, |
| | ensure_ascii=False, |
| | ) |
| |
|
| | return json.dumps({"status": "success", "images": images}, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"edit_image error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def execute_code( |
| | code: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | __event_emitter__: callable = None, |
| | __event_call__: callable = None, |
| | __chat_id__: str = None, |
| | __message_id__: str = None, |
| | __metadata__: dict = None, |
| | ) -> str: |
| | """ |
| | Execute Python code in a sandboxed environment and return the output. |
| | Use this to perform calculations, data analysis, generate visualizations, |
| | or run any Python code that would help answer the user's question. |
| | |
| | :param code: The Python code to execute |
| | :return: JSON with stdout, stderr, and result from execution |
| | """ |
| | from uuid import uuid4 |
| |
|
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | try: |
| | |
| | code = sanitize_code(code) |
| |
|
| | |
| | from open_webui.config import CODE_INTERPRETER_BLOCKED_MODULES |
| |
|
| | |
| | if CODE_INTERPRETER_BLOCKED_MODULES: |
| | import textwrap |
| |
|
| | blocking_code = textwrap.dedent(f""" |
| | import builtins |
| | |
| | BLOCKED_MODULES = {CODE_INTERPRETER_BLOCKED_MODULES} |
| | |
| | _real_import = builtins.__import__ |
| | def restricted_import(name, globals=None, locals=None, fromlist=(), level=0): |
| | if name.split('.')[0] in BLOCKED_MODULES: |
| | importer_name = globals.get('__name__') if globals else None |
| | if importer_name == '__main__': |
| | raise ImportError( |
| | f"Direct import of module {{name}} is restricted." |
| | ) |
| | return _real_import(name, globals, locals, fromlist, level) |
| | |
| | builtins.__import__ = restricted_import |
| | """) |
| | code = blocking_code + "\n" + code |
| |
|
| | engine = getattr( |
| | __request__.app.state.config, "CODE_INTERPRETER_ENGINE", "pyodide" |
| | ) |
| | if engine == "pyodide": |
| | |
| | if __event_call__ is None: |
| | return json.dumps( |
| | { |
| | "error": "Event call not available. WebSocket connection required for pyodide execution." |
| | } |
| | ) |
| |
|
| | output = await __event_call__( |
| | { |
| | "type": "execute:python", |
| | "data": { |
| | "id": str(uuid4()), |
| | "code": code, |
| | "session_id": ( |
| | __metadata__.get("session_id") if __metadata__ else None |
| | ), |
| | }, |
| | } |
| | ) |
| |
|
| | |
| | if isinstance(output, dict): |
| | stdout = output.get("stdout", "") |
| | stderr = output.get("stderr", "") |
| | result = output.get("result", "") |
| | else: |
| | stdout = "" |
| | stderr = "" |
| | result = str(output) if output else "" |
| |
|
| | elif engine == "jupyter": |
| | from open_webui.utils.code_interpreter import execute_code_jupyter |
| |
|
| | output = await execute_code_jupyter( |
| | __request__.app.state.config.CODE_INTERPRETER_JUPYTER_URL, |
| | code, |
| | ( |
| | __request__.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_TOKEN |
| | if __request__.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH |
| | == "token" |
| | else None |
| | ), |
| | ( |
| | __request__.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_PASSWORD |
| | if __request__.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH |
| | == "password" |
| | else None |
| | ), |
| | __request__.app.state.config.CODE_INTERPRETER_JUPYTER_TIMEOUT, |
| | ) |
| |
|
| | stdout = output.get("stdout", "") |
| | stderr = output.get("stderr", "") |
| | result = output.get("result", "") |
| |
|
| | else: |
| | return json.dumps({"error": f"Unknown code interpreter engine: {engine}"}) |
| |
|
| | |
| | |
| | if __user__ and __user__.get("id"): |
| | from open_webui.models.users import Users |
| | from open_webui.utils.files import get_image_url_from_base64 |
| |
|
| | user = Users.get_user_by_id(__user__["id"]) |
| |
|
| | |
| | if stdout and isinstance(stdout, str): |
| | stdout_lines = stdout.split("\n") |
| | for idx, line in enumerate(stdout_lines): |
| | if "data:image/png;base64" in line: |
| | image_url = get_image_url_from_base64( |
| | __request__, |
| | line, |
| | __metadata__ or {}, |
| | user, |
| | ) |
| | if image_url: |
| | stdout_lines[idx] = f"" |
| | stdout = "\n".join(stdout_lines) |
| |
|
| | |
| | if result and isinstance(result, str): |
| | result_lines = result.split("\n") |
| | for idx, line in enumerate(result_lines): |
| | if "data:image/png;base64" in line: |
| | image_url = get_image_url_from_base64( |
| | __request__, |
| | line, |
| | __metadata__ or {}, |
| | user, |
| | ) |
| | if image_url: |
| | result_lines[idx] = f"" |
| | result = "\n".join(result_lines) |
| |
|
| | response = { |
| | "status": "success", |
| | "stdout": stdout, |
| | "stderr": stderr, |
| | "result": result, |
| | } |
| |
|
| | return json.dumps(response, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"execute_code error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def search_memories( |
| | query: str, |
| | count: int = 5, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Search the user's stored memories for relevant information. |
| | |
| | :param query: The search query to find relevant memories |
| | :param count: Number of memories to return (default 5) |
| | :return: JSON with matching memories and their dates |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | try: |
| | user = UserModel(**__user__) if __user__ else None |
| |
|
| | results = await query_memory( |
| | __request__, |
| | QueryMemoryForm(content=query, k=count), |
| | user, |
| | ) |
| |
|
| | if results and hasattr(results, "documents") and results.documents: |
| | memories = [] |
| | for doc_idx, doc in enumerate(results.documents[0]): |
| | memory_id = None |
| | if results.ids and results.ids[0]: |
| | memory_id = results.ids[0][doc_idx] |
| | created_at = "Unknown" |
| | if results.metadatas and results.metadatas[0][doc_idx].get( |
| | "created_at" |
| | ): |
| | created_at = time.strftime( |
| | "%Y-%m-%d", |
| | time.localtime(results.metadatas[0][doc_idx]["created_at"]), |
| | ) |
| | memories.append({"id": memory_id, "date": created_at, "content": doc}) |
| | return json.dumps(memories, ensure_ascii=False) |
| | else: |
| | return json.dumps([]) |
| | except Exception as e: |
| | log.exception(f"search_memories error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def add_memory( |
| | content: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Store a new memory for the user. |
| | |
| | :param content: The memory content to store |
| | :return: Confirmation that the memory was stored |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | try: |
| | user = UserModel(**__user__) if __user__ else None |
| |
|
| | memory = await _add_memory( |
| | __request__, |
| | AddMemoryForm(content=content), |
| | user, |
| | ) |
| |
|
| | return json.dumps({"status": "success", "id": memory.id}, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"add_memory error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def replace_memory_content( |
| | memory_id: str, |
| | content: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Update the content of an existing memory by its ID. |
| | |
| | :param memory_id: The ID of the memory to update |
| | :param content: The new content for the memory |
| | :return: Confirmation that the memory was updated |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | try: |
| | user = UserModel(**__user__) if __user__ else None |
| |
|
| | memory = await update_memory_by_id( |
| | memory_id=memory_id, |
| | request=__request__, |
| | form_data=MemoryUpdateModel(content=content), |
| | user=user, |
| | ) |
| |
|
| | return json.dumps( |
| | {"status": "success", "id": memory.id, "content": memory.content}, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"replace_memory_content error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def search_notes( |
| | query: str, |
| | count: int = 5, |
| | start_timestamp: Optional[int] = None, |
| | end_timestamp: Optional[int] = None, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Search the user's notes by title and content. |
| | |
| | :param query: The search query to find matching notes |
| | :param count: Maximum number of results to return (default: 5) |
| | :param start_timestamp: Only include notes updated after this Unix timestamp (seconds) |
| | :param end_timestamp: Only include notes updated before this Unix timestamp (seconds) |
| | :return: JSON with matching notes containing id, title, and content snippet |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | user_id = __user__.get("id") |
| | user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] |
| |
|
| | result = Notes.search_notes( |
| | user_id=user_id, |
| | filter={ |
| | "query": query, |
| | "user_id": user_id, |
| | "group_ids": user_group_ids, |
| | "permission": "read", |
| | }, |
| | skip=0, |
| | limit=count * 3, |
| | ) |
| |
|
| | |
| | start_ts = start_timestamp * 1_000_000_000 if start_timestamp else None |
| | end_ts = end_timestamp * 1_000_000_000 if end_timestamp else None |
| |
|
| | notes = [] |
| | for note in result.items: |
| | |
| | if start_ts and note.updated_at < start_ts: |
| | continue |
| | if end_ts and note.updated_at > end_ts: |
| | continue |
| |
|
| | |
| | content_snippet = "" |
| | if note.data and note.data.get("content", {}).get("md"): |
| | md_content = note.data["content"]["md"] |
| | lower_content = md_content.lower() |
| | lower_query = query.lower() |
| | idx = lower_content.find(lower_query) |
| | if idx != -1: |
| | start = max(0, idx - 50) |
| | end = min(len(md_content), idx + len(query) + 100) |
| | content_snippet = ( |
| | ("..." if start > 0 else "") |
| | + md_content[start:end] |
| | + ("..." if end < len(md_content) else "") |
| | ) |
| | else: |
| | content_snippet = md_content[:150] + ( |
| | "..." if len(md_content) > 150 else "" |
| | ) |
| |
|
| | notes.append( |
| | { |
| | "id": note.id, |
| | "title": note.title, |
| | "snippet": content_snippet, |
| | "updated_at": note.updated_at, |
| | } |
| | ) |
| |
|
| | if len(notes) >= count: |
| | break |
| |
|
| | return json.dumps(notes, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"search_notes error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def view_note( |
| | note_id: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Get the full content of a note by its ID. |
| | |
| | :param note_id: The ID of the note to retrieve |
| | :return: JSON with the note's id, title, and full markdown content |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | note = Notes.get_note_by_id(note_id) |
| |
|
| | if not note: |
| | return json.dumps({"error": "Note not found"}) |
| |
|
| | |
| | user_id = __user__.get("id") |
| | user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] |
| |
|
| | from open_webui.models.access_grants import AccessGrants |
| |
|
| | if note.user_id != user_id and not AccessGrants.has_access( |
| | user_id=user_id, |
| | resource_type="note", |
| | resource_id=note.id, |
| | permission="read", |
| | user_group_ids=set(user_group_ids), |
| | ): |
| | return json.dumps({"error": "Access denied"}) |
| |
|
| | |
| | content = "" |
| | if note.data and note.data.get("content", {}).get("md"): |
| | content = note.data["content"]["md"] |
| |
|
| | return json.dumps( |
| | { |
| | "id": note.id, |
| | "title": note.title, |
| | "content": content, |
| | "updated_at": note.updated_at, |
| | "created_at": note.created_at, |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"view_note error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def write_note( |
| | title: str, |
| | content: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Create a new note with the given title and content. |
| | |
| | :param title: The title of the new note |
| | :param content: The markdown content for the note |
| | :return: JSON with success status and new note id |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | from open_webui.models.notes import NoteForm |
| |
|
| | user_id = __user__.get("id") |
| |
|
| | form = NoteForm( |
| | title=title, |
| | data={"content": {"md": content}}, |
| | access_grants=[], |
| | ) |
| |
|
| | new_note = Notes.insert_new_note(user_id, form) |
| |
|
| | if not new_note: |
| | return json.dumps({"error": "Failed to create note"}) |
| |
|
| | return json.dumps( |
| | { |
| | "status": "success", |
| | "id": new_note.id, |
| | "title": new_note.title, |
| | "created_at": new_note.created_at, |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"write_note error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def replace_note_content( |
| | note_id: str, |
| | content: str, |
| | title: Optional[str] = None, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Update the content of a note. Use this to modify task lists, add notes, or update content. |
| | |
| | :param note_id: The ID of the note to update |
| | :param content: The new markdown content for the note |
| | :param title: Optional new title for the note |
| | :return: JSON with success status and updated note info |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | from open_webui.models.notes import NoteUpdateForm |
| |
|
| | note = Notes.get_note_by_id(note_id) |
| |
|
| | if not note: |
| | return json.dumps({"error": "Note not found"}) |
| |
|
| | |
| | user_id = __user__.get("id") |
| | user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] |
| |
|
| | from open_webui.models.access_grants import AccessGrants |
| |
|
| | if note.user_id != user_id and not AccessGrants.has_access( |
| | user_id=user_id, |
| | resource_type="note", |
| | resource_id=note.id, |
| | permission="write", |
| | user_group_ids=set(user_group_ids), |
| | ): |
| | return json.dumps({"error": "Write access denied"}) |
| |
|
| | |
| | update_data = {"data": {"content": {"md": content}}} |
| | if title: |
| | update_data["title"] = title |
| |
|
| | form = NoteUpdateForm(**update_data) |
| | updated_note = Notes.update_note_by_id(note_id, form) |
| |
|
| | if not updated_note: |
| | return json.dumps({"error": "Failed to update note"}) |
| |
|
| | return json.dumps( |
| | { |
| | "status": "success", |
| | "id": updated_note.id, |
| | "title": updated_note.title, |
| | "updated_at": updated_note.updated_at, |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"replace_note_content error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def search_chats( |
| | query: str, |
| | count: int = 5, |
| | start_timestamp: Optional[int] = None, |
| | end_timestamp: Optional[int] = None, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | __chat_id__: str = None, |
| | ) -> str: |
| | """ |
| | Search the user's previous chat conversations by title and message content. |
| | |
| | :param query: The search query to find matching chats |
| | :param count: Maximum number of results to return (default: 5) |
| | :param start_timestamp: Only include chats updated after this Unix timestamp (seconds) |
| | :param end_timestamp: Only include chats updated before this Unix timestamp (seconds) |
| | :return: JSON with matching chats containing id, title, updated_at, and content snippet |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | user_id = __user__.get("id") |
| |
|
| | chats = Chats.get_chats_by_user_id_and_search_text( |
| | user_id=user_id, |
| | search_text=query, |
| | include_archived=False, |
| | skip=0, |
| | limit=count * 3, |
| | ) |
| |
|
| | results = [] |
| | for chat in chats: |
| | |
| | if __chat_id__ and chat.id == __chat_id__: |
| | continue |
| |
|
| | |
| | if start_timestamp and chat.updated_at < start_timestamp: |
| | continue |
| | if end_timestamp and chat.updated_at > end_timestamp: |
| | continue |
| |
|
| | |
| | snippet = "" |
| | messages = chat.chat.get("history", {}).get("messages", {}) |
| | lower_query = query.lower() |
| |
|
| | for msg_id, msg in messages.items(): |
| | content = msg.get("content", "") |
| | if isinstance(content, str) and lower_query in content.lower(): |
| | idx = content.lower().find(lower_query) |
| | start = max(0, idx - 50) |
| | end = min(len(content), idx + len(query) + 100) |
| | snippet = ( |
| | ("..." if start > 0 else "") |
| | + content[start:end] |
| | + ("..." if end < len(content) else "") |
| | ) |
| | break |
| |
|
| | if not snippet and lower_query in chat.title.lower(): |
| | snippet = f"Title match: {chat.title}" |
| |
|
| | results.append( |
| | { |
| | "id": chat.id, |
| | "title": chat.title, |
| | "snippet": snippet, |
| | "updated_at": chat.updated_at, |
| | } |
| | ) |
| |
|
| | if len(results) >= count: |
| | break |
| |
|
| | return json.dumps(results, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"search_chats error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def view_chat( |
| | chat_id: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Get the full conversation history of a chat by its ID. |
| | |
| | :param chat_id: The ID of the chat to retrieve |
| | :return: JSON with the chat's id, title, and messages |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | user_id = __user__.get("id") |
| |
|
| | chat = Chats.get_chat_by_id_and_user_id(chat_id, user_id) |
| |
|
| | if not chat: |
| | return json.dumps({"error": "Chat not found or access denied"}) |
| |
|
| | |
| | messages = [] |
| | history = chat.chat.get("history", {}) |
| | msg_dict = history.get("messages", {}) |
| |
|
| | |
| | current_id = history.get("currentId") |
| | visited = set() |
| |
|
| | while current_id and current_id not in visited: |
| | visited.add(current_id) |
| | msg = msg_dict.get(current_id) |
| | if msg: |
| | messages.append( |
| | { |
| | "role": msg.get("role", ""), |
| | "content": msg.get("content", ""), |
| | } |
| | ) |
| | current_id = msg.get("parentId") if msg else None |
| |
|
| | |
| | messages.reverse() |
| |
|
| | return json.dumps( |
| | { |
| | "id": chat.id, |
| | "title": chat.title, |
| | "messages": messages, |
| | "updated_at": chat.updated_at, |
| | "created_at": chat.created_at, |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"view_chat error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def search_channels( |
| | query: str, |
| | count: int = 5, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Search for channels by name and description that the user has access to. |
| | |
| | :param query: The search query to find matching channels |
| | :param count: Maximum number of results to return (default: 5) |
| | :return: JSON with matching channels containing id, name, description, and type |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | user_id = __user__.get("id") |
| |
|
| | |
| | all_channels = Channels.get_channels_by_user_id(user_id) |
| |
|
| | |
| | lower_query = query.lower() |
| | matching_channels = [] |
| |
|
| | for channel in all_channels: |
| | name_match = lower_query in channel.name.lower() if channel.name else False |
| | desc_match = lower_query in (channel.description or "").lower() |
| |
|
| | if name_match or desc_match: |
| | matching_channels.append( |
| | { |
| | "id": channel.id, |
| | "name": channel.name, |
| | "description": channel.description or "", |
| | "type": channel.type or "public", |
| | } |
| | ) |
| |
|
| | if len(matching_channels) >= count: |
| | break |
| |
|
| | return json.dumps(matching_channels, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"search_channels error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def search_channel_messages( |
| | query: str, |
| | count: int = 10, |
| | start_timestamp: Optional[int] = None, |
| | end_timestamp: Optional[int] = None, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Search for messages in channels the user is a member of, including thread replies. |
| | |
| | :param query: The search query to find matching messages |
| | :param count: Maximum number of results to return (default: 10) |
| | :param start_timestamp: Only include messages created after this Unix timestamp (seconds) |
| | :param end_timestamp: Only include messages created before this Unix timestamp (seconds) |
| | :return: JSON with matching messages containing channel info, message content, and thread context |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | user_id = __user__.get("id") |
| |
|
| | |
| | user_channels = Channels.get_channels_by_user_id(user_id) |
| | channel_ids = [c.id for c in user_channels] |
| | channel_map = {c.id: c for c in user_channels} |
| |
|
| | if not channel_ids: |
| | return json.dumps([]) |
| |
|
| | |
| | start_ts = start_timestamp * 1_000_000_000 if start_timestamp else None |
| | end_ts = end_timestamp * 1_000_000_000 if end_timestamp else None |
| |
|
| | |
| | matching_messages = Messages.search_messages_by_channel_ids( |
| | channel_ids=channel_ids, |
| | query=query, |
| | start_timestamp=start_ts, |
| | end_timestamp=end_ts, |
| | limit=count, |
| | ) |
| |
|
| | results = [] |
| | for msg in matching_messages: |
| | channel = channel_map.get(msg.channel_id) |
| |
|
| | |
| | content = msg.content or "" |
| | lower_query = query.lower() |
| | idx = content.lower().find(lower_query) |
| | if idx != -1: |
| | start = max(0, idx - 50) |
| | end = min(len(content), idx + len(query) + 100) |
| | snippet = ( |
| | ("..." if start > 0 else "") |
| | + content[start:end] |
| | + ("..." if end < len(content) else "") |
| | ) |
| | else: |
| | snippet = content[:150] + ("..." if len(content) > 150 else "") |
| |
|
| | results.append( |
| | { |
| | "channel_id": msg.channel_id, |
| | "channel_name": channel.name if channel else "Unknown", |
| | "message_id": msg.id, |
| | "content_snippet": snippet, |
| | "is_thread_reply": msg.parent_id is not None, |
| | "parent_id": msg.parent_id, |
| | "created_at": msg.created_at, |
| | } |
| | ) |
| |
|
| | return json.dumps(results, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"search_channel_messages error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def view_channel_message( |
| | message_id: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Get the full content of a channel message by its ID, including thread replies. |
| | |
| | :param message_id: The ID of the message to retrieve |
| | :return: JSON with the message content, channel info, and thread replies if any |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | user_id = __user__.get("id") |
| |
|
| | message = Messages.get_message_by_id(message_id) |
| |
|
| | if not message: |
| | return json.dumps({"error": "Message not found"}) |
| |
|
| | |
| | channel = Channels.get_channel_by_id(message.channel_id) |
| | if not channel: |
| | return json.dumps({"error": "Channel not found"}) |
| |
|
| | |
| | user_channels = Channels.get_channels_by_user_id(user_id) |
| | channel_ids = [c.id for c in user_channels] |
| |
|
| | if message.channel_id not in channel_ids: |
| | return json.dumps({"error": "Access denied"}) |
| |
|
| | |
| | result = { |
| | "id": message.id, |
| | "channel_id": message.channel_id, |
| | "channel_name": channel.name, |
| | "content": message.content, |
| | "user_id": message.user_id, |
| | "is_thread_reply": message.parent_id is not None, |
| | "parent_id": message.parent_id, |
| | "reply_count": message.reply_count, |
| | "created_at": message.created_at, |
| | "updated_at": message.updated_at, |
| | } |
| |
|
| | |
| | if message.user: |
| | result["user_name"] = message.user.name |
| |
|
| | return json.dumps(result, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"view_channel_message error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def view_channel_thread( |
| | parent_message_id: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Get all messages in a channel thread, including the parent message and all replies. |
| | |
| | :param parent_message_id: The ID of the parent message that started the thread |
| | :return: JSON with the parent message and all thread replies in chronological order |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | user_id = __user__.get("id") |
| |
|
| | |
| | parent_message = Messages.get_message_by_id(parent_message_id) |
| |
|
| | if not parent_message: |
| | return json.dumps({"error": "Message not found"}) |
| |
|
| | |
| | channel = Channels.get_channel_by_id(parent_message.channel_id) |
| | if not channel: |
| | return json.dumps({"error": "Channel not found"}) |
| |
|
| | user_channels = Channels.get_channels_by_user_id(user_id) |
| | channel_ids = [c.id for c in user_channels] |
| |
|
| | if parent_message.channel_id not in channel_ids: |
| | return json.dumps({"error": "Access denied"}) |
| |
|
| | |
| | thread_replies = Messages.get_thread_replies_by_message_id(parent_message_id) |
| |
|
| | |
| | messages = [] |
| |
|
| | |
| | messages.append( |
| | { |
| | "id": parent_message.id, |
| | "content": parent_message.content, |
| | "user_id": parent_message.user_id, |
| | "user_name": parent_message.user.name if parent_message.user else None, |
| | "is_parent": True, |
| | "created_at": parent_message.created_at, |
| | } |
| | ) |
| |
|
| | |
| | for reply in reversed(thread_replies): |
| | messages.append( |
| | { |
| | "id": reply.id, |
| | "content": reply.content, |
| | "user_id": reply.user_id, |
| | "user_name": reply.user.name if reply.user else None, |
| | "is_parent": False, |
| | "reply_to_id": reply.reply_to_id, |
| | "created_at": reply.created_at, |
| | } |
| | ) |
| |
|
| | return json.dumps( |
| | { |
| | "channel_id": parent_message.channel_id, |
| | "channel_name": channel.name, |
| | "thread_id": parent_message_id, |
| | "message_count": len(messages), |
| | "messages": messages, |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"view_channel_thread error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def list_knowledge_bases( |
| | count: int = 10, |
| | skip: int = 0, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | List the user's accessible knowledge bases. |
| | |
| | :param count: Maximum number of KBs to return (default: 10) |
| | :param skip: Number of results to skip for pagination (default: 0) |
| | :return: JSON with KBs containing id, name, description, and file_count |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | from open_webui.models.knowledge import Knowledges |
| |
|
| | user_id = __user__.get("id") |
| | user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] |
| |
|
| | result = Knowledges.search_knowledge_bases( |
| | user_id, |
| | filter={ |
| | "query": "", |
| | "user_id": user_id, |
| | "group_ids": user_group_ids, |
| | }, |
| | skip=skip, |
| | limit=count, |
| | ) |
| |
|
| | knowledge_bases = [] |
| | for knowledge_base in result.items: |
| | files = Knowledges.get_files_by_id(knowledge_base.id) |
| | file_count = len(files) if files else 0 |
| |
|
| | knowledge_bases.append( |
| | { |
| | "id": knowledge_base.id, |
| | "name": knowledge_base.name, |
| | "description": knowledge_base.description or "", |
| | "file_count": file_count, |
| | "updated_at": knowledge_base.updated_at, |
| | } |
| | ) |
| |
|
| | return json.dumps(knowledge_bases, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"list_knowledge_bases error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def search_knowledge_bases( |
| | query: str, |
| | count: int = 5, |
| | skip: int = 0, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Search the user's accessible knowledge bases by name and description. |
| | |
| | :param query: The search query to find matching knowledge bases |
| | :param count: Maximum number of results to return (default: 5) |
| | :param skip: Number of results to skip for pagination (default: 0) |
| | :return: JSON with matching KBs containing id, name, description, and file_count |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | from open_webui.models.knowledge import Knowledges |
| |
|
| | user_id = __user__.get("id") |
| | user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] |
| |
|
| | result = Knowledges.search_knowledge_bases( |
| | user_id, |
| | filter={ |
| | "query": query, |
| | "user_id": user_id, |
| | "group_ids": user_group_ids, |
| | }, |
| | skip=skip, |
| | limit=count, |
| | ) |
| |
|
| | knowledge_bases = [] |
| | for knowledge_base in result.items: |
| | files = Knowledges.get_files_by_id(knowledge_base.id) |
| | file_count = len(files) if files else 0 |
| |
|
| | knowledge_bases.append( |
| | { |
| | "id": knowledge_base.id, |
| | "name": knowledge_base.name, |
| | "description": knowledge_base.description or "", |
| | "file_count": file_count, |
| | "updated_at": knowledge_base.updated_at, |
| | } |
| | ) |
| |
|
| | return json.dumps(knowledge_bases, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"search_knowledge_bases error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def search_knowledge_files( |
| | query: str, |
| | knowledge_id: Optional[str] = None, |
| | count: int = 5, |
| | skip: int = 0, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Search files across knowledge bases the user has access to. |
| | |
| | :param query: The search query to find matching files by filename |
| | :param knowledge_id: Optional KB id to limit search to a specific knowledge base |
| | :param count: Maximum number of results to return (default: 5) |
| | :param skip: Number of results to skip for pagination (default: 0) |
| | :return: JSON with matching files containing id, filename, and updated_at |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | from open_webui.models.knowledge import Knowledges |
| |
|
| | user_id = __user__.get("id") |
| | user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] |
| |
|
| | if knowledge_id: |
| | result = Knowledges.search_files_by_id( |
| | knowledge_id=knowledge_id, |
| | user_id=user_id, |
| | filter={"query": query}, |
| | skip=skip, |
| | limit=count, |
| | ) |
| | else: |
| | result = Knowledges.search_knowledge_files( |
| | filter={ |
| | "query": query, |
| | "user_id": user_id, |
| | "group_ids": user_group_ids, |
| | }, |
| | skip=skip, |
| | limit=count, |
| | ) |
| |
|
| | files = [] |
| | for file in result.items: |
| | file_info = { |
| | "id": file.id, |
| | "filename": file.filename, |
| | "updated_at": file.updated_at, |
| | } |
| | if hasattr(file, "collection") and file.collection: |
| | file_info["knowledge_id"] = file.collection.get("id", "") |
| | file_info["knowledge_name"] = file.collection.get("name", "") |
| | files.append(file_info) |
| |
|
| | return json.dumps(files, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"search_knowledge_files error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def view_file( |
| | file_id: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | __model_knowledge__: Optional[list[dict]] = None, |
| | ) -> str: |
| | """ |
| | Get the full content of a file by its ID. |
| | |
| | :param file_id: The ID of the file to retrieve |
| | :return: JSON with the file's id, filename, and full text content |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | from open_webui.models.files import Files |
| | from open_webui.routers.files import has_access_to_file |
| |
|
| | user_id = __user__.get("id") |
| | user_role = __user__.get("role", "user") |
| |
|
| | file = Files.get_file_by_id(file_id) |
| | if not file: |
| | return json.dumps({"error": "File not found"}) |
| |
|
| | if ( |
| | file.user_id != user_id |
| | and user_role != "admin" |
| | and not any( |
| | item.get("type") == "file" and item.get("id") == file_id |
| | for item in (__model_knowledge__ or []) |
| | ) |
| | and not has_access_to_file( |
| | file_id=file_id, |
| | access_type="read", |
| | user=UserModel(**__user__), |
| | ) |
| | ): |
| | return json.dumps({"error": "File not found"}) |
| |
|
| | content = "" |
| | if file.data: |
| | content = file.data.get("content", "") |
| |
|
| | return json.dumps( |
| | { |
| | "id": file.id, |
| | "filename": file.filename, |
| | "content": content, |
| | "updated_at": file.updated_at, |
| | "created_at": file.created_at, |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"view_file error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def view_knowledge_file( |
| | file_id: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Get the full content of a file from a knowledge base. |
| | |
| | :param file_id: The ID of the file to retrieve |
| | :return: JSON with the file's id, filename, and full text content |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | from open_webui.models.files import Files |
| | from open_webui.models.knowledge import Knowledges |
| | from open_webui.models.access_grants import AccessGrants |
| |
|
| | user_id = __user__.get("id") |
| | user_role = __user__.get("role", "user") |
| | user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] |
| |
|
| | file = Files.get_file_by_id(file_id) |
| | if not file: |
| | return json.dumps({"error": "File not found"}) |
| |
|
| | |
| | knowledges = Knowledges.get_knowledges_by_file_id(file_id) |
| | has_knowledge_access = False |
| | knowledge_info = None |
| |
|
| | for knowledge_base in knowledges: |
| | if ( |
| | user_role == "admin" |
| | or knowledge_base.user_id == user_id |
| | or AccessGrants.has_access( |
| | user_id=user_id, |
| | resource_type="knowledge", |
| | resource_id=knowledge_base.id, |
| | permission="read", |
| | user_group_ids=set(user_group_ids), |
| | ) |
| | ): |
| | has_knowledge_access = True |
| | knowledge_info = {"id": knowledge_base.id, "name": knowledge_base.name} |
| | break |
| |
|
| | if not has_knowledge_access: |
| | if file.user_id != user_id and user_role != "admin": |
| | return json.dumps({"error": "Access denied"}) |
| |
|
| | content = "" |
| | if file.data: |
| | content = file.data.get("content", "") |
| |
|
| | result = { |
| | "id": file.id, |
| | "filename": file.filename, |
| | "content": content, |
| | "updated_at": file.updated_at, |
| | "created_at": file.created_at, |
| | } |
| | if knowledge_info: |
| | result["knowledge_id"] = knowledge_info["id"] |
| | result["knowledge_name"] = knowledge_info["name"] |
| |
|
| | return json.dumps(result, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"view_knowledge_file error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def query_knowledge_files( |
| | query: str, |
| | knowledge_ids: Optional[list[str]] = None, |
| | count: int = 5, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | __model_knowledge__: list[dict] = None, |
| | ) -> str: |
| | """ |
| | Search knowledge base files using semantic/vector search. Searches across collections (KBs), |
| | individual files, and notes that the user has access to. |
| | |
| | :param query: The search query to find semantically relevant content |
| | :param knowledge_ids: Optional list of KB ids to limit search to specific knowledge bases |
| | :param count: Maximum number of results to return (default: 5) |
| | :return: JSON with relevant chunks containing content, source filename, and relevance score |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | |
| | if isinstance(count, str): |
| | try: |
| | count = int(count) |
| | except ValueError: |
| | count = 5 |
| |
|
| | |
| | if isinstance(knowledge_ids, str): |
| | if knowledge_ids.lower() in ("none", "null", ""): |
| | knowledge_ids = None |
| | else: |
| | |
| | try: |
| | knowledge_ids = json.loads(knowledge_ids) |
| | except json.JSONDecodeError: |
| | |
| | knowledge_ids = [knowledge_ids] |
| |
|
| | try: |
| | from open_webui.models.knowledge import Knowledges |
| | from open_webui.models.files import Files |
| | from open_webui.models.notes import Notes |
| | from open_webui.retrieval.utils import query_collection |
| | from open_webui.models.access_grants import AccessGrants |
| |
|
| | user_id = __user__.get("id") |
| | user_role = __user__.get("role", "user") |
| | user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] |
| |
|
| | embedding_function = __request__.app.state.EMBEDDING_FUNCTION |
| | if not embedding_function: |
| | return json.dumps({"error": "Embedding function not configured"}) |
| |
|
| | collection_names = [] |
| | note_results = [] |
| |
|
| | |
| | if __model_knowledge__: |
| | for item in __model_knowledge__: |
| | item_type = item.get("type") |
| | item_id = item.get("id") |
| |
|
| | if item_type == "collection": |
| | |
| | knowledge = Knowledges.get_knowledge_by_id(item_id) |
| | if knowledge and ( |
| | user_role == "admin" |
| | or knowledge.user_id == user_id |
| | or AccessGrants.has_access( |
| | user_id=user_id, |
| | resource_type="knowledge", |
| | resource_id=knowledge.id, |
| | permission="read", |
| | user_group_ids=set(user_group_ids), |
| | ) |
| | ): |
| | collection_names.append(item_id) |
| |
|
| | elif item_type == "file": |
| | |
| | file = Files.get_file_by_id(item_id) |
| | if file and (user_role == "admin" or file.user_id == user_id): |
| | collection_names.append(f"file-{item_id}") |
| |
|
| | elif item_type == "note": |
| | |
| | note = Notes.get_note_by_id(item_id) |
| | if note and ( |
| | user_role == "admin" |
| | or note.user_id == user_id |
| | or AccessGrants.has_access( |
| | user_id=user_id, |
| | resource_type="note", |
| | resource_id=note.id, |
| | permission="read", |
| | ) |
| | ): |
| | content = note.data.get("content", {}).get("md", "") |
| | note_results.append( |
| | { |
| | "content": content, |
| | "source": note.title, |
| | "note_id": note.id, |
| | "type": "note", |
| | } |
| | ) |
| |
|
| | elif knowledge_ids: |
| | |
| | for knowledge_id in knowledge_ids: |
| | knowledge = Knowledges.get_knowledge_by_id(knowledge_id) |
| | if knowledge and ( |
| | user_role == "admin" |
| | or knowledge.user_id == user_id |
| | or AccessGrants.has_access( |
| | user_id=user_id, |
| | resource_type="knowledge", |
| | resource_id=knowledge.id, |
| | permission="read", |
| | user_group_ids=set(user_group_ids), |
| | ) |
| | ): |
| | collection_names.append(knowledge_id) |
| | else: |
| | |
| | result = Knowledges.search_knowledge_bases( |
| | user_id, |
| | filter={ |
| | "query": "", |
| | "user_id": user_id, |
| | "group_ids": user_group_ids, |
| | }, |
| | skip=0, |
| | limit=50, |
| | ) |
| | collection_names = [knowledge_base.id for knowledge_base in result.items] |
| |
|
| | chunks = [] |
| |
|
| | |
| | chunks.extend(note_results) |
| |
|
| | |
| | if collection_names: |
| | query_results = await query_collection( |
| | collection_names=collection_names, |
| | queries=[query], |
| | embedding_function=embedding_function, |
| | k=count, |
| | ) |
| |
|
| | if query_results and "documents" in query_results: |
| | documents = query_results.get("documents", [[]])[0] |
| | metadatas = query_results.get("metadatas", [[]])[0] |
| | distances = query_results.get("distances", [[]])[0] |
| |
|
| | for idx, doc in enumerate(documents): |
| | chunk_info = { |
| | "content": doc, |
| | "source": metadatas[idx].get( |
| | "source", metadatas[idx].get("name", "Unknown") |
| | ), |
| | "file_id": metadatas[idx].get("file_id", ""), |
| | } |
| | if idx < len(distances): |
| | chunk_info["distance"] = distances[idx] |
| | chunks.append(chunk_info) |
| |
|
| | |
| | chunks = chunks[:count] |
| |
|
| | return json.dumps(chunks, ensure_ascii=False) |
| | except Exception as e: |
| | log.exception(f"query_knowledge_files error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | async def query_knowledge_bases( |
| | query: str, |
| | count: int = 5, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Search knowledge bases by semantic similarity to query. |
| | Finds KBs whose name/description match the meaning of your query. |
| | Use this to discover relevant knowledge bases before querying their files. |
| | |
| | :param query: Natural language query describing what you're looking for |
| | :param count: Maximum results (default: 5) |
| | :return: JSON with matching KBs (id, name, description, similarity) |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | import heapq |
| | from open_webui.models.knowledge import Knowledges |
| | from open_webui.routers.knowledge import KNOWLEDGE_BASES_COLLECTION |
| | from open_webui.retrieval.vector.factory import VECTOR_DB_CLIENT |
| |
|
| | user_id = __user__.get("id") |
| | user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] |
| | query_embedding = await __request__.app.state.EMBEDDING_FUNCTION(query) |
| |
|
| | |
| | top_results_heap = [] |
| | seen_ids = set() |
| | page_offset = 0 |
| | page_size = 100 |
| |
|
| | while True: |
| | accessible_knowledge_bases = Knowledges.search_knowledge_bases( |
| | user_id, |
| | filter={"user_id": user_id, "group_ids": user_group_ids}, |
| | skip=page_offset, |
| | limit=page_size, |
| | ) |
| |
|
| | if not accessible_knowledge_bases.items: |
| | break |
| |
|
| | accessible_ids = [kb.id for kb in accessible_knowledge_bases.items] |
| |
|
| | search_results = VECTOR_DB_CLIENT.search( |
| | collection_name=KNOWLEDGE_BASES_COLLECTION, |
| | vectors=[query_embedding], |
| | filter={"knowledge_base_id": {"$in": accessible_ids}}, |
| | limit=count, |
| | ) |
| |
|
| | if search_results and search_results.ids and search_results.ids[0]: |
| | result_ids = search_results.ids[0] |
| | result_distances = ( |
| | search_results.distances[0] |
| | if search_results.distances |
| | else [0] * len(result_ids) |
| | ) |
| |
|
| | for knowledge_base_id, distance in zip(result_ids, result_distances): |
| | if knowledge_base_id in seen_ids: |
| | continue |
| | seen_ids.add(knowledge_base_id) |
| |
|
| | if len(top_results_heap) < count: |
| | heapq.heappush(top_results_heap, (distance, knowledge_base_id)) |
| | elif distance > top_results_heap[0][0]: |
| | heapq.heapreplace( |
| | top_results_heap, (distance, knowledge_base_id) |
| | ) |
| |
|
| | page_offset += page_size |
| | if len(accessible_knowledge_bases.items) < page_size: |
| | break |
| | if page_offset >= MAX_KNOWLEDGE_BASE_SEARCH_ITEMS: |
| | break |
| |
|
| | |
| | sorted_results = sorted(top_results_heap, key=lambda x: x[0], reverse=True) |
| |
|
| | matching_knowledge_bases = [] |
| | for distance, knowledge_base_id in sorted_results: |
| | knowledge_base = Knowledges.get_knowledge_by_id(knowledge_base_id) |
| | if knowledge_base: |
| | matching_knowledge_bases.append( |
| | { |
| | "id": knowledge_base.id, |
| | "name": knowledge_base.name, |
| | "description": knowledge_base.description or "", |
| | "similarity": round(distance, 4), |
| | } |
| | ) |
| |
|
| | return json.dumps(matching_knowledge_bases, ensure_ascii=False) |
| |
|
| | except Exception as e: |
| | log.exception(f"query_knowledge_bases error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | async def view_skill( |
| | name: str, |
| | __request__: Request = None, |
| | __user__: dict = None, |
| | ) -> str: |
| | """ |
| | Load the full instructions of a skill by its name from the available skills manifest. |
| | Use this when you need detailed instructions for a skill listed in <available_skills>. |
| | |
| | :param name: The name of the skill to load (as shown in the manifest) |
| | :return: The full skill instructions as markdown content |
| | """ |
| | if __request__ is None: |
| | return json.dumps({"error": "Request context not available"}) |
| |
|
| | if not __user__: |
| | return json.dumps({"error": "User context not available"}) |
| |
|
| | try: |
| | from open_webui.models.skills import Skills |
| | from open_webui.models.access_grants import AccessGrants |
| |
|
| | user_id = __user__.get("id") |
| |
|
| | |
| | skill = Skills.get_skill_by_name(name) |
| |
|
| | if not skill or not skill.is_active: |
| | return json.dumps({"error": f"Skill '{name}' not found"}) |
| |
|
| | |
| | user_role = __user__.get("role", "user") |
| | if user_role != "admin" and skill.user_id != user_id: |
| | user_group_ids = [ |
| | group.id for group in Groups.get_groups_by_member_id(user_id) |
| | ] |
| | if not AccessGrants.has_access( |
| | user_id=user_id, |
| | resource_type="skill", |
| | resource_id=skill.id, |
| | permission="read", |
| | user_group_ids=set(user_group_ids), |
| | ): |
| | return json.dumps({"error": "Access denied"}) |
| |
|
| | return json.dumps( |
| | { |
| | "name": skill.name, |
| | "content": skill.content, |
| | }, |
| | ensure_ascii=False, |
| | ) |
| | except Exception as e: |
| | log.exception(f"view_skill error: {e}") |
| | return json.dumps({"error": str(e)}) |
| |
|