stock-sim / monitor.py
jasonfan's picture
Upload folder using huggingface_hub
2d3db34 verified
#!/usr/bin/env python3
"""
็พŽ่‚กๆŒไป“็›‘ๆŽง โ€” ็›˜ไธญๆฏๅˆ†้’Ÿๅˆทๆ–ฐ๏ผŒ็›˜ๅค–่‡ชๅŠจ็ญ‰ๅพ…
็”จๆณ•: python3 monitor.py [ๅˆทๆ–ฐ้—ด้š”็ง’ๆ•ฐ๏ผŒ้ป˜่ฎค60]
"""
import json
import os
import sys
import time
from datetime import datetime, timedelta
import yfinance as yf
PORTFOLIO_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "my_portfolio.json")
INITIAL_CASH = 100_000.0
REFRESH_SEC = int(sys.argv[1]) if len(sys.argv) > 1 else 60
def load_portfolio():
with open(PORTFOLIO_FILE) as f:
return json.load(f)
def get_prices(symbols):
"""ๆ‰น้‡่Žทๅ–ไปทๆ ผ๏ผˆๆฏ”้€ไธชๅฟซๅพˆๅคš๏ผ‰"""
prices = {}
if not symbols:
return prices
tickers = yf.Tickers(" ".join(symbols))
for sym in symbols:
try:
info = tickers.tickers[sym].fast_info
p = info.get("lastPrice") or info.get("last_price")
if p is None:
hist = tickers.tickers[sym].history(period="1d")
p = hist["Close"].iloc[-1] if not hist.empty else 0
prices[sym] = round(float(p), 2)
except Exception:
prices[sym] = 0
return prices
def is_market_open():
"""ๅˆคๆ–ญ็พŽ่‚กๆ˜ฏๅฆๅผ€็›˜๏ผˆ็พŽไธœๆ—ถ้—ด ๅ‘จไธ€-ๅ‘จไบ” 9:30-16:00๏ผ‰"""
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
et = datetime.now(ZoneInfo("America/New_York"))
# ๅ‘จๆœซ
if et.weekday() >= 5:
return False, et
# ็›˜ๅ‰/็›˜ๅŽ
market_open = et.replace(hour=9, minute=30, second=0, microsecond=0)
market_close = et.replace(hour=16, minute=0, second=0, microsecond=0)
return market_open <= et <= market_close, et
def time_to_next_open():
"""่ฎก็ฎ—่ท็ฆปไธ‹ๆฌกๅผ€็›˜็š„ๆ—ถ้—ด"""
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
et = datetime.now(ZoneInfo("America/New_York"))
# ๆ‰พๅˆฐไธ‹ไธ€ไธชๅทฅไฝœๆ—ฅ็š„9:30
target = et.replace(hour=9, minute=30, second=0, microsecond=0)
if et.weekday() < 5 and et < target:
# ไปŠๅคฉๆ˜ฏๅทฅไฝœๆ—ฅไธ”่ฟ˜ๆฒกๅผ€็›˜
pass
else:
# ๆ‰พไธ‹ไธ€ไธชๅทฅไฝœๆ—ฅ
days_ahead = 1
while True:
target += timedelta(days=1)
if target.weekday() < 5:
break
days_ahead += 1
target = target.replace(hour=9, minute=30, second=0, microsecond=0)
diff = target - et
return diff
def clear_screen():
os.system("clear" if os.name != "nt" else "cls")
def display(portfolio, prices, et_now, is_open):
clear_screen()
status = "๐ŸŸข ๅผ€็›˜ไธญ" if is_open else "๐Ÿ”ด ๅทฒไผ‘ๅธ‚"
print(f"""
โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
โ•‘ ๐Ÿ“Š ็พŽ่‚กๆจกๆ‹ŸๆŠ•่ต„ โ€” ๅฎžๆ—ถ็›‘ๆŽง {status} โ•‘
โ•‘ ็พŽไธœๆ—ถ้—ด: {et_now.strftime('%Y-%m-%d %H:%M:%S'):<20} ๅˆทๆ–ฐ้—ด้š”: {REFRESH_SEC}็ง’ โ•‘
โ• โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ฃ""")
total_market = 0
total_cost = 0
if portfolio["holdings"]:
print(f"โ•‘ {'่‚ก็ฅจ':<7} {'่‚กๆ•ฐ':>5} {'ๆˆๆœฌ':>9} {'็Žฐไปท':>9} {'ๅธ‚ๅ€ผ':>11} {'็›ˆไบ':>11} {'ๆถจ่ทŒ':>7} โ•‘")
print(f"โ•‘ {'โ”€'*62} โ•‘")
for sym in sorted(portfolio["holdings"]):
info = portfolio["holdings"][sym]
price = prices.get(sym, info["avg_cost"])
mkt = info["shares"] * price
cost = info["shares"] * info["avg_cost"]
pnl = mkt - cost
pct = (pnl / cost * 100) if cost > 0 else 0
sign = "+" if pnl >= 0 else ""
color_pnl = f"{sign}{pnl:,.0f}"
color_pct = f"{sign}{pct:.1f}%"
total_market += mkt
total_cost += cost
print(f"โ•‘ {sym:<7} {info['shares']:>5} {info['avg_cost']:>9.2f} {price:>9.2f} {mkt:>11,.2f} {color_pnl:>11} {color_pct:>7} โ•‘")
else:
print("โ•‘ ๏ผˆ็ฉบไป“๏ผ‰ โ•‘")
total_assets = portfolio["cash"] + total_market
total_pnl = total_assets - INITIAL_CASH
total_pct = (total_pnl / INITIAL_CASH * 100)
sign = "+" if total_pnl >= 0 else ""
print(f"โ•‘ {'โ”€'*62} โ•‘")
print(f"โ•‘ ๐Ÿ’ฐ ็Žฐ้‡‘: ${portfolio['cash']:>11,.2f} โ•‘")
print(f"โ•‘ ๐Ÿ“ˆ ๆŒไป“: ${total_market:>11,.2f} โ•‘")
print(f"โ•‘ ๐Ÿ’ผ ๆ€ป่ต„ไบง: ${total_assets:>11,.2f} ๆ€ป็›ˆไบ: {sign}${total_pnl:>10,.2f} ({sign}{total_pct:.1f}%) โ•‘")
print(f"โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•")
if not is_open:
delta = time_to_next_open()
hours = int(delta.total_seconds() // 3600)
mins = int((delta.total_seconds() % 3600) // 60)
print(f"\n โณ ่ท็ฆปไธ‹ๆฌกๅผ€็›˜: {hours}ๅฐๆ—ถ{mins}ๅˆ†้’Ÿ๏ผˆไผ‘ๅธ‚ๆœŸ้—ดๆฏ5ๅˆ†้’Ÿๅˆทๆ–ฐไธ€ๆฌก๏ผ‰")
print(f"\n ๆŒ‰ Ctrl+C ้€€ๅ‡บ")
def main():
print(" ๐Ÿš€ ๅฏๅŠจ็›‘ๆŽงไธญ...")
while True:
try:
portfolio = load_portfolio()
symbols = list(portfolio["holdings"].keys())
prices = get_prices(symbols)
is_open, et_now = is_market_open()
display(portfolio, prices, et_now, is_open)
# ็›˜ไธญๆŒ‰่ฎพๅฎš้—ด้š”ๅˆทๆ–ฐ๏ผŒ็›˜ๅค–ๆฏ5ๅˆ†้’Ÿๅˆทไธ€ๆฌก
wait = REFRESH_SEC if is_open else 300
time.sleep(wait)
except KeyboardInterrupt:
print("\n\n ๐Ÿ‘‹ ็›‘ๆŽงๅทฒๅœๆญข\n")
break
except Exception as e:
print(f"\n โš ๏ธ ๅ‡บ้”™: {e}๏ผŒ{REFRESH_SEC}็ง’ๅŽ้‡่ฏ•...")
time.sleep(REFRESH_SEC)
if __name__ == "__main__":
main()