Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| from typing import Any, Dict, List, Tuple, TypedDict | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters.base import Language | |
| from langchain_text_splitters.character import RecursiveCharacterTextSplitter | |
| class MarkdownTextSplitter(RecursiveCharacterTextSplitter): | |
| """Attempts to split the text along Markdown-formatted headings.""" | |
| def __init__(self, **kwargs: Any) -> None: | |
| """Initialize a MarkdownTextSplitter.""" | |
| separators = self.get_separators_for_language(Language.MARKDOWN) | |
| super().__init__(separators=separators, **kwargs) | |
| class MarkdownHeaderTextSplitter: | |
| """Splitting markdown files based on specified headers.""" | |
| def __init__( | |
| self, | |
| headers_to_split_on: List[Tuple[str, str]], | |
| return_each_line: bool = False, | |
| strip_headers: bool = True, | |
| ): | |
| """Create a new MarkdownHeaderTextSplitter. | |
| Args: | |
| headers_to_split_on: Headers we want to track | |
| return_each_line: Return each line w/ associated headers | |
| strip_headers: Strip split headers from the content of the chunk | |
| """ | |
| # Output line-by-line or aggregated into chunks w/ common headers | |
| self.return_each_line = return_each_line | |
| # Given the headers we want to split on, | |
| # (e.g., "#, ##, etc") order by length | |
| self.headers_to_split_on = sorted( | |
| headers_to_split_on, key=lambda split: len(split[0]), reverse=True | |
| ) | |
| # Strip headers split headers from the content of the chunk | |
| self.strip_headers = strip_headers | |
| def aggregate_lines_to_chunks(self, lines: List[LineType]) -> List[Document]: | |
| """Combine lines with common metadata into chunks | |
| Args: | |
| lines: Line of text / associated header metadata | |
| """ | |
| aggregated_chunks: List[LineType] = [] | |
| for line in lines: | |
| if ( | |
| aggregated_chunks | |
| and aggregated_chunks[-1]["metadata"] == line["metadata"] | |
| ): | |
| # If the last line in the aggregated list | |
| # has the same metadata as the current line, | |
| # append the current content to the last lines's content | |
| aggregated_chunks[-1]["content"] += " \n" + line["content"] | |
| elif ( | |
| aggregated_chunks | |
| and aggregated_chunks[-1]["metadata"] != line["metadata"] | |
| # may be issues if other metadata is present | |
| and len(aggregated_chunks[-1]["metadata"]) < len(line["metadata"]) | |
| and aggregated_chunks[-1]["content"].split("\n")[-1][0] == "#" | |
| and not self.strip_headers | |
| ): | |
| # If the last line in the aggregated list | |
| # has different metadata as the current line, | |
| # and has shallower header level than the current line, | |
| # and the last line is a header, | |
| # and we are not stripping headers, | |
| # append the current content to the last line's content | |
| aggregated_chunks[-1]["content"] += " \n" + line["content"] | |
| # and update the last line's metadata | |
| aggregated_chunks[-1]["metadata"] = line["metadata"] | |
| else: | |
| # Otherwise, append the current line to the aggregated list | |
| aggregated_chunks.append(line) | |
| return [ | |
| Document(page_content=chunk["content"], metadata=chunk["metadata"]) | |
| for chunk in aggregated_chunks | |
| ] | |
| def split_text(self, text: str) -> List[Document]: | |
| """Split markdown file | |
| Args: | |
| text: Markdown file""" | |
| # Split the input text by newline character ("\n"). | |
| lines = text.split("\n") | |
| # Final output | |
| lines_with_metadata: List[LineType] = [] | |
| # Content and metadata of the chunk currently being processed | |
| current_content: List[str] = [] | |
| current_metadata: Dict[str, str] = {} | |
| # Keep track of the nested header structure | |
| # header_stack: List[Dict[str, Union[int, str]]] = [] | |
| header_stack: List[HeaderType] = [] | |
| initial_metadata: Dict[str, str] = {} | |
| in_code_block = False | |
| opening_fence = "" | |
| for line in lines: | |
| stripped_line = line.strip() | |
| # Remove all non-printable characters from the string, keeping only visible | |
| # text. | |
| stripped_line = "".join(filter(str.isprintable, stripped_line)) | |
| if not in_code_block: | |
| # Exclude inline code spans | |
| if stripped_line.startswith("```") and stripped_line.count("```") == 1: | |
| in_code_block = True | |
| opening_fence = "```" | |
| elif stripped_line.startswith("~~~"): | |
| in_code_block = True | |
| opening_fence = "~~~" | |
| else: | |
| if stripped_line.startswith(opening_fence): | |
| in_code_block = False | |
| opening_fence = "" | |
| if in_code_block: | |
| current_content.append(stripped_line) | |
| continue | |
| # Check each line against each of the header types (e.g., #, ##) | |
| for sep, name in self.headers_to_split_on: | |
| # Check if line starts with a header that we intend to split on | |
| if stripped_line.startswith(sep) and ( | |
| # Header with no text OR header is followed by space | |
| # Both are valid conditions that sep is being used a header | |
| len(stripped_line) == len(sep) or stripped_line[len(sep)] == " " | |
| ): | |
| # Ensure we are tracking the header as metadata | |
| if name is not None: | |
| # Get the current header level | |
| current_header_level = sep.count("#") | |
| # Pop out headers of lower or same level from the stack | |
| while ( | |
| header_stack | |
| and header_stack[-1]["level"] >= current_header_level | |
| ): | |
| # We have encountered a new header | |
| # at the same or higher level | |
| popped_header = header_stack.pop() | |
| # Clear the metadata for the | |
| # popped header in initial_metadata | |
| if popped_header["name"] in initial_metadata: | |
| initial_metadata.pop(popped_header["name"]) | |
| # Push the current header to the stack | |
| header: HeaderType = { | |
| "level": current_header_level, | |
| "name": name, | |
| "data": stripped_line[len(sep) :].strip(), | |
| } | |
| header_stack.append(header) | |
| # Update initial_metadata with the current header | |
| initial_metadata[name] = header["data"] | |
| # Add the previous line to the lines_with_metadata | |
| # only if current_content is not empty | |
| if current_content: | |
| lines_with_metadata.append( | |
| { | |
| "content": "\n".join(current_content), | |
| "metadata": current_metadata.copy(), | |
| } | |
| ) | |
| current_content.clear() | |
| if not self.strip_headers: | |
| current_content.append(stripped_line) | |
| break | |
| else: | |
| if stripped_line: | |
| current_content.append(stripped_line) | |
| elif current_content: | |
| lines_with_metadata.append( | |
| { | |
| "content": "\n".join(current_content), | |
| "metadata": current_metadata.copy(), | |
| } | |
| ) | |
| current_content.clear() | |
| current_metadata = initial_metadata.copy() | |
| if current_content: | |
| lines_with_metadata.append( | |
| {"content": "\n".join(current_content), "metadata": current_metadata} | |
| ) | |
| # lines_with_metadata has each line with associated header metadata | |
| # aggregate these into chunks based on common metadata | |
| if not self.return_each_line: | |
| return self.aggregate_lines_to_chunks(lines_with_metadata) | |
| else: | |
| return [ | |
| Document(page_content=chunk["content"], metadata=chunk["metadata"]) | |
| for chunk in lines_with_metadata | |
| ] | |
| class LineType(TypedDict): | |
| """Line type as typed dict.""" | |
| metadata: Dict[str, str] | |
| content: str | |
| class HeaderType(TypedDict): | |
| """Header type as typed dict.""" | |
| level: int | |
| name: str | |
| data: str | |