File size: 4,788 Bytes
cfb0fa4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | import base64
import io
import json
import logging
import os
from typing import Optional, List
import requests
from fastapi import Request
from open_webui.retrieval.web.main import SearchResult, get_filtered_results
from open_webui.utils.headers import include_user_info_headers
from open_webui.env import FORWARD_SESSION_INFO_HEADER_CHAT_ID
from xml.etree import ElementTree as ET
from xml.etree.ElementTree import Element
log = logging.getLogger(__name__)
def xml_element_contents_to_string(element: Element) -> str:
buffer = [element.text if element.text else ""]
for child in element:
buffer.append(xml_element_contents_to_string(child))
buffer.append(element.tail if element.tail else "")
return "".join(buffer)
def search_yandex(
request: Request,
yandex_search_url: str,
yandex_search_api_key: str,
yandex_search_config: str,
query: str,
count: int,
filter_list: Optional[List[str]] = None,
user=None,
) -> List[SearchResult]:
try:
headers = {
"User-Agent": "Open WebUI (https://github.com/open-webui/open-webui) RAG Bot",
"Authorization": f"Api-Key {yandex_search_api_key}",
}
if user is not None:
headers = include_user_info_headers(headers, user)
chat_id = getattr(request.state, "chat_id", None)
if chat_id:
headers[FORWARD_SESSION_INFO_HEADER_CHAT_ID] = str(chat_id)
payload = {} if yandex_search_config == "" else json.loads(yandex_search_config)
if type(payload.get("query", None)) != dict:
payload["query"] = {}
if "searchType" not in payload["query"]:
payload["query"]["searchType"] = "SEARCH_TYPE_RU"
payload["query"]["queryText"] = query
if type(payload.get("groupSpec", None)) != dict:
payload["groupSpec"] = {}
if "groupMode" not in payload["groupSpec"]:
payload["groupSpec"]["groupMode"] = "GROUP_MODE_DEEP"
payload["groupSpec"]["groupsOnPage"] = count
payload["groupSpec"]["docsInGroup"] = 1
response = requests.post(
(
"https://searchapi.api.cloud.yandex.net/v2/web/search"
if yandex_search_url == ""
else yandex_search_url
),
headers=headers,
json=payload,
)
response.raise_for_status()
response_body = response.json()
if "rawData" not in response_body:
raise Exception(f"No `rawData` in response body: {response_body}")
search_result_body_bytes = base64.decodebytes(
bytes(response_body["rawData"], "utf-8")
)
doc_root = ET.parse(io.BytesIO(search_result_body_bytes))
results = []
for group in doc_root.findall("response/results/grouping/group"):
results.append(
{
"url": xml_element_contents_to_string(group.find("doc/url")).strip(
"\n"
),
"title": xml_element_contents_to_string(
group.find("doc/title")
).strip("\n"),
"snippet": xml_element_contents_to_string(
group.find("doc/passages/passage")
),
}
)
results = get_filtered_results(results, filter_list)
results = [
SearchResult(
link=result.get("url"),
title=result.get("title"),
snippet=result.get("snippet"),
)
for result in results[:count]
]
log.info(f"Yandex search results: {results}")
return results
except Exception as e:
log.error(f"Error in search: {e}")
return []
if __name__ == "__main__":
from starlette.datastructures import Headers
from fastapi import FastAPI
result = search_yandex(
Request(
{
"type": "http",
"asgi.version": "3.0",
"asgi.spec_version": "2.0",
"method": "GET",
"path": "/internal",
"query_string": b"",
"headers": Headers({}).raw,
"client": ("127.0.0.1", 12345),
"server": ("127.0.0.1", 80),
"scheme": "http",
"app": FastAPI(),
},
None,
),
os.environ.get("YANDEX_WEB_SEARCH_URL", ""),
os.environ.get("YANDEX_WEB_SEARCH_API_KEY", ""),
os.environ.get(
"YANDEX_WEB_SEARCH_CONFIG", '{"query": {"searchType": "SEARCH_TYPE_COM"}}'
),
"TOP movies of the past year",
3,
)
print(result)
|