| import urllib |
|
|
| import requests |
| from requests.exceptions import JSONDecodeError |
| from duckduckgo_search import DDGS |
| from bs4 import BeautifulSoup |
| from langchain.schema import Document |
|
|
| from .langchain_websearch import docs_to_pretty_str, LangchainCompressor |
|
|
|
|
| class Generator: |
| """Allows a generator method to return a final value after finishing |
| the generation. Credit: https://stackoverflow.com/a/34073559""" |
| def __init__(self, gen): |
| self.gen = gen |
|
|
| def __iter__(self): |
| self.value = yield from self.gen |
| return self.value |
|
|
|
|
| def dict_list_to_pretty_str(data: list[dict]) -> str: |
| ret_str = "" |
| if isinstance(data, dict): |
| data = [data] |
| if isinstance(data, list): |
| for i, d in enumerate(data): |
| ret_str += f"Result {i+1}\n" |
| ret_str += f"Title: {d['title']}\n" |
| ret_str += f"{d['body']}\n" |
| ret_str += f"Source URL: {d['href']}\n" |
| return ret_str |
| else: |
| raise ValueError("Input must be dict or list[dict]") |
|
|
|
|
| def search_duckduckgo(query: str, max_results: int, instant_answers: bool = True, |
| regular_search_queries: bool = True, get_website_content: bool = False) -> list[dict]: |
| query = query.strip("\"'") |
| with DDGS() as ddgs: |
| if instant_answers: |
| answer_list = ddgs.answers(query) |
| else: |
| answer_list = None |
| if answer_list: |
| answer_dict = answer_list[0] |
| answer_dict["title"] = query |
| answer_dict["body"] = answer_dict["text"] |
| answer_dict["href"] = answer_dict["url"] |
| answer_dict.pop('icon', None) |
| answer_dict.pop('topic', None) |
| answer_dict.pop('text', None) |
| answer_dict.pop('url', None) |
| return [answer_dict] |
| elif regular_search_queries: |
| results = [] |
| for result in ddgs.text(query, region='wt-wt', safesearch='moderate', |
| timelimit=None, max_results=max_results): |
| if get_website_content: |
| result["body"] = get_webpage_content(result["href"]) |
| results.append(result) |
| return results |
| else: |
| raise ValueError("One of ('instant_answers', 'regular_search_queries') must be True") |
|
|
|
|
| def langchain_search_duckduckgo(query: str, langchain_compressor: LangchainCompressor, max_results: int, |
| instant_answers: bool): |
| documents = [] |
| query = query.strip("\"'") |
| yield f'Getting results from DuckDuckGo...' |
| with DDGS() as ddgs: |
| if instant_answers: |
| answer_list = ddgs.answers(query) |
| if answer_list: |
| if max_results > 1: |
| max_results -= 1 |
| answer_dict = answer_list[0] |
| instant_answer_doc = Document(page_content=answer_dict["text"], |
| metadata={"source": answer_dict["url"]}) |
| documents.append(instant_answer_doc) |
|
|
| results = [] |
| result_urls = [] |
| for result in ddgs.text(query, region='wt-wt', safesearch='moderate', timelimit=None, |
| max_results=langchain_compressor.num_results): |
| results.append(result) |
| result_urls.append(result["href"]) |
| retrieval_gen = Generator(langchain_compressor.retrieve_documents(query, result_urls)) |
| for status_message in retrieval_gen: |
| yield status_message |
| documents.extend(retrieval_gen.value) |
| if not documents: |
| print("LLM_Web_search | Could not find any page content " |
| "similar enough to be extracted, using basic search fallback...") |
| return dict_list_to_pretty_str(results[:max_results]) |
| return docs_to_pretty_str(documents[:max_results]) |
|
|
|
|
| def langchain_search_searxng(query: str, url: str, langchain_compressor: LangchainCompressor, max_results: int): |
| yield f'Getting results from Searxng...' |
| headers = {"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", |
| "Accept-Language": "en-US,en;q=0.5"} |
| result_urls = [] |
| request_str = f"/search?q={urllib.parse.quote(query)}&format=json&pageno=" |
| pageno = 1 |
| while len(result_urls) < langchain_compressor.num_results: |
| response = requests.get(url + request_str + str(pageno), headers=headers) |
| if not result_urls: |
| response.raise_for_status() |
| try: |
| response_dict = response.json() |
| except JSONDecodeError: |
| raise ValueError("JSONDecodeError: Please ensure that the SearXNG instance can return data in JSON format") |
| result_dicts = response_dict["results"] |
| if not result_dicts: |
| break |
| for result in result_dicts: |
| result_urls.append(result["url"]) |
| pageno += 1 |
| retrieval_gen = Generator(langchain_compressor.retrieve_documents(query, result_urls)) |
| for status_message in retrieval_gen: |
| yield status_message |
| documents = retrieval_gen.value |
| return docs_to_pretty_str(documents[:max_results]) |
|
|
|
|
| def get_webpage_content(url: str) -> str: |
| headers = {"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", |
| "Accept-Language": "en-US,en;q=0.5"} |
| if not url.startswith("https://"): |
| try: |
| response = requests.get(f"https://{url}", headers=headers) |
| except: |
| response = requests.get(url, headers=headers) |
| else: |
| response = requests.get(url, headers=headers) |
|
|
| soup = BeautifulSoup(response.content, features="lxml") |
| for script in soup(["script", "style"]): |
| script.extract() |
|
|
| strings = soup.stripped_strings |
| return '\n'.join([s.strip() for s in strings]) |
|
|