import argparse import zipfile from pathlib import Path from urllib.parse import urlparse import requests import pandas as pd # ------------------------------------------------------------ # 1) ZIP 다운로드 (캐싱) # ------------------------------------------------------------ def download_zip(url: str, cache_dir: str) -> Path: cache_dir = Path(cache_dir) cache_dir.mkdir(parents=True, exist_ok=True) zip_path = cache_dir / Path(urlparse(url).path).name if zip_path.exists(): print(f"[INFO] Using cached ZIP → {zip_path}") return zip_path print(f"[INFO] Downloading ZIP from {url}") headers = {"User-Agent": "2Digit AI@2digit.io", "Host": "www.sec.gov"} resp = requests.get(url, headers=headers, stream=True) resp.raise_for_status() zip_path.write_bytes(resp.content) print(f"[INFO] Saved ZIP → {zip_path}") return zip_path # ------------------------------------------------------------ # 2) ZIP 내부 읽기 # ------------------------------------------------------------ def load_tsv_from_zip(zip_path: Path): print("[INFO] Loading raw 13F data from ZIP...") with zipfile.ZipFile(zip_path, "r") as z: names = z.namelist() cover_path = next(f for f in names if f.endswith("/COVERPAGE.tsv")) info_path = next(f for f in names if f.endswith("/INFOTABLE.tsv")) submission_path = next(f for f in names if f.endswith("/SUBMISSION.tsv")) print(f"[INFO] Loading SUBMISSION ← {submission_path.split('/')[-1]}") print(f"[INFO] Loading COVERPAGE ← {cover_path.split('/')[-1]}") print(f"[INFO] Loading INFOTABLE ← {info_path.split('/')[-1]}") # 최소 컬럼만 읽기 submission = pd.read_csv( z.open(submission_path), sep="\t", usecols=["ACCESSION_NUMBER", "SUBMISSIONTYPE"], dtype=str ) cover = pd.read_csv( z.open(cover_path), sep="\t", usecols=["ACCESSION_NUMBER", "FILINGMANAGER_NAME"], dtype=str ) info_cols = ["ACCESSION_NUMBER", "NAMEOFISSUER", "CUSIP", "FIGI", "VALUE", "SSHPRNAMT"] info = pd.read_csv( z.open(info_path), sep="\t", usecols=info_cols, dtype=str, low_memory=False ) print(f"[INFO] Raw load complete:") print(f" → SUBMISSION: {len(submission):,} rows") print(f" → COVERPAGE : {len(cover):,} rows") print(f" → INFOTABLE : {len(info):,} rows") return submission, cover, info # ------------------------------------------------------------ # 3) FIGI 기반 Issuer 이름 정규화 (메모리 최소화) # ------------------------------------------------------------ def normalize_issuer(df: pd.DataFrame) -> pd.DataFrame: print("[INFO] Normalizing issuer names using FIGI...") df = df.copy() df["NAMEOFISSUER"] = df["NAMEOFISSUER"].str.upper().str.strip() # FIGI가 있는 경우에만 그룹화 → 불필요한 메모리 사용 방지 figi_notna = df["FIGI"].notna() if figi_notna.any(): figi_rep = ( df.loc[figi_notna, ['FIGI', 'NAMEOFISSUER']] .drop_duplicates(subset='FIGI') .set_index('FIGI')['NAMEOFISSUER'] ) df["NAMEOFISSUER"] = df["FIGI"].map(figi_rep).fillna(df["NAMEOFISSUER"]) return df # ------------------------------------------------------------ # 4) 핵심 처리: 각 투자자별 1% 이상 포지션만 남기기 # ------------------------------------------------------------ def build_tables(submission: pd.DataFrame, cover: pd.DataFrame, info: pd.DataFrame): print("[INFO] Starting post-processing and filtering...") # 1. 13F-HR / 13F-HR/A 만 남기기 (13F-NT 배제) valid_accessions = submission[ submission["SUBMISSIONTYPE"].isin({"13F-HR", "13F-HR/A"}) ]["ACCESSION_NUMBER"] before_cover = len(cover) cover = cover[cover["ACCESSION_NUMBER"].isin(valid_accessions)].copy() info = info[info["ACCESSION_NUMBER"].isin(valid_accessions)].copy() print(f"[INFO] Filtered out 13F-NT → COVERPAGE {before_cover:,} → {len(cover):,}") print(f"[INFO] → INFOTABLE → {len(info):,}") if info.empty: print("[WARN] No valid holdings after removing 13F-NT") return pd.DataFrame(), pd.DataFrame() # 2. 병합 + 이름 정규화 data = ( info.pipe(normalize_issuer) .merge(cover[["ACCESSION_NUMBER", "FILINGMANAGER_NAME"]], on="ACCESSION_NUMBER") ) del submission, cover, info # 즉시 메모리 해제 data["FILINGMANAGER_NAME"] = data["FILINGMANAGER_NAME"].str.upper().str.strip() # 3. 숫자 변환 data["VALUE"] = pd.to_numeric(data["VALUE"], errors="coerce") data["AMOUNT"] = pd.to_numeric(data["SSHPRNAMT"], errors="coerce") # 핵심 수정: VALUE가 0이거나 NaN인 행 완전히 제거 (AUM=0 투자자 제거) before = len(data) data = data[data["VALUE"] > 0].copy() print(f"[INFO] Removed VALUE ≤ 0 rows → {before:,} → {len(data):,}") if data.empty: print("[WARN] No positive VALUE holdings after cleaning") return pd.DataFrame(), pd.DataFrame() # 이제 안전하게 형 변환 data["VALUE"] = data["VALUE"].astype("int64") data["AMOUNT"] = data["AMOUNT"].fillna(0).astype("int64") # 4. 투자자별 총 보유액 계산 data["INVESTOR_TOTAL"] = data.groupby("FILINGMANAGER_NAME", observed=True)["VALUE"].transform("sum") # 5. 1% 이상 포지션만 남기기 data = data[data["VALUE"] >= data["INVESTOR_TOTAL"] * 0.01].copy() if data.empty: print("[WARN] No positions ≥1% after all filtering") return pd.DataFrame(), pd.DataFrame() print(f"[INFO] Final meaningful holdings (≥1% per investor): {len(data):,}") print(f"[INFO] Active investors with significant positions: {data['FILINGMANAGER_NAME'].nunique():,}") # 6. 비중 계산 data["PERCENT"] = (data["VALUE"] / data["INVESTOR_TOTAL"] * 100).round(3) # 7. 최종 정리 final = data.rename(columns={ "CUSIP": "ID", "FILINGMANAGER_NAME": "NAME", "NAMEOFISSUER": "COMPANY", })[["ID", "NAME", "COMPANY", "PERCENT", "VALUE", "AMOUNT"]] # 8. 투자자 리스트 (AUM 큰 순서) investor_order = ( data.groupby("FILINGMANAGER_NAME")["INVESTOR_TOTAL"] .first() .sort_values(ascending=False) .index ) final["NAME"] = pd.Categorical(final["NAME"], categories=investor_order, ordered=True) final = final.sort_values(["NAME", "VALUE"], ascending=[True, False]).reset_index(drop=True) investors = pd.DataFrame({"NAME": investor_order}) print(f"[INFO] Done! Output ready → {len(final):,} rows from {len(investors)} real investors") return final, investors # ------------------------------------------------------------ # 5) MAIN # ------------------------------------------------------------ def main(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--input", type=str, default="src_data/", help="원천 데이터 폴더") parser.add_argument( "--url", type=str, default="https://www.sec.gov/files/structureddata/data/form-13f-data-sets/01jun2025-31aug2025_form13f.zip", help="sec 13f zip 파일 URL", ) parser.add_argument("--output", type=str, default="data/", help="정리된 데이터 저장 경로") args = parser.parse_args() zip_path = download_zip(args.url, args.input) submission, coverpage, infotable = load_tsv_from_zip(zip_path) # 3개 반환 info_table, investor_table = build_tables(submission, coverpage, infotable) output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) info_table.to_csv(output_dir / "SEC_Filing_Manager.csv", index=False) investor_table.to_csv(output_dir / "INVEST_NAME.csv", index=False) print("[INFO] All done – clean SEC 13F data ready.") if __name__ == "__main__": main()