| import numpy as np |
| import pytest |
|
|
| from pandas import ( |
| DataFrame, |
| Index, |
| MultiIndex, |
| Series, |
| Timestamp, |
| concat, |
| date_range, |
| isna, |
| notna, |
| ) |
| import pandas._testing as tm |
|
|
| from pandas.tseries import offsets |
|
|
| |
| |
| pytestmark = pytest.mark.filterwarnings( |
| "ignore:.*(empty slice|0 for slice).*:RuntimeWarning" |
| ) |
|
|
|
|
| def f(x): |
| return x[np.isfinite(x)].mean() |
|
|
|
|
| @pytest.mark.parametrize("bad_raw", [None, 1, 0]) |
| def test_rolling_apply_invalid_raw(bad_raw): |
| with pytest.raises(ValueError, match="raw parameter must be `True` or `False`"): |
| Series(range(3)).rolling(1).apply(len, raw=bad_raw) |
|
|
|
|
| def test_rolling_apply_out_of_bounds(engine_and_raw): |
| |
| engine, raw = engine_and_raw |
|
|
| vals = Series([1, 2, 3, 4]) |
|
|
| result = vals.rolling(10).apply(np.sum, engine=engine, raw=raw) |
| assert result.isna().all() |
|
|
| result = vals.rolling(10, min_periods=1).apply(np.sum, engine=engine, raw=raw) |
| expected = Series([1, 3, 6, 10], dtype=float) |
| tm.assert_almost_equal(result, expected) |
|
|
|
|
| @pytest.mark.parametrize("window", [2, "2s"]) |
| def test_rolling_apply_with_pandas_objects(window): |
| |
| df = DataFrame( |
| { |
| "A": np.random.default_rng(2).standard_normal(5), |
| "B": np.random.default_rng(2).integers(0, 10, size=5), |
| }, |
| index=date_range("20130101", periods=5, freq="s"), |
| ) |
|
|
| |
| |
| def f(x): |
| if x.index[0] == df.index[0]: |
| return np.nan |
| return x.iloc[-1] |
|
|
| result = df.rolling(window).apply(f, raw=False) |
| expected = df.iloc[2:].reindex_like(df) |
| tm.assert_frame_equal(result, expected) |
|
|
| with tm.external_error_raised(AttributeError): |
| df.rolling(window).apply(f, raw=True) |
|
|
|
|
| def test_rolling_apply(engine_and_raw, step): |
| engine, raw = engine_and_raw |
|
|
| expected = Series([], dtype="float64") |
| result = expected.rolling(10, step=step).apply( |
| lambda x: x.mean(), engine=engine, raw=raw |
| ) |
| tm.assert_series_equal(result, expected) |
|
|
| |
| s = Series([None, None, None]) |
| result = s.rolling(2, min_periods=0, step=step).apply( |
| lambda x: len(x), engine=engine, raw=raw |
| ) |
| expected = Series([1.0, 2.0, 2.0])[::step] |
| tm.assert_series_equal(result, expected) |
|
|
| result = s.rolling(2, min_periods=0, step=step).apply(len, engine=engine, raw=raw) |
| tm.assert_series_equal(result, expected) |
|
|
|
|
| def test_all_apply(engine_and_raw): |
| engine, raw = engine_and_raw |
|
|
| df = ( |
| DataFrame( |
| {"A": date_range("20130101", periods=5, freq="s"), "B": range(5)} |
| ).set_index("A") |
| * 2 |
| ) |
| er = df.rolling(window=1) |
| r = df.rolling(window="1s") |
|
|
| result = r.apply(lambda x: 1, engine=engine, raw=raw) |
| expected = er.apply(lambda x: 1, engine=engine, raw=raw) |
| tm.assert_frame_equal(result, expected) |
|
|
|
|
| def test_ragged_apply(engine_and_raw): |
| engine, raw = engine_and_raw |
|
|
| df = DataFrame({"B": range(5)}) |
| df.index = [ |
| Timestamp("20130101 09:00:00"), |
| Timestamp("20130101 09:00:02"), |
| Timestamp("20130101 09:00:03"), |
| Timestamp("20130101 09:00:05"), |
| Timestamp("20130101 09:00:06"), |
| ] |
|
|
| f = lambda x: 1 |
| result = df.rolling(window="1s", min_periods=1).apply(f, engine=engine, raw=raw) |
| expected = df.copy() |
| expected["B"] = 1.0 |
| tm.assert_frame_equal(result, expected) |
|
|
| result = df.rolling(window="2s", min_periods=1).apply(f, engine=engine, raw=raw) |
| expected = df.copy() |
| expected["B"] = 1.0 |
| tm.assert_frame_equal(result, expected) |
|
|
| result = df.rolling(window="5s", min_periods=1).apply(f, engine=engine, raw=raw) |
| expected = df.copy() |
| expected["B"] = 1.0 |
| tm.assert_frame_equal(result, expected) |
|
|
|
|
| def test_invalid_engine(): |
| with pytest.raises(ValueError, match="engine must be either 'numba' or 'cython'"): |
| Series(range(1)).rolling(1).apply(lambda x: x, engine="foo") |
|
|
|
|
| def test_invalid_engine_kwargs_cython(): |
| with pytest.raises(ValueError, match="cython engine does not accept engine_kwargs"): |
| Series(range(1)).rolling(1).apply( |
| lambda x: x, engine="cython", engine_kwargs={"nopython": False} |
| ) |
|
|
|
|
| def test_invalid_raw_numba(): |
| with pytest.raises( |
| ValueError, match="raw must be `True` when using the numba engine" |
| ): |
| Series(range(1)).rolling(1).apply(lambda x: x, raw=False, engine="numba") |
|
|
|
|
| @pytest.mark.parametrize("args_kwargs", [[None, {"par": 10}], [(10,), None]]) |
| def test_rolling_apply_args_kwargs(args_kwargs): |
| |
| def numpysum(x, par): |
| return np.sum(x + par) |
|
|
| df = DataFrame({"gr": [1, 1], "a": [1, 2]}) |
|
|
| idx = Index(["gr", "a"]) |
| expected = DataFrame([[11.0, 11.0], [11.0, 12.0]], columns=idx) |
|
|
| result = df.rolling(1).apply(numpysum, args=args_kwargs[0], kwargs=args_kwargs[1]) |
| tm.assert_frame_equal(result, expected) |
|
|
| midx = MultiIndex.from_tuples([(1, 0), (1, 1)], names=["gr", None]) |
| expected = Series([11.0, 12.0], index=midx, name="a") |
|
|
| gb_rolling = df.groupby("gr")["a"].rolling(1) |
|
|
| result = gb_rolling.apply(numpysum, args=args_kwargs[0], kwargs=args_kwargs[1]) |
| tm.assert_series_equal(result, expected) |
|
|
|
|
| def test_nans(raw): |
| obj = Series(np.random.default_rng(2).standard_normal(50)) |
| obj[:10] = np.nan |
| obj[-10:] = np.nan |
|
|
| result = obj.rolling(50, min_periods=30).apply(f, raw=raw) |
| tm.assert_almost_equal(result.iloc[-1], np.mean(obj[10:-10])) |
|
|
| |
| result = obj.rolling(20, min_periods=15).apply(f, raw=raw) |
| assert isna(result.iloc[23]) |
| assert not isna(result.iloc[24]) |
|
|
| assert not isna(result.iloc[-6]) |
| assert isna(result.iloc[-5]) |
|
|
| obj2 = Series(np.random.default_rng(2).standard_normal(20)) |
| result = obj2.rolling(10, min_periods=5).apply(f, raw=raw) |
| assert isna(result.iloc[3]) |
| assert notna(result.iloc[4]) |
|
|
| result0 = obj.rolling(20, min_periods=0).apply(f, raw=raw) |
| result1 = obj.rolling(20, min_periods=1).apply(f, raw=raw) |
| tm.assert_almost_equal(result0, result1) |
|
|
|
|
| def test_center(raw): |
| obj = Series(np.random.default_rng(2).standard_normal(50)) |
| obj[:10] = np.nan |
| obj[-10:] = np.nan |
|
|
| result = obj.rolling(20, min_periods=15, center=True).apply(f, raw=raw) |
| expected = ( |
| concat([obj, Series([np.nan] * 9)]) |
| .rolling(20, min_periods=15) |
| .apply(f, raw=raw) |
| .iloc[9:] |
| .reset_index(drop=True) |
| ) |
| tm.assert_series_equal(result, expected) |
|
|
|
|
| def test_series(raw, series): |
| result = series.rolling(50).apply(f, raw=raw) |
| assert isinstance(result, Series) |
| tm.assert_almost_equal(result.iloc[-1], np.mean(series[-50:])) |
|
|
|
|
| def test_frame(raw, frame): |
| result = frame.rolling(50).apply(f, raw=raw) |
| assert isinstance(result, DataFrame) |
| tm.assert_series_equal( |
| result.iloc[-1, :], |
| frame.iloc[-50:, :].apply(np.mean, axis=0, raw=raw), |
| check_names=False, |
| ) |
|
|
|
|
| def test_time_rule_series(raw, series): |
| win = 25 |
| minp = 10 |
| ser = series[::2].resample("B").mean() |
| series_result = ser.rolling(window=win, min_periods=minp).apply(f, raw=raw) |
| last_date = series_result.index[-1] |
| prev_date = last_date - 24 * offsets.BDay() |
|
|
| trunc_series = series[::2].truncate(prev_date, last_date) |
| tm.assert_almost_equal(series_result.iloc[-1], np.mean(trunc_series)) |
|
|
|
|
| def test_time_rule_frame(raw, frame): |
| win = 25 |
| minp = 10 |
| frm = frame[::2].resample("B").mean() |
| frame_result = frm.rolling(window=win, min_periods=minp).apply(f, raw=raw) |
| last_date = frame_result.index[-1] |
| prev_date = last_date - 24 * offsets.BDay() |
|
|
| trunc_frame = frame[::2].truncate(prev_date, last_date) |
| tm.assert_series_equal( |
| frame_result.xs(last_date), |
| trunc_frame.apply(np.mean, raw=raw), |
| check_names=False, |
| ) |
|
|
|
|
| @pytest.mark.parametrize("minp", [0, 99, 100]) |
| def test_min_periods(raw, series, minp, step): |
| result = series.rolling(len(series) + 1, min_periods=minp, step=step).apply( |
| f, raw=raw |
| ) |
| expected = series.rolling(len(series), min_periods=minp, step=step).apply( |
| f, raw=raw |
| ) |
| nan_mask = isna(result) |
| tm.assert_series_equal(nan_mask, isna(expected)) |
|
|
| nan_mask = ~nan_mask |
| tm.assert_almost_equal(result[nan_mask], expected[nan_mask]) |
|
|
|
|
| def test_center_reindex_series(raw, series): |
| |
| s = [f"x{x:d}" for x in range(12)] |
| minp = 10 |
|
|
| series_xp = ( |
| series.reindex(list(series.index) + s) |
| .rolling(window=25, min_periods=minp) |
| .apply(f, raw=raw) |
| .shift(-12) |
| .reindex(series.index) |
| ) |
| series_rs = series.rolling(window=25, min_periods=minp, center=True).apply( |
| f, raw=raw |
| ) |
| tm.assert_series_equal(series_xp, series_rs) |
|
|
|
|
| def test_center_reindex_frame(raw): |
| |
| frame = DataFrame(range(100), index=date_range("2020-01-01", freq="D", periods=100)) |
| s = [f"x{x:d}" for x in range(12)] |
| minp = 10 |
|
|
| frame_xp = ( |
| frame.reindex(list(frame.index) + s) |
| .rolling(window=25, min_periods=minp) |
| .apply(f, raw=raw) |
| .shift(-12) |
| .reindex(frame.index) |
| ) |
| frame_rs = frame.rolling(window=25, min_periods=minp, center=True).apply(f, raw=raw) |
| tm.assert_frame_equal(frame_xp, frame_rs) |
|
|
|
|
| def test_axis1(raw): |
| |
| df = DataFrame([1, 2]) |
| msg = "Support for axis=1 in DataFrame.rolling is deprecated" |
| with tm.assert_produces_warning(FutureWarning, match=msg): |
| result = df.rolling(window=1, axis=1).apply(np.sum, raw=raw) |
| expected = DataFrame([1.0, 2.0]) |
| tm.assert_frame_equal(result, expected) |
|
|