KYTHY's picture
Update app.py
c93f486 verified
import streamlit as st
import requests
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import yfinance as yf
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
# --------------------------
# CONFIG ตั้งค่า Streamlit
# --------------------------
st.set_page_config(page_title="📰 News Sentiment Analysis for Young Investor", layout="wide")
# API จากเว็บ newsapi.org
API_KEY = "88bc396d4eab4be494a4b86ec842db47"
# --------------------------
# โหลด FinBERT model
# --------------------------
@st.cache_resource
def load_finbert():
tokenizer = AutoTokenizer.from_pretrained("project-aps/finbert-finetune")
model = AutoModelForSequenceClassification.from_pretrained("project-aps/finbert-finetune")
return tokenizer, model
tokenizer, model = load_finbert()
# --------------------------
# UTILITIES
# --------------------------
# ฟังก์ชันวิเคราะห์ Sentiment
def analyze_text(text):
"""วิเคราะห์อารมณ์ของข่าว"""
if not text or not text.strip():
return 0
# แปลงข้อความเป็นตัวเลข
inputs = tokenizer(
text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
)
#ส่งข้อความเข้า Model
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=1).numpy()[0]
# โมเดล FinBERT แบ่งออกเป็น 3 คลาส
# FinBERT = [negative, neutral, positive]
# คูณความน่าจะเป็นของแต่ละคลาสกับ -1, 0, 1 แล้วรวมกันเป็นคะแนน
score = (-1 * probs[0]) + (0 * probs[1]) + (1 * probs[2])
return float(score)
# --------------------------
# แปลงชื่อบริษัท/ชื่อหุ้น เพื่อให้เมื่อค้นหาแล้ว ได้ผลลัพธ์เหมือนกัน ไม่ว่าจะค้นหาด้วยชื่อไหน
# --------------------------
# ฟังก์ชันสำหรับหาชื่อหุ้นหรือชื่อบริษัท
# ถ้า Keyword เป็นชื่อบริษัท ให้หาชื่อหุ้น, ถ้า Keyword เป็นชื่อหุ้น ให้หาชื่อบริษัท
def resolve_company_symbol(keyword: str):
# รับ Keyword เป็นชื่อบริษัท หรือ ชื่อหุ้น อย่างใดอย่างหนึ่ง
keyword = keyword.strip()
ticker = None
name = None
# ดึงข้อมูลชื่อบริษัท/ชื่อหุ้นจาก yfinance
try:
data = yf.Ticker(keyword)
info = data.info
# ตรวจสอบข้อมูลใน Info (ข้อมูลหุ้นของ keyword ที่ส่งเข้าไป)
if "symbol" in info and info["symbol"]:
ticker = info["symbol"]
name = info.get("longName", info.get("shortName", keyword))
# ถ้าไม่หาไม่เจอให้ค้นหาผ่าน Yahoo Finance API
else:
url = f"https://query2.finance.yahoo.com/v1/finance/search?q={keyword}"
res = requests.get(url).json()
if "quotes" in res and len(res["quotes"]) > 0:
q = res["quotes"][0]
ticker = q.get("symbol")
name = q.get("longname", q.get("shortname", keyword))
except:
pass
if not ticker:
ticker = keyword.upper()
if not name:
name = keyword.capitalize()
# คืนค่าเป็น tuple (ชื่อบริษัท, ชื่อหุ้น) เช่น (Apple Inc., AAPL)
return name, ticker
# --------------------------
# ดึงข่าว 7 วัน
# --------------------------
# ฟังก์ชันสำหรับดึงข่าว
@st.cache_data(ttl=3600)
def fetch_financial_news(keyword):
# หลังจากรับ Keyword เป็นชื่อบริษัท/ชื่อหุ้นอย่างใดอย่างหนึ่ง จะเรียกฟังชันก์ resolve_company_symbol เพื่อให้คืนค่าทั้งชื่อบริษัทและชื่อหุ้น
company, symbol = resolve_company_symbol(keyword)
# กำหนดช่วงวันที่ค้นหาเป็น 7 วันล่าสุด
to_date = datetime.now().strftime('%Y-%m-%d')
from_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
# สร้าง query สำหรับ News API เพื่อให้สามารถค้นเจอข่าวที่มีชื่อบริษัทหรือชื่อหุ้นก็ได้
# เช่น กรอก AAPL ต้องเจอข่าวที่มีคำว่า Apple Inc. ด้วย แม้จะไม่มีคำว่า AAPL อยู่ในข่าวก็ตาม
query_keyword = f"({company} OR {symbol}) finance stock"
# ดึงข่าวทุกหน้าจนกว่าข่าวจะหมด โดยดึงข่าวจาก API ของเว็บ newsapi.org
all_articles = []
page = 1
while True:
url = (
f"https://newsapi.org/v2/everything?"
f"q={query_keyword}&"
f"from={from_date}&to={to_date}&"
f"language=en&sortBy=publishedAt&"
f"pageSize=100&page={page}&apiKey={API_KEY}"
)
r = requests.get(url)
data = r.json()
if data.get("status") != "ok":
st.error(f"API Error: {data}")
break
articles = data.get("articles", [])
if not articles:
break
# เก็บข่าวลง list
for a in articles:
if a["description"]:
all_articles.append({
"date": pd.to_datetime(a["publishedAt"]),
"text": f"{a['title']} {a['description']}",
"source": a["source"]["name"],
"url": a["url"]
})
if len(articles) < 100:
break
page += 1
return pd.DataFrame(all_articles)
# --------------------------
# ดึงราคาหุ้น
# --------------------------
# ฟังก์ชันสำหรับดึงราคาหุ้น
# รับพารามิเตอร์:
# symbol → ชื่อหุ้น (Ticker)
# start_date → วันที่เริ่มต้น (datetime object)
# end_date → วันที่สิ้นสุด (datetime object)
@st.cache_data(ttl=3600)
def fetch_stock_price(symbol, start_date, end_date):
try:
# ขยายช่วงวันที่เล็กน้อย เนื่องจากบางครั้งตลาดหุ้นปิดวันเสาร์-อาทิตย์ และวันหยุดนนักขัตฤกษ์ การขยายช่วงจะช่วยให้ได้ข้อมูลครบ
start_str = (start_date - timedelta(days=2)).strftime('%Y-%m-%d')
end_str = (end_date + timedelta(days=1)).strftime('%Y-%m-%d')
# ดึงราคาหุ้นจาก yfinance โดยใช้ราคาปิดรายวัน
df = yf.download(symbol, start=start_str, end=end_str, interval="1d")
if df.empty:
st.warning("ไม่พบข้อมูลราคาหุ้น")
return pd.DataFrame()
df = df.reset_index()
df_subset = df[['Date', 'Close']]
df_subset.columns = ['date', 'price']
df_subset["date"] = pd.to_datetime(df_subset["date"].dt.date)
# คืนค่าเป็น DataFrame ที่มี 2 คอลัมน์ คือ date และ price
return df_subset
except Exception as e:
st.warning(f"ดึงราคาหุ้นล้มเหลว: {e}")
return pd.DataFrame()
# --------------------------
# MAIN APP
# --------------------------
# ฟังก์ชันหลักของ Streamlit app
def main():
# หัวข้อใหญ่ของแอป
st.title("📰 News Sentiment Analysis for Young Investor")
# ข้อความอธิบาย
st.markdown("วิเคราะห์แนวโน้มอารมณ์ของข่าวย้อนหลัง 7 วัน พร้อมราคาหุ้น")
# Sidebar สำหรับ Input
with st.sidebar:
keyword = st.text_input("ค้นหา Stock Symbol (เช่น AAPL, TSLA):", "")
analyze_btn = st.button("วิเคราะห์เลย")
if not analyze_btn:
st.info("กรอกคำค้นแล้วกด 'วิเคราะห์เลย'")
return
# ดึงข่าว
st.info(f"กำลังดึงข่าวย้อนหลัง 7 วันสำหรับ '{keyword}'...")
news_df = fetch_financial_news(keyword)
if news_df.empty:
st.warning("ไม่พบบทความข่าว")
return
# วิเคราะห์ Sentiment
st.info("กำลังวิเคราะห์อารมณ์ของข่าว...")
news_df["sentiment"] = news_df["text"].apply(analyze_text)
news_df["date"] = pd.to_datetime(news_df["date"])
# คำนวณ Metrics
avg_sentiment = news_df["sentiment"].mean()
pos_pct = (news_df["sentiment"] > 0.1).mean() * 100
neg_pct = (news_df["sentiment"] < -0.1).mean() * 100
# แสดง Metrics บน App เป็น 3 คอลัมน์ ได้แก่ ค่าเฉลี่ย Sentment, % ข่าวเชิงบวก, % ข่าวเชิงลบ
col1, col2, col3 = st.columns(3)
col1.metric("ค่าเฉลี่ยอารมณ์ของข่าว", f"{avg_sentiment:.2f}")
col2.metric("ข่าวเชิงบวก", f"{pos_pct:.1f}%")
col3.metric("ข่าวเชิงลบ", f"{neg_pct:.1f}%")
# ---------------------------------------------------------
# เตรียมข้อมูลสำหรับกราฟ Sentiment & Price
# ---------------------------------------------------------
st.subheader("📈 แนวโน้มอารมณ์ของข่าว & ราคาหุ้น")
# สร้างคอลัมน์วันที่
news_df["date_day"] = pd.to_datetime(news_df["date"].dt.date)
# ฟังก์ชันจำแนก Sentiment
def sentiment_type(score):
if score > 0.1:
return "positive"
if score < -0.1:
return "negative"
return "neutral"
news_df["sentiment_type"] = news_df["sentiment"].apply(sentiment_type)
# คำนวณค่าเฉลี่ย Sentiment ต่อวัน
daily_avg = news_df.groupby("date_day")["sentiment"].mean().reset_index(name="avg_sentiment")
# นับจำนวนข่าวรายวัน
daily_counts = news_df.groupby(["date_day", "sentiment_type"]).size().unstack(fill_value=0).reset_index()
# รวม DataFrame ของค่าเฉลี่ย และ จำนวนข่าว
df_sorted = pd.merge(daily_avg, daily_counts, on="date_day").sort_values("date_day")
if len(df_sorted) < 2:
st.warning("ข้อมูลไม่พอสร้างแนวโน้ม")
st.dataframe(news_df)
return
# ดึงราคาหุ้น
_, symbol = resolve_company_symbol(keyword)
min_date, max_date = df_sorted["date_day"].min(), df_sorted["date_day"].max()
st.info(f"กำลังดึงราคาหุ้น {symbol} ...")
stock_df = fetch_stock_price(symbol, min_date, max_date)
# รวม DataFrame ของข่าว (ค่าเฉลี่ยและจำนวนข่าวที่รวมกันแล้ว) และ ราคาหุ้น
# ผลลัพธ์จะมี 4 คอลัมน์ ได้แก่ วันที่, ค่าเฉลี่ย, จำนวนข่าว, ราคาหุ้น เพื่อให้พร้อมสำหรับทำกราฟ
plot_data = pd.merge(df_sorted, stock_df, left_on="date_day", right_on="date", how="left")
# ---------------------------------------------------------
# Correlation
# ---------------------------------------------------------
# คำนวณ correlation ระหว่างราคาหุ้นกับค่า sentiment
correlation = plot_data['price'].corr(plot_data['avg_sentiment'])
# แปลงค่า correlation เป็นข้อความอธิบาย
if correlation > 0.7:
corr_label = "มีความสัมพันธ์กันอย่างมากในทิศทางเดียวกัน"
elif correlation > 0.4:
corr_label = "มีความสัมพันธ์กันปานกลางในทิศทางเดียวกัน"
elif correlation > 0.2:
corr_label = "มีความสัมพันธ์กันเล็กน้อยในทิศทางเดียวกัน"
elif correlation < -0.7:
corr_label = "มีความสัมพันธ์กันอย่างมากในทิศทางตรงกันข้าม"
elif correlation < -0.4:
corr_label = "มีความสัมพันธ์กันปานกลางในทิศทางตรงกันข้าม"
elif correlation < -0.2:
corr_label = "มีความสัมพันธ์กันเล็กน้อยในทิศทางตรงกันข้าม"
else:
corr_label = "ไม่มีความสัมพันธ์กัน"
corr_value_text = f"Correlation = {correlation:.2f}"
st.metric(
"วิเคราะห์ความสัมพันธ์ระหว่างอารมณ์ของข่าวกับราคาหุ้น",
corr_label, # ตัวบน (ใหญ่)
corr_value_text # ตัวล่าง (สีเขียว/แดง)
)
# ---------------------------------------------------------
# Forecast Sentiment
# ---------------------------------------------------------
# แปลง date เป็นตัวเลข โดยนับวันแรกเป็นวันที่ 0 และวันถัดมาเป็น 1, 2, ... เพื่อทำ Linear Regression
plot_data["timestamp"] = (plot_data["date_day"] - plot_data["date_day"].min()).dt.days
train_data = plot_data.dropna(subset=['avg_sentiment'])
# ตรวจสอบว่ามีข้อมูลเพียงพอ
if len(train_data) >= 2:
# สร้างโมเดล Linear Regression โดยให้ x = จำนวนวัน, y = ค่าเฉลี่ย sentiment ต่อวัน
model_lr = LinearRegression()
model_lr.fit(train_data[["timestamp"]], train_data["avg_sentiment"])
# สร้างช่วงวันที่สำหรับทำนายอนาคต
future_days = 7
future_timestamps = np.arange(
plot_data["timestamp"].max() + 1,
plot_data["timestamp"].max() + future_days + 1
)
# ทำนายค่า sentiment ในอนาคต
future_dates = [plot_data["date_day"].max() + timedelta(days=i) for i in range(1, future_days + 1)]
future_preds = model_lr.predict(future_timestamps.reshape(-1, 1))
# ---------------------------------------------------------
# Plot
# ---------------------------------------------------------
fig = make_subplots(rows=2, cols=1, specs=[[{"secondary_y": True}], [{}]],
row_heights=[0.7, 0.3], vertical_spacing=0.1,
shared_xaxes=True)
# ราคาหุ้น
fig.add_trace(
go.Scatter(
x=plot_data["date_day"], y=plot_data["price"],
name=f"{symbol} Price", mode="lines+markers", line=dict(color="orange")
),
row=1, col=1, secondary_y=False
)
# Sentiment จริง
fig.add_trace(
go.Scatter(
x=plot_data["date_day"], y=plot_data["avg_sentiment"],
name="Actual Sentiment", mode="lines+markers", line=dict(color="blue")
),
row=1, col=1, secondary_y=True
)
# Sentiment พยากรณ์
if "future_preds" in locals():
fig.add_trace(
go.Scatter(
x=future_dates, y=future_preds,
name="Predicted Sentiment", mode="lines+markers", line=dict(color="#02a1f7", dash="dash")
),
row=1, col=1, secondary_y=True
)
# ---------------------------------------------------------
# สร้างเส้นเชื่อม Actual -> Predicted Sentiment เพื่อความสวยงาม
# ---------------------------------------------------------
last_actual_date = plot_data["date_day"].max()
last_actual_value = plot_data["avg_sentiment"].iloc[-1]
first_pred_date = future_dates[0]
first_pred_value = future_preds[0]
fig.add_trace(
go.Scatter(
x=[last_actual_date, first_pred_date],
y=[last_actual_value, first_pred_value],
mode="lines",
line=dict(color="#02a1f7", dash="dot"),
name="Connector Actual→Predicted"
),
row=1, col=1, secondary_y=True
)
# กราฟแท่งแสดงจำนวนข่าว
for col in ["neutral", "negative", "positive"]:
if col not in plot_data.columns:
plot_data[col] = 0
fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["neutral"], name="Neutral",
marker_color='rgba(128, 128, 128, 0.7)'), row=2, col=1)
fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["negative"], name="Negative",
marker_color='rgba(255, 0, 0, 0.7)'), row=2, col=1)
fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["positive"], name="Positive",
marker_color='rgba(0, 128, 0, 0.7)'), row=2, col=1)
fig.update_layout(
title=f"แนวโน้มอารมณ์ข่าว + ราคาหุ้น ({symbol})",
barmode="stack",
height=650,
hovermode="x unified",
template="plotly_white"
)
st.plotly_chart(fig, use_container_width=True)
# ตารางแสดงรายการข่าว
st.subheader("📰 รายการข่าวทั้งหมด")
st.dataframe(news_df[["date", "source", "text", "sentiment", "url"]], use_container_width=True)
# ---------------------------------------------------------
# RUN APP
# ---------------------------------------------------------
if __name__ == "__main__":
main()