KYTHY's picture
Update app.py
fb9ddf4 verified
raw
history blame
13.3 kB
import streamlit as st
import requests
import pandas as pd
from datetime import datetime, timedelta
import nltk
import numpy as np
from sklearn.linear_model import LinearRegression
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import yfinance as yf
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
# --------------------------
# CONFIG
# --------------------------
st.set_page_config(page_title="📰 News Sentiment Analysis for Young Investor", layout="wide")
API_KEY = "88bc396d4eab4be494a4b86ec842db47"
# --------------------------
# โหลด FinBERT model
# --------------------------
@st.cache_resource
def load_finbert():
tokenizer = AutoTokenizer.from_pretrained("project-aps/finbert-finetune")
model = AutoModelForSequenceClassification.from_pretrained("project-aps/finbert-finetune")
return tokenizer, model
tokenizer, model = load_finbert()
# --------------------------
# โหลด Zero-shot classifier สำหรับธีมข่าว
# --------------------------
@st.cache_resource
def load_theme_classifier():
return pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
theme_classifier = load_theme_classifier()
candidate_labels = ["Stock Movement", "Earnings", "M&A", "Regulation", "Product Launch", "Market Analysis"]
# --------------------------
# UTILITIES
# --------------------------
def analyze_text(text):
"""วิเคราะห์อารมณ์ของข่าวด้วย FinBERT"""
if not text or not text.strip():
return 0
inputs = tokenizer(
text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=1).numpy()[0]
# FinBERT = [negative, neutral, positive]
score = (-1 * probs[0]) + (0 * probs[1]) + (1 * probs[2])
return float(score)
def summarize_themes(news_texts):
"""สรุปธีมข่าวด้วย Zero-shot classification"""
themes = []
for text in news_texts:
if not text.strip():
continue
result = theme_classifier(text, candidate_labels)
themes.append(result["labels"][0])
return themes
# --------------------------
# แปลงชื่อ/ตัวย่อ → (Company Name, Symbol)
# --------------------------
def resolve_company_symbol(keyword: str):
keyword = keyword.strip()
ticker = None
name = None
try:
data = yf.Ticker(keyword)
info = data.info
if "symbol" in info and info["symbol"]:
ticker = info["symbol"]
name = info.get("longName", info.get("shortName", keyword))
else:
url = f"https://query2.finance.yahoo.com/v1/finance/search?q={keyword}"
res = requests.get(url).json()
if "quotes" in res and len(res["quotes"]) > 0:
q = res["quotes"][0]
ticker = q.get("symbol")
name = q.get("longname", q.get("shortname", keyword))
except:
pass
if not ticker:
ticker = keyword.upper()
if not name:
name = keyword.capitalize()
return name, ticker
# --------------------------
# ดึงข่าว 7 วัน
# --------------------------
@st.cache_data(ttl=3600)
def fetch_financial_news(keyword):
company, symbol = resolve_company_symbol(keyword)
to_date = datetime.now().strftime('%Y-%m-%d')
from_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
query_keyword = f"({company} OR {symbol}) finance stock"
all_articles = []
page = 1
while True:
url = (
f"https://newsapi.org/v2/everything?"
f"q={query_keyword}&"
f"from={from_date}&to={to_date}&"
f"language=en&sortBy=publishedAt&"
f"pageSize=100&page={page}&apiKey={API_KEY}"
)
r = requests.get(url)
data = r.json()
if data.get("status") != "ok":
st.error(f"API Error: {data}")
break
articles = data.get("articles", [])
if not articles:
break
for a in articles:
if a["description"]:
all_articles.append({
"date": pd.to_datetime(a["publishedAt"]),
"text": f"{a['title']} {a['description']}",
"source": a["source"]["name"],
"url": a["url"]
})
if len(articles) < 100:
break
page += 1
return pd.DataFrame(all_articles)
# --------------------------
# ดึงราคาหุ้น
# --------------------------
@st.cache_data(ttl=3600)
def fetch_stock_price(symbol, start_date, end_date):
try:
start_str = (start_date - timedelta(days=2)).strftime('%Y-%m-%d')
end_str = (end_date + timedelta(days=1)).strftime('%Y-%m-%d')
df = yf.download(symbol, start=start_str, end=end_str, interval="1d")
if df.empty:
st.warning("ไม่พบข้อมูลราคาหุ้น")
return pd.DataFrame()
df = df.reset_index()
df_subset = df[['Date', 'Close']]
df_subset.columns = ['date', 'price']
df_subset["date"] = pd.to_datetime(df_subset["date"].dt.date)
return df_subset
except Exception as e:
st.warning(f"ดึงราคาหุ้นล้มเหลว: {e}")
return pd.DataFrame()
# --------------------------
# MAIN APP
# --------------------------
def main():
st.title("📰 News Sentiment Analysis for Young Investor")
st.markdown("วิเคราะห์แนวโน้มอารมณ์ของข่าวย้อนหลัง 7 วัน พร้อมราคาหุ้น")
# Sidebar
with st.sidebar:
keyword = st.text_input("ค้นหา Stock Symbol (เช่น AAPL, TSLA):", "")
analyze_btn = st.button("วิเคราะห์เลย")
if not analyze_btn:
st.info("กรอกคำค้นแล้วกด 'วิเคราะห์เลย'")
return
# ดึงข่าว
st.info(f"กำลังดึงข่าวย้อนหลัง 7 วันสำหรับ '{keyword}'...")
news_df = fetch_financial_news(keyword)
if news_df.empty:
st.warning("ไม่พบบทความข่าว")
return
# วิเคราะห์ Sentiment
st.info("กำลังวิเคราะห์อารมณ์ของข่าว...")
news_df["sentiment"] = news_df["text"].apply(analyze_text)
news_df["date"] = pd.to_datetime(news_df["date"])
# Metrics
avg_sentiment = news_df["sentiment"].mean()
pos_pct = (news_df["sentiment"] > 0.1).mean() * 100
neg_pct = (news_df["sentiment"] < -0.1).mean() * 100
col1, col2, col3 = st.columns(3)
col1.metric("ค่าเฉลี่ยอารมณ์ข่าว", f"{avg_sentiment:.2f}")
col2.metric("ข่าวเชิงบวก", f"{pos_pct:.1f}%")
col3.metric("ข่าวเชิงลบ", f"{neg_pct:.1f}%")
# ---------------------------------------------------------
# ธีมข่าวแทน Word Cloud
# ---------------------------------------------------------
st.subheader("📰 ธีมข่าว (Top Theme per Article)")
news_df["theme"] = summarize_themes(news_df["text"].tolist())
theme_counts = news_df["theme"].value_counts()
st.bar_chart(theme_counts)
# ---------------------------------------------------------
# ส่วนกราฟ Sentiment & Price (เหมือนเดิม)
# ---------------------------------------------------------
st.subheader("📈 แนวโน้มอารมณ์ของข่าว & ราคาหุ้น")
news_df["date_day"] = pd.to_datetime(news_df["date"].dt.date)
def sentiment_type(score):
if score > 0.1:
return "positive"
if score < -0.1:
return "negative"
return "neutral"
news_df["sentiment_type"] = news_df["sentiment"].apply(sentiment_type)
daily_avg = news_df.groupby("date_day")["sentiment"].mean().reset_index(name="avg_sentiment")
daily_counts = news_df.groupby(["date_day", "sentiment_type"]).size().unstack(fill_value=0).reset_index()
df_sorted = pd.merge(daily_avg, daily_counts, on="date_day").sort_values("date_day")
if len(df_sorted) < 2:
st.warning("ข้อมูลไม่พอสร้างแนวโน้ม")
st.dataframe(news_df)
return
# ดึงราคาหุ้น
_, symbol = resolve_company_symbol(keyword)
min_date, max_date = df_sorted["date_day"].min(), df_sorted["date_day"].max()
st.info(f"กำลังดึงราคาหุ้น {symbol} ...")
stock_df = fetch_stock_price(symbol, min_date, max_date)
plot_data = pd.merge(df_sorted, stock_df, left_on="date_day", right_on="date", how="left")
# Correlation
correlation = plot_data['price'].corr(plot_data['avg_sentiment'])
corr_text = "ไม่มีความสัมพันธ์"
if correlation > 0.5:
corr_text = "มีความสัมพันธ์ในทิศทางเดียวกัน"
elif correlation < -0.5:
corr_text = "มีความสัมพันธ์ในทิศทางตรงข้าม"
st.metric("วิเคราะห์ความสัมพันธ์ระหว่างอารมณ์ของข่าวกับราคาหุ้น (Correlation)", corr_text, f"{correlation:.2f}")
# Forecast Sentiment
plot_data["timestamp"] = (plot_data["date_day"] - plot_data["date_day"].min()).dt.days
train_data = plot_data.dropna(subset=['avg_sentiment'])
if len(train_data) >= 2:
model_lr = LinearRegression()
model_lr.fit(train_data[["timestamp"]], train_data["avg_sentiment"])
future_days = 7
future_timestamps = np.arange(
plot_data["timestamp"].max() + 1,
plot_data["timestamp"].max() + future_days + 1
)
future_dates = [plot_data["date_day"].max() + timedelta(days=i) for i in range(1, future_days + 1)]
future_preds = model_lr.predict(future_timestamps.reshape(-1, 1))
# Plot
fig = make_subplots(rows=2, cols=1, specs=[[{"secondary_y": True}], [{}]], row_heights=[0.7, 0.3], vertical_spacing=0.1, shared_xaxes=True)
# ราคาหุ้น
fig.add_trace(go.Scatter(x=plot_data["date_day"], y=plot_data["price"], name=f"{symbol} Price", mode="lines+markers", line=dict(color="orange")), row=1, col=1, secondary_y=False)
# Sentiment จริง
fig.add_trace(go.Scatter(x=plot_data["date_day"], y=plot_data["avg_sentiment"], name="Actual Sentiment", mode="lines+markers", line=dict(color="blue")), row=1, col=1, secondary_y=True)
# Sentiment พยากรณ์
if "future_preds" in locals():
fig.add_trace(go.Scatter(x=future_dates, y=future_preds, name="Predicted Sentiment", mode="lines+markers", line=dict(color="#05a0fa", dash="dash")), row=1, col=1, secondary_y=True)
# เส้นเชื่อม Actual -> Predicted
last_actual_date = plot_data["date_day"].max()
last_actual_value = plot_data["avg_sentiment"].iloc[-1]
first_pred_date = future_dates[0]
first_pred_value = future_preds[0]
fig.add_trace(go.Scatter(x=[last_actual_date, first_pred_date], y=[last_actual_value, first_pred_value], mode="lines", line=dict(color="#05a0fa", dash="dot"), name="Connector Actual→Predicted"), row=1, col=1, secondary_y=True)
# จำนวนข่าว
for col in ["neutral", "negative", "positive"]:
if col not in plot_data.columns:
plot_data[col] = 0
fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["neutral"], name="Neutral", marker_color='rgba(128, 128, 128, 0.7)'), row=2, col=1)
fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["negative"], name="Negative", marker_color='rgba(255, 0, 0, 0.7)'), row=2, col=1)
fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["positive"], name="Positive", marker_color='rgba(0, 128, 0, 0.7)'), row=2, col=1)
fig.update_layout(title=f"แนวโน้มอารมณ์ของข่าว + ราคาหุ้น ({symbol})", barmode="stack", height=650, hovermode="x unified", template="plotly_white")
st.plotly_chart(fig, use_container_width=True)
# แสดงรายการข่าว
st.subheader("📰 รายการข่าวทั้งหมด")
st.dataframe(news_df[["date", "source", "text", "sentiment", "theme", "url"]], use_container_width=True)
# ---------------------------------------------------------
# RUN APP
# ---------------------------------------------------------
if __name__ == "__main__":
nltk.download("stopwords", quiet=True)
main()