| from __future__ import annotations |
|
|
| import logging |
| import os |
| import re |
| import subprocess |
| import sys |
| import tempfile |
| import types |
| from importlib import util |
| from typing import Any |
|
|
| from open_webui.env import ( |
| ENABLE_PIP_INSTALL_FRONTMATTER_REQUIREMENTS, |
| OFFLINE_MODE, |
| PIP_OPTIONS, |
| PIP_PACKAGE_INDEX_OPTIONS, |
| ) |
| from open_webui.models.functions import FunctionModel, Functions |
| from open_webui.models.tools import Tools |
|
|
| log = logging.getLogger(__name__) |
|
|
|
|
| def resolve_valves_schema_options(valves_class: type, schema: dict, user: Any = None) -> dict: |
| """ |
| Resolve dynamic options in a Valves schema. |
| |
| For properties with `input.options`, this function handles two cases: |
| - List: Used directly as dropdown options |
| - String: Treated as method name, called to get options dynamically |
| |
| Usage in Valves: |
| class UserValves(BaseModel): |
| # Static options |
| priority: str = Field( |
| default="medium", |
| json_schema_extra={ |
| "input": { |
| "type": "select", |
| "options": ["low", "medium", "high"] |
| } |
| } |
| ) |
| |
| # Dynamic options (method name) |
| model: str = Field( |
| default="", |
| json_schema_extra={ |
| "input": { |
| "type": "select", |
| "options": "get_model_options" |
| } |
| } |
| ) |
| |
| @classmethod |
| def get_model_options(cls, __user__=None) -> list[dict]: |
| return [{"value": "gpt-4", "label": "GPT-4"}] |
| |
| Args: |
| valves_class: The Valves or UserValves Pydantic model class |
| schema: The JSON schema dict from valves_class.schema() |
| user: Optional user object passed to methods that accept __user__ |
| |
| Returns: |
| Modified schema dict with resolved options |
| """ |
| if not schema or 'properties' not in schema: |
| return schema |
|
|
| |
| schema = dict(schema) |
| schema['properties'] = dict(schema.get('properties', {})) |
|
|
| for prop_name, prop_schema in list(schema['properties'].items()): |
| |
| if not hasattr(valves_class, 'model_fields'): |
| continue |
|
|
| field_info = valves_class.model_fields.get(prop_name) |
| if not field_info: |
| continue |
|
|
| |
| json_schema_extra = field_info.json_schema_extra |
| if not json_schema_extra or not isinstance(json_schema_extra, dict): |
| continue |
|
|
| input_config = json_schema_extra.get('input') |
| if not input_config or not isinstance(input_config, dict): |
| continue |
|
|
| options = input_config.get('options') |
| if options is None: |
| continue |
|
|
| resolved_options = None |
|
|
| |
| if isinstance(options, list): |
| resolved_options = options |
|
|
| |
| elif isinstance(options, str) and options: |
| method = getattr(valves_class, options, None) |
| if method is None or not callable(method): |
| log.warning(f"options '{options}' not found or not callable on {valves_class.__name__}") |
| continue |
|
|
| try: |
| import inspect |
|
|
| sig = inspect.signature(method) |
| params = sig.parameters |
|
|
| |
| kwargs = {} |
| if '__user__' in params and user is not None: |
| kwargs['__user__'] = user.model_dump() if hasattr(user, 'model_dump') else user |
| if 'user' in params and user is not None: |
| kwargs['user'] = user.model_dump() if hasattr(user, 'model_dump') else user |
|
|
| resolved_options = method(**kwargs) if kwargs else method() |
|
|
| |
| if not isinstance(resolved_options, list): |
| log.warning(f"Method '{options}' did not return a list for {prop_name}") |
| continue |
|
|
| except Exception as e: |
| log.warning(f'Failed to resolve options for {prop_name}: {e}') |
| continue |
| else: |
| |
| continue |
|
|
| |
| schema['properties'][prop_name] = dict(prop_schema) |
| if 'input' not in schema['properties'][prop_name]: |
| schema['properties'][prop_name]['input'] = {'type': 'select'} |
| else: |
| schema['properties'][prop_name]['input'] = dict(schema['properties'][prop_name].get('input', {})) |
| schema['properties'][prop_name]['input']['options'] = resolved_options |
|
|
| return schema |
|
|
|
|
| def extract_frontmatter(content): |
| """ |
| Extract frontmatter as a dictionary from the provided content string. |
| """ |
| frontmatter = {} |
| frontmatter_started = False |
| frontmatter_ended = False |
| frontmatter_pattern = re.compile(r'^\s*([a-z_]+):\s*(.*)\s*$', re.IGNORECASE) |
|
|
| try: |
| lines = content.splitlines() |
| if len(lines) < 1 or lines[0].strip() != '"""': |
| |
| return {} |
|
|
| frontmatter_started = True |
|
|
| for line in lines[1:]: |
| if '"""' in line: |
| if frontmatter_started: |
| frontmatter_ended = True |
| break |
|
|
| if frontmatter_started and not frontmatter_ended: |
| match = frontmatter_pattern.match(line) |
| if match: |
| key, value = match.groups() |
| frontmatter[key.strip()] = value.strip() |
|
|
| except Exception as e: |
| log.exception(f'Failed to extract frontmatter: {e}') |
| return {} |
|
|
| return frontmatter |
|
|
|
|
| def replace_imports(content): |
| """ |
| Replace the import paths in the content. |
| """ |
| replacements = { |
| 'from utils': 'from open_webui.utils', |
| 'from apps': 'from open_webui.apps', |
| 'from main': 'from open_webui.main', |
| 'from config': 'from open_webui.config', |
| } |
|
|
| for old, new in replacements.items(): |
| content = content.replace(old, new) |
|
|
| return content |
|
|
|
|
| |
| |
| async def load_tool_module_by_id(tool_id, content=None): |
| if content is None: |
| tool = await Tools.get_tool_by_id(tool_id) |
| if not tool: |
| raise Exception(f'Toolkit not found: {tool_id}') |
|
|
| content = tool.content |
|
|
| content = replace_imports(content) |
| await Tools.update_tool_by_id(tool_id, {'content': content}) |
| else: |
| frontmatter = extract_frontmatter(content) |
| |
| install_frontmatter_requirements(frontmatter.get('requirements', '')) |
|
|
| module_name = f'tool_{tool_id}' |
| module = types.ModuleType(module_name) |
| sys.modules[module_name] = module |
|
|
| |
| |
| temp_file = tempfile.NamedTemporaryFile(delete=False) |
| temp_file.close() |
| try: |
| with open(temp_file.name, 'w', encoding='utf-8') as f: |
| f.write(content) |
| module.__dict__['__file__'] = temp_file.name |
|
|
| |
| exec(content, module.__dict__) |
| frontmatter = extract_frontmatter(content) |
| log.info(f'Loaded module: {module.__name__}') |
|
|
| |
| if hasattr(module, 'Tools'): |
| return module.Tools(), frontmatter |
| else: |
| raise Exception('No Tools class found in the module') |
| except Exception as e: |
| log.error(f'Error loading module: {tool_id}: {e}') |
| del sys.modules[module_name] |
| raise e |
| finally: |
| os.unlink(temp_file.name) |
|
|
|
|
| async def load_function_module_by_id(function_id: str, content: str | None = None): |
| if content is None: |
| function = await Functions.get_function_by_id(function_id) |
| if not function: |
| raise Exception(f'Function not found: {function_id}') |
| content = function.content |
|
|
| content = replace_imports(content) |
| await Functions.update_function_by_id(function_id, {'content': content}) |
| else: |
| frontmatter = extract_frontmatter(content) |
| install_frontmatter_requirements(frontmatter.get('requirements', '')) |
|
|
| module_name = f'function_{function_id}' |
| module = types.ModuleType(module_name) |
| sys.modules[module_name] = module |
|
|
| |
| |
| temp_file = tempfile.NamedTemporaryFile(delete=False) |
| temp_file.close() |
| try: |
| with open(temp_file.name, 'w', encoding='utf-8') as f: |
| f.write(content) |
| module.__dict__['__file__'] = temp_file.name |
|
|
| |
| exec(content, module.__dict__) |
| frontmatter = extract_frontmatter(content) |
| log.info(f'Loaded module: {module.__name__}') |
|
|
| |
| if hasattr(module, 'Pipe'): |
| return module.Pipe(), 'pipe', frontmatter |
| elif hasattr(module, 'Filter'): |
| return module.Filter(), 'filter', frontmatter |
| elif hasattr(module, 'Action'): |
| return module.Action(), 'action', frontmatter |
| else: |
| raise Exception('No Function class found in the module') |
| except Exception as e: |
| log.error(f'Error loading module: {function_id}: {e}') |
| |
| del sys.modules[module_name] |
|
|
| await Functions.update_function_by_id(function_id, {'is_active': False}) |
| raise e |
| finally: |
| os.unlink(temp_file.name) |
|
|
|
|
| async def get_tool_module_from_cache(request, tool_id, load_from_db=True): |
| if load_from_db: |
| |
| tool = await Tools.get_tool_by_id(tool_id) |
| if not tool: |
| raise Exception(f'Tool not found: {tool_id}') |
| content = tool.content |
|
|
| new_content = replace_imports(content) |
| if new_content != content: |
| content = new_content |
| |
| await Tools.update_tool_by_id(tool_id, {'content': content}) |
|
|
| if (hasattr(request.app.state, 'TOOL_CONTENTS') and tool_id in request.app.state.TOOL_CONTENTS) and ( |
| hasattr(request.app.state, 'TOOLS') and tool_id in request.app.state.TOOLS |
| ): |
| if request.app.state.TOOL_CONTENTS[tool_id] == content: |
| return request.app.state.TOOLS[tool_id], None |
|
|
| tool_module, frontmatter = await load_tool_module_by_id(tool_id, content) |
| else: |
| if hasattr(request.app.state, 'TOOLS') and tool_id in request.app.state.TOOLS: |
| return request.app.state.TOOLS[tool_id], None |
|
|
| tool_module, frontmatter = await load_tool_module_by_id(tool_id) |
|
|
| if not hasattr(request.app.state, 'TOOLS'): |
| request.app.state.TOOLS = {} |
|
|
| if not hasattr(request.app.state, 'TOOL_CONTENTS'): |
| request.app.state.TOOL_CONTENTS = {} |
|
|
| request.app.state.TOOLS[tool_id] = tool_module |
| request.app.state.TOOL_CONTENTS[tool_id] = content |
|
|
| return tool_module, frontmatter |
|
|
|
|
| async def get_function_module_from_cache( |
| request, function_id, function: FunctionModel | None = None, load_from_db=True |
| ): |
| if load_from_db: |
| |
| |
| |
|
|
| if function is None: |
| function = await Functions.get_function_by_id(function_id) |
| if not function: |
| raise Exception(f'Function not found: {function_id}') |
| content = function.content |
|
|
| new_content = replace_imports(content) |
| if new_content != content: |
| content = new_content |
| |
| await Functions.update_function_by_id(function_id, {'content': content}) |
|
|
| if ( |
| hasattr(request.app.state, 'FUNCTION_CONTENTS') and function_id in request.app.state.FUNCTION_CONTENTS |
| ) and (hasattr(request.app.state, 'FUNCTIONS') and function_id in request.app.state.FUNCTIONS): |
| if request.app.state.FUNCTION_CONTENTS[function_id] == content: |
| return request.app.state.FUNCTIONS[function_id], None, None |
|
|
| function_module, function_type, frontmatter = await load_function_module_by_id(function_id, content) |
| else: |
| |
| |
|
|
| if hasattr(request.app.state, 'FUNCTIONS') and function_id in request.app.state.FUNCTIONS: |
| return request.app.state.FUNCTIONS[function_id], None, None |
|
|
| function_module, function_type, frontmatter = await load_function_module_by_id(function_id) |
|
|
| if not hasattr(request.app.state, 'FUNCTIONS'): |
| request.app.state.FUNCTIONS = {} |
|
|
| if not hasattr(request.app.state, 'FUNCTION_CONTENTS'): |
| request.app.state.FUNCTION_CONTENTS = {} |
|
|
| request.app.state.FUNCTIONS[function_id] = function_module |
| request.app.state.FUNCTION_CONTENTS[function_id] = content |
|
|
| return function_module, function_type, frontmatter |
|
|
|
|
| _installed_requirements = set() |
|
|
|
|
| def install_frontmatter_requirements(requirements: str): |
| global _installed_requirements |
| if not ENABLE_PIP_INSTALL_FRONTMATTER_REQUIREMENTS: |
| log.info('ENABLE_PIP_INSTALL_FRONTMATTER_REQUIREMENTS is disabled, skipping installation of requirements.') |
| return |
|
|
| if OFFLINE_MODE: |
| log.info('Offline mode enabled, skipping installation of requirements.') |
| return |
|
|
| if requirements: |
| try: |
| req_list = [req.strip() for req in requirements.split(',')] |
| new_reqs = [req for req in req_list if req and req not in _installed_requirements] |
|
|
| if not new_reqs: |
| return |
|
|
| log.info(f'Installing requirements: {" ".join(new_reqs)}') |
| subprocess.check_call( |
| [sys.executable, '-m', 'pip', 'install'] + PIP_OPTIONS + new_reqs + PIP_PACKAGE_INDEX_OPTIONS |
| ) |
| _installed_requirements.update(new_reqs) |
| except Exception as e: |
| log.error(f'Error installing packages: {" ".join(new_reqs)}') |
| raise e |
|
|
| else: |
| log.info('No requirements found in frontmatter.') |
|
|
|
|
| async def install_tool_and_function_dependencies(): |
| """ |
| Install all dependencies for all admin tools and active functions. |
| |
| By first collecting all dependencies from the frontmatter of each tool and function, |
| and then installing them using pip. Duplicates or similar version specifications are |
| handled by pip as much as possible. |
| """ |
| function_list = await Functions.get_functions(active_only=True) |
| tool_list = await Tools.get_tools() |
|
|
| all_dependencies = '' |
| try: |
| for function in function_list: |
| frontmatter = extract_frontmatter(replace_imports(function.content)) |
| if dependencies := frontmatter.get('requirements'): |
| all_dependencies += f'{dependencies}, ' |
| for tool in tool_list: |
| |
| if tool.user and tool.user.role == 'admin': |
| frontmatter = extract_frontmatter(replace_imports(tool.content)) |
| if dependencies := frontmatter.get('requirements'): |
| all_dependencies += f'{dependencies}, ' |
|
|
| install_frontmatter_requirements(all_dependencies.strip(', ')) |
| except Exception as e: |
| log.error(f'Error installing requirements: {e}') |
|
|