| import logging |
| import gradio as gr |
| import pandas as pd |
| import torch |
| import numpy as np |
| import matplotlib.pyplot as plt |
| from GoogleNews import GoogleNews |
| from transformers import pipeline |
| from datetime import datetime, timedelta |
| import matplotlib |
| import yfinance as yf |
| matplotlib.use('Agg') |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" |
| ) |
|
|
| SENTIMENT_ANALYSIS_MODEL = ( |
| "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis" |
| ) |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| logging.info(f"Using device: {DEVICE}") |
| logging.info("Initializing sentiment analysis model...") |
| sentiment_analyzer = pipeline( |
| "sentiment-analysis", model=SENTIMENT_ANALYSIS_MODEL, device=DEVICE |
| ) |
| logging.info("Model initialized successfully") |
|
|
| |
| COMMON_TICKERS = { |
| "apple": "AAPL", |
| "microsoft": "MSFT", |
| "amazon": "AMZN", |
| "google": "GOOGL", |
| "alphabet": "GOOGL", |
| "facebook": "META", |
| "meta": "META", |
| "tesla": "TSLA", |
| "nvidia": "NVDA", |
| "netflix": "NFLX", |
| "amd": "AMD", |
| "intel": "INTC", |
| "ibm": "IBM", |
| "oracle": "ORCL", |
| "paypal": "PYPL", |
| "adobe": "ADBE", |
| "cisco": "CSCO", |
| "bitcoin": "BTC-USD", |
| "ethereum": "ETH-USD", |
| "dogecoin": "DOGE-USD", |
| "cardano": "ADA-USD", |
| "xrp": "XRP-USD", |
| "litecoin": "LTC-USD", |
| "samsung": "005930.KS", |
| "hyundai": "005380.KS", |
| "sk hynix": "000660.KS", |
| "lg": "003550.KS", |
| "lge": "066570.KS", |
| "ncsoft": "036570.KS", |
| "kakao": "035720.KS", |
| "naver": "035420.KS", |
| "ํ๋์ฐจ": "005380.KS", |
| "์ผ์ฑ์ ์": "005930.KS", |
| "์ผ์ฑ": "005930.KS", |
| "์นด์นด์ค": "035720.KS", |
| "๋ค์ด๋ฒ": "035420.KS", |
| } |
|
|
| def fetch_articles(query, max_articles=30): |
| try: |
| logging.info(f"Fetching up to {max_articles} articles for query: '{query}'") |
| googlenews = GoogleNews(lang="en") |
| googlenews.search(query) |
| |
| |
| articles = googlenews.result() |
| |
| |
| page = 2 |
| while len(articles) < max_articles and page <= 10: |
| logging.info(f"Fetched {len(articles)} articles so far. Getting page {page}...") |
| googlenews.get_page(page) |
| page_results = googlenews.result() |
| |
| |
| if not page_results: |
| logging.info(f"No more results found after page {page-1}") |
| break |
| |
| articles.extend(page_results) |
| page += 1 |
| |
| |
| articles = articles[:max_articles] |
| |
| logging.info(f"Successfully fetched {len(articles)} articles") |
| return articles |
| except Exception as e: |
| logging.error( |
| f"Error while searching articles for query: '{query}'. Error: {e}" |
| ) |
| raise gr.Error( |
| f"Unable to search articles for query: '{query}'. Try again later...", |
| duration=5, |
| ) |
|
|
| def analyze_article_sentiment(article): |
| logging.info(f"Analyzing sentiment for article: {article['title']}") |
| sentiment = sentiment_analyzer(article["desc"])[0] |
| article["sentiment"] = sentiment |
| return article |
|
|
| def calculate_time_weight(article_date_str): |
| """ |
| ๊ธฐ์ฌ ์๊ฐ ๊ธฐ์ค์ผ๋ก ๊ฐ์ค์น ๊ณ์ฐ |
| - 1์๊ฐ ๋ด ๊ธฐ์ฌ๋ 24% ๊ฐ์ค์น |
| - ์๊ฐ์ด ์ง๋ ์๋ก 1%์ฉ ๊ฐ์ (์ต์ 1%) |
| - ์: 1์๊ฐ ๋ด ๊ธฐ์ฌ = 24%, 10์๊ฐ ์ ๊ธฐ์ฌ = 15%, 24์๊ฐ ์ ๊ธฐ์ฌ = 1% |
| - 24์๊ฐ ์ด์์ด๋ฉด 1%๋ก ๊ณ ์ |
| """ |
| try: |
| |
| date_formats = [ |
| '%a, %d %b %Y %H:%M:%S %z', |
| '%Y-%m-%d %H:%M:%S', |
| '%a, %d %b %Y %H:%M:%S', |
| '%Y-%m-%dT%H:%M:%S%z', |
| '%a %b %d, %Y', |
| '%d %b %Y' |
| ] |
| |
| parsed_date = None |
| for format_str in date_formats: |
| try: |
| parsed_date = datetime.strptime(article_date_str, format_str) |
| break |
| except ValueError: |
| continue |
| |
| |
| if parsed_date is None: |
| logging.warning(f"Could not parse date: {article_date_str}, using default 24h ago") |
| return 0.01 |
| |
| |
| now = datetime.now() |
| if parsed_date.tzinfo is not None: |
| now = now.replace(tzinfo=parsed_date.tzinfo) |
| |
| hours_diff = (now - parsed_date).total_seconds() / 3600 |
| |
| |
| if hours_diff < 1: |
| return 0.24 |
| elif hours_diff < 24: |
| |
| return max(0.01, 0.24 - ((hours_diff - 1) * 0.01)) |
| else: |
| return 0.01 |
| except Exception as e: |
| logging.error(f"Error calculating time weight: {e}") |
| return 0.01 |
|
|
| def calculate_sentiment_score(sentiment_label, time_weight): |
| """ |
| ๊ฐ์ฑ ๋ ์ด๋ธ์ ๋ฐ๋ฅธ ๊ธฐ๋ณธ ์ ์ ๊ณ์ฐ ๋ฐ ์๊ฐ ๊ฐ์ค์น ์ ์ฉ |
| - positive: +3์ |
| - neutral: 0์ |
| - negative: -3์ |
| |
| ์๊ฐ ๊ฐ์ค์น๋ ๋ฐฑ๋ถ์จ๋ก ์ ์ฉ (๊ธฐ๋ณธ ์ ์์ ๊ฐ์ค์น % ๋งํผ ์ถ๊ฐ) |
| ์: |
| - 1์๊ฐ ๋ด ๊ธ์ ๊ธฐ์ฌ: 3์ + (3 * 24%) = 3 + 0.72 = 3.72์ |
| - 10์๊ฐ ์ ๋ถ์ ๊ธฐ์ฌ: -3์ + (-3 * 15%) = -3 - 0.45 = -3.45์ |
| """ |
| base_score = { |
| 'positive': 3, |
| 'neutral': 0, |
| 'negative': -3 |
| }.get(sentiment_label, 0) |
| |
| |
| weighted_addition = base_score * time_weight |
| |
| return base_score, weighted_addition |
|
|
| def get_stock_ticker(asset_name): |
| """ |
| ์์ฐ๋ช
์ผ๋ก๋ถํฐ ์ฃผ์ ํฐ์ปค ์ฌ๋ณผ์ ์ถ์ถ |
| """ |
| logging.info(f"Identifying ticker for: {asset_name}") |
| |
| |
| asset_lower = asset_name.lower().strip() |
| |
| |
| if asset_name.isupper() and 2 <= len(asset_name) <= 6: |
| logging.info(f"Input appears to be a ticker symbol: {asset_name}") |
| return asset_name |
| |
| |
| if asset_lower in COMMON_TICKERS: |
| ticker = COMMON_TICKERS[asset_lower] |
| logging.info(f"Found ticker in common tickers map: {ticker}") |
| return ticker |
| |
| |
| asset_parts = asset_lower.split() |
| for part in asset_parts: |
| if part in COMMON_TICKERS: |
| ticker = COMMON_TICKERS[part] |
| logging.info(f"Found ticker for part '{part}': {ticker}") |
| return ticker |
| |
| |
| potential_ticker = asset_name.upper().replace(" ", "") |
| if 2 <= len(potential_ticker) <= 6: |
| |
| try: |
| logging.info(f"Trying potential ticker: {potential_ticker}") |
| test_data = yf.download(potential_ticker, period="1d", progress=False) |
| if not test_data.empty: |
| logging.info(f"Valid ticker found: {potential_ticker}") |
| return potential_ticker |
| except Exception as e: |
| logging.debug(f"Error testing potential ticker: {e}") |
| |
| |
| try: |
| |
| ticker_search = yf.Ticker(asset_name) |
| try: |
| info = ticker_search.info |
| if 'symbol' in info and info['symbol']: |
| ticker = info['symbol'] |
| logging.info(f"Found ticker from info API: {ticker}") |
| return ticker |
| except (ValueError, KeyError, TypeError) as e: |
| logging.debug(f"Error getting ticker info: {e}") |
| pass |
| except Exception as e: |
| logging.debug(f"Error initializing ticker object: {e}") |
| |
| |
| major_exchanges = ["", ".KS", ".KQ", "-USD"] |
| for exchange in major_exchanges: |
| try: |
| test_ticker = f"{asset_name.upper().replace(' ', '')}{exchange}" |
| logging.info(f"Trying with exchange suffix: {test_ticker}") |
| test_data = yf.download(test_ticker, period="1d", progress=False) |
| if not test_data.empty: |
| logging.info(f"Valid ticker found with suffix: {test_ticker}") |
| return test_ticker |
| except: |
| pass |
| |
| logging.warning(f"Could not identify ticker for: {asset_name}") |
| return None |
|
|
| def create_stock_chart(ticker, period="1mo"): |
| """ |
| ์ฃผ์ ํฐ์ปค์ ๋ํ ์ฐจํธ ์์ฑ |
| """ |
| try: |
| logging.info(f"Fetching stock data for {ticker}") |
| |
| try: |
| stock_data = yf.download(ticker, period=period, progress=False) |
| except Exception as dl_error: |
| logging.error(f"Error downloading stock data: {dl_error}") |
| |
| if "-" in ticker: |
| alt_ticker = ticker.replace("-", ".") |
| logging.info(f"Trying alternative ticker format: {alt_ticker}") |
| stock_data = yf.download(alt_ticker, period=period, progress=False) |
| else: |
| raise dl_error |
| |
| if len(stock_data) == 0: |
| logging.warning(f"No stock data found for ticker: {ticker}") |
| return None |
| |
| |
| logging.info(f"Downloaded data shape: {stock_data.shape}") |
| logging.info(f"Data columns: {stock_data.columns.tolist()}") |
| |
| |
| fig, ax = plt.subplots(figsize=(10, 6)) |
| |
| |
| if isinstance(stock_data.columns, pd.MultiIndex): |
| |
| close_col = ('Close', ticker) |
| if close_col in stock_data.columns: |
| ax.plot(stock_data.index, stock_data[close_col], label='Close Price', color='blue') |
| |
| |
| if len(stock_data) > 20: |
| stock_data['MA20'] = stock_data[close_col].rolling(window=20).mean() |
| ax.plot(stock_data.index, stock_data['MA20'], label='20-day MA', color='orange') |
| |
| |
| volume_col = ('Volume', ticker) |
| if volume_col in stock_data.columns and not stock_data[volume_col].isna().all(): |
| ax2 = ax.twinx() |
| ax2.bar(stock_data.index, stock_data[volume_col], alpha=0.3, color='gray', label='Volume') |
| ax2.set_ylabel('Volume') |
| |
| |
| lines, labels = ax.get_legend_handles_labels() |
| lines2, labels2 = ax2.get_legend_handles_labels() |
| ax.legend(lines + lines2, labels + labels2, loc='upper left') |
| else: |
| |
| ax.legend(loc='upper left') |
| else: |
| raise ValueError(f"Close column not found in data columns: {stock_data.columns}") |
| else: |
| |
| if 'Close' in stock_data.columns: |
| ax.plot(stock_data.index, stock_data['Close'], label='Close Price', color='blue') |
| |
| |
| if len(stock_data) > 20: |
| stock_data['MA20'] = stock_data['Close'].rolling(window=20).mean() |
| ax.plot(stock_data.index, stock_data['MA20'], label='20-day MA', color='orange') |
| |
| |
| if 'Volume' in stock_data.columns and not stock_data['Volume'].isna().all(): |
| ax2 = ax.twinx() |
| ax2.bar(stock_data.index, stock_data['Volume'], alpha=0.3, color='gray', label='Volume') |
| ax2.set_ylabel('Volume') |
| |
| |
| lines, labels = ax.get_legend_handles_labels() |
| lines2, labels2 = ax2.get_legend_handles_labels() |
| ax.legend(lines + lines2, labels + labels2, loc='upper left') |
| else: |
| |
| ax.legend(loc='upper left') |
| else: |
| raise ValueError(f"Close column not found in data columns: {stock_data.columns}") |
| |
| |
| ax.set_title(f"{ticker} Stock Price") |
| ax.set_xlabel('Date') |
| ax.set_ylabel('Price') |
| ax.grid(True, alpha=0.3) |
| |
| plt.tight_layout() |
| |
| |
| chart_path = f"stock_chart_{ticker.replace('-', '_').replace('.', '_')}.png" |
| plt.savefig(chart_path) |
| plt.close() |
| |
| logging.info(f"Stock chart created: {chart_path}") |
| return chart_path |
| except Exception as e: |
| logging.error(f"Error creating stock chart for {ticker}: {e}") |
| |
| try: |
| fig, ax = plt.subplots(figsize=(10, 6)) |
| ax.text(0.5, 0.5, f"Unable to load data for {ticker}\nError: {str(e)}", |
| horizontalalignment='center', verticalalignment='center', transform=ax.transAxes) |
| ax.set_axis_off() |
| chart_path = f"stock_chart_error_{ticker.replace('-', '_').replace('.', '_')}.png" |
| plt.savefig(chart_path) |
| plt.close() |
| return chart_path |
| except: |
| return None |
|
|
| def analyze_asset_sentiment(asset_name): |
| logging.info(f"Starting sentiment analysis for asset: {asset_name}") |
| logging.info("Fetching up to 30 articles") |
| articles = fetch_articles(asset_name, max_articles=30) |
| logging.info("Analyzing sentiment of each article") |
| analyzed_articles = [analyze_article_sentiment(article) for article in articles] |
| |
| |
| for article in analyzed_articles: |
| time_weight = calculate_time_weight(article["date"]) |
| article["time_weight"] = time_weight |
| |
| sentiment_label = article["sentiment"]["label"] |
| base_score, weighted_addition = calculate_sentiment_score(sentiment_label, time_weight) |
| |
| article["base_score"] = base_score |
| article["weighted_addition"] = weighted_addition |
| article["total_score"] = base_score + weighted_addition |
| |
| logging.info("Sentiment analysis completed") |
| |
| |
| sentiment_summary = create_sentiment_summary(analyzed_articles, asset_name) |
| |
| |
| stock_chart = None |
| ticker = get_stock_ticker(asset_name) |
| if ticker: |
| logging.info(f"Found ticker {ticker} for asset {asset_name}") |
| stock_chart = create_stock_chart(ticker) |
| |
| return convert_to_dataframe(analyzed_articles), sentiment_summary, stock_chart, ticker |
|
|
| def create_sentiment_summary(analyzed_articles, asset_name): |
| """ |
| ๊ฐ์ฑ ๋ถ์ ๊ฒฐ๊ณผ๋ฅผ ์์ฝํ๊ณ ๊ทธ๋ํ๋ก ์๊ฐํ |
| """ |
| total_articles = len(analyzed_articles) |
| positive_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "positive") |
| neutral_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "neutral") |
| negative_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "negative") |
| |
| |
| base_score_sum = sum(a["base_score"] for a in analyzed_articles) |
| |
| |
| weighted_score_sum = sum(a["total_score"] for a in analyzed_articles) |
| |
| |
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) |
| |
| |
| labels = ['Positive', 'Neutral', 'Negative'] |
| sizes = [positive_count, neutral_count, negative_count] |
| colors = ['green', 'gray', 'red'] |
| |
| ax1.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90) |
| ax1.axis('equal') |
| ax1.set_title(f'Sentiment Distribution for {asset_name}') |
| |
| |
| sorted_articles = sorted(analyzed_articles, key=lambda x: x.get("date", ""), reverse=True) |
| |
| |
| max_display = min(15, len(sorted_articles)) |
| display_articles = sorted_articles[:max_display] |
| |
| dates = [a.get("date", "")[:10] for a in display_articles] |
| scores = [a.get("total_score", 0) for a in display_articles] |
| |
| |
| bar_colors = ['green' if s > 0 else 'red' if s < 0 else 'gray' for s in scores] |
| |
| bars = ax2.bar(range(len(dates)), scores, color=bar_colors) |
| ax2.set_xticks(range(len(dates))) |
| ax2.set_xticklabels(dates, rotation=45, ha='right') |
| ax2.set_ylabel('Weighted Sentiment Score') |
| ax2.set_title(f'Recent Article Scores for {asset_name}') |
| ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3) |
| |
| |
| summary_text = f""" |
| Analysis Summary for {asset_name}: |
| Total Articles: {total_articles} |
| Positive: {positive_count} ({positive_count/total_articles*100:.1f}%) |
| Neutral: {neutral_count} ({neutral_count/total_articles*100:.1f}%) |
| Negative: {negative_count} ({negative_count/total_articles*100:.1f}%) |
| |
| Base Score Sum: {base_score_sum:.2f} |
| Weighted Score Sum: {weighted_score_sum:.2f} |
| """ |
| |
| plt.figtext(0.5, 0.01, summary_text, ha='center', fontsize=10, bbox={"facecolor":"orange", "alpha":0.2, "pad":5}) |
| |
| plt.tight_layout(rect=[0, 0.1, 1, 0.95]) |
| |
| |
| fig_path = f"sentiment_summary_{asset_name.replace(' ', '_')}.png" |
| plt.savefig(fig_path) |
| plt.close() |
| |
| return fig_path |
|
|
| def convert_to_dataframe(analyzed_articles): |
| df = pd.DataFrame(analyzed_articles) |
| df["Title"] = df.apply( |
| lambda row: f'<a href="{row["link"]}" target="_blank">{row["title"]}</a>', |
| axis=1, |
| ) |
| df["Description"] = df["desc"] |
| df["Date"] = df["date"] |
| |
| def sentiment_badge(sentiment): |
| colors = { |
| "negative": "red", |
| "neutral": "gray", |
| "positive": "green", |
| } |
| color = colors.get(sentiment, "grey") |
| return f'<span style="background-color: {color}; color: white; padding: 2px 6px; border-radius: 4px;">{sentiment}</span>' |
| |
| df["Sentiment"] = df["sentiment"].apply(lambda x: sentiment_badge(x["label"])) |
| |
| |
| df["Base Score"] = df["base_score"] |
| df["Weight"] = df["time_weight"].apply(lambda x: f"{x*100:.0f}%") |
| df["Total Score"] = df["total_score"].apply(lambda x: f"{x:.2f}") |
| |
| return df[["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"]] |
|
|
| def main(): |
| with gr.Blocks() as iface: |
| gr.Markdown("# Trading Asset Sentiment Analysis") |
| gr.Markdown( |
| "Enter the name of a trading asset, and I'll fetch recent articles and analyze their sentiment!" |
| ) |
| |
| with gr.Row(): |
| input_asset = gr.Textbox( |
| label="Asset Name", |
| lines=1, |
| placeholder="Enter the name of the trading asset...", |
| ) |
| |
| with gr.Row(): |
| analyze_button = gr.Button("Analyze Sentiment", size="sm") |
| |
| |
| examples_list = sorted(set(COMMON_TICKERS.keys()), key=lambda x: x.lower()) |
| gr.Examples( |
| examples=examples_list, |
| inputs=input_asset, |
| ) |
| |
| |
| with gr.Row(): |
| with gr.Column(): |
| with gr.Blocks(): |
| gr.Markdown("## Stock Chart") |
| with gr.Row(): |
| stock_chart = gr.Image(type="filepath", label="Stock Price Chart") |
| ticker_info = gr.Textbox(label="Ticker Symbol") |
| |
| with gr.Row(): |
| with gr.Column(): |
| with gr.Blocks(): |
| gr.Markdown("## Sentiment Summary") |
| sentiment_summary = gr.Image(type="filepath", label="Sentiment Analysis Summary") |
| |
| with gr.Row(): |
| with gr.Column(): |
| with gr.Blocks(): |
| gr.Markdown("## Articles and Sentiment Analysis") |
| articles_output = gr.Dataframe( |
| headers=["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"], |
| datatype=["markdown", "html", "markdown", "markdown", "number", "markdown", "markdown"], |
| wrap=False, |
| ) |
| |
| analyze_button.click( |
| analyze_asset_sentiment, |
| inputs=[input_asset], |
| outputs=[articles_output, sentiment_summary, stock_chart, ticker_info], |
| ) |
|
|
| logging.info("Launching Gradio interface") |
| iface.queue().launch() |
|
|
| if __name__ == "__main__": |
| main() |
|
|