Spaces:
Running
Running
| """ | |
| sec_processor.py | |
| ================ | |
| Phase 2b β SEC Filing Processor | |
| Processes Apple SEC HTML filings (10-K, 10-Q, 8-K) through Docling and saves: | |
| - {stem}.json β structured JSON (sections, tables, metadata) | |
| - {stem}_docling.json β native DoclingDocument (required for HybridChunker) | |
| Why not reuse pdf_processor.py? | |
| --------------------------------- | |
| pdf_processor.py is built around PDFs: | |
| - Page-based noise filter (cover page, TOC, disclaimer pages) | |
| - Page numbers tracked throughout | |
| - Assumes DocLayNet layout detection | |
| SEC HTML filings are structurally different: | |
| - No pages β HTML has no page layout concept | |
| - Boilerplate is at the START of the document (cover section), not spread | |
| across specific pages | |
| - HTML headings (h1/h2/h3) map to SectionHeaderItem automatically | |
| - Tables use standard <table> tags β no OCR or TableFormer needed | |
| What stays the same | |
| -------------------- | |
| - Docling converter with do_table_structure=True | |
| - export_to_dataframe(doc) / export_to_markdown(doc) for tables | |
| - doc.model_dump_json() β _docling.json (for HybridChunker) | |
| - cleaned_text, parent_header on every section | |
| Output format per chunk (after Phase 3 chunking) | |
| -------------------------------------------------- | |
| { | |
| "chunk_id" : "10-K_2024_text_0042", | |
| "doc_id" : "10-K_2024", | |
| "chunk_type": "text" | "table", | |
| "text" : "...", | |
| "metadata" : { | |
| "source" : "sec_edgar", | |
| "doc_type" : "10-K", | |
| "ticker" : "AAPL", | |
| "company" : "Apple Inc.", | |
| "fiscal_year" : "2024", | |
| "filing_date" : "2024-11-01", | |
| "accession" : "0000320193-24-000123", | |
| "heading_path": "PART I > Item 1. Business", | |
| ... | |
| } | |
| } | |
| Usage (as a module) | |
| ------------------- | |
| from src.sec_processor import SECProcessor | |
| processor = SECProcessor() | |
| processor.process_all() | |
| Usage (as a script) | |
| ------------------- | |
| python src/sec_processor.py | |
| python src/sec_processor.py --force | |
| """ | |
| import re | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime, timezone | |
| # ββ Logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level = logging.INFO, | |
| format = "%(asctime)s %(levelname)-8s %(message)s", | |
| ) | |
| log = logging.getLogger(__name__) | |
| # ββ Paths ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_DIR = Path(__file__).parent.parent | |
| RAW_SEC_DIR = BASE_DIR / "data" / "raw" / "sec_filings" / "AAPL" | |
| PROCESSED_DIR = BASE_DIR / "data" / "processed" / "sec_filings" / "AAPL" | |
| # ββ SEC boilerplate detection ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Every SEC filing begins with a cover section containing form labels, | |
| # legal boilerplate, and administrative identifiers. These fragments are | |
| # short and carry no analytical signal for RAG queries. | |
| _BOILERPLATE_EXACT = { | |
| "united states", | |
| "securities and exchange commission", | |
| "washington, d.c. 20549", | |
| "(mark one)", | |
| "or", | |
| "for the transition period from to .", | |
| "β", "β", | |
| } | |
| _BOILERPLATE_RE = re.compile( | |
| r"^(" | |
| r"form \d+[\-/][a-z]+" # FORM 10-K, FORM 10-Q | |
| r"|commission file" # Commission File Number | |
| r"|irs employer" # IRS Employer Identification | |
| r"|state or other" # State or other jurisdiction | |
| r"|jurisdiction" # of incorporation | |
| r"|\(exact name" # (Exact name of Registrant...) | |
| r"|\(zip code" # (Zip Code) | |
| r"|indicate by check" # Indicate by check mark... | |
| r"|securities registered" # Securities registered... | |
| r"|aggregate market value" # Aggregate market value... | |
| r"|number of shares" # Number of shares outstanding | |
| r"|β|β" # form checkboxes | |
| r")", | |
| re.IGNORECASE, | |
| ) | |
| def _df_to_markdown(df) -> str: | |
| """ | |
| Build a clean markdown table from a pandas DataFrame. | |
| Why not use table.export_to_markdown(doc)? | |
| Docling's HTMLβmarkdown export produces blank cells for SEC HTML tables that | |
| use iXBRL inline tags or complex colspan/rowspan structures. The DataFrame | |
| export correctly populates cell values; we build the markdown from that instead. | |
| SEC HTML tables often expand colspan cells into N identical columns (e.g. a | |
| cell spanning 3 columns becomes ['Americas','Americas','Americas']). We | |
| de-duplicate consecutive identical values in each row before rendering so the | |
| markdown stays readable. | |
| """ | |
| def _dedup(cells: list[str]) -> list[str]: | |
| """Remove consecutive identical tokens (colspan artefacts).""" | |
| result, prev = [], object() | |
| for c in cells: | |
| if c != prev: | |
| result.append(c) | |
| prev = c | |
| return result | |
| rows_md = [] | |
| for _, row in df.iterrows(): | |
| cells = _dedup([str(c).strip() if c else "" for c in row.values]) | |
| rows_md.append(cells) | |
| # Drop rows that are entirely empty after dedup | |
| rows_md = [r for r in rows_md if any(c for c in r)] | |
| if not rows_md: | |
| return "" | |
| # Normalise column count to the widest row | |
| width = max(len(r) for r in rows_md) | |
| rows_md = [r + [""] * (width - len(r)) for r in rows_md] | |
| # Treat the first non-empty row as the header | |
| header = rows_md[0] | |
| data_rows = rows_md[1:] | |
| lines = ["| " + " | ".join(header) + " |", | |
| "| " + " | ".join(["---"] * width) + " |"] | |
| for r in data_rows: | |
| lines.append("| " + " | ".join(r) + " |") | |
| return "\n".join(lines) | |
| def _is_boilerplate(text: str) -> bool: | |
| """Return True for known SEC cover-page administrative fragments.""" | |
| t = text.strip().lower() | |
| if t in _BOILERPLATE_EXACT: | |
| return True | |
| if len(t) < 5: | |
| return True | |
| if _BOILERPLATE_RE.match(text.strip()): | |
| return True | |
| return False | |
| # ββ Text cleaning ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def clean_text(text: str) -> str: | |
| """Remove soft hyphens, zero-width spaces, and collapse whitespace.""" | |
| if not text: | |
| return "" | |
| text = text.replace("\u00ad", "").replace("\u200b", "") | |
| text = re.sub(r"[ \t]+", " ", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| return text.strip() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN PROCESSOR CLASS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SECProcessor: | |
| """ | |
| Processes Apple SEC HTML filings through Docling. | |
| Saves two files per filing: | |
| {stem}.json β structured JSON for inspection and table extraction | |
| {stem}_docling.json β native DoclingDocument for HybridChunker (Phase 3) | |
| """ | |
| def __init__(self, output_dir: Path = PROCESSED_DIR): | |
| self.output_dir = Path(output_dir) | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| self._converter = None | |
| # ββ Lazy-loaded Docling converter ββββββββββββββββββββββββββββββββββββββββββ | |
| def converter(self): | |
| """Build the Docling converter on first use (slow import).""" | |
| if self._converter is None: | |
| from docling.document_converter import DocumentConverter, PdfFormatOption | |
| from docling.datamodel.pipeline_options import PdfPipelineOptions | |
| from docling.datamodel.base_models import InputFormat | |
| opts = PdfPipelineOptions() | |
| opts.do_table_structure = True # reconstruct table rows/cols | |
| opts.do_ocr = False # HTML β no OCR needed | |
| opts.generate_picture_images = False # skip figure images | |
| self._converter = DocumentConverter( | |
| format_options={ | |
| InputFormat.PDF: PdfFormatOption(pipeline_options=opts) | |
| } | |
| ) | |
| log.info("Docling converter ready.") | |
| return self._converter | |
| # ββ Process one filing βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_filing( | |
| self, | |
| htm_path : Path, | |
| metadata : dict, | |
| force : bool = False, | |
| ) -> dict: | |
| """ | |
| Parse one SEC HTML filing and save JSON + _docling.json. | |
| Args: | |
| htm_path : path to filing.htm | |
| metadata : dict containing doc_stem, source, doc_type, ticker, etc. | |
| force : re-process even if output already exists | |
| Returns: | |
| parsed document dict | |
| """ | |
| stem = metadata["doc_stem"] | |
| out_path = self.output_dir / f"{stem}.json" | |
| docling_path = self.output_dir / f"{stem}_docling.json" | |
| # Skip if both outputs already exist | |
| if out_path.exists() and docling_path.exists() and not force: | |
| log.info(f"SKIP {stem} (already processed β {out_path.name})") | |
| with open(out_path) as f: | |
| return json.load(f) | |
| log.info(f"Processing: {stem} ({htm_path.name})") | |
| # ββ Parse with Docling ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| result = self.converter.convert(str(htm_path)) | |
| doc = result.document | |
| from docling.datamodel.document import SectionHeaderItem, TableItem | |
| # ββ Extract sections ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| sections = [] | |
| current_header = "" | |
| for item, level in doc.iterate_items(): | |
| text = getattr(item, "text", None) | |
| if not text or not text.strip(): | |
| continue | |
| if isinstance(item, TableItem): | |
| continue # tables handled separately below | |
| raw = text.strip() | |
| cleaned = clean_text(raw) | |
| is_hdr = isinstance(item, SectionHeaderItem) | |
| sections.append({ | |
| "type" : "header" if is_hdr else "text", | |
| "level" : level, | |
| "text" : raw, | |
| "cleaned_text" : cleaned, | |
| "page_num" : None, # HTML has no page numbers | |
| "parent_header" : current_header, | |
| "is_boilerplate": _is_boilerplate(raw), | |
| }) | |
| if is_hdr: | |
| current_header = raw | |
| # ββ Extract tables ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| tables = [] | |
| for i, table in enumerate(doc.tables): | |
| try: | |
| df = table.export_to_dataframe(doc) | |
| if df.empty or len(df) < 2: | |
| continue | |
| # Build markdown from the DataFrame values, not from | |
| # export_to_markdown() which produces blank cells for SEC HTML. | |
| markdown = _df_to_markdown(df) | |
| if not markdown: | |
| continue | |
| tables.append({ | |
| "index" : i, | |
| "page_num" : None, # HTML has no page numbers | |
| "markdown" : markdown, | |
| "headers" : list(df.columns.astype(str)), | |
| "rows" : len(df), | |
| "cols" : len(df.columns), | |
| "data" : df.fillna("").values.tolist(), | |
| "is_atomic": True, | |
| }) | |
| except Exception as e: | |
| log.warning(f" Table {i} skipped: {e}") | |
| # ββ Build document metadata βββββββββββββββββββββββββββββββββββββββββββ | |
| doc_meta = { | |
| k: v for k, v in metadata.items() if k != "doc_stem" | |
| } | |
| doc_meta.update({ | |
| "parsed_at" : datetime.now(timezone.utc).isoformat(), | |
| "parser" : "docling", | |
| "total_pages" : 0, | |
| "total_sections" : len(sections), | |
| "total_tables" : len(tables), | |
| "removed_pages" : [], # no pages in HTML β nothing to remove | |
| }) | |
| parsed = { | |
| "metadata" : doc_meta, | |
| "sections" : sections, | |
| "tables" : tables, | |
| } | |
| # ββ Save structured JSON ββββββββββββββββββββββββββββββββββββββββββββββ | |
| with open(out_path, "w") as f: | |
| json.dump(parsed, f, indent=2, ensure_ascii=False, default=str) | |
| size_kb = out_path.stat().st_size / 1024 | |
| log.info(f" Saved JSON : {out_path.name} ({size_kb:.1f} KB)") | |
| # ββ Save native DoclingDocument (for HybridChunker) βββββββββββββββββββ | |
| with open(docling_path, "w") as f: | |
| f.write(doc.model_dump_json()) | |
| dl_kb = docling_path.stat().st_size / 1024 | |
| log.info(f" Saved _docling : {docling_path.name} ({dl_kb:.1f} KB)") | |
| boilerplate_n = sum(1 for s in sections if s.get("is_boilerplate")) | |
| log.info( | |
| f" Sections: {len(sections)} " | |
| f"(boilerplate: {boilerplate_n}) " | |
| f"Tables: {len(tables)}" | |
| ) | |
| return parsed | |
| # ββ Batch process all filings ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_all( | |
| self, | |
| raw_dir : Path = RAW_SEC_DIR, | |
| force : bool = False, | |
| ) -> list[dict]: | |
| """ | |
| Process all 10-K, 10-Q, and 8-K filings under raw_dir. | |
| Returns: | |
| list of parsed document dicts | |
| """ | |
| results = [] | |
| for doc_type in ["10-K", "10-Q", "8-K"]: | |
| type_dir = Path(raw_dir) / doc_type | |
| if not type_dir.exists(): | |
| continue | |
| log.info(f"\nββ {doc_type} filings ββββββββββββββββββββββββββββ") | |
| for period_dir in sorted(type_dir.iterdir()): | |
| htm = period_dir / "filing.htm" | |
| if not htm.exists(): | |
| continue | |
| # Load filing metadata | |
| meta_file = period_dir / "metadata.json" | |
| file_meta = {} | |
| if meta_file.exists(): | |
| with open(meta_file) as f: | |
| file_meta = json.load(f) | |
| period = period_dir.name | |
| stem = f"{doc_type}_{period}" | |
| metadata = { | |
| "doc_stem" : stem, | |
| "source" : "sec_edgar", | |
| "doc_type" : doc_type, | |
| "ticker" : "AAPL", | |
| "company" : "Apple Inc.", | |
| "fiscal_year" : file_meta.get("fiscal_year", period[:4]), | |
| "filing_date" : file_meta.get("filing_date", ""), | |
| "accession" : file_meta.get("accession", ""), | |
| "file_name" : htm.name, | |
| "file_path" : str(htm), | |
| "license" : "public", | |
| "access_level": "public", | |
| } | |
| try: | |
| parsed = self.process_filing(htm, metadata, force=force) | |
| results.append(parsed) | |
| except Exception as e: | |
| log.error(f" FAILED {stem}: {e}") | |
| return results | |
| # ββ Entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import sys | |
| force = "--force" in sys.argv | |
| log.info("=" * 60) | |
| log.info("Phase 2b β SEC Filing Processor") | |
| log.info("=" * 60) | |
| processor = SECProcessor() | |
| results = processor.process_all(force=force) | |
| log.info("\n" + "=" * 60) | |
| log.info("Processing complete.") | |
| log.info(f" Filings processed : {len(results)}") | |
| log.info(f" Total sections : {sum(r['metadata']['total_sections'] for r in results)}") | |
| log.info(f" Total tables : {sum(r['metadata']['total_tables'] for r in results)}") | |
| log.info("\nOutput files:") | |
| for f in sorted(PROCESSED_DIR.rglob("*.json")): | |
| if not f.name.endswith("_docling.json"): | |
| size_kb = f.stat().st_size / 1024 | |
| log.info(f" {f.name:40s} ({size_kb:.1f} KB)") | |
| log.info("=" * 60) | |