danghungithp's picture
Update app.py
bf347d5 verified
import yfinance as yf
import pandas as pd
import matplotlib.pyplot as plt
import mplfinance as mpf
import io
import base64
import tempfile
import os
from datetime import datetime, timedelta
from vnstock import Vnstock, Screener
# Thay thế ChatGoogleGenerativeAI bằng ChatGroq
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph, END
import gradio as gr
from typing import TypedDict, Optional, Callable
# --- 0. Định nghĩa State Schema (Cập nhật API key) ---
class StockState(TypedDict):
symbol: str
groq_api_key: str # Đổi thành groq_api_key
analysis: dict
report: str
plot_path: str
# --- 1. Lấy và Phân tích Dữ liệu Cổ phiếu ---
def analyze_vsa(df: pd.DataFrame) -> pd.DataFrame:
"""
Phân tích dữ liệu lịch sử giá và khối lượng theo phương pháp VSA (Volume Spread Analysis).
Thêm một cột 'vsa_signal' để nhận định tín hiệu dòng tiền.
Args:
df (pd.DataFrame): DataFrame chứa dữ liệu lịch sử giá, phải có các cột 'open', 'high', 'low', 'close', 'volume'.
Returns:
pd.DataFrame: DataFrame đã được thêm cột 'vsa_signal' với các tín hiệu VSA.
"""
if df.empty:
return df
# Tính toán khối lượng trung bình và biên độ giá trung bình (ví dụ: 20 ngày)
df['volume_ma'] = df['volume'].rolling(window=20).mean()
df['spread'] = df['high'] - df['low']
df['spread_ma'] = df['spread'].rolling(window=20).mean()
# Xác định vị trí giá đóng cửa trong biên độ giá
df['close_position'] = (df['close'] - df['low']) / df['spread']
# Khởi tạo cột tín hiệu VSA
df['vsa_signal'] = 'Không có tín hiệu'
# Phân tích các mẫu hình VSA cơ bản
# Tín hiệu Cung Mạnh (Selling Climax / Supply)
df.loc[(df['volume'] > 2 * df['volume_ma']) &
(df['spread'] > 1.5 * df['spread_ma']) &
(df['close_position'] < 0.25), 'vsa_signal'] = 'Cung mạnh'
# Tín hiệu Cầu Mạnh (Buying Climax / Demand)
df.loc[(df['volume'] > 2 * df['volume_ma']) &
(df['spread'] > 1.5 * df['spread_ma']) &
(df['close_position'] > 0.75), 'vsa_signal'] = 'Cầu mạnh'
# Tín hiệu Lưỡng lự (Effort vs. Result / Indecision)
df.loc[(df['volume'] > 1.5 * df['volume_ma']) &
(df['spread'] < 0.5 * df['spread_ma']), 'vsa_signal'] = 'Thị trường lưỡng lự'
# Xóa các cột tạm thời để giữ cho DataFrame sạch sẽ
df.drop(columns=['volume_ma', 'spread', 'spread_ma', 'close_position'], inplace=True)
return df
def fetch_and_analyze(symbol: str) -> dict:
"""
Lấy dữ liệu giao dịch, tài chính, cổ tức và tin tức của một mã cổ phiếu
Việt Nam bằng thư viện vnstock và thực hiện phân tích cơ bản.
Args:
symbol (str): Mã cổ phiếu cần phân tích (ví dụ: FPT).
Returns:
dict: Một từ điển chứa các thông tin phân tích.
Raises:
ValueError: Nếu mã cổ phiếu không hợp lệ hoặc không tìm thấy dữ liệu.
"""
if not symbol or not isinstance(symbol, str):
raise ValueError("Vui lòng nhập một mã cổ phiếu hợp lệ (ví dụ: FPT, HPG, VCB).")
try:
# Khởi tạo đối tượng Vnstock
stock = Vnstock().stock(symbol=symbol,source='TCBS')
# Lấy dữ liệu lịch sử trong 3 tháng gần nhất
end_date = datetime.now()
start_date = end_date - timedelta(days=90)
# Tên cột trong vnstock là chữ thường, không phải chữ hoa
history_data = stock.quote.history(
symbol=symbol,
start=start_date.strftime('%Y-%m-%d'),
end=end_date.strftime('%Y-%m-%d'),
interval='1D'
)
if history_data.empty:
raise ValueError(f"Không tìm thấy dữ liệu lịch sử cho mã '{symbol}'. Vui lòng kiểm tra lại mã.")
# Thêm phân tích VSA vào dữ liệu lịch sử
history_with_vsa = analyze_vsa(history_data.copy())
# Lấy dữ liệu cổ tức
dividends = stock.company.dividends()
# Lấy dữ liệu tài chính (Bảng cân đối kế toán)
financials = stock.finance.balance_sheet(period='quarter')
# Lấy dữ liệu tin tức
news = stock.company.news()
except Exception as e:
raise ValueError(f"Lỗi khi lấy dữ liệu cho mã '{symbol}': {str(e)}")
# Thực hiện các phép tính phân tích
# Sử dụng các tên cột phù hợp với dữ liệu trả về từ vnstock ('close', 'high', 'low', 'volume')
latest_close = history_data['close'].iloc[-1]
start_close = history_data['close'].iloc[0]
pct_change = ((latest_close - start_close) / start_close) * 100
# Tính toán độ biến động (độ lệch chuẩn của % thay đổi giá)
volatility = history_data['close'].pct_change().std() * 100
high = history_data['high'].max()
low = history_data['low'].min()
# Lấy dữ liệu khối lượng giao dịch
latest_volume = history_data['volume'].iloc[-1]
return {
"symbol": symbol,
"latest_close": latest_close,
"start_close": start_close,
"pct_change": pct_change,
"volatility": volatility,
"high": high,
"low": low,
"latest_volume": latest_volume,
"history": history_data,
"history_with_vsa": history_with_vsa,
"dividends": dividends,
"financials": financials,
"news": news
}
# (Không thay đổi)
def fetch_and_analyze_old(symbol: str) -> dict:
if not symbol or not isinstance(symbol, str):
raise ValueError("Vui lòng nhập một mã cổ phiếu hợp lệ (ví dụ: AAPL, TSLA, FPT.VN).")
try:
stock = yf.Ticker(symbol)
data = stock.history(period="3mo")
dividends = stock.dividends
financials = stock.financials
news = stock.news
except Exception as e:
raise ValueError(f"Lỗi khi lấy dữ liệu cho mã '{symbol}': {str(e)}")
if data.empty:
raise ValueError(f"Không tìm thấy dữ liệu cho mã '{symbol}'. Vui lòng kiểm tra lại mã.")
latest_close = data['Close'][-1]
start_close = data['Close'][0]
pct_change = ((latest_close - start_close) / start_close) * 100
volatility = data['Close'].pct_change().std() * 100
high = data['High'].max()
low = data['Low'].min()
return {
"symbol": symbol,
"latest_close": latest_close,
"start_close": start_close,
"pct_change": pct_change,
"volatility": volatility,
"high": high,
"low": low,
"history": data,
"dividends": dividends,
"financials": financials,
"news": news
}
# --- 2. Tạo Biểu đồ và Lưu vào File Tạm ---
# (Không thay đổi)
def generate_price_plot(history: pd.DataFrame, symbol: str, history_with_vsa: pd.DataFrame = None) -> str:
"""
Tạo biểu đồ nến giao dịch từ dữ liệu lịch sử, có volume như mặc định ban đầu.
Args:
history (pd.DataFrame): DataFrame chứa dữ liệu giá cổ phiếu (Open, High, Low, Close, Volume).
symbol (str): Mã cổ phiếu.
history_with_vsa (pd.DataFrame): Không dùng ở phiên bản này.
Returns:
str: Đường dẫn đến tệp hình ảnh biểu đồ nến.
"""
try:
# Chuẩn hóa index
history = history.copy().dropna()
history.index = pd.to_datetime(history.index)
history = history[~history.index.duplicated(keep='first')]
history = history.sort_index()
# Lưu biểu đồ vào tệp tạm thời
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp_file:
save_path = tmp_file.name
mc = mpf.make_marketcolors(
up='green', down='red',
edge='inherit', wick='inherit',
volume='in'
)
s = mpf.make_mpf_style(marketcolors=mc, gridcolor="white")
mpf.plot(
history,
type='candle',
style=s,
title=f'Biểu đồ nến cổ phiếu {symbol}',
ylabel='Giá',
volume=True,
show_nontrading=False,
savefig=save_path
)
print(f"Đã tạo biểu đồ nến tại: {save_path}")
return save_path
except Exception as e:
print(f"Lỗi khi tạo biểu đồ nến: {str(e)}")
return ""
# --- 3. LangChain Node: Tạo Báo cáo với Groq ---
def generate_report_node(state: StockState) -> dict:
s = state["analysis"]
if isinstance(s['news'], list) and len(s['news']) > 0:
news_str = '\n'.join([f"- {item['title']} ({item['link']})" for item in s['news']])
else:
news_str = 'Không có tin tức.'
tech_analysis = (
f"Mã cổ phiếu: {s['symbol']}\n"
f"Kỳ phân tích: 3 tháng gần nhất\n"
f"Dữ liệu giá:\n"
f"- Giá đóng cửa đầu kỳ: ${s['start_close']:.2f}\n"
f"- Giá đóng cửa cuối kỳ: ${s['latest_close']:.2f}\n"
f"- Phần trăm thay đổi: {s['pct_change']:.2f}%\n"
f"- Biến động trong 90 ngày: {s['volatility']:.2f}%\n"
f"- Phân tích dòng tiền bằng phương pháp VSA: {s['history_with_vsa']}%\n"
f"- Giá cao nhất: ${s['high']:.2f}\n"
f"- Giá thấp nhất: ${s['low']:.2f}\n"
f"\n**Biểu đồ giá hình nến:**\n[Xem bên dưới]\n"
f"\n - financials={s['financials'].to_string() if hasattr(s['financials'], 'empty') and not s['financials'].empty else 'Không có dữ liệu tài chính.'}\n"
f"\n - dividends={s['dividends'].to_string() if hasattr(s['dividends'], 'empty') and not s['dividends'].empty else 'Không có dữ liệu cổ tức.'}\n"
f"\n - news={news_str}\n"
)
prompt = ChatPromptTemplate.from_template("""Là một chuyên gia phân tích tài chính chuyên nghiệp, hãy phân tích cổ phiếu này dựa trên dữ liệu được cung cấp.
Cung cấp khuyến nghị theo cấu trúc chính xác sau:
**Phân tích Xu hướng**:
[Tóm tắt biến động giá trong khoảng 100 từ]
**Đánh giá Mức độ Biến động**:
[Diễn giải về mức độ biến động trong 90 ngày]
**Tổng hợp các tin tức quan trọng liên quan tới cổ phiếu này**:
[Tổng hợp các tin tức quan trọng liên quan tới cổ phiếu này. dữ liệu từ các trang tại việt nam như cafef.vn, vietstock.vn,link liên kết]
**Định giá**:
[Định giá theo p/e, p/b, DDM, DCF, price and volume. cho cổ phiếu này] - [Lý do ngắn gọn]
**Khuyến nghị**:
[Mua/Giữ/Bán] - [Lý do ngắn gọn]
**Tín hiệu mua/ bán hiện tại của cổ phiếu**:
[Tín hiệu mua/ bán hiện tại của cổ phiếu theo price action, mẫu hình nến, và phân tích dòng tiền theo phương pháp vsa cho cổ phiếu này ] - [Lý do ngắn gọn]
** Tín hiệu mua / bán hiện tại của cổ phiếu**:
[Tín hiệu mua/ bán hiện tại của cổ phiếu theo mẫu hình nến vcp(Volatility Contraction Pattern) cho cổ phiếu này ] - [Lý do ngắn gọn]
**Các yếu tố rủi ro**:
- [Liệt kê 3 rủi ro chính có thể ảnh hưởng đến cổ phiếu này]
Dữ liệu:
{tech_analysis}""")
# Xác thực và sử dụng Groq API key
api_key = "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"#state.get("groq_api_key", "")
if not api_key or not api_key.startswith("gsk_"):
raise ValueError("Vui lòng cung cấp một Groq API key hợp lệ (bắt đầu bằng 'gsk_').")
# Sử dụng ChatGroq với model Llama3
# Bạn có thể chọn các model khác như "mixtral-8x7b-32768"
model = ChatGroq(
groq_api_key=api_key,
model_name="openai/gpt-oss-120b",
temperature=0.1 # Giảm nhiệt độ để kết quả nhất quán hơn
)
chain = prompt | model | StrOutputParser()
report = chain.invoke({"tech_analysis": tech_analysis})
return {"report": report}
# --- 4. LangGraph State & Graph ---
def fetch_node(state: StockState) -> dict:
analysis_data = fetch_and_analyze(state["symbol"])
plot_path = generate_price_plot(
analysis_data["history"],
analysis_data["symbol"],
analysis_data.get("history_with_vsa")
)
return {"analysis": analysis_data, "plot_path": plot_path}
graph = StateGraph(StockState)
graph.add_node("fetch", fetch_node)
graph.add_node("generate_report", generate_report_node)
graph.add_edge("fetch", "generate_report")
graph.add_edge("generate_report", END)
graph.set_entry_point("fetch")
stock_agent = graph.compile()
# --- 5. Hàm chạy chính (Cập nhật tham số API key) ---
def stock_insight_agent(symbol: str, groq_api_key: str, filter_row: dict = None):
try:
# Truyền groq_api_key vào state
result = stock_agent.invoke({"symbol": symbol, "groq_api_key": groq_api_key})
report = result["report"]
plot_path = result.get("plot_path", "")
# Nếu có filter_row, bổ sung vào báo cáo
if filter_row:
filter_info = '\\n'.join([f"- {k}: {v}" for k, v in filter_row.items()])
report = f"**Thông tin tổng hợp từ bảng filter:**\\n{filter_info}\\n\\n" + report
if not plot_path:
return {"report": report, "plot_path": "❌ Lỗi: Không thể tạo biểu đồ."}
return {"report": report, "plot_path": plot_path}
except Exception as e:
return {"report": f"❌ Đã xảy ra lỗi: {str(e)}", "plot_path": None}
# --- 6. Giao diện Gradio: Thêm panel lọc cổ phiếu và bảng kết quả ---
def get_screener_data():
screener = Screener()
df = screener.stock(params={"exchangeName": "HOSE,HNX,UPCOM"}, limit=50000)
# Đổi tên cột cho dễ hiểu nếu cần
rename_map = {
'ticker': 'Mã',
'companyName': 'Tên',
'revenueGrowthQoQ': '% Tăng Doanh Thu',
'netProfitGrowthQoQ': '% Tăng Lợi Nhuận',
'roe': 'ROE (%)',
'dividendYield': 'Cổ tức (%)',
'marketCap': 'Vốn hóa',
'industry': 'Ngành'
}
df = df.rename(columns=rename_map)
# Kiểm tra nếu thiếu cột filter thì fallback sang tên gốc
for orig, new in [('revenueGrowthQoQ', '% Tăng Doanh Thu'), ('netProfitGrowthQoQ', '% Tăng Lợi Nhuận'), ('roe', 'ROE (%)'), ('dividendYield', 'Cổ tức (%)')]:
if new not in df.columns and orig in df.columns:
df[new] = df[orig]
# Chuyển các cột filter về numeric
for col in ['% Tăng Doanh Thu', '% Tăng Lợi Nhuận', 'ROE (%)', 'Cổ tức (%)']:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
else:
print(f"[DEBUG] Không tìm thấy cột filter: {col}. Các cột hiện có: {df.columns.tolist()}")
return df
def filter_stocks(df, rev_growth, profit_growth, roe, dividend):
# Fallback sang tên cột gốc nếu tên tiếng Việt không tồn tại
col_rev = '% Tăng Doanh Thu' if '% Tăng Doanh Thu' in df.columns else 'revenue_growth_1y' if 'revenue_growth_1y' in df.columns else 'revenueGrowthQoQ'
col_profit = '% Tăng Lợi Nhuận' if '% Tăng Lợi Nhuận' in df.columns else 'last_quarter_profit_growth' if 'last_quarter_profit_growth' in df.columns else 'netProfitGrowthQoQ'
col_roe = 'ROE (%)' if 'ROE (%)' in df.columns else 'roe'
col_div = 'Cổ tức (%)' if 'Cổ tức (%)' in df.columns else 'dividend_yield' if 'dividend_yield' in df.columns else 'dividendYield'
filtered = df[
(pd.to_numeric(df[col_rev], errors='coerce') > rev_growth) &
(pd.to_numeric(df[col_profit], errors='coerce') > profit_growth) &
(pd.to_numeric(df[col_roe], errors='coerce') > roe) &
(pd.to_numeric(df[col_div], errors='coerce') > dividend)
]
return filtered
def stock_filter_panel(rev_growth, profit_growth, roe, dividend):
df = get_screener_data()
filtered = filter_stocks(df, rev_growth, profit_growth, roe, dividend)
# Ưu tiên các cột tài chính, tăng trưởng, định giá, ngành, mã, ROE, cổ tức, vốn hóa, PE, PB, EPS, doanh thu, lợi nhuận, khối lượng, khuyến nghị
preferred_cols = [
'Mã', 'exchange', 'Ngành', 'market_cap', 'ROE (%)', 'dividend_yield', 'pe', 'pb', 'eps',
'revenue_growth_1y', 'revenue_growth_5y', 'eps_growth_1y', 'eps_growth_5y', 'net_margin', 'gross_margin',
'avg_trading_value_20d', 'total_trading_value', 'tcbs_recommend', 'tcbs_buy_sell_signal', 'stock_rating',
'price_near_realtime', 'price_growth_1m', 'price_growth_1w', 'price_growth1_day', 'active_buy_pct', 'strong_buy_pct',
'high_vol_match', 'forecast_vol_ratio', 'ev_ebitda', 'ps', 'ev', 'peg_forward', 'peg_trailing', 'rsi14', 'beta', 'alpha',
'num_increase_continuous_day', 'num_decrease_continuous_day', 'breakout', 'heating_up', 'price_break_out52_week',
'price_vs_sma5', 'price_vs_sma10', 'price_vs_sma20', 'price_vs_sma50', 'price_vs_sma100', 'price_vs_sma200',
'vol_vs_sma5', 'vol_vs_sma10', 'vol_vs_sma20', 'vol_vs_sma50', 'macd_histogram', 'bolling_band_signal', 'dmi_signal',
'sar_vs_macd_hist', 'rsi14_status', 'foreign_vol_pct', 'foreign_transaction', 'foreign_buysell_20s',
'avg_trading_value_5d', 'avg_trading_value_10d', 'eps_recently', 'eps_ttm_growth1_year', 'eps_ttm_growth5_year',
'quarter_revenue_growth', 'quarter_income_growth', 'quarterly_income', 'quarterly_revenue', 'profit_last_4q',
'last_quarter_revenue_growth', 'second_quarter_revenue_growth', 'last_quarter_profit_growth', 'second_quarter_profit_growth',
'has_financial_report', 'financial_health', 'business_operation', 'business_model', 'corporate_percentage', 'free_transfer_rate',
'net_cash_per_market_cap', 'net_cash_per_total_assets', 'equity_mi', 'doe', 'npl', 'nim', 'roa'
]
# Lấy các cột có trong DataFrame, ưu tiên theo thứ tự preferred_cols
display_cols = [col for col in preferred_cols if col in filtered.columns]
# Nếu vẫn còn ít hơn 10 cột, bổ sung thêm các cột còn lại
if len(display_cols) < 10:
for col in filtered.columns:
if col not in display_cols:
display_cols.append(col)
if len(display_cols) >= 15:
break
filtered_display = filtered[display_cols]
filtered_display = filtered_display.reset_index(drop=True)
return filtered_display, filtered
# Callback khi click vào mã cổ phiếu trong bảng filter
def on_stock_click(selected, df_all):
if selected is None or len(selected) == 0:
return "", ""
row_idx = selected[0] if isinstance(selected, (list, tuple)) else selected
try:
symbol = df_all.iloc[row_idx]['Mã'] if 'Mã' in df_all.columns else df_all.iloc[row_idx][0]
# Lấy toàn bộ thông tin dòng filter để truyền vào AI
filter_row = df_all.iloc[row_idx].to_dict()
except Exception as e:
return f"Lỗi lấy mã cổ phiếu: {e}", ""
print(f"[DEBUG] Phân tích AI cho mã: {symbol}")
result = stock_insight_agent(symbol, "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", filter_row)
plot_path = result.get("plot_path", "")
import os
cwd = os.getcwd()
if (
not plot_path
or not isinstance(plot_path, str)
or not os.path.exists(plot_path)
or not os.path.isfile(plot_path)
or os.path.isdir(plot_path)
or os.path.abspath(plot_path) == os.path.abspath(cwd)
):
plot_path = None
return result["report"], plot_path
with gr.Blocks(title="⚡️ Phân tích & Lọc Cổ phiếu Siêu tốc bằng AI") as demo:
gr.Markdown("""# ⚡️ Phân tích & Lọc Cổ phiếu Siêu tốc bằng AI
Nhập các tiêu chí lọc để tìm cổ phiếu tiềm năng. Click vào mã cổ phiếu để phân tích chuyên sâu bằng AI!
""")
with gr.Row():
with gr.Column(scale=2):
# Ô nhập mã cổ phiếu và nút phân tích riêng
symbol_input = gr.Textbox(label="Nhập mã cổ phiếu để phân tích nhanh", placeholder="Ví dụ: FPT, HPG, VCB...")
analyze_btn = gr.Button("Phân tích mã cổ phiếu")
rev_growth = gr.Slider(0, 100, value=30, step=1, label="% Tăng Doanh Thu >")
profit_growth = gr.Slider(0, 100, value=20, step=1, label="% Tăng Lợi Nhuận >")
roe = gr.Slider(0, 100, value=20, step=1, label="ROE (%) >")
dividend = gr.Slider(0, 100, value=20, step=1, label="Cổ tức (%) >")
groq_api_key = gr.Textbox(label="Groq API Key (nếu có)", type="password", placeholder="Nhập API key của bạn từ GroqCloud...")
filter_btn = gr.Button("Lọc cổ phiếu")
with gr.Column(scale=5):
stock_table = gr.Dataframe(headers=None, interactive=True, label="Bảng kết quả lọc cổ phiếu")
gr.Markdown("*Click vào mã cổ phiếu để phân tích AI*")
with gr.Row():
with gr.Column():
ai_report = gr.Markdown(label="Báo cáo Phân tích AI")
ai_plot = gr.Image(label="Biểu đồ Giá Cổ phiếu")
# Lưu DataFrame đầy đủ để callback click lấy đúng symbol
df_all = gr.State()
def update_table(rev_growth, profit_growth, roe, dividend):
filtered_display, filtered = stock_filter_panel(rev_growth, profit_growth, roe, dividend)
return filtered_display, filtered
filter_btn.click(
update_table,
inputs=[rev_growth, profit_growth, roe, dividend],
outputs=[stock_table, df_all]
)
stock_table.select(
on_stock_click,
inputs=[stock_table, df_all],
outputs=[ai_report, ai_plot]
)
# Callback cho nút phân tích mã cổ phiếu nhập tay
def analyze_symbol(symbol, groq_api_key):
if not symbol:
return "Vui lòng nhập mã cổ phiếu!", None
result = stock_insight_agent(symbol.strip().upper(), groq_api_key or "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")
plot_path = result.get("plot_path", "")
import os
cwd = os.getcwd()
if (
not plot_path
or not isinstance(plot_path, str)
or not os.path.exists(plot_path)
or not os.path.isfile(plot_path)
or os.path.isdir(plot_path)
or os.path.abspath(plot_path) == os.path.abspath(cwd)
):
plot_path = None
return result["report"], plot_path
analyze_btn.click(
analyze_symbol,
inputs=[symbol_input, groq_api_key],
outputs=[ai_report, ai_plot]
)
if __name__ == "__main__":
demo.launch(share=True)