File size: 7,644 Bytes
37c87e8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | # SPDX-License-Identifier: AGPL-3.0-or-later
"""With *command engines* administrators can run engines to integrate arbitrary
shell commands.
.. attention::
When creating and enabling a ``command`` engine on a public instance, you
must be careful to avoid leaking private data.
The easiest solution is to limit the access by setting ``tokens`` as described
in section :ref:`private engines`. The engine base is flexible. Only your
imagination can limit the power of this engine (and maybe security concerns).
Configuration
=============
The following options are available:
``command``:
A comma separated list of the elements of the command. A special token
``{{QUERY}}`` tells where to put the search terms of the user. Example:
.. code:: yaml
['ls', '-l', '-h', '{{QUERY}}']
``delimiter``:
A mapping containing a delimiter ``char`` and the *titles* of each element in
``keys``.
``parse_regex``:
A dict containing the regular expressions for each result key.
``query_type``:
The expected type of user search terms. Possible values: ``path`` and
``enum``.
``path``:
Checks if the user provided path is inside the working directory. If not,
the query is not executed.
``enum``:
Is a list of allowed search terms. If the user submits something which is
not included in the list, the query returns an error.
``query_enum``:
A list containing allowed search terms if ``query_type`` is set to ``enum``.
``working_dir``:
The directory where the command has to be executed. Default: ``./``.
``result_separator``:
The character that separates results. Default: ``\\n``.
Example
=======
The example engine below can be used to find files with a specific name in the
configured working directory:
.. code:: yaml
- name: find
engine: command
command: ['find', '.', '-name', '{{QUERY}}']
query_type: path
shortcut: fnd
delimiter:
chars: ' '
keys: ['line']
Implementations
===============
"""
import re
from os.path import expanduser, isabs, realpath, commonprefix
from shlex import split as shlex_split
from subprocess import Popen, PIPE
from threading import Thread
from searx import logger
from searx.result_types import EngineResults
engine_type = 'offline'
paging = True
command = []
delimiter = {}
parse_regex = {}
query_type = ''
query_enum = []
environment_variables = {}
working_dir = realpath('.')
result_separator = '\n'
timeout = 4.0
_command_logger = logger.getChild('command')
_compiled_parse_regex = {}
def init(engine_settings):
check_parsing_options(engine_settings)
if 'command' not in engine_settings:
raise ValueError('engine command : missing configuration key: command')
global command, working_dir, delimiter, parse_regex, environment_variables # pylint: disable=global-statement
command = engine_settings['command']
if 'working_dir' in engine_settings:
working_dir = engine_settings['working_dir']
if not isabs(engine_settings['working_dir']):
working_dir = realpath(working_dir)
if 'parse_regex' in engine_settings:
parse_regex = engine_settings['parse_regex']
for result_key, regex in parse_regex.items():
_compiled_parse_regex[result_key] = re.compile(regex, flags=re.MULTILINE)
if 'delimiter' in engine_settings:
delimiter = engine_settings['delimiter']
if 'environment_variables' in engine_settings:
environment_variables = engine_settings['environment_variables']
def search(query, params) -> EngineResults:
res = EngineResults()
cmd = _get_command_to_run(query)
if not cmd:
return res
reader_thread = Thread(target=_get_results_from_process, args=(res, cmd, params['pageno']))
reader_thread.start()
reader_thread.join(timeout=timeout)
return res
def _get_command_to_run(query):
params = shlex_split(query)
__check_query_params(params)
cmd = []
for c in command:
if c == '{{QUERY}}':
cmd.extend(params)
else:
cmd.append(c)
return cmd
def _get_results_from_process(res: EngineResults, cmd, pageno):
leftover = ''
count = 0
start, end = __get_results_limits(pageno)
with Popen(cmd, stdout=PIPE, stderr=PIPE, env=environment_variables) as process:
line = process.stdout.readline()
while line:
buf = leftover + line.decode('utf-8')
raw_results = buf.split(result_separator)
if raw_results[-1]:
leftover = raw_results[-1]
raw_results = raw_results[:-1]
for raw_result in raw_results:
result = __parse_single_result(raw_result)
if result is None:
_command_logger.debug('skipped result:', raw_result)
continue
if start <= count and count <= end: # pylint: disable=chained-comparison
res.add(res.types.KeyValue(kvmap=result))
count += 1
if end < count:
return res
line = process.stdout.readline()
return_code = process.wait(timeout=timeout)
if return_code != 0:
raise RuntimeError('non-zero return code when running command', cmd, return_code)
return None
def __get_results_limits(pageno):
start = (pageno - 1) * 10
end = start + 9
return start, end
def __check_query_params(params):
if not query_type:
return
if query_type == 'path':
query_path = params[-1]
query_path = expanduser(query_path)
if commonprefix([realpath(query_path), working_dir]) != working_dir:
raise ValueError('requested path is outside of configured working directory')
elif query_type == 'enum' and len(query_enum) > 0:
for param in params:
if param not in query_enum:
raise ValueError('submitted query params is not allowed', param, 'allowed params:', query_enum)
def check_parsing_options(engine_settings):
"""Checks if delimiter based parsing or regex parsing is configured correctly"""
if 'delimiter' not in engine_settings and 'parse_regex' not in engine_settings:
raise ValueError('failed to init settings for parsing lines: missing delimiter or parse_regex')
if 'delimiter' in engine_settings and 'parse_regex' in engine_settings:
raise ValueError('failed to init settings for parsing lines: too many settings')
if 'delimiter' in engine_settings:
if 'chars' not in engine_settings['delimiter'] or 'keys' not in engine_settings['delimiter']:
raise ValueError
def __parse_single_result(raw_result):
"""Parses command line output based on configuration"""
result = {}
if delimiter:
elements = raw_result.split(delimiter['chars'], maxsplit=len(delimiter['keys']) - 1)
if len(elements) != len(delimiter['keys']):
return {}
for i in range(len(elements)): # pylint: disable=consider-using-enumerate
result[delimiter['keys'][i]] = elements[i]
if parse_regex:
for result_key, regex in _compiled_parse_regex.items():
found = regex.search(raw_result)
if not found:
return {}
result[result_key] = raw_result[found.start() : found.end()]
return result
|