| | from datetime import date
|
| | from pathlib import Path
|
| |
|
| | from fr_toolbelt.api_requests import get_documents_by_date
|
| | from fr_toolbelt.preprocessing import process_documents, AgencyMetadata
|
| | from numpy import array
|
| | from pandas import DataFrame, to_datetime
|
| | from plotnine import (
|
| | ggplot,
|
| | aes,
|
| | geom_col,
|
| | labs,
|
| | coord_flip,
|
| | scale_x_discrete,
|
| | theme_light,
|
| | )
|
| |
|
| | try:
|
| | from search_columns import search_columns, SearchError
|
| | from significant import get_significant_info
|
| | except ModuleNotFoundError:
|
| | from .search_columns import search_columns, SearchError
|
| | from .significant import get_significant_info
|
| |
|
| |
|
| | METADATA, _ = AgencyMetadata().get_agency_metadata()
|
| | START_DATE = "2024-03-01"
|
| | GET_SIGNIFICANT = True if date.fromisoformat(START_DATE) >= date(2023, 4, 6) else False
|
| |
|
| |
|
| | class DataAvailabilityError(Exception):
|
| | pass
|
| |
|
| |
|
| | def get_date_range(start_date: str):
|
| | start_year = date.fromisoformat(start_date).year
|
| | end_year = start_year + 1
|
| | date_range = {
|
| | "start": start_date,
|
| | "end": f"{end_year}-01-31",
|
| | "transition_year": end_year,
|
| | }
|
| | return date_range
|
| |
|
| |
|
| | def get_rules(date_range: dict) -> list[dict]:
|
| | results, _ = get_documents_by_date(
|
| | start_date=date_range.get("start"),
|
| | end_date=date_range.get("end"),
|
| | document_types=("RULE", )
|
| | )
|
| | return results
|
| |
|
| |
|
| | def format_documents(documents: list[dict]):
|
| | """Format Federal Register documents to generate count by presidential year.
|
| |
|
| | Args:
|
| | documents (list[dict]): List of documents.
|
| |
|
| | Returns:
|
| | DataFrame: Pandas DataFrame with formatted data.
|
| | """
|
| |
|
| | documents = process_documents(
|
| | documents,
|
| | which=("agencies", "presidents"),
|
| | return_values_as_str=False
|
| | )
|
| |
|
| |
|
| | df = DataFrame(documents)
|
| |
|
| |
|
| | df.loc[:, "publication_dt"] = to_datetime(df["publication_date"])
|
| | df.loc[:, "publication_date"] = df.apply(lambda x: x["publication_dt"].date(), axis=1)
|
| | df.loc[:, "publication_year"] = df.apply(lambda x: x["publication_dt"].year, axis=1)
|
| | df.loc[:, "publication_month"] = df.apply(lambda x: x["publication_dt"].month, axis=1)
|
| | df.loc[:, "publication_day"] = df.apply(lambda x: x["publication_dt"].day, axis=1)
|
| |
|
| |
|
| | return df
|
| |
|
| |
|
| | def filter_new_admin_rules(
|
| | df: DataFrame,
|
| | transition_year: int,
|
| | date_col: str = "publication_date",
|
| | ):
|
| |
|
| | admin_transitions = {
|
| | 2001: "george-w-bush",
|
| | 2009: "barack-obama",
|
| | 2017: "donald-trump",
|
| | 2021: "joe-biden",
|
| | }
|
| |
|
| | bool_date = array(df[date_col] >= date(transition_year, 1, 20))
|
| | bool_prez = array(df["president_id"] == admin_transitions.get(transition_year))
|
| | bool_ = bool_date & bool_prez
|
| | return df.loc[~bool_]
|
| |
|
| |
|
| | def filter_corrections(df: DataFrame):
|
| | """Filter out corrections from Federal Register documents.
|
| | Identifies corrections using `corrrection_of` field and regex searches of `document_number`, `title`, and `action` fields.
|
| |
|
| | Args:
|
| | df (DataFrame): Federal Register data.
|
| |
|
| | Returns:
|
| | tuple: DataFrame with corrections removed, DataFrame of corrections
|
| | """
|
| |
|
| | cols = df.columns.tolist()
|
| |
|
| |
|
| |
|
| | bool_na = array(df["correction_of"].isna())
|
| |
|
| |
|
| | search_1 = search_columns(df, [r"^[crxz][\d]{1,2}-(?:[\w]{2,4}-)?[\d]+"], ["document_number"],
|
| | return_column="indicator1")
|
| | search_2 = search_columns(df, [r"(?:;\scorrection\b)|(?:\bcorrecting\samend[\w]+\b)"], ["title", "action"],
|
| | return_column="indicator2")
|
| | bool_search = array(search_1["indicator1"] == 1) | array(search_2["indicator2"] == 1)
|
| |
|
| |
|
| | df_no_corrections = df.loc[(bool_na & ~bool_search), cols]
|
| | df_corrections = df.loc[(~bool_na | bool_search), cols]
|
| |
|
| |
|
| | if len(df) == len(df_no_corrections) + len(df_corrections):
|
| | return df_no_corrections, df_corrections
|
| | else:
|
| | raise SearchError(f"{len(df)} != {len(df_no_corrections)} + {len(df_corrections)}")
|
| |
|
| |
|
| | def get_significant_rules(df, start_date):
|
| | process_columns = ("significant", "3f1_significant", )
|
| | if date.fromisoformat(start_date) < date(2023, 4, 6):
|
| | raise DataAvailabilityError("This program does not calculate significant rule counts prior to Executive Order 14094 of April 6, 2023.")
|
| | else:
|
| | document_numbers = df.loc[:, "document_number"].to_list()
|
| | df, last_updated = get_significant_info(df, start_date, document_numbers)
|
| | for col in process_columns:
|
| | bool_na = df[col].isna()
|
| | df.loc[bool_na, col] = "0"
|
| | df.loc[:, col] = df[col].replace(".", "0").astype("int64")
|
| | bool_3f1 = df["3f1_significant"] == 1
|
| | bool_sig = df["significant"] == 1
|
| | df.loc[:, "3f1_significant"] = 0
|
| | df.loc[bool_3f1, "3f1_significant"] = 1
|
| | df.loc[:, "other_significant"] = 0
|
| | df.loc[(bool_sig & ~bool_3f1), "other_significant"] = 1
|
| | return df, last_updated
|
| |
|
| |
|
| | def get_agency_metadata_values(
|
| | df: DataFrame,
|
| | agency_column: str,
|
| | metadata: dict,
|
| | metadata_value: str,
|
| | ):
|
| | if metadata_value == "acronym":
|
| | metadata_value = "short_name"
|
| | return df.loc[:, agency_column].apply(
|
| | lambda x: metadata.get(x, {}).get(metadata_value)
|
| | )
|
| |
|
| |
|
| | def groupby_agency(
|
| | df: DataFrame,
|
| | group_col: str = "parent_slug",
|
| | value_col: str = "document_number",
|
| | aggfunc: str = "count",
|
| | significant: bool = True,
|
| | metadata: dict | None = None,
|
| | metadata_value: str = "acronym",
|
| | ):
|
| | aggfunc_dict = {value_col: aggfunc, }
|
| | if significant:
|
| | aggfunc_dict.update({
|
| | "3f1_significant": "sum",
|
| | "other_significant": "sum",
|
| | })
|
| | df_ex = df.explode(group_col, ignore_index=True)
|
| | grouped = df_ex.groupby(
|
| | by=group_col
|
| | ).agg(
|
| | aggfunc_dict
|
| | ).reset_index()
|
| | grouped = grouped.sort_values(value_col, ascending=False).rename(
|
| | columns={
|
| | group_col: "agency",
|
| | value_col: "rules",
|
| | }, errors="ignore"
|
| | )
|
| | if metadata is not None:
|
| | grouped.loc[:, metadata_value] = get_agency_metadata_values(
|
| | grouped,
|
| | agency_column="agency",
|
| | metadata=metadata,
|
| | metadata_value=metadata_value
|
| | )
|
| | cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"]
|
| | grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]]
|
| | return grouped
|
| |
|
| |
|
| | def groupby_ym(
|
| | df: DataFrame,
|
| | group_col: tuple | list = ("publication_year", "publication_month", ),
|
| | value_col: str = "document_number",
|
| | aggfunc: str = "count",
|
| | significant: bool = True
|
| | ):
|
| | aggfunc_dict = {value_col: aggfunc, }
|
| | if significant:
|
| | aggfunc_dict.update({
|
| | "3f1_significant": "sum",
|
| | "other_significant": "sum",
|
| | })
|
| | grouped = df.groupby(
|
| | by=list(group_col)
|
| | ).agg(
|
| | aggfunc_dict
|
| | ).reset_index()
|
| | grouped = grouped.rename(columns={
|
| | value_col: "rules",
|
| | }, errors="ignore")
|
| | return grouped
|
| |
|
| |
|
| | def save_csv(path: Path, df_all: DataFrame, df_agency: DataFrame, df_ym: DataFrame, transition_year: int):
|
| | files = (
|
| | f"rules_{transition_year - 1}_{transition_year}.csv",
|
| | f"rules_by_agency_{transition_year - 1}_{transition_year}.csv",
|
| | f"rules_by_month_{transition_year - 1}_{transition_year}.csv"
|
| | )
|
| | dataframes = (df_all, df_agency, df_ym)
|
| | for data, file in zip(dataframes, files):
|
| | data.to_csv(path / file, index=False)
|
| |
|
| |
|
| | def plot_agency(df, group_col = "acronym", value_col = "rules"):
|
| |
|
| | order_list = df.loc[:, group_col].to_list()[::-1]
|
| |
|
| | plot = (
|
| | ggplot(
|
| | df,
|
| | aes(x=group_col, y=value_col),
|
| | )
|
| | + geom_col()
|
| | + coord_flip()
|
| | + scale_x_discrete(limits=order_list)
|
| | + labs(y="", x="", title="Number of Rules Published by Agency")
|
| | + theme_light()
|
| | )
|
| | return plot
|
| |
|
| |
|
| | def plot_month(df, group_cols = ("publication_year", "publication_month"), value_col = "rules"):
|
| |
|
| | df.loc[:, "ym"] = df[group_cols[0]].astype(str) + "-" + df[group_cols[1]].astype(str).str.pad(2, fillchar="0")
|
| | order_list = df.loc[:, "ym"].to_list()
|
| |
|
| | plot = (
|
| | ggplot(
|
| | df,
|
| | aes(x="ym", y=value_col),
|
| | )
|
| | + geom_col()
|
| | + scale_x_discrete(limits=order_list)
|
| | + labs(y="", x="", title="Number of Rules Published by Month")
|
| | + theme_light()
|
| | )
|
| | return plot
|
| |
|
| |
|
| | def get_rules_in_window(start_date: str, get_significant: bool = True):
|
| | date_range = get_date_range(start_date)
|
| | transition_year = date_range.get("transition_year")
|
| | results = get_rules(date_range)
|
| | df = format_documents(results)
|
| | df, _ = filter_corrections(df)
|
| | df = filter_new_admin_rules(df, transition_year)
|
| | if get_significant:
|
| | df, last_updated = get_significant_rules(df, start_date)
|
| | else:
|
| | last_updated = date.today()
|
| | return df, last_updated
|
| |
|
| |
|
| | DF, LAST_UPDATED = get_rules_in_window(START_DATE, get_significant=GET_SIGNIFICANT)
|
| |
|
| |
|
| | def main(start_date, save_data: bool = True, path: Path | None = None, metadata: dict | None = None, significant: bool = True):
|
| | if date.fromisoformat(start_date) < date(2023, 4, 6):
|
| | significant = False
|
| | date_range = get_date_range(start_date)
|
| | transition_year = date_range.get("transition_year")
|
| | df, _ = get_rules_in_window(start_date, get_significant=significant)
|
| |
|
| | df_agency = groupby_agency(df, metadata=metadata, significant=significant)
|
| | df_ym = groupby_ym(df, significant=significant)
|
| |
|
| | if save_data:
|
| | if path is None:
|
| | path = Path(__file__).parent
|
| | save_csv(path, df, df_agency, df_ym, transition_year)
|
| |
|
| | return df, df_agency, df_ym
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| |
|
| | pass
|
| |
|