Spaces:
Running
Running
| """ | |
| SEC EDGAR Fetcher for Apple Inc. (AAPL) | |
| ======================================== | |
| Fetches 10-K, 10-Q, and 8-K filings from SEC EDGAR and stores them locally. | |
| Usage: | |
| python sec_fetcher.py | |
| Output structure: | |
| data/raw/sec_filings/ | |
| βββ AAPL/ | |
| βββ 10-K/ | |
| β βββ 2024/ | |
| β β βββ metadata.json | |
| β β βββ filing.htm | |
| β βββ 2023/ | |
| β βββ 2022/ | |
| βββ 10-Q/ | |
| β βββ 2026_Q2/ | |
| β βββ ... | |
| βββ 8-K/ | |
| βββ ... | |
| """ | |
| import json | |
| import time | |
| import logging | |
| import requests | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| # ββ Configuration βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| COMPANY_NAME = "Apple Inc." | |
| TICKER = "AAPL" | |
| CIK = "0000320193" # Apple's SEC CIK (with leading zeros) | |
| CIK_PLAIN = "320193" # Without leading zeros (for archive URLs) | |
| FILING_TYPES = ["10-K", "10-Q", "8-K"] | |
| FETCH_COUNT = { | |
| "10-K": 3, | |
| "10-Q": 6, | |
| "8-K" : 5, | |
| } | |
| BASE_DIR = Path(__file__).parent.parent / "data" / "raw" / "sec_filings" | |
| OUTPUT_DIR = BASE_DIR / TICKER | |
| # SEC EDGAR endpoints | |
| # NOTE: data.sec.gov β submissions JSON (metadata) | |
| # www.sec.gov β archive file downloads | |
| # These are TWO different hosts β headers must match the host being called. | |
| SUBMISSIONS_URL = f"https://data.sec.gov/submissions/CIK{CIK}.json" | |
| # SEC requires a descriptive User-Agent (their policy) | |
| USER_AGENT = "Morningstar RAG Research Pipeline contact@example.com" | |
| REQUEST_DELAY = 0.15 # 150ms between requests β stays under SEC's 10 req/sec | |
| # ββ Logging βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| LOG_DIR = Path(__file__).parent.parent / "logs" | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| logging.basicConfig( | |
| level = logging.INFO, | |
| format = "%(asctime)s %(levelname)-8s %(message)s", | |
| handlers=[ | |
| logging.FileHandler(LOG_DIR / "sec_fetcher.log"), | |
| logging.StreamHandler(), | |
| ] | |
| ) | |
| log = logging.getLogger(__name__) | |
| # ββ HTTP helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def make_headers(host: str) -> dict: | |
| """ | |
| Build headers for a specific host. | |
| SEC requires User-Agent; Host must match the actual destination. | |
| """ | |
| return { | |
| "User-Agent" : USER_AGENT, | |
| "Accept-Encoding": "gzip, deflate", | |
| "Host" : host, | |
| } | |
| def get(url: str, stream: bool = False, retries: int = 3) -> requests.Response: | |
| """GET with retry + rate limiting. Auto-detects host from URL.""" | |
| from urllib.parse import urlparse | |
| host = urlparse(url).netloc # e.g. "data.sec.gov" or "www.sec.gov" | |
| for attempt in range(1, retries + 1): | |
| try: | |
| time.sleep(REQUEST_DELAY) | |
| resp = requests.get( | |
| url, | |
| headers = make_headers(host), | |
| timeout = 30, | |
| stream = stream, | |
| ) | |
| if resp.status_code == 429: | |
| wait = int(resp.headers.get("Retry-After", 60)) | |
| log.warning(f"Rate limited β waiting {wait}s ...") | |
| time.sleep(wait) | |
| continue | |
| resp.raise_for_status() | |
| return resp | |
| except requests.RequestException as e: | |
| log.warning(f"Attempt {attempt}/{retries} failed for {url}: {e}") | |
| if attempt < retries: | |
| time.sleep(5 * attempt) | |
| raise RuntimeError(f"Failed after {retries} attempts: {url}") | |
| def save_json(data: dict, path: Path): | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def save_file(url: str, path: Path) -> bool: | |
| """Stream-download a file to disk. Returns True on success.""" | |
| try: | |
| resp = get(url, stream=True) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "wb") as f: | |
| for chunk in resp.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| size_kb = path.stat().st_size / 1024 | |
| log.info(f" Saved {path.name} ({size_kb:.1f} KB)") | |
| return True | |
| except Exception as e: | |
| log.error(f" Could not save {path.name}: {e}") | |
| return False | |
| def now_utc() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| # ββ Filing index lookup βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def fetch_filing_doc_list(accession_fmt: str) -> list: | |
| """ | |
| Fetch the document list for one filing via the EDGAR index JSON. | |
| Returns a list of {name, type, size, url} dicts. | |
| accession_fmt: accession number with dashes removed, e.g. 000032019324000123 | |
| """ | |
| url = ( | |
| f"https://data.sec.gov/Archives/edgar/data/" | |
| f"{CIK_PLAIN}/{accession_fmt}/" | |
| f"{accession_fmt[:10]}-{accession_fmt[10:12]}-{accession_fmt[12:]}-index.json" | |
| ) | |
| # Rebuild with dashes: 0000320193-24-000123 | |
| acc_dashed = f"{accession_fmt[:10]}-{accession_fmt[10:12]}-{accession_fmt[12:]}" | |
| url = ( | |
| f"https://data.sec.gov/Archives/edgar/data/" | |
| f"{CIK_PLAIN}/{accession_fmt}/{acc_dashed}-index.json" | |
| ) | |
| try: | |
| resp = get(url) | |
| items = resp.json().get("directory", {}).get("item", []) | |
| return [ | |
| { | |
| "name": d["name"], | |
| "type": d.get("type", ""), | |
| "size": d.get("size", ""), | |
| "url" : ( | |
| f"https://www.sec.gov/Archives/edgar/data/" | |
| f"{CIK_PLAIN}/{accession_fmt}/{d['name']}" | |
| ), | |
| } | |
| for d in items | |
| if isinstance(d, dict) and "name" in d | |
| ] | |
| except Exception as e: | |
| log.debug(f" Doc list fetch failed for {accession_fmt}: {e}") | |
| return [] | |
| # ββ Main fetcher class ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SECEdgarFetcher: | |
| def __init__(self, output_dir: Path): | |
| self.output_dir = output_dir | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| def run(self): | |
| log.info("=" * 60) | |
| log.info(f"SEC EDGAR Fetcher β {COMPANY_NAME} ({TICKER})") | |
| log.info(f"CIK : {CIK}") | |
| log.info(f"Output : {self.output_dir}") | |
| log.info("=" * 60) | |
| submissions = self._fetch_submissions() | |
| if not submissions: | |
| log.error("Could not fetch submissions. Aborting.") | |
| return | |
| # Save company-level metadata once | |
| save_json({ | |
| "name" : submissions.get("name"), | |
| "cik" : CIK, | |
| "ticker" : TICKER, | |
| "sic" : submissions.get("sic"), | |
| "sic_desc" : submissions.get("sicDescription"), | |
| "state" : submissions.get("stateOfIncorporation"), | |
| "fiscal_year" : submissions.get("fiscalYearEnd"), | |
| "fetched_at" : now_utc(), | |
| }, self.output_dir / "company_metadata.json") | |
| log.info(f"Company : {submissions.get('name')}") | |
| log.info(f"Industry : {submissions.get('sicDescription')}") | |
| filings = self._parse_filings(submissions) | |
| summary = {} | |
| for ftype in FILING_TYPES: | |
| n = FETCH_COUNT.get(ftype, 3) | |
| subset = [f for f in filings if f["form"] == ftype][:n] | |
| log.info(f"\n--- {ftype} ({len(subset)} filings) ---") | |
| saved = self._process_filing_type(ftype, subset) | |
| summary[ftype] = {"requested": n, "saved": saved} | |
| save_json({"run_at": now_utc(), "summary": summary}, | |
| self.output_dir / "fetch_summary.json") | |
| log.info("\n" + "=" * 60) | |
| log.info("Done.") | |
| for ftype, s in summary.items(): | |
| log.info(f" {ftype:5s} {s['saved']}/{s['requested']} saved") | |
| log.info("=" * 60) | |
| # ββ Submissions βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _fetch_submissions(self) -> dict: | |
| log.info(f"Fetching submissions index ...") | |
| try: | |
| return get(SUBMISSIONS_URL).json() | |
| except Exception as e: | |
| log.error(f"Submissions fetch failed: {e}") | |
| return {} | |
| def _parse_filings(self, submissions: dict) -> list: | |
| recent = submissions.get("filings", {}).get("recent", {}) | |
| forms = recent.get("form", []) | |
| dates = recent.get("filingDate", []) | |
| accnums = recent.get("accessionNumber", []) | |
| docs = recent.get("primaryDocument", []) | |
| filings = [] | |
| for form, date, acc, doc in zip(forms, dates, accnums, docs): | |
| filings.append({ | |
| "form" : form, | |
| "date" : date, | |
| "accession" : acc, # with dashes | |
| "acc_fmt" : acc.replace("-", ""), # without dashes | |
| "primary_doc": doc, | |
| }) | |
| log.info(f"Total filings in index: {len(filings)}") | |
| return filings | |
| # ββ Per-filing processing βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _process_filing_type(self, ftype: str, filings: list) -> int: | |
| saved = 0 | |
| for filing in filings: | |
| if self._process_one(ftype, filing): | |
| saved += 1 | |
| return saved | |
| def _process_one(self, ftype: str, filing: dict) -> bool: | |
| date = filing["date"] | |
| acc = filing["accession"] | |
| acc_fmt = filing["acc_fmt"] | |
| prim_doc = filing["primary_doc"] | |
| folder = self._folder_name(ftype, date, acc) | |
| filing_dir = self.output_dir / ftype / folder | |
| meta_path = filing_dir / "metadata.json" | |
| # Already downloaded β skip only if a filing file also exists | |
| if meta_path.exists(): | |
| has_filing = any( | |
| f for f in filing_dir.iterdir() | |
| if f.name != "metadata.json" and f.stat().st_size > 1024 | |
| ) | |
| if has_filing: | |
| log.info(f" SKIP {ftype}/{folder} (already cached)") | |
| return True | |
| else: | |
| log.info(f" Re-fetching {ftype}/{folder} (metadata exists but no filing file)") | |
| log.info(f" Fetching {ftype}/{folder} ({date}) ...") | |
| # Fetch document list from the filing index | |
| doc_list = fetch_filing_doc_list(acc_fmt) | |
| # Build the primary document download URL | |
| # Archive base: https://www.sec.gov/Archives/edgar/data/<CIK>/<acc_fmt>/ | |
| archive_base = ( | |
| f"https://www.sec.gov/Archives/edgar/data/{CIK_PLAIN}/{acc_fmt}" | |
| ) | |
| doc_url = f"{archive_base}/{prim_doc}" | |
| # Save metadata first (even if download fails, we have provenance) | |
| metadata = { | |
| "ticker" : TICKER, | |
| "form" : ftype, | |
| "filing_date" : date, | |
| "fiscal_year" : date[:4], | |
| "accession" : acc, | |
| "primary_doc" : prim_doc, | |
| "doc_url" : doc_url, | |
| "archive_base": archive_base, | |
| "all_docs" : doc_list, | |
| "source" : "SEC EDGAR", | |
| "license" : "public", | |
| "fetched_at" : now_utc(), | |
| } | |
| save_json(metadata, meta_path) | |
| # Download primary document | |
| ext = Path(prim_doc).suffix.lower() or ".htm" | |
| doc_path = filing_dir / f"filing{ext}" | |
| success = save_file(doc_url, doc_path) | |
| if not success: | |
| # Try alternative: look for an .htm file in the doc list | |
| for d in doc_list: | |
| if d["name"].endswith(".htm") and ftype.replace("-", "").lower() in d["name"].lower(): | |
| alt_url = d["url"] | |
| log.info(f" Trying alternative: {d['name']}") | |
| success = save_file(alt_url, filing_dir / d["name"]) | |
| if success: | |
| metadata["local_path"] = str(filing_dir / d["name"]) | |
| break | |
| if success: | |
| metadata["local_path"] = str(doc_path) | |
| save_json(metadata, meta_path) | |
| return True | |
| log.warning(f" FAILED {ftype}/{folder} β metadata saved, file not downloaded") | |
| return False | |
| def _folder_name(self, ftype: str, date: str, acc: str) -> str: | |
| year = date[:4] | |
| month = int(date[5:7]) | |
| if ftype == "10-K": | |
| return year | |
| elif ftype == "10-Q": | |
| quarter = (month - 1) // 3 + 1 | |
| return f"{year}_Q{quarter}" | |
| else: | |
| return date # 8-K: full date | |
| # ββ Entry point βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| fetcher = SECEdgarFetcher(output_dir=OUTPUT_DIR) | |
| fetcher.run() | |