| from datetime import date |
|
|
| from fr_toolbelt.api_requests import get_documents_by_date |
| from fr_toolbelt.preprocessing import process_documents, AgencyMetadata |
| from numpy import array |
| from pandas import DataFrame, to_datetime |
|
|
| try: |
| from search_columns import search_columns, SearchError |
| from significant import get_significant_info |
| from utils import get_agency_metadata_values |
| except (ModuleNotFoundError, ImportError): |
| from .search_columns import search_columns, SearchError |
| from .significant import get_significant_info |
| from .utils import get_agency_metadata_values |
|
|
|
|
| METADATA, _ = AgencyMetadata().get_agency_metadata() |
| START_DATE = "2024-01-01" |
| WINDOW_OPEN_DATE = "2024-08-16" |
| GET_SIGNIFICANT = True if date.fromisoformat(START_DATE) >= date(2023, 4, 6) else False |
|
|
|
|
| class DataAvailabilityError(Exception): |
| """Raised when data is not available for the requested inputs.""" |
| pass |
|
|
|
|
| def get_date_range(start_date: str, end_mmdd: str = "01-20"): |
| """Define date range of documents returned by the app. |
| |
| Args: |
| start_date (str): The start date for retrieving the documents. |
| end_mmdd (str, optional): The month and day for the end date in MM-DD format. Defaults to "01-20". |
| |
| Returns: |
| dict: Dictionary containing start date, end date, and transition year. |
| """ |
| start_year = date.fromisoformat(start_date).year |
| end_year = start_year + 1 |
| date_range = { |
| "start": start_date, |
| "end": f"{end_year}-{end_mmdd}", |
| "transition_year": end_year, |
| } |
| return date_range |
|
|
|
|
| def get_rules(date_range: dict) -> list[dict]: |
| """Get rules within a date range. |
| """ |
| results, _ = get_documents_by_date( |
| start_date=date_range.get("start"), |
| end_date=date_range.get("end"), |
| document_types=("RULE", ) |
| ) |
| return results |
|
|
|
|
| def format_documents(documents: list[dict]): |
| """Format Federal Register documents to generate count by presidential year. |
| |
| Args: |
| documents (list[dict]): List of documents. |
| |
| Returns: |
| DataFrame: Pandas DataFrame with formatted data. |
| """ |
| |
| documents = process_documents( |
| documents, |
| which=("agencies", "presidents"), |
| return_values_as_str=False |
| ) |
| |
| |
| df = DataFrame(documents) |
| |
| |
| df.loc[:, "publication_dt"] = to_datetime(df["publication_date"]) |
| df.loc[:, "publication_date"] = df.apply(lambda x: x["publication_dt"].date(), axis=1) |
| df.loc[:, "publication_year"] = df.apply(lambda x: x["publication_dt"].year, axis=1) |
| df.loc[:, "publication_month"] = df.apply(lambda x: x["publication_dt"].month, axis=1) |
| df.loc[:, "publication_day"] = df.apply(lambda x: x["publication_dt"].day, axis=1) |
| |
| |
| return df |
|
|
|
|
| def filter_new_admin_rules( |
| df: DataFrame, |
| transition_year: int, |
| date_col: str = "publication_date", |
| ): |
| """Remove rules issued by the new administration. |
| |
| Args: |
| df (DataFrame): Input data. |
| transition_year (int): The year of the presidential transition. |
| date_col (str, optional): Column containing date information. Defaults to "publication_date". |
| |
| Returns: |
| DataFrame: Filtered data. |
| """ |
| admin_transitions = { |
| 2001: "george-w-bush", |
| 2009: "barack-obama", |
| 2017: "donald-trump", |
| 2021: "joe-biden", |
| 2025: "donald-trump", |
| } |
| |
| bool_date = array(df[date_col] >= date(transition_year, 1, 20)) |
| bool_prez = array(df["president_id"] == admin_transitions.get(transition_year)) |
| bool_ = bool_date & bool_prez |
| return df.loc[~bool_] |
|
|
|
|
| def filter_corrections(df: DataFrame): |
| """Filter out corrections from Federal Register documents. |
| Identifies corrections using `corrrection_of` field and regex searches of `document_number`, `title`, and `action` fields. |
| |
| Args: |
| df (DataFrame): Federal Register data. |
| |
| Returns: |
| tuple: DataFrame with corrections removed, DataFrame of corrections |
| """ |
| |
| cols = df.columns.tolist() |
| |
| |
| |
| bool_na = array(df["correction_of"].isna()) |
|
|
| |
| search_1 = search_columns(df, [r"^[crxz][\d]{1,2}-(?:[\w]{2,4}-)?[\d]+"], ["document_number"], |
| return_column="indicator1") |
| search_2 = search_columns(df, [r"(?:;\scorrection\b)|(?:\bcorrecting\samend[\w]+\b)"], ["title", "action"], |
| return_column="indicator2") |
| bool_search = array(search_1["indicator1"] == 1) | array(search_2["indicator2"] == 1) |
|
|
| |
| df_no_corrections = df.loc[(bool_na & ~bool_search), cols] |
| df_corrections = df.loc[(~bool_na | bool_search), cols] |
| |
| |
| if len(df) == len(df_no_corrections) + len(df_corrections): |
| return df_no_corrections, df_corrections |
| else: |
| raise SearchError(f"{len(df)} != {len(df_no_corrections)} + {len(df_corrections)}") |
|
|
|
|
| def get_significant_rules(df: DataFrame, start_date: str) -> tuple[DataFrame, date]: |
| """Get significant rules and merge with FR data. |
| |
| Args: |
| df (DataFrame): Input data. |
| start_date (str): Start date of significant rule data. |
| |
| Raises: |
| DataAvailabilityError: Raised when requesting significant rule counts prior to Executive Order 14094 of April 6, 2023. |
| |
| Returns: |
| tuple[DataFrame, datetime.date]: Data with significant rules, last updated date for significant data |
| """ |
| process_columns = ("significant", "3f1_significant", ) |
| if date.fromisoformat(start_date) < date(2023, 4, 6): |
| raise DataAvailabilityError("This program does not calculate significant rule counts prior to Executive Order 14094 of April 6, 2023.") |
| else: |
| document_numbers = df.loc[:, "document_number"].to_list() |
| df, last_updated = get_significant_info(df, start_date, document_numbers) |
| for col in process_columns: |
| bool_na = df[col].isna() |
| df.loc[bool_na, col] = "0" |
| df.loc[:, col] = df[col].replace(".", "0").astype("int64") |
| bool_3f1 = df["3f1_significant"] == 1 |
| bool_sig = df["significant"] == 1 |
| df.loc[:, "3f1_significant"] = 0 |
| df.loc[bool_3f1, "3f1_significant"] = 1 |
| df.loc[:, "other_significant"] = 0 |
| df.loc[(bool_sig & ~bool_3f1), "other_significant"] = 1 |
| return df, last_updated |
|
|
|
|
| def get_rules_in_window(start_date: str, get_significant: bool = True, metadata: dict = METADATA): |
| """Retrieve and process rules in a given CRA window. |
| |
| Args: |
| start_date (str): Start date of window. |
| get_significant (bool, optional): Get significant rule data. Defaults to True. |
| metadata (dict, optional): Agency metadata. Defaults to METADATA. |
| |
| Returns: |
| tuple[DataFrame, datetime.date]: Data with significant rules, last updated date for significant data |
| """ |
| date_range = get_date_range(start_date) |
| transition_year = date_range.get("transition_year") |
| results = get_rules(date_range) |
| df = format_documents(results) |
| df, _ = filter_corrections(df) |
| df = filter_new_admin_rules(df, transition_year) |
| df.loc[:, "acronym"] = get_agency_metadata_values(df, "parent_slug", metadata=metadata, metadata_value="acronym") |
| if get_significant: |
| df, last_updated = get_significant_rules(df, start_date) |
| else: |
| last_updated = date.today() |
| return df, last_updated |
|
|
|
|
| def get_list_agencies(start_date: str, agency_column: str = "parent_slug", significant: bool = True, **kwargs): |
| """Get list of agencies with rules in dataset. |
| |
| Args: |
| start_date (str): Start date of window. |
| agency_column (str, optional): Column containing agency values. Defaults to "parent_slug". |
| significant (bool, optional): Get significant rule data. Defaults to True. |
| |
| Returns: |
| list: List of agencies |
| """ |
| df, _ = get_rules_in_window(start_date, get_significant=significant, **kwargs) |
| df_ex = df.explode(agency_column, ignore_index=True) |
| return sorted(df_ex[agency_column].value_counts().index.to_list()) |
|
|
|
|
| |
| DF, LAST_UPDATED = get_rules_in_window(START_DATE, get_significant=GET_SIGNIFICANT) |
| AGENCIES = get_list_agencies(START_DATE, significant=GET_SIGNIFICANT) |
|
|
|
|
| if __name__ == "__main__": |
| |
| print(DF.columns) |
| print(LAST_UPDATED) |
| print(AGENCIES) |
| print(len(METADATA.keys())) |
|
|