#!/usr/bin/env python3 """ 美股持仓监控 — 盘中每分钟刷新,盘外自动等待 用法: python3 monitor.py [刷新间隔秒数,默认60] """ import json import os import sys import time from datetime import datetime, timedelta import yfinance as yf PORTFOLIO_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "my_portfolio.json") INITIAL_CASH = 100_000.0 REFRESH_SEC = int(sys.argv[1]) if len(sys.argv) > 1 else 60 def load_portfolio(): with open(PORTFOLIO_FILE) as f: return json.load(f) def get_prices(symbols): """批量获取价格(比逐个快很多)""" prices = {} if not symbols: return prices tickers = yf.Tickers(" ".join(symbols)) for sym in symbols: try: info = tickers.tickers[sym].fast_info p = info.get("lastPrice") or info.get("last_price") if p is None: hist = tickers.tickers[sym].history(period="1d") p = hist["Close"].iloc[-1] if not hist.empty else 0 prices[sym] = round(float(p), 2) except Exception: prices[sym] = 0 return prices def is_market_open(): """判断美股是否开盘(美东时间 周一-周五 9:30-16:00)""" try: from zoneinfo import ZoneInfo except ImportError: from backports.zoneinfo import ZoneInfo et = datetime.now(ZoneInfo("America/New_York")) # 周末 if et.weekday() >= 5: return False, et # 盘前/盘后 market_open = et.replace(hour=9, minute=30, second=0, microsecond=0) market_close = et.replace(hour=16, minute=0, second=0, microsecond=0) return market_open <= et <= market_close, et def time_to_next_open(): """计算距离下次开盘的时间""" try: from zoneinfo import ZoneInfo except ImportError: from backports.zoneinfo import ZoneInfo et = datetime.now(ZoneInfo("America/New_York")) # 找到下一个工作日的9:30 target = et.replace(hour=9, minute=30, second=0, microsecond=0) if et.weekday() < 5 and et < target: # 今天是工作日且还没开盘 pass else: # 找下一个工作日 days_ahead = 1 while True: target += timedelta(days=1) if target.weekday() < 5: break days_ahead += 1 target = target.replace(hour=9, minute=30, second=0, microsecond=0) diff = target - et return diff def clear_screen(): os.system("clear" if os.name != "nt" else "cls") def display(portfolio, prices, et_now, is_open): clear_screen() status = "🟢 开盘中" if is_open else "🔴 已休市" print(f""" ╔══════════════════════════════════════════════════════════════╗ ║ 📊 美股模拟投资 — 实时监控 {status} ║ ║ 美东时间: {et_now.strftime('%Y-%m-%d %H:%M:%S'):<20} 刷新间隔: {REFRESH_SEC}秒 ║ ╠══════════════════════════════════════════════════════════════╣""") total_market = 0 total_cost = 0 if portfolio["holdings"]: print(f"║ {'股票':<7} {'股数':>5} {'成本':>9} {'现价':>9} {'市值':>11} {'盈亏':>11} {'涨跌':>7} ║") print(f"║ {'─'*62} ║") for sym in sorted(portfolio["holdings"]): info = portfolio["holdings"][sym] price = prices.get(sym, info["avg_cost"]) mkt = info["shares"] * price cost = info["shares"] * info["avg_cost"] pnl = mkt - cost pct = (pnl / cost * 100) if cost > 0 else 0 sign = "+" if pnl >= 0 else "" color_pnl = f"{sign}{pnl:,.0f}" color_pct = f"{sign}{pct:.1f}%" total_market += mkt total_cost += cost print(f"║ {sym:<7} {info['shares']:>5} {info['avg_cost']:>9.2f} {price:>9.2f} {mkt:>11,.2f} {color_pnl:>11} {color_pct:>7} ║") else: print("║ (空仓) ║") total_assets = portfolio["cash"] + total_market total_pnl = total_assets - INITIAL_CASH total_pct = (total_pnl / INITIAL_CASH * 100) sign = "+" if total_pnl >= 0 else "" print(f"║ {'─'*62} ║") print(f"║ 💰 现金: ${portfolio['cash']:>11,.2f} ║") print(f"║ 📈 持仓: ${total_market:>11,.2f} ║") print(f"║ 💼 总资产: ${total_assets:>11,.2f} 总盈亏: {sign}${total_pnl:>10,.2f} ({sign}{total_pct:.1f}%) ║") print(f"╚══════════════════════════════════════════════════════════════╝") if not is_open: delta = time_to_next_open() hours = int(delta.total_seconds() // 3600) mins = int((delta.total_seconds() % 3600) // 60) print(f"\n ⏳ 距离下次开盘: {hours}小时{mins}分钟(休市期间每5分钟刷新一次)") print(f"\n 按 Ctrl+C 退出") def main(): print(" 🚀 启动监控中...") while True: try: portfolio = load_portfolio() symbols = list(portfolio["holdings"].keys()) prices = get_prices(symbols) is_open, et_now = is_market_open() display(portfolio, prices, et_now, is_open) # 盘中按设定间隔刷新,盘外每5分钟刷一次 wait = REFRESH_SEC if is_open else 300 time.sleep(wait) except KeyboardInterrupt: print("\n\n 👋 监控已停止\n") break except Exception as e: print(f"\n ⚠️ 出错: {e},{REFRESH_SEC}秒后重试...") time.sleep(REFRESH_SEC) if __name__ == "__main__": main()