Z-Image-Special-Edtion / python_env /lib /site-packages /pandas /tests /io /parser /test_textreader.py
| """ | |
| Tests the TextReader class in parsers.pyx, which | |
| is integral to the C engine in parsers.py | |
| """ | |
| from io import ( | |
| BytesIO, | |
| StringIO, | |
| ) | |
| import numpy as np | |
| import pytest | |
| import pandas._libs.parsers as parser | |
| from pandas._libs.parsers import TextReader | |
| from pandas.errors import ParserWarning | |
| from pandas import DataFrame | |
| import pandas._testing as tm | |
| from pandas.io.parsers import ( | |
| TextFileReader, | |
| read_csv, | |
| ) | |
| from pandas.io.parsers.c_parser_wrapper import ensure_dtype_objs | |
| class TestTextReader: | |
| def csv_path(self, datapath): | |
| return datapath("io", "data", "csv", "test1.csv") | |
| def test_file_handle(self, csv_path): | |
| with open(csv_path, "rb") as f: | |
| reader = TextReader(f) | |
| reader.read() | |
| def test_file_handle_mmap(self, csv_path): | |
| # this was never using memory_map=True | |
| with open(csv_path, "rb") as f: | |
| reader = TextReader(f, header=None) | |
| reader.read() | |
| def test_StringIO(self, csv_path): | |
| with open(csv_path, "rb") as f: | |
| text = f.read() | |
| src = BytesIO(text) | |
| reader = TextReader(src, header=None) | |
| reader.read() | |
| def test_string_factorize(self): | |
| # should this be optional? | |
| data = "a\nb\na\nb\na" | |
| reader = TextReader(StringIO(data), header=None) | |
| result = reader.read() | |
| assert len(set(map(id, result[0]))) == 2 | |
| def test_skipinitialspace(self): | |
| data = "a, b\na, b\na, b\na, b" | |
| reader = TextReader(StringIO(data), skipinitialspace=True, header=None) | |
| result = reader.read() | |
| tm.assert_numpy_array_equal( | |
| result[0], np.array(["a", "a", "a", "a"], dtype=np.object_) | |
| ) | |
| tm.assert_numpy_array_equal( | |
| result[1], np.array(["b", "b", "b", "b"], dtype=np.object_) | |
| ) | |
| def test_parse_booleans(self): | |
| data = "True\nFalse\nTrue\nTrue" | |
| reader = TextReader(StringIO(data), header=None) | |
| result = reader.read() | |
| assert result[0].dtype == np.bool_ | |
| def test_delimit_whitespace(self): | |
| data = 'a b\na\t\t "b"\n"a"\t \t b' | |
| reader = TextReader(StringIO(data), delim_whitespace=True, header=None) | |
| result = reader.read() | |
| tm.assert_numpy_array_equal( | |
| result[0], np.array(["a", "a", "a"], dtype=np.object_) | |
| ) | |
| tm.assert_numpy_array_equal( | |
| result[1], np.array(["b", "b", "b"], dtype=np.object_) | |
| ) | |
| def test_embedded_newline(self): | |
| data = 'a\n"hello\nthere"\nthis' | |
| reader = TextReader(StringIO(data), header=None) | |
| result = reader.read() | |
| expected = np.array(["a", "hello\nthere", "this"], dtype=np.object_) | |
| tm.assert_numpy_array_equal(result[0], expected) | |
| def test_euro_decimal(self): | |
| data = "12345,67\n345,678" | |
| reader = TextReader(StringIO(data), delimiter=":", decimal=",", header=None) | |
| result = reader.read() | |
| expected = np.array([12345.67, 345.678]) | |
| tm.assert_almost_equal(result[0], expected) | |
| def test_integer_thousands(self): | |
| data = "123,456\n12,500" | |
| reader = TextReader(StringIO(data), delimiter=":", thousands=",", header=None) | |
| result = reader.read() | |
| expected = np.array([123456, 12500], dtype=np.int64) | |
| tm.assert_almost_equal(result[0], expected) | |
| def test_integer_thousands_alt(self): | |
| data = "123.456\n12.500" | |
| reader = TextFileReader( | |
| StringIO(data), delimiter=":", thousands=".", header=None | |
| ) | |
| result = reader.read() | |
| expected = DataFrame([123456, 12500]) | |
| tm.assert_frame_equal(result, expected) | |
| def test_skip_bad_lines(self): | |
| # too many lines, see #2430 for why | |
| data = "a:b:c\nd:e:f\ng:h:i\nj:k:l:m\nl:m:n\no:p:q:r" | |
| reader = TextReader(StringIO(data), delimiter=":", header=None) | |
| msg = r"Error tokenizing data\. C error: Expected 3 fields in line 4, saw 4" | |
| with pytest.raises(parser.ParserError, match=msg): | |
| reader.read() | |
| reader = TextReader( | |
| StringIO(data), delimiter=":", header=None, on_bad_lines=2 # Skip | |
| ) | |
| result = reader.read() | |
| expected = { | |
| 0: np.array(["a", "d", "g", "l"], dtype=object), | |
| 1: np.array(["b", "e", "h", "m"], dtype=object), | |
| 2: np.array(["c", "f", "i", "n"], dtype=object), | |
| } | |
| assert_array_dicts_equal(result, expected) | |
| with tm.assert_produces_warning(ParserWarning, match="Skipping line"): | |
| reader = TextReader( | |
| StringIO(data), delimiter=":", header=None, on_bad_lines=1 # Warn | |
| ) | |
| reader.read() | |
| def test_header_not_enough_lines(self): | |
| data = "skip this\nskip this\na,b,c\n1,2,3\n4,5,6" | |
| reader = TextReader(StringIO(data), delimiter=",", header=2) | |
| header = reader.header | |
| expected = [["a", "b", "c"]] | |
| assert header == expected | |
| recs = reader.read() | |
| expected = { | |
| 0: np.array([1, 4], dtype=np.int64), | |
| 1: np.array([2, 5], dtype=np.int64), | |
| 2: np.array([3, 6], dtype=np.int64), | |
| } | |
| assert_array_dicts_equal(recs, expected) | |
| def test_escapechar(self): | |
| data = '\\"hello world"\n\\"hello world"\n\\"hello world"' | |
| reader = TextReader(StringIO(data), delimiter=",", header=None, escapechar="\\") | |
| result = reader.read() | |
| expected = {0: np.array(['"hello world"'] * 3, dtype=object)} | |
| assert_array_dicts_equal(result, expected) | |
| def test_eof_has_eol(self): | |
| # handling of new line at EOF | |
| pass | |
| def test_na_substitution(self): | |
| pass | |
| def test_numpy_string_dtype(self): | |
| data = """\ | |
| a,1 | |
| aa,2 | |
| aaa,3 | |
| aaaa,4 | |
| aaaaa,5""" | |
| def _make_reader(**kwds): | |
| if "dtype" in kwds: | |
| kwds["dtype"] = ensure_dtype_objs(kwds["dtype"]) | |
| return TextReader(StringIO(data), delimiter=",", header=None, **kwds) | |
| reader = _make_reader(dtype="S5,i4") | |
| result = reader.read() | |
| assert result[0].dtype == "S5" | |
| ex_values = np.array(["a", "aa", "aaa", "aaaa", "aaaaa"], dtype="S5") | |
| assert (result[0] == ex_values).all() | |
| assert result[1].dtype == "i4" | |
| reader = _make_reader(dtype="S4") | |
| result = reader.read() | |
| assert result[0].dtype == "S4" | |
| ex_values = np.array(["a", "aa", "aaa", "aaaa", "aaaa"], dtype="S4") | |
| assert (result[0] == ex_values).all() | |
| assert result[1].dtype == "S4" | |
| def test_pass_dtype(self): | |
| data = """\ | |
| one,two | |
| 1,a | |
| 2,b | |
| 3,c | |
| 4,d""" | |
| def _make_reader(**kwds): | |
| if "dtype" in kwds: | |
| kwds["dtype"] = ensure_dtype_objs(kwds["dtype"]) | |
| return TextReader(StringIO(data), delimiter=",", **kwds) | |
| reader = _make_reader(dtype={"one": "u1", 1: "S1"}) | |
| result = reader.read() | |
| assert result[0].dtype == "u1" | |
| assert result[1].dtype == "S1" | |
| reader = _make_reader(dtype={"one": np.uint8, 1: object}) | |
| result = reader.read() | |
| assert result[0].dtype == "u1" | |
| assert result[1].dtype == "O" | |
| reader = _make_reader(dtype={"one": np.dtype("u1"), 1: np.dtype("O")}) | |
| result = reader.read() | |
| assert result[0].dtype == "u1" | |
| assert result[1].dtype == "O" | |
| def test_usecols(self): | |
| data = """\ | |
| a,b,c | |
| 1,2,3 | |
| 4,5,6 | |
| 7,8,9 | |
| 10,11,12""" | |
| def _make_reader(**kwds): | |
| return TextReader(StringIO(data), delimiter=",", **kwds) | |
| reader = _make_reader(usecols=(1, 2)) | |
| result = reader.read() | |
| exp = _make_reader().read() | |
| assert len(result) == 2 | |
| assert (result[1] == exp[1]).all() | |
| assert (result[2] == exp[2]).all() | |
| def test_cr_delimited(self, text, kwargs): | |
| nice_text = text.replace("\r", "\r\n") | |
| result = TextReader(StringIO(text), **kwargs).read() | |
| expected = TextReader(StringIO(nice_text), **kwargs).read() | |
| assert_array_dicts_equal(result, expected) | |
| def test_empty_field_eof(self): | |
| data = "a,b,c\n1,2,3\n4,," | |
| result = TextReader(StringIO(data), delimiter=",").read() | |
| expected = { | |
| 0: np.array([1, 4], dtype=np.int64), | |
| 1: np.array(["2", ""], dtype=object), | |
| 2: np.array(["3", ""], dtype=object), | |
| } | |
| assert_array_dicts_equal(result, expected) | |
| def test_empty_field_eof_mem_access_bug(self, repeat): | |
| # GH5664 | |
| a = DataFrame([["b"], [np.nan]], columns=["a"], index=["a", "c"]) | |
| b = DataFrame([[1, 1, 1, 0], [1, 1, 1, 0]], columns=list("abcd"), index=[1, 1]) | |
| c = DataFrame( | |
| [ | |
| [1, 2, 3, 4], | |
| [6, np.nan, np.nan, np.nan], | |
| [8, 9, 10, 11], | |
| [13, 14, np.nan, np.nan], | |
| ], | |
| columns=list("abcd"), | |
| index=[0, 5, 7, 12], | |
| ) | |
| df = read_csv(StringIO("a,b\nc\n"), skiprows=0, names=["a"], engine="c") | |
| tm.assert_frame_equal(df, a) | |
| df = read_csv( | |
| StringIO("1,1,1,1,0\n" * 2 + "\n" * 2), names=list("abcd"), engine="c" | |
| ) | |
| tm.assert_frame_equal(df, b) | |
| df = read_csv( | |
| StringIO("0,1,2,3,4\n5,6\n7,8,9,10,11\n12,13,14"), | |
| names=list("abcd"), | |
| engine="c", | |
| ) | |
| tm.assert_frame_equal(df, c) | |
| def test_empty_csv_input(self): | |
| # GH14867 | |
| with read_csv( | |
| StringIO(), chunksize=20, header=None, names=["a", "b", "c"] | |
| ) as df: | |
| assert isinstance(df, TextFileReader) | |
| def assert_array_dicts_equal(left, right): | |
| for k, v in left.items(): | |
| tm.assert_numpy_array_equal(np.asarray(v), np.asarray(right[k])) | |