Spaces:
Running
Running
| """ | |
| Phase 2 β Document Parser | |
| ========================== | |
| Parses all raw documents (Morningstar PDFs + SEC filings) using Docling. | |
| Outputs structured JSON per document with: | |
| - Text sections (with hierarchy / heading level) | |
| - Tables (as markdown + dataframe-ready dict) | |
| - Metadata (source, type, page, fiscal year, etc.) | |
| Usage: | |
| python doc_parser.py | |
| Output: | |
| data/processed/ | |
| βββ morningstar/ | |
| β βββ a-wide-moat-focus-provides-differentiation.json | |
| β βββ ptc01302411420.json | |
| βββ sec_filings/ | |
| βββ AAPL/ | |
| βββ 10-K_2023.json | |
| βββ 10-K_2024.json | |
| βββ ... | |
| """ | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime, timezone | |
| # ββ Paths ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_DIR = Path(__file__).parent.parent | |
| RAW_DIR = BASE_DIR / "data" / "raw" | |
| PROCESSED_DIR = BASE_DIR / "data" / "processed" | |
| LOG_DIR = BASE_DIR / "logs" | |
| MORNINGSTAR_RAW = RAW_DIR / "morningstar" | |
| SEC_RAW = RAW_DIR / "sec_filings" / "AAPL" | |
| MORNINGSTAR_OUT = PROCESSED_DIR / "morningstar" | |
| SEC_OUT = PROCESSED_DIR / "sec_filings" / "AAPL" | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| # ββ Logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level = logging.INFO, | |
| format = "%(asctime)s %(levelname)-8s %(message)s", | |
| handlers=[ | |
| logging.FileHandler(LOG_DIR / "doc_parser.log"), | |
| logging.StreamHandler(), | |
| ] | |
| ) | |
| log = logging.getLogger(__name__) | |
| # ββ Docling setup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_converter(): | |
| from docling.document_converter import DocumentConverter, PdfFormatOption | |
| from docling.datamodel.pipeline_options import PdfPipelineOptions | |
| from docling.datamodel.base_models import InputFormat | |
| opts = PdfPipelineOptions() | |
| opts.do_table_structure = True # preserve financial tables | |
| opts.do_ocr = False # these are digital PDFs, skip OCR | |
| opts.generate_picture_images = False # skip figure image extraction | |
| return DocumentConverter( | |
| format_options={ | |
| InputFormat.PDF: PdfFormatOption(pipeline_options=opts) | |
| } | |
| ) | |
| # ββ Parse one PDF ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def parse_pdf(pdf_path: Path, metadata: dict, converter) -> dict: | |
| """ | |
| Parse a single PDF with Docling. | |
| Returns a structured dict with sections, tables, and metadata. | |
| """ | |
| log.info(f" Parsing: {pdf_path.name}") | |
| result = converter.convert(str(pdf_path)) | |
| doc = result.document | |
| # ββ Text sections ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| sections = [] | |
| for item, level in doc.iterate_items(): | |
| from docling.datamodel.document import TextItem, SectionHeaderItem | |
| text = getattr(item, "text", None) | |
| if not text or not text.strip(): | |
| continue | |
| item_type = "header" if isinstance(item, SectionHeaderItem) else "text" | |
| page_num = item.prov[0].page_no if item.prov else None | |
| sections.append({ | |
| "type" : item_type, | |
| "level" : level, | |
| "text" : text.strip(), | |
| "page_num": page_num, | |
| }) | |
| # ββ Tables βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| tables = [] | |
| for i, table in enumerate(doc.tables): | |
| try: | |
| df = table.export_to_dataframe() | |
| markdown = table.export_to_markdown() | |
| page_num = table.prov[0].page_no if table.prov else None | |
| tables.append({ | |
| "index" : i, | |
| "page_num" : page_num, | |
| "markdown" : markdown, | |
| "rows" : len(df), | |
| "cols" : len(df.columns), | |
| "headers" : list(df.columns.astype(str)), | |
| "data" : df.values.tolist(), | |
| "is_atomic": True, # never split this chunk | |
| }) | |
| except Exception as e: | |
| log.warning(f" Table {i} export failed: {e}") | |
| # ββ Full markdown export (for quick inspection) βββββββββββββββββββββββββββ | |
| full_markdown = doc.export_to_markdown() | |
| parsed = { | |
| "metadata" : { | |
| **metadata, | |
| "parsed_at" : datetime.now(timezone.utc).isoformat(), | |
| "parser" : "docling", | |
| "total_pages" : max((s["page_num"] for s in sections if s["page_num"]), default=0), | |
| "total_sections": len(sections), | |
| "total_tables" : len(tables), | |
| }, | |
| "sections" : sections, | |
| "tables" : tables, | |
| "full_markdown" : full_markdown, | |
| } | |
| return parsed | |
| def save_parsed(data: dict, out_path: Path): | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(out_path, "w") as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False, default=str) | |
| size_kb = out_path.stat().st_size / 1024 | |
| log.info(f" Saved: {out_path.name} ({size_kb:.1f} KB)") | |
| # ββ Morningstar PDFs βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_morningstar(converter): | |
| log.info("\n=== Morningstar PDFs ===") | |
| pdfs = list(MORNINGSTAR_RAW.glob("*.pdf")) | |
| log.info(f"Found {len(pdfs)} PDFs") | |
| for pdf in pdfs: | |
| out_path = MORNINGSTAR_OUT / f"{pdf.stem}.json" | |
| if out_path.exists(): | |
| log.info(f" SKIP {pdf.name} (already parsed)") | |
| continue | |
| metadata = { | |
| "source" : "morningstar", | |
| "doc_type" : "research_report", | |
| "file_name" : pdf.name, | |
| "file_path" : str(pdf), | |
| "license" : "proprietary", | |
| "access_level": "internal", | |
| } | |
| try: | |
| parsed = parse_pdf(pdf, metadata, converter) | |
| save_parsed(parsed, out_path) | |
| log.info( | |
| f" Sections: {parsed['metadata']['total_sections']} " | |
| f"Tables: {parsed['metadata']['total_tables']} " | |
| f"Pages: {parsed['metadata']['total_pages']}" | |
| ) | |
| except Exception as e: | |
| log.error(f" FAILED {pdf.name}: {e}") | |
| # ββ SEC Filings ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_sec_filings(converter): | |
| log.info("\n=== SEC Filings (AAPL) ===") | |
| for ftype in ["10-K", "10-Q", "8-K"]: | |
| ftype_dir = SEC_RAW / ftype | |
| if not ftype_dir.exists(): | |
| continue | |
| for folder in sorted(ftype_dir.iterdir()): | |
| htm_files = list(folder.glob("filing.htm")) | |
| if not htm_files: | |
| continue | |
| htm = htm_files[0] | |
| out_name = f"{ftype}_{folder.name}.json" | |
| out_path = SEC_OUT / out_name | |
| if out_path.exists(): | |
| log.info(f" SKIP {out_name} (already parsed)") | |
| continue | |
| # Load filing metadata | |
| meta_file = folder / "metadata.json" | |
| file_meta = {} | |
| if meta_file.exists(): | |
| with open(meta_file) as f: | |
| file_meta = json.load(f) | |
| metadata = { | |
| "source" : "sec_edgar", | |
| "doc_type" : ftype, | |
| "ticker" : "AAPL", | |
| "company" : "Apple Inc.", | |
| "fiscal_year" : file_meta.get("fiscal_year", folder.name[:4]), | |
| "filing_date" : file_meta.get("filing_date", ""), | |
| "accession" : file_meta.get("accession", ""), | |
| "file_name" : htm.name, | |
| "file_path" : str(htm), | |
| "license" : "public", | |
| "access_level": "public", | |
| } | |
| log.info(f" Parsing {ftype}/{folder.name} ...") | |
| try: | |
| parsed = parse_pdf(htm, metadata, converter) | |
| save_parsed(parsed, out_path) | |
| log.info( | |
| f" Sections: {parsed['metadata']['total_sections']} " | |
| f"Tables: {parsed['metadata']['total_tables']} " | |
| f"Pages: {parsed['metadata']['total_pages']}" | |
| ) | |
| except Exception as e: | |
| log.error(f" FAILED {out_name}: {e}") | |
| # ββ Entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| log.info("=" * 60) | |
| log.info("Phase 2 β Document Parser") | |
| log.info("=" * 60) | |
| log.info("Loading Docling converter ...") | |
| converter = build_converter() | |
| log.info("Converter ready.") | |
| process_morningstar(converter) | |
| process_sec_filings(converter) | |
| # Summary | |
| log.info("\n" + "=" * 60) | |
| log.info("Parsing complete. Output files:") | |
| for f in sorted(PROCESSED_DIR.rglob("*.json")): | |
| size_kb = f.stat().st_size / 1024 | |
| log.info(f" {f.relative_to(PROCESSED_DIR)} ({size_kb:.1f} KB)") | |
| log.info("=" * 60) | |