| | import os |
| | import re |
| | import subprocess |
| | import sys |
| | from importlib import util |
| | import types |
| | import tempfile |
| | import logging |
| | from typing import Any |
| |
|
| | from open_webui.env import PIP_OPTIONS, PIP_PACKAGE_INDEX_OPTIONS, OFFLINE_MODE |
| | from open_webui.models.functions import Functions |
| | from open_webui.models.tools import Tools |
| |
|
| | log = logging.getLogger(__name__) |
| |
|
| |
|
| | def resolve_valves_schema_options( |
| | valves_class: type, schema: dict, user: Any = None |
| | ) -> dict: |
| | """ |
| | Resolve dynamic options in a Valves schema. |
| | |
| | For properties with `input.options`, this function handles two cases: |
| | - List: Used directly as dropdown options |
| | - String: Treated as method name, called to get options dynamically |
| | |
| | Usage in Valves: |
| | class UserValves(BaseModel): |
| | # Static options |
| | priority: str = Field( |
| | default="medium", |
| | json_schema_extra={ |
| | "input": { |
| | "type": "select", |
| | "options": ["low", "medium", "high"] |
| | } |
| | } |
| | ) |
| | |
| | # Dynamic options (method name) |
| | model: str = Field( |
| | default="", |
| | json_schema_extra={ |
| | "input": { |
| | "type": "select", |
| | "options": "get_model_options" |
| | } |
| | } |
| | ) |
| | |
| | @classmethod |
| | def get_model_options(cls, __user__=None) -> list[dict]: |
| | return [{"value": "gpt-4", "label": "GPT-4"}] |
| | |
| | Args: |
| | valves_class: The Valves or UserValves Pydantic model class |
| | schema: The JSON schema dict from valves_class.schema() |
| | user: Optional user object passed to methods that accept __user__ |
| | |
| | Returns: |
| | Modified schema dict with resolved options |
| | """ |
| | if not schema or "properties" not in schema: |
| | return schema |
| |
|
| | |
| | schema = dict(schema) |
| | schema["properties"] = dict(schema.get("properties", {})) |
| |
|
| | for prop_name, prop_schema in list(schema["properties"].items()): |
| | |
| | if not hasattr(valves_class, "model_fields"): |
| | continue |
| |
|
| | field_info = valves_class.model_fields.get(prop_name) |
| | if not field_info: |
| | continue |
| |
|
| | |
| | json_schema_extra = field_info.json_schema_extra |
| | if not json_schema_extra or not isinstance(json_schema_extra, dict): |
| | continue |
| |
|
| | input_config = json_schema_extra.get("input") |
| | if not input_config or not isinstance(input_config, dict): |
| | continue |
| |
|
| | options = input_config.get("options") |
| | if options is None: |
| | continue |
| |
|
| | resolved_options = None |
| |
|
| | |
| | if isinstance(options, list): |
| | resolved_options = options |
| |
|
| | |
| | elif isinstance(options, str) and options: |
| | method = getattr(valves_class, options, None) |
| | if method is None or not callable(method): |
| | log.warning( |
| | f"options '{options}' not found or not callable on {valves_class.__name__}" |
| | ) |
| | continue |
| |
|
| | try: |
| | import inspect |
| |
|
| | sig = inspect.signature(method) |
| | params = sig.parameters |
| |
|
| | |
| | kwargs = {} |
| | if "__user__" in params and user is not None: |
| | kwargs["__user__"] = ( |
| | user.model_dump() if hasattr(user, "model_dump") else user |
| | ) |
| | if "user" in params and user is not None: |
| | kwargs["user"] = ( |
| | user.model_dump() if hasattr(user, "model_dump") else user |
| | ) |
| |
|
| | resolved_options = method(**kwargs) if kwargs else method() |
| |
|
| | |
| | if not isinstance(resolved_options, list): |
| | log.warning( |
| | f"Method '{options}' did not return a list for {prop_name}" |
| | ) |
| | continue |
| |
|
| | except Exception as e: |
| | log.warning(f"Failed to resolve options for {prop_name}: {e}") |
| | continue |
| | else: |
| | |
| | continue |
| |
|
| | |
| | schema["properties"][prop_name] = dict(prop_schema) |
| | if "input" not in schema["properties"][prop_name]: |
| | schema["properties"][prop_name]["input"] = {"type": "select"} |
| | else: |
| | schema["properties"][prop_name]["input"] = dict( |
| | schema["properties"][prop_name].get("input", {}) |
| | ) |
| | schema["properties"][prop_name]["input"]["options"] = resolved_options |
| |
|
| | return schema |
| |
|
| |
|
| | def extract_frontmatter(content): |
| | """ |
| | Extract frontmatter as a dictionary from the provided content string. |
| | """ |
| | frontmatter = {} |
| | frontmatter_started = False |
| | frontmatter_ended = False |
| | frontmatter_pattern = re.compile(r"^\s*([a-z_]+):\s*(.*)\s*$", re.IGNORECASE) |
| |
|
| | try: |
| | lines = content.splitlines() |
| | if len(lines) < 1 or lines[0].strip() != '"""': |
| | |
| | return {} |
| |
|
| | frontmatter_started = True |
| |
|
| | for line in lines[1:]: |
| | if '"""' in line: |
| | if frontmatter_started: |
| | frontmatter_ended = True |
| | break |
| |
|
| | if frontmatter_started and not frontmatter_ended: |
| | match = frontmatter_pattern.match(line) |
| | if match: |
| | key, value = match.groups() |
| | frontmatter[key.strip()] = value.strip() |
| |
|
| | except Exception as e: |
| | log.exception(f"Failed to extract frontmatter: {e}") |
| | return {} |
| |
|
| | return frontmatter |
| |
|
| |
|
| | def replace_imports(content): |
| | """ |
| | Replace the import paths in the content. |
| | """ |
| | replacements = { |
| | "from utils": "from open_webui.utils", |
| | "from apps": "from open_webui.apps", |
| | "from main": "from open_webui.main", |
| | "from config": "from open_webui.config", |
| | } |
| |
|
| | for old, new in replacements.items(): |
| | content = content.replace(old, new) |
| |
|
| | return content |
| |
|
| |
|
| | def load_tool_module_by_id(tool_id, content=None): |
| |
|
| | if content is None: |
| | tool = Tools.get_tool_by_id(tool_id) |
| | if not tool: |
| | raise Exception(f"Toolkit not found: {tool_id}") |
| |
|
| | content = tool.content |
| |
|
| | content = replace_imports(content) |
| | Tools.update_tool_by_id(tool_id, {"content": content}) |
| | else: |
| | frontmatter = extract_frontmatter(content) |
| | |
| | install_frontmatter_requirements(frontmatter.get("requirements", "")) |
| |
|
| | module_name = f"tool_{tool_id}" |
| | module = types.ModuleType(module_name) |
| | sys.modules[module_name] = module |
| |
|
| | |
| | |
| | temp_file = tempfile.NamedTemporaryFile(delete=False) |
| | temp_file.close() |
| | try: |
| | with open(temp_file.name, "w", encoding="utf-8") as f: |
| | f.write(content) |
| | module.__dict__["__file__"] = temp_file.name |
| |
|
| | |
| | exec(content, module.__dict__) |
| | frontmatter = extract_frontmatter(content) |
| | log.info(f"Loaded module: {module.__name__}") |
| |
|
| | |
| | if hasattr(module, "Tools"): |
| | return module.Tools(), frontmatter |
| | else: |
| | raise Exception("No Tools class found in the module") |
| | except Exception as e: |
| | log.error(f"Error loading module: {tool_id}: {e}") |
| | del sys.modules[module_name] |
| | raise e |
| | finally: |
| | os.unlink(temp_file.name) |
| |
|
| |
|
| | def load_function_module_by_id(function_id: str, content: str | None = None): |
| | if content is None: |
| | function = Functions.get_function_by_id(function_id) |
| | if not function: |
| | raise Exception(f"Function not found: {function_id}") |
| | content = function.content |
| |
|
| | content = replace_imports(content) |
| | Functions.update_function_by_id(function_id, {"content": content}) |
| | else: |
| | frontmatter = extract_frontmatter(content) |
| | install_frontmatter_requirements(frontmatter.get("requirements", "")) |
| |
|
| | module_name = f"function_{function_id}" |
| | module = types.ModuleType(module_name) |
| | sys.modules[module_name] = module |
| |
|
| | |
| | |
| | temp_file = tempfile.NamedTemporaryFile(delete=False) |
| | temp_file.close() |
| | try: |
| | with open(temp_file.name, "w", encoding="utf-8") as f: |
| | f.write(content) |
| | module.__dict__["__file__"] = temp_file.name |
| |
|
| | |
| | exec(content, module.__dict__) |
| | frontmatter = extract_frontmatter(content) |
| | log.info(f"Loaded module: {module.__name__}") |
| |
|
| | |
| | if hasattr(module, "Pipe"): |
| | return module.Pipe(), "pipe", frontmatter |
| | elif hasattr(module, "Filter"): |
| | return module.Filter(), "filter", frontmatter |
| | elif hasattr(module, "Action"): |
| | return module.Action(), "action", frontmatter |
| | else: |
| | raise Exception("No Function class found in the module") |
| | except Exception as e: |
| | log.error(f"Error loading module: {function_id}: {e}") |
| | |
| | del sys.modules[module_name] |
| |
|
| | Functions.update_function_by_id(function_id, {"is_active": False}) |
| | raise e |
| | finally: |
| | os.unlink(temp_file.name) |
| |
|
| |
|
| | def get_tool_module_from_cache(request, tool_id, load_from_db=True): |
| | if load_from_db: |
| | |
| | tool = Tools.get_tool_by_id(tool_id) |
| | if not tool: |
| | raise Exception(f"Tool not found: {tool_id}") |
| | content = tool.content |
| |
|
| | new_content = replace_imports(content) |
| | if new_content != content: |
| | content = new_content |
| | |
| | Tools.update_tool_by_id(tool_id, {"content": content}) |
| |
|
| | if ( |
| | hasattr(request.app.state, "TOOL_CONTENTS") |
| | and tool_id in request.app.state.TOOL_CONTENTS |
| | ) and ( |
| | hasattr(request.app.state, "TOOLS") and tool_id in request.app.state.TOOLS |
| | ): |
| | if request.app.state.TOOL_CONTENTS[tool_id] == content: |
| | return request.app.state.TOOLS[tool_id], None |
| |
|
| | tool_module, frontmatter = load_tool_module_by_id(tool_id, content) |
| | else: |
| | if hasattr(request.app.state, "TOOLS") and tool_id in request.app.state.TOOLS: |
| | return request.app.state.TOOLS[tool_id], None |
| |
|
| | tool_module, frontmatter = load_tool_module_by_id(tool_id) |
| |
|
| | if not hasattr(request.app.state, "TOOLS"): |
| | request.app.state.TOOLS = {} |
| |
|
| | if not hasattr(request.app.state, "TOOL_CONTENTS"): |
| | request.app.state.TOOL_CONTENTS = {} |
| |
|
| | request.app.state.TOOLS[tool_id] = tool_module |
| | request.app.state.TOOL_CONTENTS[tool_id] = content |
| |
|
| | return tool_module, frontmatter |
| |
|
| |
|
| | def get_function_module_from_cache(request, function_id, load_from_db=True): |
| | if load_from_db: |
| | |
| | |
| | |
| |
|
| | function = Functions.get_function_by_id(function_id) |
| | if not function: |
| | raise Exception(f"Function not found: {function_id}") |
| | content = function.content |
| |
|
| | new_content = replace_imports(content) |
| | if new_content != content: |
| | content = new_content |
| | |
| | Functions.update_function_by_id(function_id, {"content": content}) |
| |
|
| | if ( |
| | hasattr(request.app.state, "FUNCTION_CONTENTS") |
| | and function_id in request.app.state.FUNCTION_CONTENTS |
| | ) and ( |
| | hasattr(request.app.state, "FUNCTIONS") |
| | and function_id in request.app.state.FUNCTIONS |
| | ): |
| | if request.app.state.FUNCTION_CONTENTS[function_id] == content: |
| | return request.app.state.FUNCTIONS[function_id], None, None |
| |
|
| | function_module, function_type, frontmatter = load_function_module_by_id( |
| | function_id, content |
| | ) |
| | else: |
| | |
| | |
| |
|
| | if ( |
| | hasattr(request.app.state, "FUNCTIONS") |
| | and function_id in request.app.state.FUNCTIONS |
| | ): |
| | return request.app.state.FUNCTIONS[function_id], None, None |
| |
|
| | function_module, function_type, frontmatter = load_function_module_by_id( |
| | function_id |
| | ) |
| |
|
| | if not hasattr(request.app.state, "FUNCTIONS"): |
| | request.app.state.FUNCTIONS = {} |
| |
|
| | if not hasattr(request.app.state, "FUNCTION_CONTENTS"): |
| | request.app.state.FUNCTION_CONTENTS = {} |
| |
|
| | request.app.state.FUNCTIONS[function_id] = function_module |
| | request.app.state.FUNCTION_CONTENTS[function_id] = content |
| |
|
| | return function_module, function_type, frontmatter |
| |
|
| |
|
| | def install_frontmatter_requirements(requirements: str): |
| | if OFFLINE_MODE: |
| | log.info("Offline mode enabled, skipping installation of requirements.") |
| | return |
| |
|
| | if requirements: |
| | try: |
| | req_list = [req.strip() for req in requirements.split(",")] |
| | log.info(f"Installing requirements: {' '.join(req_list)}") |
| | subprocess.check_call( |
| | [sys.executable, "-m", "pip", "install"] |
| | + PIP_OPTIONS |
| | + req_list |
| | + PIP_PACKAGE_INDEX_OPTIONS |
| | ) |
| | except Exception as e: |
| | log.error(f"Error installing packages: {' '.join(req_list)}") |
| | raise e |
| |
|
| | else: |
| | log.info("No requirements found in frontmatter.") |
| |
|
| |
|
| | def install_tool_and_function_dependencies(): |
| | """ |
| | Install all dependencies for all admin tools and active functions. |
| | |
| | By first collecting all dependencies from the frontmatter of each tool and function, |
| | and then installing them using pip. Duplicates or similar version specifications are |
| | handled by pip as much as possible. |
| | """ |
| | function_list = Functions.get_functions(active_only=True) |
| | tool_list = Tools.get_tools() |
| |
|
| | all_dependencies = "" |
| | try: |
| | for function in function_list: |
| | frontmatter = extract_frontmatter(replace_imports(function.content)) |
| | if dependencies := frontmatter.get("requirements"): |
| | all_dependencies += f"{dependencies}, " |
| | for tool in tool_list: |
| | |
| | if tool.user and tool.user.role == "admin": |
| | frontmatter = extract_frontmatter(replace_imports(tool.content)) |
| | if dependencies := frontmatter.get("requirements"): |
| | all_dependencies += f"{dependencies}, " |
| |
|
| | install_frontmatter_requirements(all_dependencies.strip(", ")) |
| | except Exception as e: |
| | log.error(f"Error installing requirements: {e}") |
| |
|