|
|
|
|
|
|
| from abc import abstractmethod, ABC
|
| import re
|
|
|
| from searx import settings
|
| from searx.sxng_locales import sxng_locales
|
| from searx.engines import categories, engines, engine_shortcuts
|
| from searx.external_bang import get_bang_definition_and_autocomplete
|
| from searx.search.models import EngineRef
|
| from searx.webutils import VALID_LANGUAGE_CODE
|
|
|
|
|
| class QueryPartParser(ABC):
|
|
|
| __slots__ = "raw_text_query", "enable_autocomplete"
|
|
|
| @staticmethod
|
| @abstractmethod
|
| def check(raw_value):
|
| """Check if raw_value can be parsed"""
|
|
|
| def __init__(self, raw_text_query, enable_autocomplete):
|
| self.raw_text_query = raw_text_query
|
| self.enable_autocomplete = enable_autocomplete
|
|
|
| @abstractmethod
|
| def __call__(self, raw_value):
|
| """Try to parse raw_value: set the self.raw_text_query properties
|
|
|
| return True if raw_value has been parsed
|
|
|
| self.raw_text_query.autocomplete_list is also modified
|
| if self.enable_autocomplete is True
|
| """
|
|
|
| def _add_autocomplete(self, value):
|
| if value not in self.raw_text_query.autocomplete_list:
|
| self.raw_text_query.autocomplete_list.append(value)
|
|
|
|
|
| class TimeoutParser(QueryPartParser):
|
| @staticmethod
|
| def check(raw_value):
|
| return raw_value[0] == '<'
|
|
|
| def __call__(self, raw_value):
|
| value = raw_value[1:]
|
| found = self._parse(value) if len(value) > 0 else False
|
| if self.enable_autocomplete and not value:
|
| self._autocomplete()
|
| return found
|
|
|
| def _parse(self, value):
|
| if not value.isdigit():
|
| return False
|
| raw_timeout_limit = int(value)
|
| if raw_timeout_limit < 100:
|
|
|
| self.raw_text_query.timeout_limit = float(raw_timeout_limit)
|
| else:
|
|
|
| self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0
|
| return True
|
|
|
| def _autocomplete(self):
|
| for suggestion in ['<3', '<850']:
|
| self._add_autocomplete(suggestion)
|
|
|
|
|
| class LanguageParser(QueryPartParser):
|
| @staticmethod
|
| def check(raw_value):
|
| return raw_value[0] == ':'
|
|
|
| def __call__(self, raw_value):
|
| value = raw_value[1:].lower().replace('_', '-')
|
| found = self._parse(value) if len(value) > 0 else False
|
| if self.enable_autocomplete and not found:
|
| self._autocomplete(value)
|
| return found
|
|
|
| def _parse(self, value):
|
| found = False
|
|
|
|
|
| for lc in sxng_locales:
|
| lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
|
|
|
|
|
|
|
|
|
| if (
|
| value == lang_id or value == lang_name or value == english_name or value.replace('-', ' ') == country
|
| ) and value not in self.raw_text_query.languages:
|
| found = True
|
| lang_parts = lang_id.split('-')
|
| if len(lang_parts) == 2:
|
| self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
|
| else:
|
| self.raw_text_query.languages.append(lang_id)
|
|
|
| if value == lang_id:
|
| break
|
|
|
|
|
| if VALID_LANGUAGE_CODE.match(value) or value == 'auto':
|
| lang_parts = value.split('-')
|
| if len(lang_parts) > 1:
|
| value = lang_parts[0].lower() + '-' + lang_parts[1].upper()
|
| if value not in self.raw_text_query.languages:
|
| self.raw_text_query.languages.append(value)
|
| found = True
|
|
|
| return found
|
|
|
| def _autocomplete(self, value):
|
| if not value:
|
|
|
| if len(settings['search']['languages']) < 10:
|
| for lang in settings['search']['languages']:
|
| self.raw_text_query.autocomplete_list.append(':' + lang)
|
| else:
|
| for lang in [":en", ":en_us", ":english", ":united_kingdom"]:
|
| self.raw_text_query.autocomplete_list.append(lang)
|
| return
|
|
|
| for lc in sxng_locales:
|
| if lc[0] not in settings['search']['languages']:
|
| continue
|
| lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
|
|
|
|
|
| if lang_id.startswith(value):
|
| if len(value) <= 2:
|
| self._add_autocomplete(':' + lang_id.split('-')[0])
|
| else:
|
| self._add_autocomplete(':' + lang_id)
|
|
|
|
|
| if lang_name.startswith(value) or english_name.startswith(value):
|
| self._add_autocomplete(':' + lang_name)
|
|
|
|
|
|
|
| if country.startswith(value.replace('-', ' ')):
|
| self._add_autocomplete(':' + country.replace(' ', '_'))
|
|
|
|
|
| class ExternalBangParser(QueryPartParser):
|
| @staticmethod
|
| def check(raw_value):
|
| return raw_value.startswith('!!') and len(raw_value) > 2
|
|
|
| def __call__(self, raw_value):
|
| value = raw_value[2:].lower()
|
| found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])
|
| if self.enable_autocomplete:
|
| self._autocomplete(bang_ac_list)
|
| return found
|
|
|
| def _parse(self, value):
|
| found = False
|
| bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)
|
| if bang_definition is not None:
|
| self.raw_text_query.external_bang = value
|
| found = True
|
| return found, bang_ac_list
|
|
|
| def _autocomplete(self, bang_ac_list):
|
| if not bang_ac_list:
|
| bang_ac_list = ['g', 'ddg', 'bing']
|
| for external_bang in bang_ac_list:
|
| self._add_autocomplete('!!' + external_bang)
|
|
|
|
|
| class BangParser(QueryPartParser):
|
| @staticmethod
|
| def check(raw_value):
|
|
|
| return raw_value[0] == '!' and (len(raw_value) < 2 or raw_value[1] != '!')
|
|
|
| def __call__(self, raw_value):
|
| value = raw_value[1:].replace('-', ' ').replace('_', ' ').lower()
|
| found = self._parse(value) if len(value) > 0 else False
|
| if found and raw_value[0] == '!':
|
| self.raw_text_query.specific = True
|
| if self.enable_autocomplete:
|
| self._autocomplete(raw_value[0], value)
|
| return found
|
|
|
| def _parse(self, value):
|
|
|
| if value in engine_shortcuts:
|
| value = engine_shortcuts[value]
|
|
|
|
|
| if value in engines:
|
| self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))
|
| return True
|
|
|
|
|
| if value in categories:
|
|
|
|
|
| self.raw_text_query.enginerefs.extend(
|
| EngineRef(engine.name, value)
|
| for engine in categories[value]
|
| if (engine.name, value) not in self.raw_text_query.disabled_engines
|
| )
|
| return True
|
|
|
| return False
|
|
|
| def _autocomplete(self, first_char, value):
|
| if not value:
|
|
|
| for suggestion in ['images', 'wikipedia', 'osm']:
|
| if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:
|
| self._add_autocomplete(first_char + suggestion)
|
| return
|
|
|
|
|
| for category in categories:
|
| if category.startswith(value):
|
| self._add_autocomplete(first_char + category.replace(' ', '_'))
|
|
|
|
|
| for engine in engines:
|
| if engine.startswith(value):
|
| self._add_autocomplete(first_char + engine.replace(' ', '_'))
|
|
|
|
|
| for engine_shortcut in engine_shortcuts:
|
| if engine_shortcut.startswith(value):
|
| self._add_autocomplete(first_char + engine_shortcut)
|
|
|
|
|
| class FeelingLuckyParser(QueryPartParser):
|
| @staticmethod
|
| def check(raw_value):
|
| return raw_value == '!!'
|
|
|
| def __call__(self, raw_value):
|
| self.raw_text_query.redirect_to_first_result = True
|
| return True
|
|
|
|
|
| class RawTextQuery:
|
| """parse raw text query (the value from the html input)"""
|
|
|
| PARSER_CLASSES = [
|
| TimeoutParser,
|
| LanguageParser,
|
| ExternalBangParser,
|
| BangParser,
|
| FeelingLuckyParser,
|
| ]
|
|
|
| def __init__(self, query: str, disabled_engines: list):
|
| assert isinstance(query, str)
|
|
|
| self.query = query
|
| self.disabled_engines = disabled_engines if disabled_engines else []
|
|
|
| self.enginerefs = []
|
| self.languages = []
|
| self.timeout_limit = None
|
| self.external_bang = None
|
| self.specific = False
|
| self.autocomplete_list = []
|
|
|
| self.query_parts = []
|
| self.user_query_parts = []
|
| self.autocomplete_location = None
|
| self.redirect_to_first_result = False
|
| self._parse_query()
|
|
|
| def _parse_query(self):
|
| """
|
| parse self.query, if tags are set, which
|
| change the search engine or search-language
|
| """
|
|
|
|
|
| raw_query_parts = re.split(r'(\s+)', self.query)
|
|
|
| last_index_location = None
|
| autocomplete_index = len(raw_query_parts) - 1
|
|
|
| for i, query_part in enumerate(raw_query_parts):
|
|
|
| if query_part.isspace() or query_part == '':
|
| continue
|
|
|
|
|
| special_part = False
|
| for parser_class in RawTextQuery.PARSER_CLASSES:
|
| if parser_class.check(query_part):
|
| special_part = parser_class(self, i == autocomplete_index)(query_part)
|
| break
|
|
|
|
|
| qlist = self.query_parts if special_part else self.user_query_parts
|
| qlist.append(query_part)
|
| last_index_location = (qlist, len(qlist) - 1)
|
|
|
| self.autocomplete_location = last_index_location
|
|
|
| def get_autocomplete_full_query(self, text):
|
| qlist, position = self.autocomplete_location
|
| qlist[position] = text
|
| return self.getFullQuery()
|
|
|
| def changeQuery(self, query):
|
| self.user_query_parts = query.strip().split()
|
| self.query = self.getFullQuery()
|
| self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)
|
| self.autocomplete_list = []
|
| return self
|
|
|
| def getQuery(self):
|
| return ' '.join(self.user_query_parts)
|
|
|
| def getFullQuery(self):
|
| """
|
| get full query including whitespaces
|
| """
|
| return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()
|
|
|
| def __str__(self):
|
| return self.getFullQuery()
|
|
|
| def __repr__(self):
|
| return (
|
| f"<{self.__class__.__name__} "
|
| + f"query={self.query!r} "
|
| + f"disabled_engines={self.disabled_engines!r}\n "
|
| + f"languages={self.languages!r} "
|
| + f"timeout_limit={self.timeout_limit!r} "
|
| + f"external_bang={self.external_bang!r} "
|
| + f"specific={self.specific!r} "
|
| + f"enginerefs={self.enginerefs!r}\n "
|
| + f"autocomplete_list={self.autocomplete_list!r}\n "
|
| + f"query_parts={self.query_parts!r}\n "
|
| + f"user_query_parts={self.user_query_parts!r} >\n"
|
| + f"redirect_to_first_result={self.redirect_to_first_result!r}"
|
| )
|
|
|