| from __future__ import annotations |
|
|
| from datetime import datetime, date, timedelta |
|
|
| from dateutil.relativedelta import * |
| from pandas import DataFrame, Timestamp, to_datetime |
|
|
| from .utils import get_agency_metadata_values |
|
|
|
|
| def _get_first_week_start(dates: list[date], week_start: int | str | "weekday" = MO): |
| """Get the start date of the first week from a list of dates. |
| Pass "week_start" to select a different start date for each week (defaults to Monday). |
| """ |
| if week_start in (MO, TU, WE, TH, FR, SA, SU): |
| pass |
| elif isinstance(week_start, str): |
| weekdays = { |
| "monday": MO, |
| "tuesday": TU, |
| "wednesday": WE, |
| "thursday": TH, |
| "friday": FR, |
| "saturday": SA, |
| "sunday": SU, |
| } |
| week_start = weekdays.get(week_start.lower(), MO) |
| elif isinstance(week_start, int): |
| weekdays = { |
| 0: MO, |
| 1: TU, |
| 2: WE, |
| 3: TH, |
| 4: FR, |
| 5: SA, |
| 6: SU, |
| } |
| week_start = weekdays.get(week_start, MO) |
| else: |
| raise TypeError("Parameter 'week_start' must be type `str`, `int`, or a dateutil weekday instance.") |
| |
| first_day = next(d for d in dates) |
| return first_day + relativedelta(weekday=week_start(-1)) |
|
|
|
|
| def _get_week_start_dates(first_week_start: date | Timestamp, end_date: date | None = None): |
| """Get the index and start date for each week. |
| |
| Args: |
| first_week_start (date | Timestamp): Start date of the first week in the data. |
| end_date (date | None, optional): End date for data. If None is passed (the default), the end date is `date.today()`. |
| |
| Returns: |
| list[tuple]: List of tuples containing the week number and the start date. |
| """ |
| if end_date is None: |
| end_date = date.today() |
| try: |
| week_start_dates = [first_week_start.date()] |
| except AttributeError as err: |
| week_start_dates = [first_week_start] |
| while week_start_dates[-1] < end_date: |
| next_start_date = week_start_dates[-1] + relativedelta(weeks=1) |
| week_start_dates.append(next_start_date) |
| week_start_dates = [day for day in week_start_dates if day <= end_date] |
| week_start_dates = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in week_start_dates] |
| return [(idx, w) for idx, w in enumerate(week_start_dates)] |
|
|
|
|
| def _get_weeks(dates: list[date], end_date: date | None = None, **kwargs) -> list[tuple]: |
| """Takes a list, array, or other iterable of `datetime.date` values and returns a list of tuples containing (week_number, week_start_date) pairs. |
| Pass keyword arg "week_start" - ranging from 0 (Monday) to 6 (Sunday) - to choose a different start date than Monday for the week. |
| """ |
| |
| first_week_start = _get_first_week_start(dates, **kwargs) |
| |
| |
| weeks = _get_week_start_dates(first_week_start, end_date=end_date) |
|
|
| |
| results = [] |
| for d in dates: |
| if isinstance(d, Timestamp): |
| d = d.date() |
| week_gen = ((idx, start_date) for idx, start_date in weeks if (start_date <= d < (start_date + timedelta(weeks=1)))) |
| results.append(next(week_gen, (0, first_week_start))) |
| return results |
|
|
|
|
| def add_week_info_to_data(df: DataFrame, date_column: str = "publication_date", new_columns: tuple[str] = ("week_number", "week_of")): |
| """Add week number and week start date to input data. |
| |
| Args: |
| df (DataFrame): Input data. |
| date_column (str, optional): Name of column containing publication dates. Defaults to "publication_date". |
| new_columns (tuple[str], optional): New column names. Defaults to ("week_number", "week_start"). |
| |
| Returns: |
| DataFrame: Data containing week information. |
| """ |
| df_c = df.copy() |
| data = df_c[date_column].to_list() |
| if len(data) > 0: |
| week_numbers, week_starts = list(zip(*_get_weeks(data))) |
| df_c.loc[:, new_columns[0]] = week_numbers |
| df_c.loc[:, new_columns[1]] = to_datetime(week_starts) |
| return df_c |
|
|
|
|
| def _pad_missing_weeks(timeframe_list: list[date], **kwargs): |
| """Pad dataframe with weeks missing from retrieved data (i.e., weeks without qualifying rule data). |
| """ |
| |
| first_week_start = _get_first_week_start(timeframe_list) |
| |
| |
| return _get_week_start_dates(first_week_start, **kwargs) |
|
|
|
|
| def _pad_missing_days(timeframe_list: list[date], end_date: date | None = None): |
| """Pad dataframe with days missing from retrieved data (i.e., days without qualifying rule data). |
| """ |
| start_date = min(timeframe_list) |
| if end_date is None: |
| end_date = date.today() |
| |
| |
| |
| return [ |
| start_date + relativedelta(days=n) |
| for n in range((end_date - start_date).days + 1) |
| if (start_date + relativedelta(days=n)).weekday() in range(0, 5) |
| ] |
|
|
|
|
| def pad_missing_dates(df: DataFrame, pad_column: str, how: str, fill_padded_values: dict | None = None, **kwargs): |
| """Add missing dates (either weeks or days) to the dataset. |
| |
| Args: |
| df (DataFrame): Input data. |
| pad_column (str): Date column to pad. |
| how (str): Whether to pad by "days" or "weeks". |
| fill_padded_values (dict | None, optional): Dictionary of columns and values to fill for padded observations (e.g., {"column": 0}). Defaults to None. |
| |
| Raises: |
| ValueError: Must pass 'days' or 'weeks' to parameter 'how'. |
| |
| Returns: |
| DataFrame: Padded data. |
| """ |
| df_copy = df.copy() |
| timeframe_list = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in df_copy[pad_column].to_list()] |
| df_copy = df_copy.astype({pad_column: "object"}) |
| df_copy.loc[:, pad_column] = timeframe_list |
|
|
| |
| if len(timeframe_list) > 0: |
| |
| |
| if how == "days": |
| week_numbers = None |
| padded_timeframes = _pad_missing_days(timeframe_list, **kwargs) |
| elif how == "weeks": |
| week_numbers, padded_timeframes = zip(*_pad_missing_weeks(timeframe_list, **kwargs)) |
| else: |
| raise ValueError("Must pass 'days' or 'weeks' to parameter 'how'.") |
|
|
| |
| df_merge = DataFrame({pad_column: padded_timeframes}) |
| pad_cols = [pad_column] |
| if week_numbers is not None: |
| df_merge.loc[:, "week_number"] = week_numbers |
| pad_cols.append("week_number") |
| df_copy = df_copy.merge(df_merge, on=pad_cols, how="outer", indicator=True) |
| if fill_padded_values is not None: |
| for col, val in fill_padded_values.items(): |
| bool_ = df_copy["_merge"] == "right_only" |
| df_copy.loc[bool_, col] = val |
|
|
| return df_copy.drop(columns=["_merge"], errors="ignore") |
|
|
|
|
| def groupby_agency( |
| df: DataFrame, |
| group_col: str = "parent_slug", |
| value_col: str = "document_number", |
| aggfunc: str = "count", |
| significant: bool = True, |
| metadata: dict | None = None, |
| metadata_value: str = "acronym", |
| ): |
| """Group data by agencies and aggregate the values. |
| |
| Args: |
| df (DataFrame): Input data. |
| group_col (str, optional): Column to group by. Defaults to "parent_slug". |
| value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number". |
| aggfunc (str, optional): Aggregation function. Defaults to "count". |
| significant (bool, optional): Whether to include significance data in values. Defaults to True. |
| metadata (dict | None, optional): Agency metadata. Defaults to None. |
| metadata_value (str, optional): Metadata value to add to output data. Defaults to "acronym". |
| |
| Returns: |
| DataFrame: Grouped and aggregated data. |
| """ |
| aggfunc_dict = {value_col: aggfunc, } |
| if significant: |
| aggfunc_dict.update({ |
| "3f1_significant": "sum", |
| "other_significant": "sum", |
| }) |
| aggfunc_dict.update({'CRA_Target':'sum'}) |
| df_ex = df.explode(group_col, ignore_index=True) |
| grouped = df_ex.groupby( |
| by=group_col |
| ).agg( |
| aggfunc_dict |
| ).reset_index() |
| grouped = grouped.sort_values(value_col, ascending=False).rename( |
| columns={ |
| group_col: "agency", |
| value_col: "rules", |
| "CRA_Target":"cra_targeted", |
| }, errors="ignore" |
| ) |
| if metadata is not None: |
| grouped.loc[:, metadata_value] = get_agency_metadata_values( |
| grouped, |
| agency_column="agency", |
| metadata=metadata, |
| metadata_value=metadata_value |
| ) |
| cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant","cra_targeted"] |
| grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]] |
| return grouped |
|
|
|
|
| def groupby_date( |
| df: DataFrame, |
| group_col: str | tuple | list = ("publication_year", "publication_month", ), |
| value_col: str = "document_number", |
| aggfunc: str = "count", |
| significant: bool = True |
| ): |
| """Group data by a given date frequency and aggregate the values. |
| |
| Args: |
| df (DataFrame): Input data. |
| group_col (str | tuple | list, optional): Columns to group by. Defaults to ("publication_year", "publication_month", ). |
| value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number". |
| aggfunc (str, optional): Aggregation function. Defaults to "count". |
| significant (bool, optional): Whether to include significance data in values. Defaults to True. |
| |
| Raises: |
| TypeError: Parameter 'group_col' must be type `str`, `list`, or `tuple`. |
| |
| Returns: |
| DataFrame: Grouped and aggregated data. |
| """ |
| if isinstance(group_col, str): |
| group_col = [group_col] |
| elif isinstance(group_col, (list, tuple)): |
| group_col = list(group_col) |
| else: |
| raise TypeError("Parameter 'group_col' must be type `str`, `list`, or `tuple`.") |
| |
| aggfunc_dict = {value_col: aggfunc, } |
| if significant: |
| aggfunc_dict.update({ |
| "3f1_significant": "sum", |
| "other_significant": "sum", |
| }) |
| aggfunc_dict.update({'CRA_Target':'sum'}) |
| grouped = df.groupby( |
| by=group_col |
| ).agg( |
| aggfunc_dict |
| ).reset_index() |
| grouped = grouped.rename(columns={ |
| value_col: "rules", |
| "CRA_Target":"cra_targeted", |
| }, errors="ignore") |
| return grouped |
|
|
|
|
| if __name__ == "__main__": |
| |
| from datetime import date, timedelta |
| from pandas import to_datetime |
|
|
| TODAY = date.today() |
| WEEKS_AGO = TODAY - timedelta(weeks=10) |
|
|
| dates = [(WEEKS_AGO - timedelta(days=r)) for r in range(21) if (r % 3 != 0)][::-1] + [(TODAY - timedelta(days=r)) for r in range(21)][::-1] |
| df = DataFrame({"dates": dates, "values": [idx for idx, _ in enumerate(dates)]}) |
|
|
| df_a = pad_missing_dates(df, "dates", "days", fill_padded_values={"values": 0}) |
| print(df_a.head(10)) |
|
|
| df = add_week_info_to_data(df, date_column="dates") |
| print(df.head(10)) |
|
|
| grouped = groupby_date(df, group_col=("week_number", "week_of"), value_col="values", significant=False) |
| print(grouped) |
| |
| df_b = pad_missing_dates(grouped, "week_of", how="weeks", fill_padded_values={"rules": 0}) |
| print(df_b) |
|
|