| import os |
| from datetime import datetime |
|
|
| import pandas as pd |
| import requests |
|
|
| from .base_pipeline import BasePipeline |
|
|
|
|
| class FREDPipeline(BasePipeline): |
| """ |
| FRED Data Pipeline: Extracts, transforms, and loads FRED data using config. |
| """ |
|
|
| def __init__(self, config_path: str): |
| super().__init__(config_path) |
| self.fred_cfg = self.config["fred"] |
| self.api_key = self.fred_cfg["api_key"] |
| self.series = self.fred_cfg["series"] |
| self.start_date = self.fred_cfg["start_date"] |
| self.end_date = self.fred_cfg["end_date"] |
| self.output_dir = self.fred_cfg["output_dir"] |
| self.export_dir = self.fred_cfg["export_dir"] |
| os.makedirs(self.output_dir, exist_ok=True) |
| os.makedirs(self.export_dir, exist_ok=True) |
|
|
| def extract(self): |
| """Extract data from FRED API for all configured series.""" |
| base_url = "https://api.stlouisfed.org/fred/series/observations" |
| data = {} |
| for series_id in self.series: |
| params = { |
| "series_id": series_id, |
| "api_key": self.api_key, |
| "file_type": "json", |
| "start_date": self.start_date, |
| "end_date": self.end_date, |
| } |
| try: |
| resp = requests.get(base_url, params=params) |
| resp.raise_for_status() |
| obs = resp.json().get("observations", []) |
| dates, values = [], [] |
| for o in obs: |
| try: |
| dates.append(pd.to_datetime(o["date"])) |
| values.append(float(o["value"]) if o["value"] != "." else None) |
| except Exception: |
| continue |
| data[series_id] = pd.Series(values, index=dates, name=series_id) |
| self.logger.info(f"Extracted {len(values)} records for {series_id}") |
| except Exception as e: |
| self.logger.error(f"Failed to extract {series_id}: {e}") |
| return data |
|
|
| def transform(self, data): |
| """Transform raw data into a DataFrame, align dates, handle missing.""" |
| if not data: |
| self.logger.warning("No data to transform.") |
| return pd.DataFrame() |
| all_dates = set() |
| for s in data.values(): |
| all_dates.update(s.index) |
| if not all_dates: |
| return pd.DataFrame() |
| date_range = pd.date_range(min(all_dates), max(all_dates), freq="D") |
| df = pd.DataFrame(index=date_range) |
| for k, v in data.items(): |
| df[k] = v |
| df.index.name = "Date" |
| self.logger.info(f"Transformed data to DataFrame with shape {df.shape}") |
| return df |
|
|
| def load(self, df): |
| """Save DataFrame to CSV in output_dir and export_dir.""" |
| if df.empty: |
| self.logger.warning("No data to load.") |
| return None |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
| out_path = os.path.join(self.output_dir, f"fred_data_{ts}.csv") |
| exp_path = os.path.join(self.export_dir, f"fred_data_{ts}.csv") |
| df.to_csv(out_path) |
| df.to_csv(exp_path) |
| self.logger.info(f"Saved data to {out_path} and {exp_path}") |
| return out_path, exp_path |
|
|
| def run(self): |
| self.logger.info("Starting FRED data pipeline run...") |
| data = self.extract() |
| df = self.transform(data) |
| self.load(df) |
| self.logger.info("FRED data pipeline run complete.") |
|
|