Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import copy | |
| import pathlib | |
| from io import BytesIO, StringIO | |
| from typing import Any, Dict, Iterable, List, Optional, Tuple, TypedDict, cast | |
| import requests | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters.character import RecursiveCharacterTextSplitter | |
| class ElementType(TypedDict): | |
| """Element type as typed dict.""" | |
| url: str | |
| xpath: str | |
| content: str | |
| metadata: Dict[str, str] | |
| class HTMLHeaderTextSplitter: | |
| """ | |
| Splitting HTML files based on specified headers. | |
| Requires lxml package. | |
| """ | |
| def __init__( | |
| self, | |
| headers_to_split_on: List[Tuple[str, str]], | |
| return_each_element: bool = False, | |
| ): | |
| """Create a new HTMLHeaderTextSplitter. | |
| Args: | |
| headers_to_split_on: list of tuples of headers we want to track mapped to | |
| (arbitrary) keys for metadata. Allowed header values: h1, h2, h3, h4, | |
| h5, h6 e.g. [("h1", "Header 1"), ("h2", "Header 2)]. | |
| return_each_element: Return each element w/ associated headers. | |
| """ | |
| # Output element-by-element or aggregated into chunks w/ common headers | |
| self.return_each_element = return_each_element | |
| self.headers_to_split_on = sorted(headers_to_split_on) | |
| def aggregate_elements_to_chunks( | |
| self, elements: List[ElementType] | |
| ) -> List[Document]: | |
| """Combine elements with common metadata into chunks | |
| Args: | |
| elements: HTML element content with associated identifying info and metadata | |
| """ | |
| aggregated_chunks: List[ElementType] = [] | |
| for element in elements: | |
| if ( | |
| aggregated_chunks | |
| and aggregated_chunks[-1]["metadata"] == element["metadata"] | |
| ): | |
| # If the last element in the aggregated list | |
| # has the same metadata as the current element, | |
| # append the current content to the last element's content | |
| aggregated_chunks[-1]["content"] += " \n" + element["content"] | |
| else: | |
| # Otherwise, append the current element to the aggregated list | |
| aggregated_chunks.append(element) | |
| return [ | |
| Document(page_content=chunk["content"], metadata=chunk["metadata"]) | |
| for chunk in aggregated_chunks | |
| ] | |
| def split_text_from_url(self, url: str) -> List[Document]: | |
| """Split HTML from web URL | |
| Args: | |
| url: web URL | |
| """ | |
| r = requests.get(url) | |
| return self.split_text_from_file(BytesIO(r.content)) | |
| def split_text(self, text: str) -> List[Document]: | |
| """Split HTML text string | |
| Args: | |
| text: HTML text | |
| """ | |
| return self.split_text_from_file(StringIO(text)) | |
| def split_text_from_file(self, file: Any) -> List[Document]: | |
| """Split HTML file | |
| Args: | |
| file: HTML file | |
| """ | |
| try: | |
| from lxml import etree | |
| except ImportError as e: | |
| raise ImportError( | |
| "Unable to import lxml, please install with `pip install lxml`." | |
| ) from e | |
| # use lxml library to parse html document and return xml ElementTree | |
| # Explicitly encoding in utf-8 allows non-English | |
| # html files to be processed without garbled characters | |
| parser = etree.HTMLParser(encoding="utf-8") | |
| tree = etree.parse(file, parser) | |
| # document transformation for "structure-aware" chunking is handled with xsl. | |
| # see comments in html_chunks_with_headers.xslt for more detailed information. | |
| xslt_path = pathlib.Path(__file__).parent / "xsl/html_chunks_with_headers.xslt" | |
| xslt_tree = etree.parse(xslt_path) | |
| transform = etree.XSLT(xslt_tree) | |
| result = transform(tree) | |
| result_dom = etree.fromstring(str(result)) | |
| # create filter and mapping for header metadata | |
| header_filter = [header[0] for header in self.headers_to_split_on] | |
| header_mapping = dict(self.headers_to_split_on) | |
| # map xhtml namespace prefix | |
| ns_map = {"h": "http://www.w3.org/1999/xhtml"} | |
| # build list of elements from DOM | |
| elements = [] | |
| for element in result_dom.findall("*//*", ns_map): | |
| if element.findall("*[@class='headers']") or element.findall( | |
| "*[@class='chunk']" | |
| ): | |
| elements.append( | |
| ElementType( | |
| url=file, | |
| xpath="".join( | |
| [ | |
| node.text or "" | |
| for node in element.findall("*[@class='xpath']", ns_map) | |
| ] | |
| ), | |
| content="".join( | |
| [ | |
| node.text or "" | |
| for node in element.findall("*[@class='chunk']", ns_map) | |
| ] | |
| ), | |
| metadata={ | |
| # Add text of specified headers to metadata using header | |
| # mapping. | |
| header_mapping[node.tag]: node.text or "" | |
| for node in filter( | |
| lambda x: x.tag in header_filter, | |
| element.findall("*[@class='headers']/*", ns_map), | |
| ) | |
| }, | |
| ) | |
| ) | |
| if not self.return_each_element: | |
| return self.aggregate_elements_to_chunks(elements) | |
| else: | |
| return [ | |
| Document(page_content=chunk["content"], metadata=chunk["metadata"]) | |
| for chunk in elements | |
| ] | |
| class HTMLSectionSplitter: | |
| """ | |
| Splitting HTML files based on specified tag and font sizes. | |
| Requires lxml package. | |
| """ | |
| def __init__( | |
| self, | |
| headers_to_split_on: List[Tuple[str, str]], | |
| xslt_path: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| """Create a new HTMLSectionSplitter. | |
| Args: | |
| headers_to_split_on: list of tuples of headers we want to track mapped to | |
| (arbitrary) keys for metadata. Allowed header values: h1, h2, h3, h4, | |
| h5, h6 e.g. [("h1", "Header 1"), ("h2", "Header 2"]. | |
| xslt_path: path to xslt file for document transformation. | |
| Uses a default if not passed. | |
| Needed for html contents that using different format and layouts. | |
| """ | |
| self.headers_to_split_on = dict(headers_to_split_on) | |
| if xslt_path is None: | |
| self.xslt_path = ( | |
| pathlib.Path(__file__).parent / "xsl/converting_to_header.xslt" | |
| ).absolute() | |
| else: | |
| self.xslt_path = pathlib.Path(xslt_path).absolute() | |
| self.kwargs = kwargs | |
| def split_documents(self, documents: Iterable[Document]) -> List[Document]: | |
| """Split documents.""" | |
| texts, metadatas = [], [] | |
| for doc in documents: | |
| texts.append(doc.page_content) | |
| metadatas.append(doc.metadata) | |
| results = self.create_documents(texts, metadatas=metadatas) | |
| text_splitter = RecursiveCharacterTextSplitter(**self.kwargs) | |
| return text_splitter.split_documents(results) | |
| def split_text(self, text: str) -> List[Document]: | |
| """Split HTML text string | |
| Args: | |
| text: HTML text | |
| """ | |
| return self.split_text_from_file(StringIO(text)) | |
| def create_documents( | |
| self, texts: List[str], metadatas: Optional[List[dict]] = None | |
| ) -> List[Document]: | |
| """Create documents from a list of texts.""" | |
| _metadatas = metadatas or [{}] * len(texts) | |
| documents = [] | |
| for i, text in enumerate(texts): | |
| for chunk in self.split_text(text): | |
| metadata = copy.deepcopy(_metadatas[i]) | |
| for key in chunk.metadata.keys(): | |
| if chunk.metadata[key] == "#TITLE#": | |
| chunk.metadata[key] = metadata["Title"] | |
| metadata = {**metadata, **chunk.metadata} | |
| new_doc = Document(page_content=chunk.page_content, metadata=metadata) | |
| documents.append(new_doc) | |
| return documents | |
| def split_html_by_headers( | |
| self, html_doc: str | |
| ) -> Dict[str, Dict[str, Optional[str]]]: | |
| try: | |
| from bs4 import BeautifulSoup, PageElement # type: ignore[import-untyped] | |
| except ImportError as e: | |
| raise ImportError( | |
| "Unable to import BeautifulSoup/PageElement, \ | |
| please install with `pip install \ | |
| bs4`." | |
| ) from e | |
| soup = BeautifulSoup(html_doc, "html.parser") | |
| headers = list(self.headers_to_split_on.keys()) | |
| sections: Dict[str, Dict[str, Optional[str]]] = {} | |
| headers = soup.find_all(["body"] + headers) | |
| for i, header in enumerate(headers): | |
| header_element: PageElement = header | |
| if i == 0: | |
| current_header = "#TITLE#" | |
| current_header_tag = "h1" | |
| section_content: List = [] | |
| else: | |
| current_header = header_element.text.strip() | |
| current_header_tag = header_element.name | |
| section_content = [] | |
| for element in header_element.next_elements: | |
| if i + 1 < len(headers) and element == headers[i + 1]: | |
| break | |
| if isinstance(element, str): | |
| section_content.append(element) | |
| content = " ".join(section_content).strip() | |
| if content != "": | |
| sections[current_header] = { | |
| "content": content, | |
| "tag_name": current_header_tag, | |
| } | |
| return sections | |
| def convert_possible_tags_to_header(self, html_content: str) -> str: | |
| if self.xslt_path is None: | |
| return html_content | |
| try: | |
| from lxml import etree | |
| except ImportError as e: | |
| raise ImportError( | |
| "Unable to import lxml, please install with `pip install lxml`." | |
| ) from e | |
| # use lxml library to parse html document and return xml ElementTree | |
| parser = etree.HTMLParser() | |
| tree = etree.parse(StringIO(html_content), parser) | |
| xslt_tree = etree.parse(self.xslt_path) | |
| transform = etree.XSLT(xslt_tree) | |
| result = transform(tree) | |
| return str(result) | |
| def split_text_from_file(self, file: Any) -> List[Document]: | |
| """Split HTML file | |
| Args: | |
| file: HTML file | |
| """ | |
| file_content = file.getvalue() | |
| file_content = self.convert_possible_tags_to_header(file_content) | |
| sections = self.split_html_by_headers(file_content) | |
| return [ | |
| Document( | |
| cast(str, sections[section_key]["content"]), | |
| metadata={ | |
| self.headers_to_split_on[ | |
| str(sections[section_key]["tag_name"]) | |
| ]: section_key | |
| }, | |
| ) | |
| for section_key in sections.keys() | |
| ] | |