File size: 10,957 Bytes
559dd34
 
 
 
 
 
 
 
57007fe
559dd34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e8553c3
57007fe
e8553c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
559dd34
 
57007fe
559dd34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57007fe
559dd34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57007fe
559dd34
 
 
 
 
 
 
 
 
 
 
 
9397e33
 
559dd34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40b4763
 
 
559dd34
 
 
 
 
 
 
 
 
 
57007fe
559dd34
 
 
 
 
 
 
 
 
 
 
57007fe
559dd34
 
 
 
 
57007fe
559dd34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40b4763
 
 
 
 
57007fe
40b4763
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
559dd34
 
 
 
 
 
 
 
40b4763
 
559dd34
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""Chunker abstraction and implementations."""

import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import lru_cache
from typing import List, Optional

import nbformat
import pygments
import tiktoken
from semchunk import chunk as chunk_via_semchunk
from tree_sitter import Node
from tree_sitter_language_pack import get_parser

logger = logging.getLogger(__name__)


@dataclass
class Chunk:
    """A chunk of code or text extracted from a file in the repository."""

    filename: str
    start_byte: int
    end_byte: int
    _content: Optional[str] = None

    @property
    def content(self) -> Optional[str]:
        """The text content to be embedded. Might contain information beyond just the text snippet from the file."""
        return self._content

    @property
    def to_metadata(self):
        """Converts the chunk to a dictionary that can be passed to a vector store."""
        # Some vector stores require the IDs to be ASCII.
        filename_ascii = self.filename.encode("ascii", "ignore").decode("ascii")
        return {
            # Some vector stores require the IDs to be ASCII.
            "id": f"{filename_ascii}_{self.start_byte}_{self.end_byte}",
            "filename": self.filename,
            "start_byte": self.start_byte,
            "end_byte": self.end_byte,
            # Note to developer: When choosing a large chunk size, you might exceed the vector store's metadata
            # size limit. In that case, you can simply store the start/end bytes above, and fetch the content
            # directly from the repository when needed.
            "text": self.content,
        }

    def populate_content(self, file_content: str):
        """Populates the content of the chunk with the file path and file content."""
        self._content = self.filename + "\n\n" + file_content[self.start_byte : self.end_byte]

    def num_tokens(self, tokenizer):
        """Counts the number of tokens in the chunk."""
        if not self.content:
            raise ValueError("Content not populated.")
        return Chunk._cached_num_tokens(self.content, tokenizer)

    @staticmethod
    @lru_cache(maxsize=1024)
    def _cached_num_tokens(content: str, tokenizer):
        """Static method to cache token counts."""
        return len(tokenizer.encode(content, disallowed_special=()))

    def __eq__(self, other):
        if isinstance(other, Chunk):
            return (
                self.filename == other.filename
                and self.start_byte == other.start_byte
                and self.end_byte == other.end_byte
            )
        return False

    def __hash__(self):
        return hash((self.filename, self.start_byte, self.end_byte))


class Chunker(ABC):
    """Abstract class for chunking a file into smaller pieces."""

    @abstractmethod
    def chunk(self, file_path: str, file_content: str) -> List[Chunk]:
        """Chunks a file into smaller pieces."""


class CodeChunker(Chunker):
    """Splits a code file into chunks of at most `max_tokens` tokens each."""

    def __init__(self, max_tokens: int):
        self.max_tokens = max_tokens
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
        self.text_chunker = TextChunker(max_tokens)

    @staticmethod
    def _get_language_from_filename(filename: str):
        """Returns a canonical name for the language of the file, based on its extension.
        Returns None if the language is unknown to the pygments lexer.
        """
        try:
            lexer = pygments.lexers.get_lexer_for_filename(filename)
            return lexer.name.lower()
        except pygments.util.ClassNotFound:
            return None

    def _chunk_node(self, node: Node, filename: str, file_content: str) -> List[Chunk]:
        """Splits a node in the parse tree into a flat list of chunks."""
        node_chunk = Chunk(filename, node.start_byte, node.end_byte)
        node_chunk.populate_content(file_content)

        if node_chunk.num_tokens(self.tokenizer) <= self.max_tokens:
            return [node_chunk]

        if not node.children:
            # This is a leaf node, but it's too long. We'll have to split it with a text tokenizer.
            return self.text_chunker.chunk(filename, file_content[node.start_byte : node.end_byte])

        chunks = []
        for child in node.children:
            chunks.extend(self._chunk_node(child, filename, file_content))

        for chunk in chunks:
            # This should always be true. Otherwise there must be a bug in the code.
            assert chunk.content and chunk.num_tokens(self.tokenizer) <= self.max_tokens

        # Merge neighboring chunks if their combined size doesn't exceed max_tokens. The goal is to avoid pathologically
        # small chunks that end up being undeservedly preferred by the retriever.
        merged_chunks = []
        for chunk in chunks:
            if not merged_chunks:
                merged_chunks.append(chunk)
            elif merged_chunks[-1].num_tokens(self.tokenizer) + chunk.num_tokens(self.tokenizer) < self.max_tokens - 50:
                # There's a good chance that merging these two chunks will be under the token limit. We're not 100% sure
                # at this point, because tokenization is not necessarily additive.
                merged = Chunk(
                    merged_chunks[-1].filename,
                    merged_chunks[-1].start_byte,
                    chunk.end_byte,
                )
                merged.populate_content(file_content)
                if merged.num_tokens(self.tokenizer) <= self.max_tokens:
                    merged_chunks[-1] = merged
                else:
                    merged_chunks.append(chunk)
            else:
                merged_chunks.append(chunk)
        chunks = merged_chunks

        for chunk in merged_chunks:
            # This should always be true. Otherwise there's a bug worth investigating.
            assert chunk.content and chunk.num_tokens(self.tokenizer) <= self.max_tokens

        return merged_chunks

    @staticmethod
    def is_code_file(filename: str) -> bool:
        """Checks whether pygment & tree_sitter can parse the file as code."""
        language = CodeChunker._get_language_from_filename(filename)
        return language and language not in ["text only", "None"]

    @staticmethod
    def parse_tree(filename: str, content: str) -> List[str]:
        """Parses the code in a file and returns the parse tree."""
        language = CodeChunker._get_language_from_filename(filename)

        if not language or language in ["text only", "None"]:
            logging.debug("%s doesn't seem to be a code file.", filename)
            return None

        try:
            parser = get_parser(language)
        except LookupError:
            logging.debug("%s doesn't seem to be a code file.", filename)
            return None

        tree = parser.parse(bytes(content, "utf8"))

        if not tree.root_node.children or tree.root_node.children[0].type == "ERROR":
            logging.warning("Failed to parse code in %s.", filename)
            return None
        return tree

    def chunk(self, file_path: str, file_content: str) -> List[Chunk]:
        """Chunks a code file into smaller pieces."""
        if not file_content.strip():
            return []

        tree = self.parse_tree(file_path, file_content)
        if tree is None:
            return []

        chunks = self._chunk_node(tree.root_node, file_path, file_content)
        for chunk in chunks:
            # Make sure that the chunk has content and doesn't exceed the max_tokens limit. Otherwise there must be
            # a bug in the code.
            assert chunk.content
            size = chunk.num_tokens(self.tokenizer)
            assert size <= self.max_tokens, f"Chunk size {size} exceeds max_tokens {self.max_tokens}."

        return chunks


class TextChunker(Chunker):
    """Wrapper around semchunk: https://github.com/umarbutler/semchunk."""

    def __init__(self, max_tokens: int):
        self.max_tokens = max_tokens

        tokenizer = tiktoken.get_encoding("cl100k_base")
        self.count_tokens = lambda text: len(tokenizer.encode(text, disallowed_special=()))

    def chunk(self, file_path: str, file_content: str) -> List[Chunk]:
        """Chunks a text file into smaller pieces."""
        # We need to allocate some tokens for the filename, which is part of the chunk content.
        extra_tokens = self.count_tokens(file_path + "\n\n")
        text_chunks = chunk_via_semchunk(file_content, self.max_tokens - extra_tokens, self.count_tokens)

        chunks = []
        start = 0
        for text_chunk in text_chunks:
            # This assertion should always be true. Otherwise there's a bug worth finding.
            assert self.count_tokens(text_chunk) <= self.max_tokens - extra_tokens

            # Find the start/end positions of the chunks.
            start = file_content.index(text_chunk, start)
            if start == -1:
                logging.warning("Couldn't find semchunk in content: %s", text_chunk)
            else:
                end = start + len(text_chunk)
                chunks.append(Chunk(file_path, start, end, text_chunk))

            start = end
        return chunks


class IPYNBChunker(Chunker):
    """Extracts the python code from a Jupyter notebook, removing all the boilerplate.

    Based on https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/code/code_retrieval_augmented_generation.ipynb
    """

    def __init__(self, code_chunker: CodeChunker):
        self.code_chunker = code_chunker

    def chunk(self, filename: str, content: str) -> List[Chunk]:
        if not filename.lower().endswith(".ipynb"):
            logging.warn("IPYNBChunker is only for .ipynb files.")
            return []

        notebook = nbformat.reads(content, as_version=nbformat.NO_CONVERT)
        python_code = "\n".join([cell.source for cell in notebook.cells if cell.cell_type == "code"])
        chunks = self.code_chunker.chunk(filename.replace(".ipynb", ".py"), python_code)
        # Change back the filenames to .ipynb.
        for chunk in chunks:
            chunk.filename = chunk.filename.replace(".py", ".ipynb")
        return chunks


class UniversalChunker(Chunker):
    """Chunks a file into smaller pieces, regardless of whether it's code or text."""

    def __init__(self, max_tokens: int):
        self.code_chunker = CodeChunker(max_tokens)
        self.text_chunker = TextChunker(max_tokens)

    def chunk(self, file_path: str, file_content: str) -> List[Chunk]:
        if file_path.lower().endswith(".ipynb"):
            return IPYNBChunker(self.code_chunker).chunk(file_path, file_content)
        if CodeChunker.is_code_file(file_path):
            return self.code_chunker.chunk(file_path, file_content)
        return self.text_chunker.chunk(file_path, file_content)