Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| import numpy as np | |
| from sklearn.linear_model import LinearRegression | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import yfinance as yf | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| # -------------------------- | |
| # CONFIG ตั้งค่า Streamlit | |
| # -------------------------- | |
| st.set_page_config(page_title="📰 News Sentiment Analysis for Young Investor", layout="wide") | |
| # API จากเว็บ newsapi.org | |
| API_KEY = "88bc396d4eab4be494a4b86ec842db47" | |
| # -------------------------- | |
| # โหลด FinBERT model | |
| # -------------------------- | |
| def load_finbert(): | |
| tokenizer = AutoTokenizer.from_pretrained("project-aps/finbert-finetune") | |
| model = AutoModelForSequenceClassification.from_pretrained("project-aps/finbert-finetune") | |
| return tokenizer, model | |
| tokenizer, model = load_finbert() | |
| # -------------------------- | |
| # UTILITIES | |
| # -------------------------- | |
| # ฟังก์ชันวิเคราะห์ Sentiment | |
| def analyze_text(text): | |
| """วิเคราะห์อารมณ์ของข่าว""" | |
| if not text or not text.strip(): | |
| return 0 | |
| # แปลงข้อความเป็นตัวเลข | |
| inputs = tokenizer( | |
| text, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=512 | |
| ) | |
| #ส่งข้อความเข้า Model | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| probs = torch.softmax(logits, dim=1).numpy()[0] | |
| # โมเดล FinBERT แบ่งออกเป็น 3 คลาส | |
| # FinBERT = [negative, neutral, positive] | |
| # คูณความน่าจะเป็นของแต่ละคลาสกับ -1, 0, 1 แล้วรวมกันเป็นคะแนน | |
| score = (-1 * probs[0]) + (0 * probs[1]) + (1 * probs[2]) | |
| return float(score) | |
| # -------------------------- | |
| # แปลงชื่อบริษัท/ชื่อหุ้น เพื่อให้เมื่อค้นหาแล้ว ได้ผลลัพธ์เหมือนกัน ไม่ว่าจะค้นหาด้วยชื่อไหน | |
| # -------------------------- | |
| # ฟังก์ชันสำหรับหาชื่อหุ้นหรือชื่อบริษัท | |
| # ถ้า Keyword เป็นชื่อบริษัท ให้หาชื่อหุ้น, ถ้า Keyword เป็นชื่อหุ้น ให้หาชื่อบริษัท | |
| def resolve_company_symbol(keyword: str): | |
| # รับ Keyword เป็นชื่อบริษัท หรือ ชื่อหุ้น อย่างใดอย่างหนึ่ง | |
| keyword = keyword.strip() | |
| ticker = None | |
| name = None | |
| # ดึงข้อมูลชื่อบริษัท/ชื่อหุ้นจาก yfinance | |
| try: | |
| data = yf.Ticker(keyword) | |
| info = data.info | |
| # ตรวจสอบข้อมูลใน Info (ข้อมูลหุ้นของ keyword ที่ส่งเข้าไป) | |
| if "symbol" in info and info["symbol"]: | |
| ticker = info["symbol"] | |
| name = info.get("longName", info.get("shortName", keyword)) | |
| # ถ้าไม่หาไม่เจอให้ค้นหาผ่าน Yahoo Finance API | |
| else: | |
| url = f"https://query2.finance.yahoo.com/v1/finance/search?q={keyword}" | |
| res = requests.get(url).json() | |
| if "quotes" in res and len(res["quotes"]) > 0: | |
| q = res["quotes"][0] | |
| ticker = q.get("symbol") | |
| name = q.get("longname", q.get("shortname", keyword)) | |
| except: | |
| pass | |
| if not ticker: | |
| ticker = keyword.upper() | |
| if not name: | |
| name = keyword.capitalize() | |
| # คืนค่าเป็น tuple (ชื่อบริษัท, ชื่อหุ้น) เช่น (Apple Inc., AAPL) | |
| return name, ticker | |
| # -------------------------- | |
| # ดึงข่าว 7 วัน | |
| # -------------------------- | |
| # ฟังก์ชันสำหรับดึงข่าว | |
| def fetch_financial_news(keyword): | |
| # หลังจากรับ Keyword เป็นชื่อบริษัท/ชื่อหุ้นอย่างใดอย่างหนึ่ง จะเรียกฟังชันก์ resolve_company_symbol เพื่อให้คืนค่าทั้งชื่อบริษัทและชื่อหุ้น | |
| company, symbol = resolve_company_symbol(keyword) | |
| # กำหนดช่วงวันที่ค้นหาเป็น 7 วันล่าสุด | |
| to_date = datetime.now().strftime('%Y-%m-%d') | |
| from_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d') | |
| # สร้าง query สำหรับ News API เพื่อให้สามารถค้นเจอข่าวที่มีชื่อบริษัทหรือชื่อหุ้นก็ได้ | |
| # เช่น กรอก AAPL ต้องเจอข่าวที่มีคำว่า Apple Inc. ด้วย แม้จะไม่มีคำว่า AAPL อยู่ในข่าวก็ตาม | |
| query_keyword = f"({company} OR {symbol}) finance stock" | |
| # ดึงข่าวทุกหน้าจนกว่าข่าวจะหมด โดยดึงข่าวจาก API ของเว็บ newsapi.org | |
| all_articles = [] | |
| page = 1 | |
| while True: | |
| url = ( | |
| f"https://newsapi.org/v2/everything?" | |
| f"q={query_keyword}&" | |
| f"from={from_date}&to={to_date}&" | |
| f"language=en&sortBy=publishedAt&" | |
| f"pageSize=100&page={page}&apiKey={API_KEY}" | |
| ) | |
| r = requests.get(url) | |
| data = r.json() | |
| if data.get("status") != "ok": | |
| st.error(f"API Error: {data}") | |
| break | |
| articles = data.get("articles", []) | |
| if not articles: | |
| break | |
| # เก็บข่าวลง list | |
| for a in articles: | |
| if a["description"]: | |
| all_articles.append({ | |
| "date": pd.to_datetime(a["publishedAt"]), | |
| "text": f"{a['title']} {a['description']}", | |
| "source": a["source"]["name"], | |
| "url": a["url"] | |
| }) | |
| if len(articles) < 100: | |
| break | |
| page += 1 | |
| return pd.DataFrame(all_articles) | |
| # -------------------------- | |
| # ดึงราคาหุ้น | |
| # -------------------------- | |
| # ฟังก์ชันสำหรับดึงราคาหุ้น | |
| # รับพารามิเตอร์: | |
| # symbol → ชื่อหุ้น (Ticker) | |
| # start_date → วันที่เริ่มต้น (datetime object) | |
| # end_date → วันที่สิ้นสุด (datetime object) | |
| def fetch_stock_price(symbol, start_date, end_date): | |
| try: | |
| # ขยายช่วงวันที่เล็กน้อย เนื่องจากบางครั้งตลาดหุ้นปิดวันเสาร์-อาทิตย์ และวันหยุดนนักขัตฤกษ์ การขยายช่วงจะช่วยให้ได้ข้อมูลครบ | |
| start_str = (start_date - timedelta(days=2)).strftime('%Y-%m-%d') | |
| end_str = (end_date + timedelta(days=1)).strftime('%Y-%m-%d') | |
| # ดึงราคาหุ้นจาก yfinance โดยใช้ราคาปิดรายวัน | |
| df = yf.download(symbol, start=start_str, end=end_str, interval="1d") | |
| if df.empty: | |
| st.warning("ไม่พบข้อมูลราคาหุ้น") | |
| return pd.DataFrame() | |
| df = df.reset_index() | |
| df_subset = df[['Date', 'Close']] | |
| df_subset.columns = ['date', 'price'] | |
| df_subset["date"] = pd.to_datetime(df_subset["date"].dt.date) | |
| # คืนค่าเป็น DataFrame ที่มี 2 คอลัมน์ คือ date และ price | |
| return df_subset | |
| except Exception as e: | |
| st.warning(f"ดึงราคาหุ้นล้มเหลว: {e}") | |
| return pd.DataFrame() | |
| # -------------------------- | |
| # MAIN APP | |
| # -------------------------- | |
| # ฟังก์ชันหลักของ Streamlit app | |
| def main(): | |
| # หัวข้อใหญ่ของแอป | |
| st.title("📰 News Sentiment Analysis for Young Investor") | |
| # ข้อความอธิบาย | |
| st.markdown("วิเคราะห์แนวโน้มอารมณ์ของข่าวย้อนหลัง 7 วัน พร้อมราคาหุ้น") | |
| # Sidebar สำหรับ Input | |
| with st.sidebar: | |
| keyword = st.text_input("ค้นหา Stock Symbol (เช่น AAPL, TSLA):", "") | |
| analyze_btn = st.button("วิเคราะห์เลย") | |
| if not analyze_btn: | |
| st.info("กรอกคำค้นแล้วกด 'วิเคราะห์เลย'") | |
| return | |
| # ดึงข่าว | |
| st.info(f"กำลังดึงข่าวย้อนหลัง 7 วันสำหรับ '{keyword}'...") | |
| news_df = fetch_financial_news(keyword) | |
| if news_df.empty: | |
| st.warning("ไม่พบบทความข่าว") | |
| return | |
| # วิเคราะห์ Sentiment | |
| st.info("กำลังวิเคราะห์อารมณ์ของข่าว...") | |
| news_df["sentiment"] = news_df["text"].apply(analyze_text) | |
| news_df["date"] = pd.to_datetime(news_df["date"]) | |
| # คำนวณ Metrics | |
| avg_sentiment = news_df["sentiment"].mean() | |
| pos_pct = (news_df["sentiment"] > 0.1).mean() * 100 | |
| neg_pct = (news_df["sentiment"] < -0.1).mean() * 100 | |
| # แสดง Metrics บน App เป็น 3 คอลัมน์ ได้แก่ ค่าเฉลี่ย Sentment, % ข่าวเชิงบวก, % ข่าวเชิงลบ | |
| col1, col2, col3 = st.columns(3) | |
| col1.metric("ค่าเฉลี่ยอารมณ์ของข่าว", f"{avg_sentiment:.2f}") | |
| col2.metric("ข่าวเชิงบวก", f"{pos_pct:.1f}%") | |
| col3.metric("ข่าวเชิงลบ", f"{neg_pct:.1f}%") | |
| # --------------------------------------------------------- | |
| # เตรียมข้อมูลสำหรับกราฟ Sentiment & Price | |
| # --------------------------------------------------------- | |
| st.subheader("📈 แนวโน้มอารมณ์ของข่าว & ราคาหุ้น") | |
| # สร้างคอลัมน์วันที่ | |
| news_df["date_day"] = pd.to_datetime(news_df["date"].dt.date) | |
| # ฟังก์ชันจำแนก Sentiment | |
| def sentiment_type(score): | |
| if score > 0.1: | |
| return "positive" | |
| if score < -0.1: | |
| return "negative" | |
| return "neutral" | |
| news_df["sentiment_type"] = news_df["sentiment"].apply(sentiment_type) | |
| # คำนวณค่าเฉลี่ย Sentiment ต่อวัน | |
| daily_avg = news_df.groupby("date_day")["sentiment"].mean().reset_index(name="avg_sentiment") | |
| # นับจำนวนข่าวรายวัน | |
| daily_counts = news_df.groupby(["date_day", "sentiment_type"]).size().unstack(fill_value=0).reset_index() | |
| # รวม DataFrame ของค่าเฉลี่ย และ จำนวนข่าว | |
| df_sorted = pd.merge(daily_avg, daily_counts, on="date_day").sort_values("date_day") | |
| if len(df_sorted) < 2: | |
| st.warning("ข้อมูลไม่พอสร้างแนวโน้ม") | |
| st.dataframe(news_df) | |
| return | |
| # ดึงราคาหุ้น | |
| _, symbol = resolve_company_symbol(keyword) | |
| min_date, max_date = df_sorted["date_day"].min(), df_sorted["date_day"].max() | |
| st.info(f"กำลังดึงราคาหุ้น {symbol} ...") | |
| stock_df = fetch_stock_price(symbol, min_date, max_date) | |
| # รวม DataFrame ของข่าว (ค่าเฉลี่ยและจำนวนข่าวที่รวมกันแล้ว) และ ราคาหุ้น | |
| # ผลลัพธ์จะมี 4 คอลัมน์ ได้แก่ วันที่, ค่าเฉลี่ย, จำนวนข่าว, ราคาหุ้น เพื่อให้พร้อมสำหรับทำกราฟ | |
| plot_data = pd.merge(df_sorted, stock_df, left_on="date_day", right_on="date", how="left") | |
| # --------------------------------------------------------- | |
| # Correlation | |
| # --------------------------------------------------------- | |
| # คำนวณ correlation ระหว่างราคาหุ้นกับค่า sentiment | |
| correlation = plot_data['price'].corr(plot_data['avg_sentiment']) | |
| # แปลงค่า correlation เป็นข้อความอธิบาย | |
| if correlation > 0.7: | |
| corr_label = "มีความสัมพันธ์กันอย่างมากในทิศทางเดียวกัน" | |
| elif correlation > 0.4: | |
| corr_label = "มีความสัมพันธ์กันปานกลางในทิศทางเดียวกัน" | |
| elif correlation > 0.2: | |
| corr_label = "มีความสัมพันธ์กันเล็กน้อยในทิศทางเดียวกัน" | |
| elif correlation < -0.7: | |
| corr_label = "มีความสัมพันธ์กันอย่างมากในทิศทางตรงกันข้าม" | |
| elif correlation < -0.4: | |
| corr_label = "มีความสัมพันธ์กันปานกลางในทิศทางตรงกันข้าม" | |
| elif correlation < -0.2: | |
| corr_label = "มีความสัมพันธ์กันเล็กน้อยในทิศทางตรงกันข้าม" | |
| else: | |
| corr_label = "ไม่มีความสัมพันธ์กัน" | |
| corr_value_text = f"Correlation = {correlation:.2f}" | |
| st.metric( | |
| "วิเคราะห์ความสัมพันธ์ระหว่างอารมณ์ของข่าวกับราคาหุ้น", | |
| corr_label, # ตัวบน (ใหญ่) | |
| corr_value_text # ตัวล่าง (สีเขียว/แดง) | |
| ) | |
| # --------------------------------------------------------- | |
| # Forecast Sentiment | |
| # --------------------------------------------------------- | |
| # แปลง date เป็นตัวเลข โดยนับวันแรกเป็นวันที่ 0 และวันถัดมาเป็น 1, 2, ... เพื่อทำ Linear Regression | |
| plot_data["timestamp"] = (plot_data["date_day"] - plot_data["date_day"].min()).dt.days | |
| train_data = plot_data.dropna(subset=['avg_sentiment']) | |
| # ตรวจสอบว่ามีข้อมูลเพียงพอ | |
| if len(train_data) >= 2: | |
| # สร้างโมเดล Linear Regression โดยให้ x = จำนวนวัน, y = ค่าเฉลี่ย sentiment ต่อวัน | |
| model_lr = LinearRegression() | |
| model_lr.fit(train_data[["timestamp"]], train_data["avg_sentiment"]) | |
| # สร้างช่วงวันที่สำหรับทำนายอนาคต | |
| future_days = 7 | |
| future_timestamps = np.arange( | |
| plot_data["timestamp"].max() + 1, | |
| plot_data["timestamp"].max() + future_days + 1 | |
| ) | |
| # ทำนายค่า sentiment ในอนาคต | |
| future_dates = [plot_data["date_day"].max() + timedelta(days=i) for i in range(1, future_days + 1)] | |
| future_preds = model_lr.predict(future_timestamps.reshape(-1, 1)) | |
| # --------------------------------------------------------- | |
| # Plot | |
| # --------------------------------------------------------- | |
| fig = make_subplots(rows=2, cols=1, specs=[[{"secondary_y": True}], [{}]], | |
| row_heights=[0.7, 0.3], vertical_spacing=0.1, | |
| shared_xaxes=True) | |
| # ราคาหุ้น | |
| fig.add_trace( | |
| go.Scatter( | |
| x=plot_data["date_day"], y=plot_data["price"], | |
| name=f"{symbol} Price", mode="lines+markers", line=dict(color="orange") | |
| ), | |
| row=1, col=1, secondary_y=False | |
| ) | |
| # Sentiment จริง | |
| fig.add_trace( | |
| go.Scatter( | |
| x=plot_data["date_day"], y=plot_data["avg_sentiment"], | |
| name="Actual Sentiment", mode="lines+markers", line=dict(color="blue") | |
| ), | |
| row=1, col=1, secondary_y=True | |
| ) | |
| # Sentiment พยากรณ์ | |
| if "future_preds" in locals(): | |
| fig.add_trace( | |
| go.Scatter( | |
| x=future_dates, y=future_preds, | |
| name="Predicted Sentiment", mode="lines+markers", line=dict(color="#02a1f7", dash="dash") | |
| ), | |
| row=1, col=1, secondary_y=True | |
| ) | |
| # --------------------------------------------------------- | |
| # สร้างเส้นเชื่อม Actual -> Predicted Sentiment เพื่อความสวยงาม | |
| # --------------------------------------------------------- | |
| last_actual_date = plot_data["date_day"].max() | |
| last_actual_value = plot_data["avg_sentiment"].iloc[-1] | |
| first_pred_date = future_dates[0] | |
| first_pred_value = future_preds[0] | |
| fig.add_trace( | |
| go.Scatter( | |
| x=[last_actual_date, first_pred_date], | |
| y=[last_actual_value, first_pred_value], | |
| mode="lines", | |
| line=dict(color="#02a1f7", dash="dot"), | |
| name="Connector Actual→Predicted" | |
| ), | |
| row=1, col=1, secondary_y=True | |
| ) | |
| # กราฟแท่งแสดงจำนวนข่าว | |
| for col in ["neutral", "negative", "positive"]: | |
| if col not in plot_data.columns: | |
| plot_data[col] = 0 | |
| fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["neutral"], name="Neutral", | |
| marker_color='rgba(128, 128, 128, 0.7)'), row=2, col=1) | |
| fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["negative"], name="Negative", | |
| marker_color='rgba(255, 0, 0, 0.7)'), row=2, col=1) | |
| fig.add_trace(go.Bar(x=plot_data["date_day"], y=plot_data["positive"], name="Positive", | |
| marker_color='rgba(0, 128, 0, 0.7)'), row=2, col=1) | |
| fig.update_layout( | |
| title=f"แนวโน้มอารมณ์ข่าว + ราคาหุ้น ({symbol})", | |
| barmode="stack", | |
| height=650, | |
| hovermode="x unified", | |
| template="plotly_white" | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| # ตารางแสดงรายการข่าว | |
| st.subheader("📰 รายการข่าวทั้งหมด") | |
| st.dataframe(news_df[["date", "source", "text", "sentiment", "url"]], use_container_width=True) | |
| # --------------------------------------------------------- | |
| # RUN APP | |
| # --------------------------------------------------------- | |
| if __name__ == "__main__": | |
| main() | |