| from __future__ import annotations |
|
|
| from typing import TYPE_CHECKING |
|
|
| import numpy as np |
|
|
| from pandas._libs import lib |
| from pandas._libs.algos import unique_deltas |
| from pandas._libs.tslibs import ( |
| Timestamp, |
| get_unit_from_dtype, |
| periods_per_day, |
| tz_convert_from_utc, |
| ) |
| from pandas._libs.tslibs.ccalendar import ( |
| DAYS, |
| MONTH_ALIASES, |
| MONTH_NUMBERS, |
| MONTHS, |
| int_to_weekday, |
| ) |
| from pandas._libs.tslibs.dtypes import ( |
| OFFSET_TO_PERIOD_FREQSTR, |
| freq_to_period_freqstr, |
| ) |
| from pandas._libs.tslibs.fields import ( |
| build_field_sarray, |
| month_position_check, |
| ) |
| from pandas._libs.tslibs.offsets import ( |
| DateOffset, |
| Day, |
| to_offset, |
| ) |
| from pandas._libs.tslibs.parsing import get_rule_month |
| from pandas.util._decorators import cache_readonly |
|
|
| from pandas.core.dtypes.common import is_numeric_dtype |
| from pandas.core.dtypes.dtypes import ( |
| DatetimeTZDtype, |
| PeriodDtype, |
| ) |
| from pandas.core.dtypes.generic import ( |
| ABCIndex, |
| ABCSeries, |
| ) |
|
|
| from pandas.core.algorithms import unique |
|
|
| if TYPE_CHECKING: |
| from pandas._typing import npt |
|
|
| from pandas import ( |
| DatetimeIndex, |
| Series, |
| TimedeltaIndex, |
| ) |
| from pandas.core.arrays.datetimelike import DatetimeLikeArrayMixin |
| |
| |
|
|
| _need_suffix = ["QS", "BQE", "BQS", "YS", "BYE", "BYS"] |
|
|
| for _prefix in _need_suffix: |
| for _m in MONTHS: |
| key = f"{_prefix}-{_m}" |
| OFFSET_TO_PERIOD_FREQSTR[key] = OFFSET_TO_PERIOD_FREQSTR[_prefix] |
|
|
| for _prefix in ["Y", "Q"]: |
| for _m in MONTHS: |
| _alias = f"{_prefix}-{_m}" |
| OFFSET_TO_PERIOD_FREQSTR[_alias] = _alias |
|
|
| for _d in DAYS: |
| OFFSET_TO_PERIOD_FREQSTR[f"W-{_d}"] = f"W-{_d}" |
|
|
|
|
| def get_period_alias(offset_str: str) -> str | None: |
| """ |
| Alias to closest period strings BQ->Q etc. |
| """ |
| return OFFSET_TO_PERIOD_FREQSTR.get(offset_str, None) |
|
|
|
|
| |
| |
|
|
|
|
| def infer_freq( |
| index: DatetimeIndex | TimedeltaIndex | Series | DatetimeLikeArrayMixin, |
| ) -> str | None: |
| """ |
| Infer the most likely frequency given the input index. |
| |
| Parameters |
| ---------- |
| index : DatetimeIndex, TimedeltaIndex, Series or array-like |
| If passed a Series will use the values of the series (NOT THE INDEX). |
| |
| Returns |
| ------- |
| str or None |
| None if no discernible frequency. |
| |
| Raises |
| ------ |
| TypeError |
| If the index is not datetime-like. |
| ValueError |
| If there are fewer than three values. |
| |
| Examples |
| -------- |
| >>> idx = pd.date_range(start='2020/12/01', end='2020/12/30', periods=30) |
| >>> pd.infer_freq(idx) |
| 'D' |
| """ |
| from pandas.core.api import DatetimeIndex |
|
|
| if isinstance(index, ABCSeries): |
| values = index._values |
| if not ( |
| lib.is_np_dtype(values.dtype, "mM") |
| or isinstance(values.dtype, DatetimeTZDtype) |
| or values.dtype == object |
| ): |
| raise TypeError( |
| "cannot infer freq from a non-convertible dtype " |
| f"on a Series of {index.dtype}" |
| ) |
| index = values |
|
|
| inferer: _FrequencyInferer |
|
|
| if not hasattr(index, "dtype"): |
| pass |
| elif isinstance(index.dtype, PeriodDtype): |
| raise TypeError( |
| "PeriodIndex given. Check the `freq` attribute " |
| "instead of using infer_freq." |
| ) |
| elif lib.is_np_dtype(index.dtype, "m"): |
| |
| inferer = _TimedeltaFrequencyInferer(index) |
| return inferer.get_freq() |
|
|
| elif is_numeric_dtype(index.dtype): |
| raise TypeError( |
| f"cannot infer freq from a non-convertible index of dtype {index.dtype}" |
| ) |
|
|
| if not isinstance(index, DatetimeIndex): |
| index = DatetimeIndex(index) |
|
|
| inferer = _FrequencyInferer(index) |
| return inferer.get_freq() |
|
|
|
|
| class _FrequencyInferer: |
| """ |
| Not sure if I can avoid the state machine here |
| """ |
|
|
| def __init__(self, index) -> None: |
| self.index = index |
| self.i8values = index.asi8 |
|
|
| |
| |
| if isinstance(index, ABCIndex): |
| |
| |
| self._creso = get_unit_from_dtype( |
| index._data._ndarray.dtype |
| ) |
| else: |
| |
| self._creso = get_unit_from_dtype(index._ndarray.dtype) |
|
|
| |
| |
| if hasattr(index, "tz"): |
| if index.tz is not None: |
| self.i8values = tz_convert_from_utc( |
| self.i8values, index.tz, reso=self._creso |
| ) |
|
|
| if len(index) < 3: |
| raise ValueError("Need at least 3 dates to infer frequency") |
|
|
| self.is_monotonic = ( |
| self.index._is_monotonic_increasing or self.index._is_monotonic_decreasing |
| ) |
|
|
| @cache_readonly |
| def deltas(self) -> npt.NDArray[np.int64]: |
| return unique_deltas(self.i8values) |
|
|
| @cache_readonly |
| def deltas_asi8(self) -> npt.NDArray[np.int64]: |
| |
| |
| return unique_deltas(self.index.asi8) |
|
|
| @cache_readonly |
| def is_unique(self) -> bool: |
| return len(self.deltas) == 1 |
|
|
| @cache_readonly |
| def is_unique_asi8(self) -> bool: |
| return len(self.deltas_asi8) == 1 |
|
|
| def get_freq(self) -> str | None: |
| """ |
| Find the appropriate frequency string to describe the inferred |
| frequency of self.i8values |
| |
| Returns |
| ------- |
| str or None |
| """ |
| if not self.is_monotonic or not self.index._is_unique: |
| return None |
|
|
| delta = self.deltas[0] |
| ppd = periods_per_day(self._creso) |
| if delta and _is_multiple(delta, ppd): |
| return self._infer_daily_rule() |
|
|
| |
| if self.hour_deltas in ([1, 17], [1, 65], [1, 17, 65]): |
| return "bh" |
|
|
| |
| |
| |
| if not self.is_unique_asi8: |
| return None |
|
|
| delta = self.deltas_asi8[0] |
| pph = ppd // 24 |
| ppm = pph // 60 |
| pps = ppm // 60 |
| if _is_multiple(delta, pph): |
| |
| return _maybe_add_count("h", delta / pph) |
| elif _is_multiple(delta, ppm): |
| |
| return _maybe_add_count("min", delta / ppm) |
| elif _is_multiple(delta, pps): |
| |
| return _maybe_add_count("s", delta / pps) |
| elif _is_multiple(delta, (pps // 1000)): |
| |
| return _maybe_add_count("ms", delta / (pps // 1000)) |
| elif _is_multiple(delta, (pps // 1_000_000)): |
| |
| return _maybe_add_count("us", delta / (pps // 1_000_000)) |
| else: |
| |
| return _maybe_add_count("ns", delta) |
|
|
| @cache_readonly |
| def day_deltas(self) -> list[int]: |
| ppd = periods_per_day(self._creso) |
| return [x / ppd for x in self.deltas] |
|
|
| @cache_readonly |
| def hour_deltas(self) -> list[int]: |
| pph = periods_per_day(self._creso) // 24 |
| return [x / pph for x in self.deltas] |
|
|
| @cache_readonly |
| def fields(self) -> np.ndarray: |
| return build_field_sarray(self.i8values, reso=self._creso) |
|
|
| @cache_readonly |
| def rep_stamp(self) -> Timestamp: |
| return Timestamp(self.i8values[0], unit=self.index.unit) |
|
|
| def month_position_check(self) -> str | None: |
| return month_position_check(self.fields, self.index.dayofweek) |
|
|
| @cache_readonly |
| def mdiffs(self) -> npt.NDArray[np.int64]: |
| nmonths = self.fields["Y"] * 12 + self.fields["M"] |
| return unique_deltas(nmonths.astype("i8")) |
|
|
| @cache_readonly |
| def ydiffs(self) -> npt.NDArray[np.int64]: |
| return unique_deltas(self.fields["Y"].astype("i8")) |
|
|
| def _infer_daily_rule(self) -> str | None: |
| annual_rule = self._get_annual_rule() |
| if annual_rule: |
| nyears = self.ydiffs[0] |
| month = MONTH_ALIASES[self.rep_stamp.month] |
| alias = f"{annual_rule}-{month}" |
| return _maybe_add_count(alias, nyears) |
|
|
| quarterly_rule = self._get_quarterly_rule() |
| if quarterly_rule: |
| nquarters = self.mdiffs[0] / 3 |
| mod_dict = {0: 12, 2: 11, 1: 10} |
| month = MONTH_ALIASES[mod_dict[self.rep_stamp.month % 3]] |
| alias = f"{quarterly_rule}-{month}" |
| return _maybe_add_count(alias, nquarters) |
|
|
| monthly_rule = self._get_monthly_rule() |
| if monthly_rule: |
| return _maybe_add_count(monthly_rule, self.mdiffs[0]) |
|
|
| if self.is_unique: |
| return self._get_daily_rule() |
|
|
| if self._is_business_daily(): |
| return "B" |
|
|
| wom_rule = self._get_wom_rule() |
| if wom_rule: |
| return wom_rule |
|
|
| return None |
|
|
| def _get_daily_rule(self) -> str | None: |
| ppd = periods_per_day(self._creso) |
| days = self.deltas[0] / ppd |
| if days % 7 == 0: |
| |
| wd = int_to_weekday[self.rep_stamp.weekday()] |
| alias = f"W-{wd}" |
| return _maybe_add_count(alias, days / 7) |
| else: |
| return _maybe_add_count("D", days) |
|
|
| def _get_annual_rule(self) -> str | None: |
| if len(self.ydiffs) > 1: |
| return None |
|
|
| if len(unique(self.fields["M"])) > 1: |
| return None |
|
|
| pos_check = self.month_position_check() |
|
|
| if pos_check is None: |
| return None |
| else: |
| return {"cs": "YS", "bs": "BYS", "ce": "YE", "be": "BYE"}.get(pos_check) |
|
|
| def _get_quarterly_rule(self) -> str | None: |
| if len(self.mdiffs) > 1: |
| return None |
|
|
| if not self.mdiffs[0] % 3 == 0: |
| return None |
|
|
| pos_check = self.month_position_check() |
|
|
| if pos_check is None: |
| return None |
| else: |
| return {"cs": "QS", "bs": "BQS", "ce": "QE", "be": "BQE"}.get(pos_check) |
|
|
| def _get_monthly_rule(self) -> str | None: |
| if len(self.mdiffs) > 1: |
| return None |
| pos_check = self.month_position_check() |
|
|
| if pos_check is None: |
| return None |
| else: |
| return {"cs": "MS", "bs": "BMS", "ce": "ME", "be": "BME"}.get(pos_check) |
|
|
| def _is_business_daily(self) -> bool: |
| |
| if self.day_deltas != [1, 3]: |
| return False |
|
|
| |
| first_weekday = self.index[0].weekday() |
| shifts = np.diff(self.i8values) |
| ppd = periods_per_day(self._creso) |
| shifts = np.floor_divide(shifts, ppd) |
| weekdays = np.mod(first_weekday + np.cumsum(shifts), 7) |
|
|
| return bool( |
| np.all( |
| ((weekdays == 0) & (shifts == 3)) |
| | ((weekdays > 0) & (weekdays <= 4) & (shifts == 1)) |
| ) |
| ) |
|
|
| def _get_wom_rule(self) -> str | None: |
| weekdays = unique(self.index.weekday) |
| if len(weekdays) > 1: |
| return None |
|
|
| week_of_months = unique((self.index.day - 1) // 7) |
| |
| week_of_months = week_of_months[week_of_months < 4] |
| if len(week_of_months) == 0 or len(week_of_months) > 1: |
| return None |
|
|
| |
| week = week_of_months[0] + 1 |
| wd = int_to_weekday[weekdays[0]] |
|
|
| return f"WOM-{week}{wd}" |
|
|
|
|
| class _TimedeltaFrequencyInferer(_FrequencyInferer): |
| def _infer_daily_rule(self): |
| if self.is_unique: |
| return self._get_daily_rule() |
|
|
|
|
| def _is_multiple(us, mult: int) -> bool: |
| return us % mult == 0 |
|
|
|
|
| def _maybe_add_count(base: str, count: float) -> str: |
| if count != 1: |
| assert count == int(count) |
| count = int(count) |
| return f"{count}{base}" |
| else: |
| return base |
|
|
|
|
| |
| |
|
|
|
|
| def is_subperiod(source, target) -> bool: |
| """ |
| Returns True if downsampling is possible between source and target |
| frequencies |
| |
| Parameters |
| ---------- |
| source : str or DateOffset |
| Frequency converting from |
| target : str or DateOffset |
| Frequency converting to |
| |
| Returns |
| ------- |
| bool |
| """ |
| if target is None or source is None: |
| return False |
| source = _maybe_coerce_freq(source) |
| target = _maybe_coerce_freq(target) |
|
|
| if _is_annual(target): |
| if _is_quarterly(source): |
| return _quarter_months_conform( |
| get_rule_month(source), get_rule_month(target) |
| ) |
| return source in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} |
| elif _is_quarterly(target): |
| return source in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} |
| elif _is_monthly(target): |
| return source in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
| elif _is_weekly(target): |
| return source in {target, "D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
| elif target == "B": |
| return source in {"B", "h", "min", "s", "ms", "us", "ns"} |
| elif target == "C": |
| return source in {"C", "h", "min", "s", "ms", "us", "ns"} |
| elif target == "D": |
| return source in {"D", "h", "min", "s", "ms", "us", "ns"} |
| elif target == "h": |
| return source in {"h", "min", "s", "ms", "us", "ns"} |
| elif target == "min": |
| return source in {"min", "s", "ms", "us", "ns"} |
| elif target == "s": |
| return source in {"s", "ms", "us", "ns"} |
| elif target == "ms": |
| return source in {"ms", "us", "ns"} |
| elif target == "us": |
| return source in {"us", "ns"} |
| elif target == "ns": |
| return source in {"ns"} |
| else: |
| return False |
|
|
|
|
| def is_superperiod(source, target) -> bool: |
| """ |
| Returns True if upsampling is possible between source and target |
| frequencies |
| |
| Parameters |
| ---------- |
| source : str or DateOffset |
| Frequency converting from |
| target : str or DateOffset |
| Frequency converting to |
| |
| Returns |
| ------- |
| bool |
| """ |
| if target is None or source is None: |
| return False |
| source = _maybe_coerce_freq(source) |
| target = _maybe_coerce_freq(target) |
|
|
| if _is_annual(source): |
| if _is_annual(target): |
| return get_rule_month(source) == get_rule_month(target) |
|
|
| if _is_quarterly(target): |
| smonth = get_rule_month(source) |
| tmonth = get_rule_month(target) |
| return _quarter_months_conform(smonth, tmonth) |
| return target in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} |
| elif _is_quarterly(source): |
| return target in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} |
| elif _is_monthly(source): |
| return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
| elif _is_weekly(source): |
| return target in {source, "D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
| elif source == "B": |
| return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
| elif source == "C": |
| return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
| elif source == "D": |
| return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
| elif source == "h": |
| return target in {"h", "min", "s", "ms", "us", "ns"} |
| elif source == "min": |
| return target in {"min", "s", "ms", "us", "ns"} |
| elif source == "s": |
| return target in {"s", "ms", "us", "ns"} |
| elif source == "ms": |
| return target in {"ms", "us", "ns"} |
| elif source == "us": |
| return target in {"us", "ns"} |
| elif source == "ns": |
| return target in {"ns"} |
| else: |
| return False |
|
|
|
|
| def _maybe_coerce_freq(code) -> str: |
| """we might need to coerce a code to a rule_code |
| and uppercase it |
| |
| Parameters |
| ---------- |
| source : str or DateOffset |
| Frequency converting from |
| |
| Returns |
| ------- |
| str |
| """ |
| assert code is not None |
| if isinstance(code, DateOffset): |
| code = freq_to_period_freqstr(1, code.name) |
| if code in {"h", "min", "s", "ms", "us", "ns"}: |
| return code |
| else: |
| return code.upper() |
|
|
|
|
| def _quarter_months_conform(source: str, target: str) -> bool: |
| snum = MONTH_NUMBERS[source] |
| tnum = MONTH_NUMBERS[target] |
| return snum % 3 == tnum % 3 |
|
|
|
|
| def _is_annual(rule: str) -> bool: |
| rule = rule.upper() |
| return rule == "Y" or rule.startswith("Y-") |
|
|
|
|
| def _is_quarterly(rule: str) -> bool: |
| rule = rule.upper() |
| return rule == "Q" or rule.startswith(("Q-", "BQ")) |
|
|
|
|
| def _is_monthly(rule: str) -> bool: |
| rule = rule.upper() |
| return rule in ("M", "BM") |
|
|
|
|
| def _is_weekly(rule: str) -> bool: |
| rule = rule.upper() |
| return rule == "W" or rule.startswith("W-") |
|
|
|
|
| __all__ = [ |
| "Day", |
| "get_period_alias", |
| "infer_freq", |
| "is_subperiod", |
| "is_superperiod", |
| "to_offset", |
| ] |
|
|