Spaces:
Runtime error
Runtime error
| import pykrx | |
| import pandas as pd | |
| import yfinance as yf | |
| from datetime import datetime | |
| import re | |
| import os | |
| # ============================================================ | |
| # 1) KOSPI / KOSDAQ ํฐ์ปค ๋ชฉ๋ก ๋ฏธ๋ฆฌ ๋ก๋ฉ | |
| # ============================================================ | |
| print("[INFO] Loading KOSPI/KOSDAQ ticker lists...") | |
| KOSPI_SET = set(pykrx.stock.get_market_ticker_list(market="KOSPI")) | |
| KOSDAQ_SET = set(pykrx.stock.get_market_ticker_list(market="KOSDAQ")) | |
| # ============================================================ | |
| # 2) ์์ฅ์ ๋ฐ๋ผ .KS / .KQ ์๋ ๋ถ์ฐฉ | |
| # ============================================================ | |
| def attach_market_suffix(ticker6: str): | |
| if ticker6 in KOSPI_SET: | |
| return ticker6 + ".KS" | |
| if ticker6 in KOSDAQ_SET: | |
| return ticker6 + ".KQ" | |
| return ticker6 + ".KS" | |
| # ============================================================ | |
| # 3) RAW-ID ๋ฌธ์์ด์์ ๋ํ ํ๊ตญ ํฐ์ปค ์ถ์ถ | |
| # ============================================================ | |
| def extract_primary_ticker(raw_id: str): | |
| if pd.isna(raw_id): | |
| return "" | |
| s = str(raw_id).strip() | |
| if re.fullmatch(r"\d{6}\.(KS|KQ)", s): | |
| return s | |
| parts = re.split(r"[,\s]+", s) | |
| # 6์๋ฆฌ ์ซ์ โ ์์ฅ ์๋ํ๋ณ | |
| for p in parts: | |
| if re.fullmatch(r"\d{6}", p): | |
| return attach_market_suffix(p) | |
| # ์ด๋ฏธ .KS/.KQ | |
| for p in parts: | |
| if re.fullmatch(r"\d{6}\.(KS|KQ)", p): | |
| return p | |
| # fallback ์ซ์ | |
| for p in parts: | |
| if p.isdigit() and len(p) == 6: | |
| return attach_market_suffix(p) | |
| # ๋ง์ง๋ง fallback | |
| return parts[0] if parts else s | |
| # ============================================================ | |
| # 4) ํ์ฌ๋ช ์กฐํ ํจ์ | |
| # ============================================================ | |
| def safe_company_name(ticker): | |
| try: | |
| yf_t = yf.Ticker(ticker) | |
| try: | |
| fi = yf_t.fast_info | |
| if fi: | |
| nm = fi.get("longName") or fi.get("shortName") | |
| if nm: | |
| return nm | |
| except: | |
| pass | |
| try: | |
| info = yf_t.info | |
| if info: | |
| nm = info.get("longName") or info.get("shortName") | |
| if nm: | |
| return nm | |
| except: | |
| pass | |
| except Exception: | |
| pass | |
| return "" | |
| # ============================================================ | |
| # 5) MAIN CODE | |
| # ============================================================ | |
| def main(): | |
| input_df = pd.read_csv('etf_2.tsv', sep='\t') | |
| output_path = "ETF.csv" | |
| # ํ์ผ ์์ผ๋ฉด ํค๋ ํฌํจ ์๋ก ์์ฑ | |
| if not os.path.exists(output_path): | |
| pd.DataFrame(columns=["ID","NAME","COMPANY","VALUE","AMOUNT","PERCENTAGE"])\ | |
| .to_csv(output_path, index=False, encoding="utf-8") | |
| for row in input_df.to_dict('records'): | |
| etf_ticker = row['์ข ๋ชฉ์ฝ๋'] | |
| name_from_input = row['์ข ๋ชฉ๋ช '] | |
| today_dt = datetime.now().strftime('%Y%m%d') | |
| df = pykrx.stock.get_etf_portfolio_deposit_file(etf_ticker, today_dt) | |
| if df is None or df.empty: | |
| print(f"[WARN] {etf_ticker} PDF ์์. skip") | |
| continue | |
| # ============ 1) ํฐ์ปค ์ ๋ฆฌ ============ | |
| if "ํฐ์ปค" in df.columns: | |
| df = df.rename(columns={"ํฐ์ปค": "ID"}) | |
| elif df.index.name == "ํฐ์ปค": | |
| df = df.reset_index().rename(columns={"ํฐ์ปค": "ID"}) | |
| else: | |
| print(f"[ERROR] {etf_ticker}: ํฐ์ปค ์ปฌ๋ผ ์์") | |
| print(df) | |
| continue | |
| raw_ids = df["ID"].astype(str) | |
| df["ID"] = raw_ids.apply(extract_primary_ticker) | |
| # ============ 2) ETF ์ด๋ฆ ============ | |
| df["NAME"] = name_from_input | |
| # ============ 3) ์ซ์ ์ฒ๋ฆฌ ============ | |
| if "๊ธ์ก" in df.columns: | |
| df["VALUE"] = pd.to_numeric(df["๊ธ์ก"], errors="coerce") | |
| if "๊ณ์ฝ์" in df.columns: | |
| df["AMOUNT"] = pd.to_numeric(df["๊ณ์ฝ์"], errors="coerce") | |
| if "VALUE" not in df.columns: | |
| df["VALUE"] = None | |
| if "AMOUNT" not in df.columns: | |
| df["AMOUNT"] = None | |
| df["VALUE"] = df["VALUE"].clip(lower=0) | |
| df["AMOUNT"] = df["AMOUNT"].clip(lower=0) | |
| # ============ 4) ํ์ฌ๋ช ์กฐํ ============ | |
| df["COMPANY"] = df["ID"].apply(safe_company_name) | |
| # ============ 5) ๋น์ค ์ ๊ฑฐ ============ | |
| if "๋น์ค" in df.columns: | |
| df = df.drop(columns=["๋น์ค"]) | |
| # ============ 6) ๋น์จ ๊ณ์ฐ ============ | |
| if df["VALUE"].notna().any(): | |
| total_value = df["VALUE"].sum(skipna=True) | |
| if total_value > 0: | |
| df["PERCENTAGE"] = (df["VALUE"] / total_value * 100).round(1) | |
| else: | |
| df["PERCENTAGE"] = 0.0 | |
| else: | |
| df["PERCENTAGE"] = None | |
| # ============ 7) ์ต์ข ์ปฌ๋ผ ============ | |
| final_cols = ["ID", "NAME", "COMPANY", "VALUE", "AMOUNT", "PERCENTAGE"] | |
| for col in final_cols: | |
| if col not in df.columns: | |
| df[col] = None | |
| df = df[final_cols].reset_index(drop=True) | |
| print(df) | |
| # =============================================== | |
| # 8) CSV ์ append (์ผํ๋ก ๊ตฌ๋ถ + UTF-8) | |
| # =============================================== | |
| df.to_csv( | |
| output_path, | |
| mode="a", | |
| header=False, | |
| index=False, | |
| encoding="utf-8" | |
| ) | |
| # ์ฌ๋ฌ ์ข ๋ชฉ ์ฒ๋ฆฌํ ๋ break ์ ๊ฑฐ | |
| # break | |
| if __name__ == "__main__": | |
| main() | |