stockprice / src /features.py
cormort's picture
Update src/features.py
30cacbc verified
# src/features.py
import pandas as pd
import numpy as np
import yfinance as yf
from .config import Config
from .data import data_service
import twstock
class FeatureLayer:
@staticmethod
def calculate_scores(ticker, df_price, df_inst, df_rev, fund, price, data_source):
# ... (這部分保持不變,為了節省篇幅省略) ...
scores = {}
details = {}
close = df_price['Close']
ma20 = close.rolling(20).mean().iloc[-1]
ma60 = close.rolling(60).mean().iloc[-1]
ts = 0; tm = []
if price > ma20: ts += 40; tm.append("站上月線")
if price > ma60: ts += 40; tm.append("站上季線")
if close.rolling(5).mean().iloc[-1] > ma20 > ma60: ts += 20; tm.append("多頭排列")
scores['技術'] = min(ts, 100); details['技術'] = tm
cs = 50; cm = []
if data_source == "FinMind":
if not df_inst.empty and '外資' in df_inst.columns:
f_sum = df_inst['外資'].tail(5).sum()
if f_sum > 1: cs += 20; cm.append("外資買超")
elif f_sum < -1: cs -= 20; cm.append("外資賣超")
else:
if not df_inst.empty and '主力動向' in df_inst.columns:
m_sum = df_inst['主力動向'].tail(5).sum()
if m_sum > 0: cs += 15; cm.append("主力吸籌(估)")
else: cs -= 15; cm.append("主力調節(估)")
scores['籌碼'] = max(0, min(cs, 100)); details['籌碼'] = cm if cm else ["籌碼中性"]
vs = 0; vm = []
dy = (fund['avg_div']/price)*100 if price > 0 else 0
if dy > 4: vs += 50; vm.append(f"殖利率 {dy:.1f}%")
try:
pe = yf.Ticker(ticker).info.get('trailingPE', 999)
if 0 < pe < 15: vs += 50; vm.append(f"PE {pe:.1f}")
except: pass
scores['價值'] = min(vs, 100); details['價值'] = vm
ms = 50; mm = []
ret = (close.iloc[-1]/close.iloc[-20]-1)*100
if ret > 5: ms += 30; mm.append("月勢轉強")
elif ret < -5: ms -= 20; mm.append("月勢轉弱")
scores['動能'] = min(ms, 100); details['動能'] = mm
gs = 40; gm = []
if not df_rev.empty:
try:
yoy = ((df_rev.iloc[-1]['revenue'] - df_rev.iloc[-13]['revenue']) / df_rev.iloc[-13]['revenue'])*100
if yoy > 20: gs = 100; gm.append(f"YoY +{yoy:.1f}%")
elif yoy > 0: gs = 70; gm.append("營收成長")
else: gs = 30; gm.append("營收衰退")
except: pass
scores['成長'] = gs; details['成長'] = gm
rs = 80; rm = []
if df_price['Volume'].iloc[-1] > df_price['Volume'].tail(20).mean()*2:
if price < ma60: rs += 20; rm.append("低檔爆量")
else: rs -= 40; rm.append("高檔爆量")
scores['風險'] = max(0, min(rs, 100)); details['風險'] = rm
return scores, details
@staticmethod
def calculate_weighted_score(scores, strategy_name):
weights = Config.STRATEGY_PROFILES.get(strategy_name, Config.STRATEGY_PROFILES["成長型 (預設)"])
total_score = 0
for k, v in scores.items():
total_score += v * weights.get(k, 0.16)
return round(total_score, 1)
@staticmethod
def simple_backtest(df, hold_days=20):
# ... (保持不變) ...
if len(df) < 60: return None
ma20 = df['Close'].rolling(20).mean()
signal = (df['Close'] > ma20) & (ma20 > ma20.shift(1))
results = []
start_idx = max(20, len(df) - 120)
for i in range(start_idx, len(df)-hold_days, 5):
if signal.iloc[i]:
entry = df['Open'].iloc[i+1]
exit_p = df['Close'].iloc[i+hold_days]
ret = (exit_p / entry - 1) * 100
results.append(ret)
start_price = df['Close'].iloc[start_idx]
end_price = df['Close'].iloc[-1]
bench_ret = (end_price / start_price - 1) * 100
if not results: return {"勝率": 0, "平均報酬": 0, "次數": 0, "基準報酬": round(bench_ret, 2)}
wins = [r for r in results if r > 0]
return {"勝率": round(len(wins)/len(results)*100, 1), "平均報酬": round(np.mean(results), 2), "次數": len(results), "基準報酬": round(bench_ret, 2)}
# ★★★ 重點修改區域:ETF 智能顧問 ★★★
@staticmethod
def run_etf_screener(category, sort_by):
if category != "全部": target_dict = Config.ETF_DATABASE.get(category, {})
else: target_dict = {k:v for d in Config.ETF_DATABASE.values() for k,v in d.items()}
tickers = list(target_dict.keys())
if not tickers: return pd.DataFrame()
data = data_service.fetch_batch_history(tickers)
results = []
for ticker in tickers:
try:
hist = data[ticker] if len(tickers) > 1 else data
hist = hist.dropna(subset=['Close'])
if len(hist) < 60: continue
# 1. 數據計算
price = hist['Close'].iloc[-1]
# 季報酬 (動能)
r3 = ((hist['Close'].iloc[-1]/hist['Close'].iloc[-60])-1)*100
# 月報酬 (短期)
r1 = ((hist['Close'].iloc[-1]/hist['Close'].iloc[-20])-1)*100
# 波動率 (風險)
vol = hist['Close'].pct_change().std() * 100 * (252**0.5)
# 季線趨勢 (MA60)
ma60 = hist['Close'].rolling(60).mean().iloc[-1]
# 2. AI 評分與診斷
score = 0
tags = []
# 動能分 (40%)
if r3 > 10: score += 40; tags.append("🚀 強勢")
elif r3 > 0: score += 20
elif r3 < -5: tags.append("📉 弱勢")
# 趨勢分 (30%)
if price > ma60: score += 30
# 風險分 (30%) - 波動越低分越高 (針對 ETF 屬性)
if vol < 12: score += 30; tags.append("🛡️ 穩健")
elif vol < 20: score += 20
else: tags.append("🌊 活潑") # 波動大
# 亮點標籤:高CP值 (漲幅/波動 > 1)
if vol > 0 and (r3 / vol) > 1: tags.append("💎 高CP")
ai_comment = " ".join(tags[:2]) # 取前兩個
# 3. 操作建議
if score >= 80: suggestion = "🟢 順勢佈局"
elif score >= 60: suggestion = "🟡 區間操作"
else: suggestion = "⚪ 暫時觀望"
results.append({
"代碼": ticker.replace('.TW',''),
"ETF名稱": target_dict[ticker],
"建議": suggestion, # 新增
"AI 診斷": ai_comment, # 新增
"綜合評分": score, # 新增
"現價": round(price, 2),
"季報酬%": round(r3, 2),
"波動率%": round(vol, 1)
})
except: continue
df = pd.DataFrame(results)
if df.empty: return df
# 欄位排序
cols = ["代碼", "ETF名稱", "建議", "AI 診斷", "綜合評分", "季報酬%", "波動率%", "現價"]
df = df[cols]
# 根據使用者選擇排序
sort_col = sort_by if sort_by in df.columns else "綜合評分"
return df.sort_values(sort_col, ascending=False)
@staticmethod
def run_value_screener(category):
# ... (保持不變) ...
candidates = Config.STOCK_POOLS.get(category, [])
results = []
for t in candidates:
try:
stock = yf.Ticker(t)
info = stock.info
pe = info.get('trailingPE', 999) or 999
dy = (info.get('dividendYield', 0) or 0) * 100
pb = info.get('priceToBook', 999) or 999
roe = (info.get('returnOnEquity', 0) or 0) * 100
beta = info.get('beta', 1.0) or 1.0
price = info.get('currentPrice', 0)
score = 0
if 0 < pe < 12: score += 25
elif pe < 18: score += 15
if dy > 5: score += 25
elif dy > 3: score += 15
if pb < 1.2: score += 15
if roe > 15: score += 20
elif roe > 10: score += 10
if 0 < beta < 0.8: score += 15
elif beta > 1.5: score -= 10
tags = []
if roe > 15: tags.append("🏆 績優")
if dy > 5: tags.append("💰 現金牛")
if 0 < pe < 10: tags.append("💎 超值")
if 0 < beta < 0.6: tags.append("🛡️ 抗跌")
if not tags: tags.append("⚖️ 中性")
ai_comment = " ".join(tags[:2])
if score >= 75: suggestion = "🟢 積極佈局"
elif score >= 60: suggestion = "🟡 逢低買進"
else: suggestion = "⚪ 再觀察"
name = t
if t.split('.')[0] in twstock.codes: name = twstock.codes[t.split('.')[0]].name
results.append({
"代碼": t, "名稱": name, "AI 診斷": ai_comment, "建議": suggestion,
"評分": int(score), "ROE%": round(roe, 1), "殖利率%": round(dy, 1),
"本益比": round(pe, 1) if pe != 999 else "N/A", "風險係數": round(beta, 2), "現價": price
})
except: continue
df = pd.DataFrame(results)
if df.empty: return df
cols = ["代碼", "名稱", "建議", "AI 診斷", "評分", "ROE%", "殖利率%", "本益比", "風險係數", "現價"]
df = df[cols]
return df.sort_values("評分", ascending=False)