| import base64 | |
| import io | |
| import json | |
| import logging | |
| import os | |
| from typing import Optional, List | |
| import requests | |
| from fastapi import Request | |
| from open_webui.retrieval.web.main import SearchResult, get_filtered_results | |
| from open_webui.utils.headers import include_user_info_headers | |
| from open_webui.env import FORWARD_SESSION_INFO_HEADER_CHAT_ID | |
| from xml.etree import ElementTree as ET | |
| from xml.etree.ElementTree import Element | |
| log = logging.getLogger(__name__) | |
| def xml_element_contents_to_string(element: Element) -> str: | |
| buffer = [element.text if element.text else ""] | |
| for child in element: | |
| buffer.append(xml_element_contents_to_string(child)) | |
| buffer.append(element.tail if element.tail else "") | |
| return "".join(buffer) | |
| def search_yandex( | |
| request: Request, | |
| yandex_search_url: str, | |
| yandex_search_api_key: str, | |
| yandex_search_config: str, | |
| query: str, | |
| count: int, | |
| filter_list: Optional[List[str]] = None, | |
| user=None, | |
| ) -> List[SearchResult]: | |
| try: | |
| headers = { | |
| "User-Agent": "Open WebUI (https://github.com/open-webui/open-webui) RAG Bot", | |
| "Authorization": f"Api-Key {yandex_search_api_key}", | |
| } | |
| if user is not None: | |
| headers = include_user_info_headers(headers, user) | |
| chat_id = getattr(request.state, "chat_id", None) | |
| if chat_id: | |
| headers[FORWARD_SESSION_INFO_HEADER_CHAT_ID] = str(chat_id) | |
| payload = {} if yandex_search_config == "" else json.loads(yandex_search_config) | |
| if type(payload.get("query", None)) != dict: | |
| payload["query"] = {} | |
| if "searchType" not in payload["query"]: | |
| payload["query"]["searchType"] = "SEARCH_TYPE_RU" | |
| payload["query"]["queryText"] = query | |
| if type(payload.get("groupSpec", None)) != dict: | |
| payload["groupSpec"] = {} | |
| if "groupMode" not in payload["groupSpec"]: | |
| payload["groupSpec"]["groupMode"] = "GROUP_MODE_DEEP" | |
| payload["groupSpec"]["groupsOnPage"] = count | |
| payload["groupSpec"]["docsInGroup"] = 1 | |
| response = requests.post( | |
| ( | |
| "https://searchapi.api.cloud.yandex.net/v2/web/search" | |
| if yandex_search_url == "" | |
| else yandex_search_url | |
| ), | |
| headers=headers, | |
| json=payload, | |
| ) | |
| response.raise_for_status() | |
| response_body = response.json() | |
| if "rawData" not in response_body: | |
| raise Exception(f"No `rawData` in response body: {response_body}") | |
| search_result_body_bytes = base64.decodebytes( | |
| bytes(response_body["rawData"], "utf-8") | |
| ) | |
| doc_root = ET.parse(io.BytesIO(search_result_body_bytes)) | |
| results = [] | |
| for group in doc_root.findall("response/results/grouping/group"): | |
| results.append( | |
| { | |
| "url": xml_element_contents_to_string(group.find("doc/url")).strip( | |
| "\n" | |
| ), | |
| "title": xml_element_contents_to_string( | |
| group.find("doc/title") | |
| ).strip("\n"), | |
| "snippet": xml_element_contents_to_string( | |
| group.find("doc/passages/passage") | |
| ), | |
| } | |
| ) | |
| results = get_filtered_results(results, filter_list) | |
| results = [ | |
| SearchResult( | |
| link=result.get("url"), | |
| title=result.get("title"), | |
| snippet=result.get("snippet"), | |
| ) | |
| for result in results[:count] | |
| ] | |
| log.info(f"Yandex search results: {results}") | |
| return results | |
| except Exception as e: | |
| log.error(f"Error in search: {e}") | |
| return [] | |
| if __name__ == "__main__": | |
| from starlette.datastructures import Headers | |
| from fastapi import FastAPI | |
| result = search_yandex( | |
| Request( | |
| { | |
| "type": "http", | |
| "asgi.version": "3.0", | |
| "asgi.spec_version": "2.0", | |
| "method": "GET", | |
| "path": "/internal", | |
| "query_string": b"", | |
| "headers": Headers({}).raw, | |
| "client": ("127.0.0.1", 12345), | |
| "server": ("127.0.0.1", 80), | |
| "scheme": "http", | |
| "app": FastAPI(), | |
| }, | |
| None, | |
| ), | |
| os.environ.get("YANDEX_WEB_SEARCH_URL", ""), | |
| os.environ.get("YANDEX_WEB_SEARCH_API_KEY", ""), | |
| os.environ.get( | |
| "YANDEX_WEB_SEARCH_CONFIG", '{"query": {"searchType": "SEARCH_TYPE_COM"}}' | |
| ), | |
| "TOP movies of the past year", | |
| 3, | |
| ) | |
| print(result) | |