import streamlit as st import requests import pandas as pd from datetime import datetime, timedelta import nltk from wordcloud import WordCloud import base64 from io import BytesIO import numpy as np from sklearn.linear_model import LinearRegression import plotly.graph_objects as go from plotly.subplots import make_subplots import yfinance as yf import torch from transformers import AutoTokenizer, AutoModelForSequenceClassification # -------------------------- # CONFIG # -------------------------- st.set_page_config(page_title="📰 News Sentiment Analysis for Young Investor", layout="wide") API_KEY = "88bc396d4eab4be494a4b86ec842db47" # -------------------------- # โหลด FinBERT model # -------------------------- @st.cache_resource def load_finbert(): tokenizer = AutoTokenizer.from_pretrained("project-aps/finbert-finetune") model = AutoModelForSequenceClassification.from_pretrained("project-aps/finbert-finetune") return tokenizer, model tokenizer, model = load_finbert() # -------------------------- # UTILITIES # -------------------------- def analyze_text(text): """วิเคราะห์อารมณ์ของข่าว""" if not text or not text.strip(): return 0 inputs = tokenizer( text, return_tensors="pt", padding=True, truncation=True, max_length=512 ) with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits probs = torch.softmax(logits, dim=1).numpy()[0] # FinBERT = [negative, neutral, positive] score = (-1 * probs[0]) + (0 * probs[1]) + (1 * probs[2]) return float(score) def generate_wordcloud(text): stopwords = nltk.corpus.stopwords.words('english') wordcloud = WordCloud(width=800, height=400, background_color="white", stopwords=stopwords).generate(text) buf = BytesIO() wordcloud.to_image().save(buf, format="PNG") return base64.b64encode(buf.getvalue()).decode() # -------------------------- # แปลงชื่อ/ตัวย่อ → (Company Name, Symbol) # -------------------------- def resolve_company_symbol(keyword: str): keyword = keyword.strip() ticker = None name = None try: data = yf.Ticker(keyword) info = data.info if "symbol" in info and info["symbol"]: ticker = info["symbol"] name = info.get("longName", info.get("shortName", keyword)) else: url = f"https://query2.finance.yahoo.com/v1/finance/search?q={keyword}" res = requests.get(url).json() if "quotes" in res and len(res["quotes"]) > 0: q = res["quotes"][0] ticker = q.get("symbol") name = q.get("longname", q.get("shortname", keyword)) except: pass if not ticker: ticker = keyword.upper() if not name: name = keyword.capitalize() return name, ticker # -------------------------- # ดึงข่าว 7 วัน # -------------------------- @st.cache_data(ttl=3600) def fetch_financial_news(keyword): company, symbol = resolve_company_symbol(keyword) to_date = datetime.now().strftime('%Y-%m-%d') from_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d') query_keyword = f"({company} OR {symbol}) finance stock" all_articles = [] page = 1 while True: url = ( f"https://newsapi.org/v2/everything?" f"q={query_keyword}&" f"from={from_date}&to={to_date}&" f"language=en&sortBy=publishedAt&" f"pageSize=100&page={page}&apiKey={API_KEY}" ) r = requests.get(url) data = r.json() if data.get("status") != "ok": st.error(f"API Error: {data}") break articles = data.get("articles", []) if not articles: break for a in articles: if a["description"]: all_articles.append({ "date": pd.to_datetime(a["publishedAt"]), "text": f"{a['title']} {a['description']}", "source": a["source"]["name"], "url": a["url"] }) if len(articles) < 100: break page += 1 return pd.DataFrame(all_articles) # -------------------------- # ดึงราคาหุ้น # -------------------------- @st.cache_data(ttl=3600) def fetch_stock_price(symbol, start_date, end_date): try: start_str = (start_date - timedelta(days=2)).strftime('%Y-%m-%d') end_str = (end_date + timedelta(days=1)).strftime('%Y-%m-%d') df = yf.download(symbol, start=start_str, end=end_str, interval="1d") if df.empty: st.warning("ไม่พบข้อมูลราคาหุ้น") return pd.DataFrame() df = df.reset_index() df_subset = df[['Date', 'Close']] df_subset.columns = ['date', 'price'] df_subset["date"] = pd.to_datetime(df_subset["date"].dt.date) return df_subset except Exception as e: st.warning(f"ดึงราคาหุ้นล้มเหลว: {e}") return pd.DataFrame() # -------------------------- # MAIN APP # -------------------------- def main(): st.title("📰 News Sentiment Analysis for Young Investor") st.markdown("วิเคราะห์แนวโน้มอารมณ์ของข่าวย้อนหลัง 7 วัน พร้อมราคาหุ้น") # Sidebar with st.sidebar: keyword = st.text_input("ค้นหา Stock Symbol (เช่น AAPL, TSLA):", "") analyze_btn = st.button("วิเคราะห์เลย") if not analyze_btn: st.info("กรอกคำค้นแล้วกด 'วิเคราะห์เลย'") return # ดึงข่าว st.info(f"กำลังดึงข่าวย้อนหลัง 7 วันสำหรับ '{keyword}'...") news_df = fetch_financial_news(keyword) if news_df.empty: st.warning("ไม่พบบทความข่าว") return # วิเคราะห์ Sentiment st.info("กำลังวิเคราะห์อารมณ์ของข่าว...") news_df["sentiment"] = news_df["text"].apply(analyze_text) news_df["date"] = pd.to_datetime(news_df["date"]) # Metrics avg_sentiment = news_df["sentiment"].mean() pos_pct = (news_df["sentiment"] > 0.1).mean() * 100 neg_pct = (news_df["sentiment"] < -0.1).mean() * 100 col1, col2, col3 = st.columns(3) col1.metric("ค่าเฉลี่ยอารมณ์ข่าว", f"{avg_sentiment:.2f}") col2.metric("ข่าวเชิงบวก", f"{pos_pct:.1f}%") col3.metric("ข่าวเชิงลบ", f"{neg_pct:.1f}%") # WordCloud st.subheader("☁️ Word Cloud") all_text = " ".join(news_df["text"].tolist()) img = generate_wordcloud(all_text) st.image(f"data:image/png;base64,{img}", use_column_width=True) # --------------------------------------------------------- # เตรียมข้อมูลสำหรับกราฟ Sentiment & Price # --------------------------------------------------------- st.subheader("📈 แนวโน้มอารมณ์ของข่าว & ราคาหุ้น") news_df["date_day"] = pd.to_datetime(news_df["date"].dt.date) def sentiment_type(score): if score > 0.1: return "positive" if score < -0.1: return "negative" return "neutral" news_df["sentiment_type"] = news_df["sentiment"].apply(sentiment_type) daily_avg = news_df.groupby("date_day")["sentiment"].mean().reset_index(name="avg_sentiment") daily_counts = news_df.groupby(["date_day", "sentiment_type"]).size().unstack(fill_value=0).reset_index() df_sorted = pd.merge(daily_avg, daily_counts, on="date_day").sort_values("date_day") if len(df_sorted) < 2: st.warning("ข้อมูลไม่พอสร้างแนวโน้ม") st.dataframe(news_df) return # ดึงราคาหุ้น _, symbol = resolve_company_symbol(keyword) min_date, max_date = df_sorted["date_day"].min(), df_sorted["date_day"].max() st.info(f"กำลังดึงราคาหุ้น {symbol} ...") stock_df = fetch_stock_price(symbol, min_date, max_date) plot_data = pd.merge(df_sorted, stock_df, left_on="date_day", right_on="date", how="left") # --------------------------------------------------------- # Correlation # --------------------------------------------------------- correlation = plot_data['price'].corr(plot_data['avg_sentiment']) corr_text = "ไม่มีความสัมพันธ์" if correlation > 0.5: corr_text = "มีความสัมพันธ์ในทิศทางเดียวกัน" elif correlation < -0.5: corr_text = "มีความสัมพันธ์ในทิศทางตรงกันข้าม" st.metric("วิเคราะห์ความสัมพันธ์ระหว่างอารมณ์ของข่าวกับราคาหุ้น (Correlation)", corr_text, f"{correlation:.2f}") # --------------------------------------------------------- # Forecast Sentiment # --------------------------------------------------------- plot_data["timestamp"] = (plot_data["date_day"] - plot_data["date_day"].min()).dt.days train_data = plot_data.dropna(subset=['avg_sentiment']) if len(train_data) >= 2: model_lr = LinearRegression() model_lr.fit(train_data[["timestamp"]], train_data["avg_sentiment"]) future_days = 7 future_timestamps = np.arange( plot_data["timestamp"].max() + 1, plot_data["timestamp"].max() + future_days + 1 ) future_dates = [plot_data["date_day"].max() + timedelta(days=i) for i in range(1, future_days + 1)] future_preds = model_lr.predict(future_timestamps.reshape(-1, 1)) # --------------------------------------------------------- # Plot # --------------------------------------------------------- fig = make_subplots(rows=2, cols=1, specs=[[{"secondary_y": True}], [{}]], row_heights=[0.7, 0.3], vertical_spacing=0.1, shared_xaxes=True) # ราคาหุ้น fig.add_trace( go.Scatter( x=plot_data["date_day"], y=plot_data["price"], name=f"{symbol} Price", mode="lines+markers", line=dict(color="orange") ), row=1, col=1, secondary_y=False ) # Sentiment จริง fig.add_trace( go.Scatter( x=plot_data["date_day"], y=plot_data["avg_sentiment"], name="Actual Sentiment", mode="lines+markers", line=dict(color="blue") ), row=1, col=1, secondary_y=True ) # Sentiment พยากรณ์ if "future_preds" in locals(): fig.add_trace( go.Scatter( x=future_dates, y=future_preds, name="Predicted Sentiment", mode="lines+markers", line=dict(color="#02a1f7", dash="dash") ), row=1, col=1, secondary_y=True ) # --------------------------------------------------------- # เส้นเชื่อม Actual -> Predicted # --------------------------------------------------------- last_actual_date = plot_data["date_day"].max() last_actual_value = plot_data["avg_sentiment"].iloc[-1] first_pred_date = future_dates[0] first_pred_value = future_preds[0] fig.add_trace( go.Scatter( x=[last_actual_date, first_pred_date], y=[last_actual_value, first_pred_value], mode="lines", line=dict(color="#02a1f7", dash="dot"), name="Connector Actual→Predicted" ), row=1, col=1, secondary_y=True ) # จำนวนข่าว for col in ["neutral", "negative", "positive"]: if col not in plot_data.columns: plot_data[col] = 0 fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["neutral"], name="Neutral", marker_color='rgba(128, 128, 128, 0.7)'), row=2, col=1) fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["negative"], name="Negative", marker_color='rgba(255, 0, 0, 0.7)'), row=2, col=1) fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["positive"], name="Positive", marker_color='rgba(0, 128, 0, 0.7)'), row=2, col=1) fig.update_layout( title=f"แนวโน้มอารมณ์ข่าว + ราคาหุ้น ({symbol})", barmode="stack", height=650, hovermode="x unified", template="plotly_white" ) st.plotly_chart(fig, use_container_width=True) # แสดงรายการข่าว st.subheader("📰 รายการข่าวทั้งหมด") st.dataframe(news_df[["date", "source", "text", "sentiment", "url"]], use_container_width=True) # --------------------------------------------------------- # RUN APP # --------------------------------------------------------- if __name__ == "__main__": nltk.download("stopwords", quiet=True) main()