""" USDA NASS Agricultural Data Collector. Free API key required: https://quickstats.nass.usda.gov/api Set USDA_API_KEY in your .env file. Collects weekly crop progress (% good + % excellent) and quarterly stocks for: ZC=F (Corn), ZW=F (Wheat), ZS=F (Soybeans), CT=F (Cotton) Usage: python data/collector_usda.py python data/collector_usda.py --backfill """ import logging import os import sys from datetime import date from pathlib import Path import pandas as pd import requests sys.path.insert(0, str(Path(__file__).parent.parent)) from data.db import get_conn log = logging.getLogger(__name__) USDA_API_BASE = "https://quickstats.nass.usda.gov/api/api_GET/" USDA_COMMODITY_MAP = { "ZC=F": "CORN", "ZW=F": "WHEAT", "ZS=F": "SOYBEANS", "CT=F": "COTTON", } # (statisticcat_desc, short_desc_contains, date_type) # date_type: "weekly" → week_ending; "quarterly" → year+reference_period_desc; "annual" → year-01-01 USDA_QUERIES = [ ("CONDITION", "PCT GOOD", "weekly"), ("CONDITION", "PCT EXCELLENT", "weekly"), ("STOCKS", "GRAIN - STOCKS, MEASURED IN BU", "quarterly"), ("PRODUCTION", "MEASURED IN BU", "annual"), # annual crop yield (WASDE proxy) ] # USDA FAS PSD (no key needed) — global supply/demand for ending stocks-to-use ratio FAS_PSD_BASE = "https://apps.fas.usda.gov/psdonline/api/v1/data" FAS_COMMODITY_MAP = { "ZC=F": "Corn", "ZW=F": "Wheat", "ZS=F": "Soybean Oil", # closest FAS mapping for soybeans "ZS_grain": "Soybeans", } _PERIOD_MONTH = { "FIRST OF JAN": "01", "FIRST OF MAR": "03", "FIRST OF JUN": "06", "FIRST OF SEP": "09", "FIRST OF DEC": "12", "MAR": "03", "JUN": "06", "SEP": "09", "DEC": "12", } def _parse_quarterly_date(year: str, period: str) -> str | None: """Convert USDA year + reference_period_desc to YYYY-MM-01 date string.""" period_upper = str(period).strip().upper() for key, month in _PERIOD_MONTH.items(): if key in period_upper: return f"{year}-{month}-01" return None def _fetch_usda(api_key: str, commodity: str, stat_cat: str, desc_contains: str, date_type: str, start_year: int) -> pd.DataFrame: params = { "key": api_key, "commodity_desc": commodity, "statisticcat_desc": stat_cat, "year__GE": str(start_year), "agg_level_desc": "NATIONAL", "format": "JSON", } try: resp = requests.get(USDA_API_BASE, params=params, timeout=45) resp.raise_for_status() rows = resp.json().get("data", []) if not rows: return pd.DataFrame() df = pd.DataFrame(rows) # Filter by short_desc if "short_desc" in df.columns: df = df[df["short_desc"].str.upper().str.contains(desc_contains.upper(), na=False)] if df.empty: return pd.DataFrame() df["value"] = pd.to_numeric( df["Value"].astype(str).str.replace(",", ""), errors="coerce" ) if date_type == "weekly": df["date"] = pd.to_datetime(df["week_ending"], errors="coerce").dt.date elif date_type == "annual": # Annual: use Jan 1 of the marketing year df["date"] = pd.to_datetime(df["year"].astype(str) + "-01-01", errors="coerce").dt.date else: # Quarterly: construct from year + reference_period_desc df["date"] = df.apply( lambda r: _parse_quarterly_date(r.get("year", ""), r.get("reference_period_desc", "")), axis=1, ) df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.date return ( df[["date", "value", "short_desc"]] .rename(columns={"short_desc": "metric"}) .dropna(subset=["date", "value"]) .sort_values("date") .reset_index(drop=True) ) except Exception as exc: log.debug("USDA fetch error (%s %s): %s", commodity, stat_cat, exc) return pd.DataFrame() def run(backfill: bool = False) -> str: api_key = os.environ.get("USDA_API_KEY", "").strip() if not api_key: log.info("USDA_API_KEY not set — skipping USDA collection (get free key at quickstats.nass.usda.gov/api)") return "USDA: skipped (USDA_API_KEY not set)" start_year = date.today().year - 5 if backfill else date.today().year - 2 conn = get_conn() total = 0 for symbol, commodity in USDA_COMMODITY_MAP.items(): for stat_cat, desc_contains, date_type in USDA_QUERIES: df = _fetch_usda(api_key, commodity, stat_cat, desc_contains, date_type, start_year) if df.empty: continue df = df.sort_values("date").reset_index(drop=True) df["yoy_chg_pct"] = df["value"].pct_change(periods=52) * 100 for _, row in df.iterrows(): try: conn.execute(""" INSERT OR REPLACE INTO usda_crop (date, commodity, metric, value, yoy_chg_pct) VALUES (?, ?, ?, ?, ?) """, [ row["date"], symbol, row["metric"], row["value"], None if pd.isna(row["yoy_chg_pct"]) else row["yoy_chg_pct"], ]) total += 1 except Exception as exc: log.debug("USDA insert error: %s", exc) log.info("USDA %s (%s): done", symbol, commodity) conn.close() return f"USDA: stored {total} rows" if __name__ == "__main__": import argparse from dotenv import load_dotenv load_dotenv() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") parser = argparse.ArgumentParser() parser.add_argument("--backfill", action="store_true") args = parser.parse_args() from data.db import init_schema init_schema() print(run(backfill=args.backfill))