File size: 6,108 Bytes
2d3db34 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | #!/usr/bin/env python3
"""
็พ่กๆไป็ๆง โ ็ไธญๆฏๅ้ๅทๆฐ๏ผ็ๅค่ชๅจ็ญๅพ
็จๆณ: python3 monitor.py [ๅทๆฐ้ด้็งๆฐ๏ผ้ป่ฎค60]
"""
import json
import os
import sys
import time
from datetime import datetime, timedelta
import yfinance as yf
PORTFOLIO_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "my_portfolio.json")
INITIAL_CASH = 100_000.0
REFRESH_SEC = int(sys.argv[1]) if len(sys.argv) > 1 else 60
def load_portfolio():
with open(PORTFOLIO_FILE) as f:
return json.load(f)
def get_prices(symbols):
"""ๆน้่ทๅไปทๆ ผ๏ผๆฏ้ไธชๅฟซๅพๅค๏ผ"""
prices = {}
if not symbols:
return prices
tickers = yf.Tickers(" ".join(symbols))
for sym in symbols:
try:
info = tickers.tickers[sym].fast_info
p = info.get("lastPrice") or info.get("last_price")
if p is None:
hist = tickers.tickers[sym].history(period="1d")
p = hist["Close"].iloc[-1] if not hist.empty else 0
prices[sym] = round(float(p), 2)
except Exception:
prices[sym] = 0
return prices
def is_market_open():
"""ๅคๆญ็พ่กๆฏๅฆๅผ็๏ผ็พไธๆถ้ด ๅจไธ-ๅจไบ 9:30-16:00๏ผ"""
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
et = datetime.now(ZoneInfo("America/New_York"))
# ๅจๆซ
if et.weekday() >= 5:
return False, et
# ็ๅ/็ๅ
market_open = et.replace(hour=9, minute=30, second=0, microsecond=0)
market_close = et.replace(hour=16, minute=0, second=0, microsecond=0)
return market_open <= et <= market_close, et
def time_to_next_open():
"""่ฎก็ฎ่ท็ฆปไธๆฌกๅผ็็ๆถ้ด"""
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
et = datetime.now(ZoneInfo("America/New_York"))
# ๆพๅฐไธไธไธชๅทฅไฝๆฅ็9:30
target = et.replace(hour=9, minute=30, second=0, microsecond=0)
if et.weekday() < 5 and et < target:
# ไปๅคฉๆฏๅทฅไฝๆฅไธ่ฟๆฒกๅผ็
pass
else:
# ๆพไธไธไธชๅทฅไฝๆฅ
days_ahead = 1
while True:
target += timedelta(days=1)
if target.weekday() < 5:
break
days_ahead += 1
target = target.replace(hour=9, minute=30, second=0, microsecond=0)
diff = target - et
return diff
def clear_screen():
os.system("clear" if os.name != "nt" else "cls")
def display(portfolio, prices, et_now, is_open):
clear_screen()
status = "๐ข ๅผ็ไธญ" if is_open else "๐ด ๅทฒไผๅธ"
print(f"""
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ๐ ็พ่กๆจกๆๆ่ต โ ๅฎๆถ็ๆง {status} โ
โ ็พไธๆถ้ด: {et_now.strftime('%Y-%m-%d %H:%M:%S'):<20} ๅทๆฐ้ด้: {REFRESH_SEC}็ง โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฃ""")
total_market = 0
total_cost = 0
if portfolio["holdings"]:
print(f"โ {'่ก็ฅจ':<7} {'่กๆฐ':>5} {'ๆๆฌ':>9} {'็ฐไปท':>9} {'ๅธๅผ':>11} {'็ไบ':>11} {'ๆถจ่ท':>7} โ")
print(f"โ {'โ'*62} โ")
for sym in sorted(portfolio["holdings"]):
info = portfolio["holdings"][sym]
price = prices.get(sym, info["avg_cost"])
mkt = info["shares"] * price
cost = info["shares"] * info["avg_cost"]
pnl = mkt - cost
pct = (pnl / cost * 100) if cost > 0 else 0
sign = "+" if pnl >= 0 else ""
color_pnl = f"{sign}{pnl:,.0f}"
color_pct = f"{sign}{pct:.1f}%"
total_market += mkt
total_cost += cost
print(f"โ {sym:<7} {info['shares']:>5} {info['avg_cost']:>9.2f} {price:>9.2f} {mkt:>11,.2f} {color_pnl:>11} {color_pct:>7} โ")
else:
print("โ ๏ผ็ฉบไป๏ผ โ")
total_assets = portfolio["cash"] + total_market
total_pnl = total_assets - INITIAL_CASH
total_pct = (total_pnl / INITIAL_CASH * 100)
sign = "+" if total_pnl >= 0 else ""
print(f"โ {'โ'*62} โ")
print(f"โ ๐ฐ ็ฐ้: ${portfolio['cash']:>11,.2f} โ")
print(f"โ ๐ ๆไป: ${total_market:>11,.2f} โ")
print(f"โ ๐ผ ๆป่ตไบง: ${total_assets:>11,.2f} ๆป็ไบ: {sign}${total_pnl:>10,.2f} ({sign}{total_pct:.1f}%) โ")
print(f"โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ")
if not is_open:
delta = time_to_next_open()
hours = int(delta.total_seconds() // 3600)
mins = int((delta.total_seconds() % 3600) // 60)
print(f"\n โณ ่ท็ฆปไธๆฌกๅผ็: {hours}ๅฐๆถ{mins}ๅ้๏ผไผๅธๆ้ดๆฏ5ๅ้ๅทๆฐไธๆฌก๏ผ")
print(f"\n ๆ Ctrl+C ้ๅบ")
def main():
print(" ๐ ๅฏๅจ็ๆงไธญ...")
while True:
try:
portfolio = load_portfolio()
symbols = list(portfolio["holdings"].keys())
prices = get_prices(symbols)
is_open, et_now = is_market_open()
display(portfolio, prices, et_now, is_open)
# ็ไธญๆ่ฎพๅฎ้ด้ๅทๆฐ๏ผ็ๅคๆฏ5ๅ้ๅทไธๆฌก
wait = REFRESH_SEC if is_open else 300
time.sleep(wait)
except KeyboardInterrupt:
print("\n\n ๐ ็ๆงๅทฒๅๆญข\n")
break
except Exception as e:
print(f"\n โ ๏ธ ๅบ้: {e}๏ผ{REFRESH_SEC}็งๅ้่ฏ...")
time.sleep(REFRESH_SEC)
if __name__ == "__main__":
main()
|