Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import time | |
| from pathlib import Path | |
| import gradio as gr | |
| from gradio import ChatMessage | |
| from agents.financial_agent import FinancialAdvisorAgent | |
| from agents.tools import FinancialTools | |
| # Avatar configuration | |
| AVATAR_IMAGES = ( | |
| None, | |
| "./public/images/fin_logo.png", | |
| ) | |
| # Initialize tools and agent | |
| financial_tools = FinancialTools(tavily_api_key=os.getenv("TAVILY_API_KEY")) | |
| tools = financial_tools.get_all_tools() | |
| agent = FinancialAdvisorAgent(tools=tools, openai_api_key=os.getenv("OPENAI_API_KEY")) | |
| #agent = FinancialAdvisorAgent(tools=tools, cohere_api_key=os.getenv("COHERE_API_KEY")) | |
| gr.set_static_paths(paths=[(Path.cwd() / "public" / "images").absolute()]) | |
| def analyze_data_with_repl(data_type, data): | |
| """Analyze financial data using Python REPL with comprehensive insights""" | |
| if data_type == "budget": | |
| try: | |
| budget_data = json.loads(data) | |
| categories = list(budget_data.get("current_expenses", {}).keys()) | |
| values = list(budget_data.get("current_expenses", {}).values()) | |
| income = budget_data.get("monthly_income", budget_data.get("income", 0)) | |
| if categories and values: | |
| total_expenses = sum(values) | |
| analysis_text = "💰 **Comprehensive Budget Analysis**\n\n" | |
| # Income vs Expenses Overview | |
| analysis_text += "## 📈 **Income vs Expenses Overview**\n" | |
| analysis_text += f"- **Monthly Income**: ${income:,.0f}\n" | |
| analysis_text += f"- **Total Expenses**: ${total_expenses:,.0f}\n" | |
| if income > 0: | |
| remaining = income - total_expenses | |
| savings_rate = (remaining / income * 100) if income > 0 else 0 | |
| if remaining > 0: | |
| analysis_text += f"- **💚 Surplus**: ${remaining:,.0f}\n" | |
| analysis_text += f"- **💎 Savings Rate**: {savings_rate:.1f}%\n" | |
| else: | |
| analysis_text += f"- **🔴 Deficit**: ${abs(remaining):,.0f}\n" | |
| analysis_text += ( | |
| f"- **⚠️ Overspending**: {abs(savings_rate):.1f}%\n" | |
| ) | |
| # Expense Breakdown with Progress Bars | |
| analysis_text += "\n## 💳 **Expense Breakdown**\n" | |
| for i, (category, amount) in enumerate(zip(categories, values)): | |
| percentage = ( | |
| (amount / total_expenses * 100) if total_expenses > 0 else 0 | |
| ) | |
| income_percentage = (amount / income * 100) if income > 0 else 0 | |
| bar = "█" * min(int(percentage / 3), 30) # Max 30 chars | |
| analysis_text += f"**{category.title()}**: ${amount:,.0f}\n" | |
| analysis_text += f" └─ {percentage:.1f}% of expenses | {income_percentage:.1f}% of income {bar}\n\n" | |
| # Financial Health Metrics | |
| analysis_text += "## 📊 **Financial Health Metrics**\n" | |
| avg_expense = total_expenses / len(values) | |
| largest_expense = max(values) | |
| smallest_expense = min(values) | |
| largest_category = categories[values.index(largest_expense)] | |
| smallest_category = categories[values.index(smallest_expense)] | |
| analysis_text += ( | |
| f"- **Average Category Expense**: ${avg_expense:,.0f}\n" | |
| ) | |
| analysis_text += f"- **Highest Expense**: {largest_category} (${largest_expense:,.0f})\n" | |
| analysis_text += f"- **Lowest Expense**: {smallest_category} (${smallest_expense:,.0f})\n" | |
| analysis_text += ( | |
| f"- **Expense Range**: ${largest_expense - smallest_expense:,.0f}\n" | |
| ) | |
| # Budget Recommendations | |
| analysis_text += "\n## 💡 **Smart Budget Insights**\n" | |
| # 50/30/20 Rule Analysis | |
| if income > 0: | |
| needs_target = income * 0.50 | |
| wants_target = income * 0.30 | |
| savings_target = income * 0.20 | |
| analysis_text += "**50/30/20 Rule Comparison:**\n" | |
| analysis_text += f"- Needs Target (50%): ${needs_target:,.0f}\n" | |
| analysis_text += f"- Wants Target (30%): ${wants_target:,.0f}\n" | |
| analysis_text += f"- Savings Target (20%): ${savings_target:,.0f}\n" | |
| if savings_rate >= 20: | |
| analysis_text += "✅ **Excellent savings rate!**\n" | |
| elif savings_rate >= 10: | |
| analysis_text += "⚠️ **Good savings, aim for 20%**\n" | |
| else: | |
| analysis_text += ( | |
| "🔴 **Consider reducing expenses to save more**\n" | |
| ) | |
| # Category Warnings | |
| for category, amount in zip(categories, values): | |
| if income > 0: | |
| cat_percentage = amount / income * 100 | |
| if ( | |
| category.lower() in ["rent", "housing"] | |
| and cat_percentage > 30 | |
| ): | |
| analysis_text += f"⚠️ **Housing costs high**: {cat_percentage:.1f}% (recommend <30%)\n" | |
| elif ( | |
| category.lower() in ["food", "dining"] | |
| and cat_percentage > 15 | |
| ): | |
| analysis_text += f"⚠️ **Food costs high**: {cat_percentage:.1f}% (recommend <15%)\n" | |
| return analysis_text | |
| except Exception as e: | |
| return f"Error analyzing budget data: {str(e)}" | |
| elif data_type == "portfolio": | |
| try: | |
| portfolio_data = json.loads(data) | |
| holdings = portfolio_data.get("holdings", []) | |
| total_value = sum(holding.get("value", 0) for holding in holdings) | |
| analysis_text = "📊 **Advanced Portfolio Analysis**\n\n" | |
| # Portfolio Overview | |
| analysis_text += "## 💼 **Portfolio Overview**\n" | |
| analysis_text += f"- **Total Portfolio Value**: ${total_value:,.2f}\n" | |
| analysis_text += f"- **Number of Holdings**: {len(holdings)}\n" | |
| if holdings: | |
| values = [holding.get("value", 0) for holding in holdings] | |
| avg_holding = sum(values) / len(values) | |
| max_holding = max(values) | |
| min_holding = min(values) | |
| analysis_text += f"- **Average Holding Size**: ${avg_holding:,.2f}\n" | |
| analysis_text += f"- **Largest Position**: ${max_holding:,.2f}\n" | |
| analysis_text += f"- **Smallest Position**: ${min_holding:,.2f}\n" | |
| # Detailed Holdings breakdown | |
| analysis_text += "\n## 📈 **Holdings Breakdown**\n" | |
| sorted_holdings = sorted( | |
| holdings, key=lambda x: x.get("value", 0), reverse=True | |
| ) | |
| for i, holding in enumerate(sorted_holdings, 1): | |
| symbol = holding.get("symbol", "Unknown") | |
| value = holding.get("value", 0) | |
| shares = holding.get("shares", 0) | |
| allocation = holding.get( | |
| "allocation", (value / total_value * 100) if total_value > 0 else 0 | |
| ) | |
| sector = holding.get("sector", "Unknown") | |
| # Calculate position concentration risk | |
| risk_level = ( | |
| "🟢 Low" | |
| if allocation < 10 | |
| else "🟡 Medium" | |
| if allocation < 25 | |
| else "🔴 High" | |
| ) | |
| analysis_text += f"**#{i} {symbol}** - {sector}\n" | |
| analysis_text += f" └─ Value: ${value:,.2f} | Shares: {shares:,.0f} | Weight: {allocation:.1f}%\n" | |
| analysis_text += f" └─ Concentration Risk: {risk_level}\n\n" | |
| # Sector analysis with advanced metrics | |
| sectors = {} | |
| sector_values = {} | |
| for holding in holdings: | |
| sector = holding.get("sector", "Unknown") | |
| allocation = holding.get("allocation", 0) | |
| value = holding.get("value", 0) | |
| sectors[sector] = sectors.get(sector, 0) + allocation | |
| sector_values[sector] = sector_values.get(sector, 0) + value | |
| if sectors: | |
| analysis_text += "## 🏭 **Sector Diversification Analysis**\n" | |
| sorted_sectors = sorted( | |
| sectors.items(), key=lambda x: x[1], reverse=True | |
| ) | |
| for sector, allocation in sorted_sectors: | |
| bar = "█" * min(int(allocation / 2), 30) | |
| value = sector_values.get(sector, 0) | |
| # Sector concentration assessment | |
| if allocation > 40: | |
| risk_emoji = "🔴" | |
| risk_text = "Over-concentrated" | |
| elif allocation > 25: | |
| risk_emoji = "🟡" | |
| risk_text = "Moderate concentration" | |
| else: | |
| risk_emoji = "🟢" | |
| risk_text = "Well diversified" | |
| analysis_text += f"**{sector}**: {allocation:.1f}% (${value:,.2f}) {risk_emoji}\n" | |
| analysis_text += f" └─ {bar} {risk_text}\n\n" | |
| # Portfolio Health Metrics | |
| analysis_text += "## 🎯 **ポートフォリオの健全性評価**\n" | |
| # Diversification Score | |
| num_sectors = len(sectors) | |
| if num_sectors >= 8: | |
| diversification = "🟢 Excellent" | |
| elif num_sectors >= 5: | |
| diversification = "🟡 Good" | |
| else: | |
| diversification = "🔴 Poor" | |
| analysis_text += f"- **セクターの多様化**: {diversification} ({num_sectors} セクター)\n" | |
| # Concentration Risk | |
| if holdings: | |
| top_3_allocation = sum( | |
| sorted([h.get("allocation", 0) for h in holdings], reverse=True)[:3] | |
| ) | |
| if top_3_allocation > 60: | |
| concentration_risk = "🔴 High" | |
| elif top_3_allocation > 40: | |
| concentration_risk = "🟡 Medium" | |
| else: | |
| concentration_risk = "🟢 Low" | |
| analysis_text += f"- **集中リスク**: {concentration_risk} (Top 3: {top_3_allocation:.1f}%)\n" | |
| # Portfolio Recommendations | |
| analysis_text += "\n## 💡 **ポートフォリオ最適化の推奨事項**\n" | |
| # Check for over-concentration | |
| for holding in holdings: | |
| allocation = holding.get("allocation", 0) | |
| if allocation > 25: | |
| analysis_text += f"⚠️ **{holding.get('symbol', 'Unknown')}**は{allocation:.1f}%でオーバーウェイトしています。 (リバランスを検討)\n" | |
| # Sector recommendations | |
| for sector, allocation in sectors.items(): | |
| if allocation > 40: | |
| analysis_text += f"⚠️ **{sector}** セクターオーバーウェイト {allocation:.1f}% (多様化を検討)\n" | |
| # Diversification suggestions | |
| if num_sectors < 5: | |
| analysis_text += "💡 **分散投資の効果を高めるために、より多くのセクターへの投資を検討する**\n" | |
| if len(holdings) < 10: | |
| analysis_text += ( | |
| "💡 **単一銘柄のリスクを減らすために保有銘柄を増やすことを検討する。**\n" | |
| ) | |
| return analysis_text | |
| except Exception as e: | |
| return f"Error analyzing portfolio data: {str(e)}" | |
| elif data_type == "stock": | |
| try: | |
| stock_data = json.loads(data) | |
| symbol = stock_data.get("symbol", "Unknown") | |
| price_str = stock_data.get("current_price", "0") | |
| analysis_text = f"📈 **Comprehensive Stock Analysis: {symbol}**\n\n" | |
| # Company Overview | |
| analysis_text += "## 🏢 **Company Overview**\n" | |
| analysis_text += f"- **シンボル**: {symbol}\n" | |
| analysis_text += f"- **プライス**: {price_str}\n" | |
| analysis_text += f"- **企業名**: {stock_data.get('company_name', 'N/A')}\n" | |
| analysis_text += f"- **セクター**: {stock_data.get('sector', 'N/A')}\n" | |
| analysis_text += f"- **業界**: {stock_data.get('industry', 'N/A')}\n" | |
| analysis_text += ( | |
| f"- **マーケット・キャップ**: {stock_data.get('market_cap', 'N/A')}\n\n" | |
| ) | |
| # Financial Metrics | |
| financials = stock_data.get("financials", {}) | |
| if financials: | |
| analysis_text += "## 💹 **Key Financial Metrics**\n" | |
| # Valuation metrics | |
| pe_ratio = financials.get("pe_ratio", "N/A") | |
| pb_ratio = financials.get("pb_ratio", "N/A") | |
| ps_ratio = financials.get("ps_ratio", "N/A") | |
| analysis_text += f"- **P/E Ratio**: {pe_ratio}" | |
| if pe_ratio != "N/A" and isinstance(pe_ratio, (int, float)): | |
| if pe_ratio < 15: | |
| analysis_text += " 🟢 (Undervalued)" | |
| elif pe_ratio > 25: | |
| analysis_text += " 🔴 (Potentially Overvalued)" | |
| else: | |
| analysis_text += " 🟡 (Fairly Valued)" | |
| analysis_text += "\n" | |
| analysis_text += f"- **P/B Ratio**: {pb_ratio}\n" | |
| analysis_text += f"- **P/S Ratio**: {ps_ratio}\n" | |
| # Profitability metrics | |
| analysis_text += f"- **ROE**: {financials.get('roe', 'N/A')}\n" | |
| analysis_text += f"- **ROA**: {financials.get('roa', 'N/A')}\n" | |
| analysis_text += ( | |
| f"- **Profit Margin**: {financials.get('profit_margin', 'N/A')}\n" | |
| ) | |
| analysis_text += f"- **Revenue Growth**: {financials.get('revenue_growth', 'N/A')}\n\n" | |
| # Performance analysis with trend indicators | |
| performance = stock_data.get("performance", {}) | |
| if performance: | |
| analysis_text += "## 📊 **Performance Analysis**\n" | |
| periods = ["1d", "1w", "1m", "3m", "6m", "1y", "ytd"] | |
| for period in periods: | |
| if period in performance: | |
| return_pct = performance[period] | |
| # Add trend indicators | |
| if isinstance(return_pct, str) and "%" in return_pct: | |
| try: | |
| pct_value = float(return_pct.replace("%", "")) | |
| if pct_value > 0: | |
| trend = "📈" | |
| elif pct_value < 0: | |
| trend = "📉" | |
| else: | |
| trend = "➡️" | |
| except: | |
| trend = "" | |
| else: | |
| trend = "" | |
| analysis_text += ( | |
| f"- **{period.upper()}**: {return_pct} {trend}\n" | |
| ) | |
| analysis_text += "\n" | |
| # Advanced Risk Assessment | |
| risk_data = stock_data.get("risk_assessment", {}) | |
| if risk_data: | |
| analysis_text += "## ⚠️ **Risk Assessment**\n" | |
| risk_level = risk_data.get("risk_level", "N/A") | |
| volatility = risk_data.get("volatility_30d", "N/A") | |
| beta = risk_data.get("beta", "N/A") | |
| # Risk level with emoji indicators | |
| if risk_level.lower() == "low": | |
| risk_emoji = "🟢" | |
| elif risk_level.lower() == "medium": | |
| risk_emoji = "🟡" | |
| elif risk_level.lower() == "high": | |
| risk_emoji = "🔴" | |
| else: | |
| risk_emoji = "" | |
| analysis_text += f"- **Risk Level**: {risk_level} {risk_emoji}\n" | |
| analysis_text += f"- **30-Day Volatility**: {volatility}\n" | |
| analysis_text += f"- **Beta**: {beta}" | |
| if beta != "N/A" and isinstance(beta, (int, float)): | |
| if beta > 1.2: | |
| analysis_text += " (High volatility vs market)" | |
| elif beta < 0.8: | |
| analysis_text += " (Low volatility vs market)" | |
| else: | |
| analysis_text += " (Similar to market)" | |
| analysis_text += "\n\n" | |
| # Technical Analysis | |
| technical = stock_data.get("technical_analysis", {}) | |
| if technical: | |
| analysis_text += "## 📈 **Technical Analysis**\n" | |
| analysis_text += f"- **50-Day MA**: {technical.get('ma_50', 'N/A')}\n" | |
| analysis_text += f"- **200-Day MA**: {technical.get('ma_200', 'N/A')}\n" | |
| analysis_text += f"- **RSI**: {technical.get('rsi', 'N/A')}\n" | |
| analysis_text += ( | |
| f"- **Support Level**: {technical.get('support', 'N/A')}\n" | |
| ) | |
| analysis_text += ( | |
| f"- **Resistance Level**: {technical.get('resistance', 'N/A')}\n\n" | |
| ) | |
| # Investment Recommendation with detailed reasoning | |
| recommendation = stock_data.get("recommendation", {}) | |
| if recommendation: | |
| action = recommendation.get("action", "N/A") | |
| confidence = recommendation.get("confidence", "N/A") | |
| reasoning = recommendation.get("reasoning", "") | |
| analysis_text += "## 💡 **Investment Recommendation**\n" | |
| # Action with emoji | |
| if action.lower() == "buy": | |
| action_emoji = "🟢" | |
| elif action.lower() == "sell": | |
| action_emoji = "🔴" | |
| elif action.lower() == "hold": | |
| action_emoji = "🟡" | |
| else: | |
| action_emoji = "" | |
| analysis_text += f"- **アクション**: {action} {action_emoji}\n" | |
| analysis_text += f"- **コンフィデンス**: {confidence}\n" | |
| if reasoning: | |
| analysis_text += f"- **Reasoning**: {reasoning}\n" | |
| analysis_text += "\n" | |
| # Additional Investment Considerations | |
| analysis_text += "## 🎯 **投資に関する考慮事項**\n" | |
| # Dividend info | |
| dividend_yield = stock_data.get("dividend_yield", "N/A") | |
| if dividend_yield != "N/A": | |
| analysis_text += f"- **配当利回り**: {dividend_yield}\n" | |
| # Analyst ratings | |
| analyst_rating = stock_data.get("analyst_rating", "N/A") | |
| if analyst_rating != "N/A": | |
| analysis_text += f"- **アナリスト評価**: {analyst_rating}\n" | |
| # Price targets | |
| price_target = stock_data.get("price_target", "N/A") | |
| if price_target != "N/A": | |
| analysis_text += f"- **プライス・ターゲット**: {price_target}\n" | |
| # ESG score | |
| esg_score = stock_data.get("esg_score", "N/A") | |
| if esg_score != "N/A": | |
| analysis_text += f"- **ESG Score**: {esg_score}\n" | |
| return analysis_text | |
| except Exception as e: | |
| return f"Error analyzing stock data: {str(e)}" | |
| return None | |
| def determine_intended_tool(message): | |
| """Determine which tool the AI intends to use based on the message""" | |
| message_lower = message.lower() | |
| tool_detection_map = { | |
| "budget_planner": [ | |
| "budget", | |
| "income", | |
| "expense", | |
| "spending", | |
| "allocat", | |
| "monthly", | |
| "plan", | |
| "financial plan", | |
| "money", | |
| "track", | |
| "categoriz", | |
| "cost", | |
| ], | |
| "investment_analyzer": [ | |
| "stock", | |
| "invest", | |
| "buy", | |
| "sell", | |
| "analyze", | |
| "AAPL", | |
| "GOOGL", | |
| "TSLA", | |
| "NVDA", | |
| "NVIDIA", | |
| "MSFT", | |
| "AMZN", | |
| "META", | |
| "share", | |
| "equity", | |
| "ticker", | |
| ], | |
| "portfolio_analyzer": [ | |
| "portfolio", | |
| "holdings", | |
| "allocation", | |
| "diversif", | |
| "asset", | |
| "position", | |
| ], | |
| "market_trends": [ | |
| "market", | |
| "trend", | |
| "news", | |
| "sector", | |
| "economic", | |
| "latest", | |
| "current", | |
| ], | |
| } | |
| tool_names = { | |
| "budget_planner": "Budget Planner", | |
| "investment_analyzer": "Investment Analyzer", | |
| "market_trends": "Market Trends Analyzer", | |
| "portfolio_analyzer": "Portfolio Analyzer", | |
| } | |
| for tool_key, keywords in tool_detection_map.items(): | |
| if any(keyword in message_lower for keyword in keywords): | |
| return tool_key, tool_names.get(tool_key, tool_key) | |
| return None, None | |
| def determine_response_type(message): | |
| """Determine if user wants detailed report or short response""" | |
| message_lower = message.lower() | |
| # Keywords indicating detailed response preference | |
| detailed_keywords = [ | |
| "detailed", | |
| "detail", | |
| "comprehensive", | |
| "thorough", | |
| "in-depth", | |
| "full analysis", | |
| "complete", | |
| "report", | |
| "breakdown", | |
| "explain", | |
| "elaborate", | |
| "deep dive", | |
| "extensive", | |
| "detailed analysis", | |
| "full report", | |
| "comprehensive report", | |
| ] | |
| # Keywords indicating short response preference | |
| short_keywords = [ | |
| "quick", | |
| "brief", | |
| "short", | |
| "summary", | |
| "concise", | |
| "simple", | |
| "fast", | |
| "just tell me", | |
| "quickly", | |
| "in short", | |
| "tldr", | |
| "bottom line", | |
| ] | |
| # Check for detailed indicators first | |
| if any(keyword in message_lower for keyword in detailed_keywords): | |
| return "detailed" | |
| # Check for short indicators | |
| if any(keyword in message_lower for keyword in short_keywords): | |
| return "short" | |
| # Default to short response | |
| return "short" | |
| def process_financial_query(message, history): | |
| """Process user queries through the financial agent with streaming response""" | |
| # Get the actual user message from the last entry in history | |
| if not history or len(history) == 0: | |
| return history | |
| # Extract the last user message | |
| last_user_message = None | |
| for msg in reversed(history): | |
| if msg["role"] == "user": | |
| last_user_message = msg["content"] | |
| break | |
| if not last_user_message: | |
| return history | |
| # Convert Gradio history to agent format (excluding the last user message we just added) | |
| agent_history = [] | |
| for i, msg in enumerate(history[:-1]): # Exclude the last message | |
| agent_history.append( | |
| { | |
| "role": msg["role"], | |
| "content": msg["content"] | |
| if isinstance(msg["content"], str) | |
| else str(msg["content"]), | |
| } | |
| ) | |
| # Start timer | |
| start_time = time.time() | |
| init_message_start_index = len(history) | |
| try: | |
| # Show what tool will be used and processing status | |
| intended_tool_key, intended_tool_name = determine_intended_tool( | |
| last_user_message | |
| ) | |
| response_type = determine_response_type(last_user_message) | |
| # Always show status for all tools with expected time estimates | |
| if intended_tool_name: | |
| if intended_tool_key == "market_trends": | |
| status_msg = "🔍 市場ニュースの取得とトレンドの分析 (20-30秒位、かかります。)..." | |
| elif intended_tool_key == "investment_analyzer": | |
| status_msg = "📈 株価データの分析と指標の計算 (10-15秒位、かかります。)..." | |
| elif intended_tool_key == "budget_planner": | |
| status_msg = "💰 処理予算の分析 (5-10秒位、かかります。)..." | |
| elif intended_tool_key == "portfolio_analyzer": | |
| status_msg = "📊 ポートフォリオデータの分析 (8-12秒位、かかります。)..." | |
| else: | |
| status_msg = ( | |
| f"🔄 ツール: {intended_tool_name} (5-15秒位、かかります。)..." | |
| ) | |
| history.append(ChatMessage(role="assistant", content=status_msg)) | |
| yield history | |
| else: | |
| # If no tool detected, show generic processing message | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content="🧠 リクエストを処理しています (10-15秒位、かかります。)...", | |
| ) | |
| ) | |
| yield history | |
| # Process message through agent | |
| response, tool_used, tool_result, all_tools, all_results = ( | |
| agent.process_message_with_details(last_user_message, agent_history) | |
| ) | |
| # Clear the processing message now that tool is complete | |
| if len(history) > init_message_start_index: | |
| history.pop() # Remove the processing message | |
| # Step 5: Show tool execution results | |
| if all_tools and all_results: | |
| # Remove initialization messages but keep all previous conversation and tool info | |
| history = history[:init_message_start_index] | |
| tool_names = { | |
| "budget_planner": "Budget Planner", | |
| "investment_analyzer": "Investment Analyzer", | |
| "market_trends": "Market Trends Analyzer", | |
| "portfolio_analyzer": "Portfolio Analyzer", | |
| } | |
| tool_emojis = { | |
| "Budget Planner": "💰", | |
| "Investment Analyzer": "📈", | |
| "Market Trends Analyzer": "📰", | |
| "Portfolio Analyzer": "📊", | |
| } | |
| # Show results for all tools used | |
| for i, (used_tool, result) in enumerate(zip(all_tools, all_results)): | |
| tool_display_name = tool_names.get(used_tool, used_tool) | |
| if result: | |
| # Format tool result for display | |
| try: | |
| import json | |
| if result.startswith("{") or result.startswith("["): | |
| # Pretty format JSON output | |
| parsed_result = json.loads(result) | |
| # Truncate very long results for display | |
| if len(str(parsed_result)) > 2000: | |
| # Show summary for long results | |
| if isinstance(parsed_result, dict): | |
| summary = { | |
| k: f"[{type(v).__name__}]" | |
| if isinstance(v, (list, dict)) | |
| else v | |
| for k, v in list(parsed_result.items())[:10] | |
| } | |
| display_result = f"```json\n{json.dumps(summary, indent=2)}\n... (truncated)\n```" | |
| else: | |
| display_result = f"```json\n{json.dumps(parsed_result, indent=2)[:1000]}...\n```" | |
| else: | |
| formatted_result = json.dumps(parsed_result, indent=2) | |
| display_result = f"```json\n{formatted_result}\n```" | |
| else: | |
| # Truncate non-JSON results | |
| display_result = ( | |
| result[:1000] + "..." if len(result) > 1000 else result | |
| ) | |
| except Exception: | |
| display_result = ( | |
| str(result)[:1000] + "..." | |
| if len(str(result)) > 1000 | |
| else str(result) | |
| ) | |
| tool_emoji = tool_emojis.get(tool_display_name, "🔧") | |
| collapsible_content = f""" | |
| <details> | |
| <summary><strong>{tool_emoji} {tool_display_name} 結果</strong> - クリックして展開します</summary> | |
| {display_result} | |
| </details> | |
| """ | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=collapsible_content, | |
| ) | |
| ) | |
| yield history | |
| # Add visualization for all applicable tools | |
| if all_tools and all_results: | |
| for used_tool, result in zip(all_tools, all_results): | |
| if result and used_tool in [ | |
| "budget_planner", | |
| "portfolio_analyzer", | |
| "investment_analyzer", | |
| ]: | |
| viz_type = { | |
| "budget_planner": "budget", | |
| "portfolio_analyzer": "portfolio", | |
| "investment_analyzer": "stock", | |
| }.get(used_tool) | |
| try: | |
| analysis_data = analyze_data_with_repl(viz_type, result) | |
| if analysis_data: | |
| tool_display_name = { | |
| "budget_planner": "Budget", | |
| "portfolio_analyzer": "Portfolio", | |
| "investment_analyzer": "Stock", | |
| }.get(used_tool, "Data") | |
| # Create collapsible data analysis output | |
| collapsible_analysis = f""" | |
| <details> | |
| <summary><strong>🔍 {tool_display_name} データ分析</strong> - クリックして展開します</summary> | |
| {analysis_data} | |
| </details> | |
| """ | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=collapsible_analysis, | |
| ) | |
| ) | |
| yield history | |
| except Exception: | |
| # Silently continue if analysis fails | |
| pass | |
| # Stream the final response in real-time using LLM streaming | |
| if tool_used and tool_result: | |
| # Use real LLM streaming with response type | |
| streaming_content = "" | |
| history.append(ChatMessage(role="assistant", content="")) | |
| for chunk in agent.stream_response( | |
| last_user_message, tool_result, tool_used, response_type | |
| ): | |
| streaming_content += chunk | |
| history[-1] = ChatMessage(role="assistant", content=streaming_content) | |
| yield history | |
| else: | |
| # Fallback for non-streaming response | |
| history.append(ChatMessage(role="assistant", content=response)) | |
| yield history | |
| elapsed = time.time() - start_time | |
| except Exception as e: | |
| elapsed = time.time() - start_time | |
| history[-1] = ChatMessage( | |
| role="assistant", | |
| content=f"リクエストの処理中にエラーが発生しました: {str(e)}. 質問を言い換えてみてください。", | |
| metadata={"title": f"💥 Error ({elapsed:.1f}s)"}, | |
| ) | |
| yield history | |
| # Create the Gradio interface | |
| with gr.Blocks(theme=gr.themes.Base(), title="ファイナンシャル・アドバイザリー・エージェント") as demo: | |
| gr.HTML("""<center><img src="https://www.ryhintl.com/images/ryhlogo/ryhlogo.png" alt="Fin Logo" style="width: 100px; vertical-align: middle;"> | |
| <h1 style="text-align: center;">AI ファイナンシャル・アドバイザリー・エージェント</h1> | |
| 予算編成、投資、ポートフォリオ分析、市場動向に関する AI 搭載のファイナンシャル アドバイザー | |
| </center> | |
| """) | |
| chatbot = gr.Chatbot( | |
| type="messages", | |
| scale=2, | |
| height=400, | |
| avatar_images=AVATAR_IMAGES, | |
| show_copy_button=True, | |
| ) | |
| with gr.Row(equal_height=True): | |
| msg = gr.Textbox( | |
| placeholder="予算、投資、その他金融に関する話題についてお問い合わせください。", | |
| show_label=False, | |
| scale=19, | |
| autofocus=True, | |
| ) | |
| submit = gr.Button("生成", variant="primary", scale=1, min_width=60) | |
| # Example queries | |
| example_queries = [ | |
| "トヨタ自動車の株を分析して、良い投資かどうか教えてください。", | |
| "トヨタ自動車の株についてもっと教えてください。", | |
| "月々の収入が500000円で支出額が400000円の予算を立てるのを手伝ってください。家賃15万円、食費10万円、光熱費10000円です。", | |
| "ハイテク株の最新市場動向は何ですか? ", | |
| "自分のポートフォリオを分析してください: {'holdings': [{'企業名': 'トヨタ自動車株式会社(7203.T)', '株数': 100}, {'企業名': '花王株式会社(4452.T)', '株数': 50}]}", | |
| ] | |
| gr.Examples(examples=example_queries, inputs=msg, label="例文") | |
| # Handle message submission | |
| def user_submit(message, history): | |
| if not message.strip(): | |
| return "", history, gr.update(interactive=True), gr.update(interactive=True) | |
| history = history + [ChatMessage(role="user", content=message)] | |
| return "", history, gr.update(interactive=False), gr.update(interactive=False) | |
| def enable_input(): | |
| return gr.update(interactive=True), gr.update(interactive=True) | |
| # Connect events | |
| submit_event = ( | |
| msg.submit(user_submit, [msg, chatbot], [msg, chatbot, msg, submit]) | |
| .then(process_financial_query, [msg, chatbot], chatbot) | |
| .then(enable_input, [], [msg, submit]) | |
| ) | |
| click_event = ( | |
| submit.click(user_submit, [msg, chatbot], [msg, chatbot, msg, submit]) | |
| .then(process_financial_query, [msg, chatbot], chatbot) | |
| .then(enable_input, [], [msg, submit]) | |
| ) | |
| # Add like functionality for feedback | |
| def like_handler(evt: gr.LikeData): | |
| pass | |
| chatbot.like(like_handler) | |
| if __name__ == "__main__": | |
| demo.launch(ssr_mode=False) | |