File size: 17,567 Bytes
825942f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
import shutil
import re
import io
import operator
from functools import reduce
from itertools import zip_longest
from typing import (
    Union,
    Tuple,
    Sequence,
    List,
    Any,
)

from toolz import frequencies
from wcwidth import wcwidth, wcswidth

from .exceptions import TableOverflowError

# Types
NonWrappedCell = str
WrappedCellLine = str
Data = List[List[NonWrappedCell]]
Headers = List[str]
LogicalRow = List[List[WrappedCellLine]]


class Columnar:
    def __call__(
        self,
        data: Sequence[Sequence[Any]],
        headers: Union[None, Sequence[Any]] = None,
        head: int = 0,
        justify: str = "l",
        wrap_max: int = 5,
        max_column_width: Union[None, int] = None,
        min_column_width: int = 5,
        row_sep: str = "-",
        column_sep: str = "|",
        patterns: Sequence[str] = [],
        drop: Sequence[str] = [],
        select: Sequence[str] = [],
        no_borders: bool = False,
        terminal_width: Union[None, int] = None,
    ) -> str:
        self.wrap_max = wrap_max
        self.max_column_width = max_column_width
        self.min_column_width = min_column_width
        self.justify = justify
        self.head = head
        self.terminal_width = (
            terminal_width
            if terminal_width is not None
            else shutil.get_terminal_size().columns
        )
        self.row_sep = row_sep
        self.column_sep = column_sep
        self.header_sep = "="
        self.patterns = self.compile_patterns(patterns)
        self.ansi_color_pattern = re.compile(r"\x1b\[.+?m")
        self.color_reset = "\x1b[0m"
        self.color_grid = None
        self.drop = drop
        self.select = select
        self.no_borders = no_borders
        self.no_headers = headers is None

        if self.no_headers:
            headers = [""] * len(data[0])

        if self.no_borders:
            self.column_sep = " " * 2
            self.row_sep = ""
            self.header_sep = ""
            headers = [text.upper() for text in headers]

        data = self.clean_data(data)
        data, headers = self.filter_columns(data, headers)
        if self.no_headers:
            logical_rows = self.convert_data_to_logical_rows(data)
        else:
            logical_rows = self.convert_data_to_logical_rows([headers] + data)
        column_widths = self.get_column_widths(logical_rows)
        truncated_rows = self.wrap_and_truncate_logical_cells(
            logical_rows, column_widths
        )

        justification_map = {
            "l": lambda text, width: self.visual_justify(text, width, 'l'),
            "c": lambda text, width: self.visual_justify(text, width, 'c'),
            "r": lambda text, width: self.visual_justify(text, width, 'r'),
        }
        justifications = []
        if type(justify) is str:
            justifications = [justification_map[justify]] * len(column_widths)
        else:
            justifications = [justification_map[spec] for spec in justify]

        table_width = sum(column_widths) + ((len(column_widths) + 1) * len(row_sep))
        out = io.StringIO()
        write_header = True if not self.no_headers else False
        self.write_row_separators(out, column_widths)
        for lrow, color_row in zip(truncated_rows, self.color_grid):
            for row in lrow:
                justified_row_parts = [
                    justifier(text, width)
                    for text, justifier, width in zip(
                        row, justifications, column_widths
                    )
                ]
                colorized_row_parts = [
                    self.colorize(text, code)
                    for text, code in zip(justified_row_parts, color_row)
                ]
                out.write(
                    self.column_sep
                    + self.column_sep.join(colorized_row_parts)
                    + self.column_sep
                    + "\n"
                )
            if write_header:
                out.write(
                    self.column_sep
                    + (self.header_sep * (table_width - (len(self.column_sep * 2))))
                    + self.column_sep
                    + "\n"
                )
                write_header = False
            else:
                if not self.no_borders:
                    self.write_row_separators(out, column_widths)
        return out.getvalue()

    def write_row_separators(
        self, out_stream: io.StringIO, column_widths: Sequence[int]
    ) -> None:
        cells = [self.row_sep * width for width in column_widths]
        out_stream.write(
            self.column_sep + self.column_sep.join(cells) + self.column_sep + "\n"
        )

    def compile_patterns(self, patterns):
        out = []
        for regex, func in patterns:
            if regex is not re.Pattern:
                regex = re.compile(regex)
            out.append((regex, func))
        return out

    def colorize(self, text, code):
        if code == None:
            return text
        return "".join([code, text, self.color_reset])

    def clean_data(self, data: Sequence[Sequence[Any]]) -> Data:
        # First make sure data is a list of lists
        if type(data) is not list:
            raise TypeError(f"'data' must be a list of lists. Got a {type(data)}")
        if type(data[0]) is not list:
            raise TypeError(f"'data' must be a list of lists. Got a list of {type(data[0])}")
        # Make sure all the lists are the same length
        num_columns = len(data[0])
        for row_num, row in enumerate(data):
            if len(row) != num_columns:
                raise ValueError(
                    f"All the rows in 'data' must have the same number of columns, however the first row had {num_columns} columns and row number {row_num + 1} had {len(row)} column(s)."
                )
        carriage_return = re.compile("\r")
        tab = re.compile("\t")
        out = []
        for row in data:
            cleaned = []
            for cell in row:
                cell = str(cell)
                cell = carriage_return.sub("", cell)
                cell = tab.sub(" " * 4, cell)
                cleaned.append(cell)
            out.append(cleaned)
        return out

    def filter_columns(self, data: Data, headers: Headers) -> Tuple[Data, Headers]:
        """
        Drop columns that meet drop criteria, unless they have been
        explicitly selected.
        """
        drop = set(self.drop)
        select_patterns = [re.compile(pattern, re.I) for pattern in self.select]
        select = len(select_patterns) > 0
        headers_out = []
        columns_out = []
        for header, column in zip(headers, zip(*data)):
            if select:
                for pattern in select_patterns:
                    if pattern.search(header):
                        headers_out.append(header)
                        columns_out.append(column)
            else:
                freqs = frequencies(column)
                if not set(freqs.keys()).issubset(drop):
                    headers_out.append(header)
                    columns_out.append(column)
        rows_out = list(zip(*columns_out))
        return rows_out, headers_out

    def convert_data_to_logical_rows(self, full_data: Data) -> List[LogicalRow]:
        """
        Takes a list of lists of items. Returns a list of logical rows, where each logical
        row is a list of lists, where each sub-list in a logical row is a physical row to be
        printed to the screen. There will only be more than one phyical row in a logical
        row if one of the columns wraps past one line. However, wrapping will be performed
        in a later step, so this function always returns logical rows that only contain
        one physical row which will be wrapped onto multiple physical rows later.
        """
        logical_rows = []
        color_grid = []
        for row in full_data:
            cells_varying_lengths = []
            color_row = []
            for cell in row:
                cell = self.apply_patterns(cell)
                cell, color = self.strip_color(cell)
                color_row.append(color)
                lines = cell.split("\n")
                cells_varying_lengths.append(lines)
            cells = [
                [cell_text or "" for cell_text in physical_row]
                for physical_row in zip_longest(*cells_varying_lengths)
            ]
            logical_rows.append(cells)
            color_grid.append(color_row)
        self.color_grid = color_grid
        return logical_rows

    def apply_patterns(self, cell_text):
        out_text = cell_text
        for pattern, func in self.patterns:
            if pattern.match(cell_text):
                out_text = func(cell_text)
                break
        return out_text

    def strip_color(self, cell_text):
        matches = [match for match in self.ansi_color_pattern.finditer(cell_text)]
        color_codes = None
        clean_text = cell_text
        if matches:
            clean_text = self.ansi_color_pattern.sub("", cell_text)
            color_codes = "".join([match.group(0) for match in matches[:-1]])
        return clean_text, color_codes

    def distribute_between(self, diff: int, columns: List[dict], n: int) -> List[dict]:
        """
        Reduces the total width of the n widest columns by 'diff', returning
        the list of columns such that the first n columns are now all the 
        same width. This function will continue to be called as long as the nth 
        column is narrower than the n+1 th column, meaning that we could still
        distribute our 'diff' more equally among the widest columns.
        """
        subset = columns[:n]
        width = sum([column["width"] for column in subset])
        remainder = width - diff
        new_width = remainder // n
        for i in range(n):
            columns[i]["width"] = new_width
        return columns

    def widths_sorted_by(self, columns: List[dict], key: str) -> List[int]:
        return [column["width"] for column in sorted(columns, key=lambda x: x[key])]

    def current_table_width(self, columns: List[dict]) -> int:
        return sum(
            [len(self.column_sep) + column["width"] for column in columns]
        ) + len(self.column_sep)

    def get_column_widths(self, logical_rows: List[LogicalRow]) -> List[int]:
        """
        Calculated column widths, taking into account the terminal width,
        the number of columns, and the column seperators that will be used
        to delimit columns.

        Our table-sizing heuristic says that we should keep wide
        columns as wide as possible and only touch narrow columns if we have shrunken
        the wide columns down to the width of the narrow columns and the table is still
        too wide to fit in the display.

        The function we will utilize to determine our column widths is
        'self.distribute_between'. It has three arguments:
        1. 'diff' is the size by which we need to shrink the table to get it 
        to fit in the terminal. 
        2. 'columns' is a list of dictionaries that
        represent the columns in the table, sorted from widest to narrowest.
        3. 'n' is the number of columns whose size will be reduced to reduce the table
        size by a total of 'diff'.
        The first time distribute_between is called n will be 1 and
        'diff' will be a positive value and the first/largest column's width will be 
        reduced by 'diff'. 

        Often the state of our table after the first call to 
        distribute_between does not follow our heuristic since the widest column is now 
        narrower, potentially much narrower, than the next widest column. (More formally speaking,
        the nth column is now narrower than the n+1 th column keeping in mind that the columns
        are sorted from widest to narrowest). It would be more
        desirable to shrink several wide columns a little bit than to shrink one column a lot.
        So, to "shrink several wide columns a little bit" we will
        redistribute the original "diff" amount between the widest columns. We will determine
        the number of columns to split the "diff" between by calling distribute_between 
        multiple times and adding the next-largest column into the group that shares the "diff". 
        After each call we will check if column n+1 is wider than the first n
        columns (which will all be the same width), and if so we will call distribute_between 
        again to ensure that we are shrinking columns equitably. Once column number n+1 is narrower 
        than the first n columns we are done.

        So starting with the second call to distribute_between 'diff' will be 0, but n will increase
        by one each call, meaning that the origial 'diff' amount will get distributed between a 
        larger number of columns each round until we either manage to get a table that fits and 
        preserves the order of column sizes, or we have exhausted our columns as we throw a 
        TableOverflowError.
        """

        max_widths = []
        for column in zip(*reduce(operator.add, logical_rows)):
            lengths = [len(cell) for cell in column]
            max_natural = max(lengths)
            max_width = (
                max_natural
                if self.max_column_width == None
                else min(max_natural, self.max_column_width)
            )
            max_widths.append(max_width)

        columns = sorted(
            [{"column_no": no, "width": width} for no, width in enumerate(max_widths)],
            key=lambda x: x["width"],
            reverse=True,
        )
        # apply min and max widths
        for column in columns:
            if column["width"] < self.min_column_width:
                column["width"] = self.min_column_width
            if self.max_column_width:
                if column["width"] > self.max_column_width:
                    column["width"] = self.max_column_width

        if self.current_table_width(columns) <= self.terminal_width:
            return self.widths_sorted_by(columns, "column_no")

        # the table needs to be narrowed
        for i in range(len(columns)):
            # include the next largest column in the size reduction
            diff = self.current_table_width(columns) - self.terminal_width
            columns = self.distribute_between(diff, columns, i + 1)
            if i < len(columns) - 1 and columns[0]["width"] < columns[i + 1]["width"]:
                # if the columns that were just shrunk are smaller than the next largest column,
                # keep distributing the size so we have evenly-shrunken columns
                continue
            elif (
                columns[0]["width"] >= self.min_column_width
                and self.current_table_width(columns) <= self.terminal_width
            ):
                return self.widths_sorted_by(columns, "column_no")

        raise TableOverflowError(
            "Could not fit table in current terminal, try reducing the number of columns."
        )

    def wrap_and_truncate_logical_cells(
        self, logical_rows: List[LogicalRow], column_widths: List[int]
    ) -> List[LogicalRow]:
        lrows_out = []
        for lrow in logical_rows:
            cells_out = []
            for cell, width in zip(map(list, zip(*lrow)), column_widths):
                # at this point `cell` is a list of strings, representing each line of the cell's contents
                cell_out = []
                for line in cell:
                    # Get the line width accounting for characters that occupy two terminal columns
                    # e.g. Unicode code point U+1F32D has a visual width of 2
                    while wcswidth(line) > width:
                        wrap_index = width
                        while wcswidth(line[:wrap_index]) > width:
                            # decrease the number of characters on the line until the 
                            # visual width is <= width.
                            wrap_index -= 1
                        cell_out.append(line[:wrap_index])
                        line = line[wrap_index:]
                    cell_out.append(line)
                cells_out.append(cell_out[: self.wrap_max + 1])
            cells_out_padded = [
                [text or "" for text in line] for line in zip_longest(*cells_out)
            ]
            lrows_out.append(cells_out_padded)
        return lrows_out

    def visual_justify(self, text: str, width: int, alignment: str) -> str:
        """
        The default python string methods, ljust, center, and rjust check
        the string length using len(), which adds too many spaces when the 
        string includes characters with a visual length of 2. We need to
        implement our own justification methods to handle this.
        """
        text_width = wcswidth(text)
        diff = width - text_width
        if alignment == 'l':
            right_padding = " " * diff
            return text + right_padding
        elif alignment == 'c':
            left_length = (diff // 2)
            left_padding = " " * left_length
            right_padding = " " * (diff - left_length)
            return ''.join([left_padding, text, right_padding])
        elif alignment == 'r':
            left_padding = " " * diff
            return left_padding + text
        else:
            raise ValueError(f"Got invalid justification value: {alignment}")