Spaces:
Runtime error
Runtime error
| import re | |
| import copy | |
| import json | |
| import random | |
| import string | |
| import http.client | |
| import chromadb | |
| import torch | |
| import torch.nn.functional as F | |
| from urllib.request import urlopen | |
| from urllib.error import HTTPError | |
| from bs4 import BeautifulSoup | |
| from transformers import AutoTokenizer, AutoModel | |
| from pingpong import PingPong | |
| from pingpong.pingpong import PPManager | |
| from pingpong.context.strategy import CtxStrategy | |
| default_instruction = """Below texts come from the webpages that you provided in '{ping}'. Try to explain '{ping}' in detail as much as possible. Your exaplanation should almost based on the text below. Try not to write anything unrelated information. | |
| ===================== | |
| """ | |
| class URLSearchStrategy(CtxStrategy): | |
| def __init__( | |
| self, | |
| similarity_searcher, | |
| instruction=default_instruction, | |
| db_name=None, chunk_size=1000 | |
| ): | |
| self.searcher = similarity_searcher | |
| self.instruction = instruction | |
| self.db_name = db_name | |
| self.chunk_size = chunk_size | |
| if self.searcher is None: | |
| raise ValueError("SimilaritySearcher is not set.") | |
| if self.db_name is None: | |
| self.db_name = URLSearchStrategy.id_generator() | |
| def __call__(self, ppmanager: PPManager, urls, top_k=8, max_tokens=1024, keep_original=False): | |
| ppm = copy.deepcopy(ppmanager) | |
| last_ping = ppm.pingpongs[-1].ping | |
| # 1st yield | |
| ppm.add_pong("\n") | |
| ppm.append_pong("β’ Creating Chroma DB Collection...") | |
| yield True, ppm, "β’ Creating Chroma DB Collection β" | |
| chroma_client = chromadb.Client() | |
| try: | |
| chroma_client.delete_collection(self.db_name) | |
| except: | |
| pass | |
| col = chroma_client.create_collection(self.db_name) | |
| # 2nd yield | |
| ppm.replace_last_pong("\n") | |
| ppm.append_pong("β’ Creating Chroma DB Collection β\n") | |
| ppm.append_pong("β’ URL Searching...\n") | |
| yield True, ppm, "β’ URL Searching β" | |
| # HTML parsing | |
| search_results = [] | |
| success_urls = [] | |
| for url in urls: | |
| parse_result, contents = self._parse_html(url) | |
| if parse_result == True: | |
| success_urls.append(url) | |
| search_results.append(contents) | |
| ppm.append_pong(f" - {url} β\n") | |
| yield True, ppm, f" β· {url} β" | |
| if len(search_results) == 0: | |
| yield False, ppm, "There is no valid URLs. Check if there are trailing characters such as .(dot), ,(comma), etc., LLM will answer to your question based on its base knowledge." | |
| if len(' '.join(search_results).split(' ')) < max_tokens: | |
| final_result = ' '.join(search_results) | |
| # 3rd yield | |
| ppm.replace_last_pong("\n") | |
| ppm.append_pong("β’ Creating Chroma DB Collection β\n") | |
| ppm.append_pong("β’ URL Searching β\n") | |
| for url in success_urls: | |
| ppm.append_pong(f" - {url} β\n") | |
| yield True, ppm, "β’ Done β" | |
| last_ping = self.instruction.format(ping=last_ping) | |
| last_ping = last_ping + final_result | |
| ppm.pingpongs[-1].ping = last_ping | |
| ppm.replace_last_pong("") | |
| yield True, ppm, "β³ Wait until LLM generates message for you β³" | |
| else: | |
| # 3rd yield | |
| ppm.replace_last_pong("\n") | |
| ppm.append_pong("β’ Creating Chroma DB Collection β\n") | |
| ppm.append_pong("β’ URL Searching β\n") | |
| for url in success_urls: | |
| ppm.append_pong(f" - {url} β\n") | |
| ppm.append_pong("β’ Creating embeddings...") | |
| yield True, ppm, "β’ Creating embeddings β" | |
| final_chunks = [] | |
| for search_result in search_results: | |
| chunks = self._create_chunks( | |
| search_result, | |
| chunk_size=self.searcher.max_length | |
| ) | |
| final_chunks.append(chunks) | |
| self._put_chunks_into_collection( | |
| col, final_chunks, docs_per_step=1 | |
| ) | |
| query_results = self._query( | |
| col, f"query: {last_ping}", top_k, | |
| ) | |
| # 4th yield | |
| ppm.replace_last_pong("\n") | |
| ppm.append_pong("β’ Creating Chroma DB Collection β\n") | |
| ppm.append_pong("β’ URL Searching β\n") | |
| for url in success_urls: | |
| ppm.append_pong(f" - {url} β\n") | |
| ppm.append_pong("β’ Creating embeddings β\n") | |
| ppm.append_pong("β’ Information retrieval...") | |
| yield True, ppm, "β’ Information retrieval β" | |
| last_ping = self.instruction.format(ping=last_ping) | |
| for doc in query_results['documents'][0]: | |
| last_ping = last_ping + doc.replace('passage: ', '') + "\n" | |
| # 5th yield | |
| ppm.replace_last_pong("\n") | |
| ppm.append_pong("β’ Creating Chroma DB Collection β\n") | |
| ppm.append_pong("β’ URL Searching β\n") | |
| for url in success_urls: | |
| ppm.append_pong(f" - {url} β\n") | |
| ppm.append_pong("β’ Creating embeddings β\n") | |
| ppm.append_pong("β’ Information retrieval β") | |
| yield True, ppm, "β’ Done β" | |
| ppm.pingpongs[-1].ping = last_ping | |
| ppm.replace_last_pong("") | |
| yield True, ppm, "β³ Wait until LLM generates message for you β³" | |
| def _parse_html(self, url): | |
| try: | |
| page = urlopen(url, timeout=5) | |
| html_bytes = page.read() | |
| html = html_bytes.decode("utf-8") | |
| except: | |
| return False, None | |
| text = "" | |
| soup = BeautifulSoup(html, "html.parser") | |
| for tag in soup.findAll('p'): | |
| for string in tag.strings: | |
| text = text + string | |
| for tag in soup.findAll('pre'): | |
| for string in tag.strings: | |
| text = text + string | |
| text = self._replace_multiple_newlines(text) | |
| return True, text | |
| def _query( | |
| self, collection, q, top_k | |
| ): | |
| _, q_embeddings_list = self.searcher.get_embeddings([q]) | |
| return collection.query( | |
| query_embeddings=q_embeddings_list, | |
| n_results=top_k | |
| ) | |
| # chunk_size == number of characters | |
| def _create_chunks(self, text, chunk_size): | |
| chunks = [] | |
| for idx in range(0, len(text), chunk_size): | |
| chunks.append( | |
| f"passage: {text[idx:idx+chunk_size]}" | |
| ) | |
| return chunks | |
| def _put_chunk_into_collection( | |
| self, collection, chunk_id, chunk, docs_per_step=1 | |
| ): | |
| for i in range(0, len(chunk), docs_per_step): | |
| cur_texts = chunk[i:i+docs_per_step] | |
| _, embeddings_list = self.searcher.get_embeddings(cur_texts) | |
| ids = [ | |
| f"id-{chunk_id}-{num}" for num in range(i, i+docs_per_step) | |
| ] | |
| collection.add( | |
| embeddings=embeddings_list, | |
| documents=cur_texts, | |
| ids=ids | |
| ) | |
| def _put_chunks_into_collection( | |
| self, collection, | |
| chunks, docs_per_step=1 | |
| ): | |
| for idx, chunk in enumerate(chunks): | |
| self._put_chunk_into_collection( | |
| collection, idx, | |
| chunk, docs_per_step=docs_per_step | |
| ) | |
| def _replace_multiple_newlines(self, text): | |
| """Replaces multiple newline characters with a single newline character.""" | |
| pattern = re.compile(r"\n+") | |
| return pattern.sub("\n", text) | |
| def id_generator(cls, size=10, chars=string.ascii_uppercase + string.digits): | |
| return ''.join(random.choice(chars) for _ in range(size)) |