KYTHY's picture
Update app.py
4e702bb verified
raw
history blame
14.7 kB
import streamlit as st
import requests
import pandas as pd
from datetime import datetime, timedelta
# import nltk
# from wordcloud import WordCloud
# import base64
# from io import BytesIO
import numpy as np
from sklearn.linear_model import LinearRegression
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import yfinance as yf
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
# --------------------------
# CONFIG
# --------------------------
st.set_page_config(page_title="📰 News Sentiment Analysis for Young Investor", layout="wide")
API_KEY = "88bc396d4eab4be494a4b86ec842db47"
# --------------------------
# โหลด FinBERT model
# --------------------------
@st.cache_resource
def load_finbert():
tokenizer = AutoTokenizer.from_pretrained("project-aps/finbert-finetune")
model = AutoModelForSequenceClassification.from_pretrained("project-aps/finbert-finetune")
return tokenizer, model
tokenizer, model = load_finbert()
# --------------------------
# UTILITIES
# --------------------------
def analyze_text(text):
"""วิเคราะห์อารมณ์ของข่าว"""
if not text or not text.strip():
return 0
inputs = tokenizer(
text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=1).numpy()[0]
# FinBERT = [negative, neutral, positive]
score = (-1 * probs[0]) + (0 * probs[1]) + (1 * probs[2])
return float(score)
# def generate_wordcloud(text):
# stopwords = nltk.corpus.stopwords.words('english')
# wordcloud = WordCloud(width=800, height=400, background_color="white", stopwords=stopwords).generate(text)
# buf = BytesIO()
# wordcloud.to_image().save(buf, format="PNG")
# return base64.b64encode(buf.getvalue()).decode()
# --------------------------
# แปลงชื่อ/ตัวย่อ → (Company Name, Symbol)
# --------------------------
def resolve_company_symbol(keyword: str):
keyword = keyword.strip()
ticker = None
name = None
try:
data = yf.Ticker(keyword)
info = data.info
if "symbol" in info and info["symbol"]:
ticker = info["symbol"]
name = info.get("longName", info.get("shortName", keyword))
else:
url = f"https://query2.finance.yahoo.com/v1/finance/search?q={keyword}"
res = requests.get(url).json()
if "quotes" in res and len(res["quotes"]) > 0:
q = res["quotes"][0]
ticker = q.get("symbol")
name = q.get("longname", q.get("shortname", keyword))
except:
pass
if not ticker:
ticker = keyword.upper()
if not name:
name = keyword.capitalize()
return name, ticker
# --------------------------
# ดึงข่าว 7 วัน
# --------------------------
@st.cache_data(ttl=3600)
def fetch_financial_news(keyword):
company, symbol = resolve_company_symbol(keyword)
to_date = datetime.now().strftime('%Y-%m-%d')
from_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
query_keyword = f"({company} OR {symbol}) finance stock"
all_articles = []
page = 1
while True:
url = (
f"https://newsapi.org/v2/everything?"
f"q={query_keyword}&"
f"from={from_date}&to={to_date}&"
f"language=en&sortBy=publishedAt&"
f"pageSize=100&page={page}&apiKey={API_KEY}"
)
r = requests.get(url)
data = r.json()
if data.get("status") != "ok":
st.error(f"API Error: {data}")
break
articles = data.get("articles", [])
if not articles:
break
for a in articles:
if a["description"]:
all_articles.append({
"date": pd.to_datetime(a["publishedAt"]),
"text": f"{a['title']} {a['description']}",
"source": a["source"]["name"],
"url": a["url"]
})
if len(articles) < 100:
break
page += 1
return pd.DataFrame(all_articles)
# --------------------------
# ดึงราคาหุ้น
# --------------------------
@st.cache_data(ttl=3600)
def fetch_stock_price(symbol, start_date, end_date):
try:
start_str = (start_date - timedelta(days=2)).strftime('%Y-%m-%d')
end_str = (end_date + timedelta(days=1)).strftime('%Y-%m-%d')
df = yf.download(symbol, start=start_str, end=end_str, interval="1d")
if df.empty:
st.warning("ไม่พบข้อมูลราคาหุ้น")
return pd.DataFrame()
df = df.reset_index()
df_subset = df[['Date', 'Close']]
df_subset.columns = ['date', 'price']
df_subset["date"] = pd.to_datetime(df_subset["date"].dt.date)
return df_subset
except Exception as e:
st.warning(f"ดึงราคาหุ้นล้มเหลว: {e}")
return pd.DataFrame()
# --------------------------
# MAIN APP
# --------------------------
def main():
st.title("📰 News Sentiment Analysis for Young Investor")
st.markdown("วิเคราะห์แนวโน้มอารมณ์ของข่าวย้อนหลัง 7 วัน พร้อมราคาหุ้น")
# Sidebar
with st.sidebar:
keyword = st.text_input("ค้นหา Stock Symbol (เช่น AAPL, TSLA):", "")
analyze_btn = st.button("วิเคราะห์เลย")
if not analyze_btn:
st.info("กรอกคำค้นแล้วกด 'วิเคราะห์เลย'")
return
# ดึงข่าว
st.info(f"กำลังดึงข่าวย้อนหลัง 7 วันสำหรับ '{keyword}'...")
news_df = fetch_financial_news(keyword)
if news_df.empty:
st.warning("ไม่พบบทความข่าว")
return
# วิเคราะห์ Sentiment
st.info("กำลังวิเคราะห์อารมณ์ของข่าว...")
news_df["sentiment"] = news_df["text"].apply(analyze_text)
news_df["date"] = pd.to_datetime(news_df["date"])
# Metrics
avg_sentiment = news_df["sentiment"].mean()
pos_pct = (news_df["sentiment"] > 0.1).mean() * 100
neg_pct = (news_df["sentiment"] < -0.1).mean() * 100
col1, col2, col3 = st.columns(3)
col1.metric("ค่าเฉลี่ยอารมณ์ข่าว", f"{avg_sentiment:.2f}")
col2.metric("ข่าวเชิงบวก", f"{pos_pct:.1f}%")
col3.metric("ข่าวเชิงลบ", f"{neg_pct:.1f}%")
# # WordCloud
# st.subheader("☁️ Word Cloud")
# all_text = " ".join(news_df["text"].tolist())
# img = generate_wordcloud(all_text)
# st.image(f"data:image/png;base64,{img}", use_column_width=True)
# ---------------------------------------------------------
# เตรียมข้อมูลสำหรับกราฟ Sentiment & Price
# ---------------------------------------------------------
st.subheader("📈 แนวโน้มอารมณ์ของข่าว & ราคาหุ้น")
news_df["date_day"] = pd.to_datetime(news_df["date"].dt.date)
def sentiment_type(score):
if score > 0.1:
return "positive"
if score < -0.1:
return "negative"
return "neutral"
news_df["sentiment_type"] = news_df["sentiment"].apply(sentiment_type)
daily_avg = news_df.groupby("date_day")["sentiment"].mean().reset_index(name="avg_sentiment")
daily_counts = news_df.groupby(["date_day", "sentiment_type"]).size().unstack(fill_value=0).reset_index()
df_sorted = pd.merge(daily_avg, daily_counts, on="date_day").sort_values("date_day")
if len(df_sorted) < 2:
st.warning("ข้อมูลไม่พอสร้างแนวโน้ม")
st.dataframe(news_df)
return
# ดึงราคาหุ้น
_, symbol = resolve_company_symbol(keyword)
min_date, max_date = df_sorted["date_day"].min(), df_sorted["date_day"].max()
st.info(f"กำลังดึงราคาหุ้น {symbol} ...")
stock_df = fetch_stock_price(symbol, min_date, max_date)
plot_data = pd.merge(df_sorted, stock_df, left_on="date_day", right_on="date", how="left")
# ---------------------------------------------------------
# Correlation
# ---------------------------------------------------------
correlation = plot_data['price'].corr(plot_data['avg_sentiment'])
# หาข้อความอธิบาย (ข้อความใหญ่ด้านบน)
if correlation > 0.75:
corr_label = "มีความสัมพันธ์กันอย่างมากในทิศทางเดียวกัน"
elif correlation > 0.5:
corr_label = "มีความสัมพันธ์กันปานกลางในทิศทางเดียวกัน"
elif correlation > 0.25:
corr_label = "มีความสัมพันธ์กันเล็กน้อยในทิศทางเดียวกัน"
elif correlation < -0.75:
corr_label = "มีความสัมพันธ์กันอย่างมากในทิศทางตรงกันข้าม"
elif correlation < -0.5:
corr_label = "มีความสัมพันธ์กันปานกลางในทิศทางตรงกันข้าม"
elif correlation < -0.25:
corr_label = "มีความสัมพันธ์กันเล็กน้อยในทิศทางตรงกันข้าม"
else:
corr_label = "ไม่มีความสัมพันธ์กัน"
corr_value_text = f"Correlation = {correlation:.2f}"
st.metric(
"วิเคราะห์ความสัมพันธ์ระหว่างอารมณ์ของข่าวกับราคาหุ้น (Correlation)",
corr_label, # ตัวบน (ใหญ่)
corr_value_text # ตัวล่าง (สีเขียว/แดง)
)
# ---------------------------------------------------------
# Forecast Sentiment
# ---------------------------------------------------------
plot_data["timestamp"] = (plot_data["date_day"] - plot_data["date_day"].min()).dt.days
train_data = plot_data.dropna(subset=['avg_sentiment'])
if len(train_data) >= 2:
model_lr = LinearRegression()
model_lr.fit(train_data[["timestamp"]], train_data["avg_sentiment"])
future_days = 7
future_timestamps = np.arange(
plot_data["timestamp"].max() + 1,
plot_data["timestamp"].max() + future_days + 1
)
future_dates = [plot_data["date_day"].max() + timedelta(days=i) for i in range(1, future_days + 1)]
future_preds = model_lr.predict(future_timestamps.reshape(-1, 1))
# ---------------------------------------------------------
# Plot
# ---------------------------------------------------------
fig = make_subplots(rows=2, cols=1, specs=[[{"secondary_y": True}], [{}]],
row_heights=[0.7, 0.3], vertical_spacing=0.1,
shared_xaxes=True)
# ราคาหุ้น
fig.add_trace(
go.Scatter(
x=plot_data["date_day"], y=plot_data["price"],
name=f"{symbol} Price", mode="lines+markers", line=dict(color="orange")
),
row=1, col=1, secondary_y=False
)
# Sentiment จริง
fig.add_trace(
go.Scatter(
x=plot_data["date_day"], y=plot_data["avg_sentiment"],
name="Actual Sentiment", mode="lines+markers", line=dict(color="blue")
),
row=1, col=1, secondary_y=True
)
# Sentiment พยากรณ์
if "future_preds" in locals():
fig.add_trace(
go.Scatter(
x=future_dates, y=future_preds,
name="Predicted Sentiment", mode="lines+markers", line=dict(color="#02a1f7", dash="dash")
),
row=1, col=1, secondary_y=True
)
# ---------------------------------------------------------
# เส้นเชื่อม Actual -> Predicted
# ---------------------------------------------------------
last_actual_date = plot_data["date_day"].max()
last_actual_value = plot_data["avg_sentiment"].iloc[-1]
first_pred_date = future_dates[0]
first_pred_value = future_preds[0]
fig.add_trace(
go.Scatter(
x=[last_actual_date, first_pred_date],
y=[last_actual_value, first_pred_value],
mode="lines",
line=dict(color="#02a1f7", dash="dot"),
name="Connector Actual→Predicted"
),
row=1, col=1, secondary_y=True
)
# จำนวนข่าว
for col in ["neutral", "negative", "positive"]:
if col not in plot_data.columns:
plot_data[col] = 0
fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["neutral"], name="Neutral",
marker_color='rgba(128, 128, 128, 0.7)'), row=2, col=1)
fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["negative"], name="Negative",
marker_color='rgba(255, 0, 0, 0.7)'), row=2, col=1)
fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["positive"], name="Positive",
marker_color='rgba(0, 128, 0, 0.7)'), row=2, col=1)
fig.update_layout(
title=f"แนวโน้มอารมณ์ข่าว + ราคาหุ้น ({symbol})",
barmode="stack",
height=650,
hovermode="x unified",
template="plotly_white"
)
st.plotly_chart(fig, use_container_width=True)
# แสดงรายการข่าว
st.subheader("📰 รายการข่าวทั้งหมด")
st.dataframe(news_df[["date", "source", "text", "sentiment", "url"]], use_container_width=True)
# ---------------------------------------------------------
# RUN APP
# ---------------------------------------------------------
if __name__ == "__main__":
# nltk.download("stopwords", quiet=True)
main()