import json import os import gradio as gr from typing import Literal from typing_extensions import Annotated from groq import Groq from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager import requests import mysql.connector from qdrant_client import QdrantClient from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent # 通貨変換関数 CurrencySymbol = Literal["USD", "EUR"] # MySQLに接続 conn = mysql.connector.connect( host="www.ryhintl.com", user="smairuser", password="smairuser", port=36000, database="smair" ) # カーソルを取得 cursor = conn.cursor(dictionary=True) # List API Keys select_one_data_query = "SELECT * FROM agentic_apis" cursor.execute(select_one_data_query) result = cursor.fetchall() # JSONをパースしてkeyを抽出 keys = [item['key'] for item in result] os.environ["GROQ_API_KEY"] = keys[2] client = Groq(api_key=os.environ["GROQ_API_KEY"]) def update_params(task_type): if task_type == "Currency Conversion": return "USD", "YEN", "100" elif task_type == "Weather Forecast": return "new york", "", "" elif task_type == "Trend Prediction": return "最近のAIトレンドについて教えてください。", "", "" elif task_type == "Division Revenue": return "営業部の2023年の売上を教えて下さい。", "", "" elif task_type == "Division CAGR": return "営業部の2023年の年間成長率を教えて下さい。", "", "" elif task_type == "Scout": return "トータルソリューションズのAIプロジェクトの立ち上げに必要な専門知識を持っている人材が不足して悩んでいます。AIに関する専門知識を持っている専門家とPYTHONやAGENTの経験を持っている技術者をリスティングして、アドバイスしてください。", "", "" elif task_type == "New Biz": return "吉野家は業績が伸び悩んでいます。吉野家の業績情報をに基づいて新規事業を開拓するためにはどうすれば良いですか? 吉野家の業績情報も出力してください。", "", "" return "", "", "" def clear_params(): return "", "", "" def clear_output(): return "" def exchange_rate(base_currency: CurrencySymbol, quote_currency: CurrencySymbol) -> float: if base_currency == quote_currency: return 1.0 elif base_currency == "USD" and quote_currency == "EUR": return 1 / 1.1 elif base_currency == "EUR" and quote_currency == "USD": return 1.1 elif base_currency == "WON" and quote_currency == "EUR": return 0.000634 elif base_currency == "EUR" and quote_currency == "WON": return 1569 elif base_currency == "WON" and quote_currency == "USD": return 0.00069 elif base_currency == "USD" and quote_currency == "WON": return 1449 elif base_currency == "YEN" and quote_currency == "EUR": return 0.0062 elif base_currency == "EUR" and quote_currency == "YEN": return 161.89 elif base_currency == "YEN" and quote_currency == "USD": return 1.09 elif base_currency == "USD" and quote_currency == "YEN": return 147.59 else: raise ValueError(f"Unknown currencies {base_currency}, {quote_currency}") def currency_calculator(base_amount: float, base_currency: CurrencySymbol = "USD", quote_currency: CurrencySymbol = "EUR") -> str: quote_amount = exchange_rate(base_currency, quote_currency) * base_amount return f"{format(quote_amount, '.2f')} {quote_currency}" # 天気予報関数 def get_current_weather(location: str, unit: str = "fahrenheit") -> str: if "chicago" in location.lower(): return json.dumps({"location": "Chicago", "temperature": "13", "unit": unit}) elif "san francisco" in location.lower(): return json.dumps({"location": "San Francisco", "temperature": "55", "unit": unit}) elif "new york" in location.lower(): return json.dumps({"location": "New York", "temperature": "11", "unit": unit}) elif "seoul" in location.lower(): return json.dumps({"location": "Seoul", "temperature": "50", "unit": unit}) elif "tokyo" in location.lower(): return json.dumps({"location": "Tokyo", "temperature": "59", "unit": unit}) else: return json.dumps({"location": location, "temperature": "unknown"}) def weather_forecast(location: str) -> str: weather_details = get_current_weather(location=location) weather = json.loads(weather_details) return f"{weather['location']} will be {weather['temperature']} degrees {weather['unit']}" def trend_predict(prompt: str) -> str: # Set the system prompt system_prompt = { "role": "system", "content": "You are a helpful assistant, answer questions concisely." } # Set the user prompt user_input = prompt user_prompt = { "role": "user", "content": user_input } # Initialize the chat history chat_history = [system_prompt, user_prompt] response = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=chat_history, max_tokens=1024, temperature=0) return response.choices[0].message.content #async def autocagr(requests="セールズ部門の2023年の年間成長率を教えて下さい。"): #def autocagr(requests="営業部の2023年の年間成長率を教えて下さい。"): def autorevenue(requests): # Configure Groq config_list = [{ "model": "llama-3.3-70b-versatile", "api_key": os.environ.get("GROQ_API_KEY"), "api_type": "groq" }] llm_config = {"config_list": config_list, "timeout": 120} assistant = AssistantAgent( name="assistant", system_message="""タスクを解く際、提供された関数に役立つものがある場合、それ利用して下さい。 最終的な解答を提示した後は「タスク完了」というメッセージを出力してください。""", llm_config=llm_config, max_consecutive_auto_reply=5, ) user_proxy = UserProxyAgent( name="user_proxy", is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("タスク完了"), human_input_mode="NEVER", llm_config=llm_config, max_consecutive_auto_reply=5, code_execution_config={"work_dir":"coding", "use_docker":False} ) @user_proxy.register_for_execution() #@assistant.register_for_llm(description="部門の年間成長率を取り出す関数") #def extract_cagr(division: Annotated[str, "部門名(営業部、カスタマーサポート部、製品開発部、マーケティング部、オンライン販売部)"]) -> str: @assistant.register_for_llm(description="部門の売上を取り出す関数") def extract_cagr(division: Annotated[str, "部門名(営業部、カスタマーサポート部、製品開発部、マーケティング部、オンライン販売部)"]) -> str: #select_one_data_query = "SELECT * FROM corp_cagr where 部門 = '"+division+"'" #select_one_data_query = "SELECT * FROM corp_cagr" select_one_data_query = "SELECT `部門`,FORMAT(SUM(`売上高`),0) as `売上` FROM corp_jkpi group by `部門`" cursor.execute(select_one_data_query) result = cursor.fetchall() return result # the assistant receives a message from the user_proxy, which contains the task description chat_result = user_proxy.initiate_chat(assistant, message=requests) # Extract content where role is "user" and name is "assistant" extracted_content = [item["content"] for item in chat_result.chat_history if item["role"] == "user" and item["name"] == "assistant"] return extracted_content[0] def autocagr(requests): # Configure Groq config_list = [{ "model": "llama-3.3-70b-versatile", "api_key": os.environ.get("GROQ_API_KEY"), "api_type": "groq" }] llm_config = {"config_list": config_list, "timeout": 120} assistant = AssistantAgent( name="assistant", system_message="""タスクを解く際、提供された関数に役立つものがある場合、それ利用して下さい。 最終的な解答を提示した後は「タスク完了」というメッセージを出力してください。""", llm_config=llm_config, max_consecutive_auto_reply=5, ) user_proxy = UserProxyAgent( name="user_proxy", is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("タスク完了"), human_input_mode="NEVER", llm_config=llm_config, max_consecutive_auto_reply=5, code_execution_config={"work_dir":"coding", "use_docker":False} ) @user_proxy.register_for_execution() @assistant.register_for_llm(description="部門の年間成長率を取り出す関数") def extract_cagr(division: Annotated[str, "部門名(営業部、カスタマーサポート部、製品開発部、マーケティング部、オンライン販売部)"]) -> str: select_one_data_query = "SELECT `部門`, YEAR(`年度`), CONCAT(`年間成長率`, '%') AS `年間成長率` FROM `corp_cagr` group by `部門`, `年度`" cursor.execute(select_one_data_query) result = cursor.fetchall() return result # the assistant receives a message from the user_proxy, which contains the task description chat_result = user_proxy.initiate_chat(assistant, message=requests) # Extract content where role is "user" and name is "assistant" extracted_content = [item["content"] for item in chat_result.chat_history if item["role"] == "user" and item["name"] == "assistant"] return extracted_content[0] def gen_scout(requests): #トータルソリューションズのAIプロジェクトの立ち上げに必要な専門知識を持っている人材が不足して悩んでいます。AIに関する専門知識を持っている専門家とPYTHONやAGENTの経験を持っている技術者をリスティングして、アドバイスしてください。 userPrompt=requests assistant="あなたはヘッドハンティング会社のアナリストです。" ragproxyagent="あなたは高度な知識を持つヘッドハンティング会社の上席コンサルタントです。assistantによる応答を分析し、詳細情報を提供します。" # GROQ config config_list = [ { "model": "llama-3.3-70b-versatile", "api_key": os.environ["GROQ_API_KEY"], "api_type": "groq", } ] assistant = AssistantAgent( name="assistant", system_message="You are a helpful assistant.", llm_config={ "timeout": 600, "cache_seed": 42, "config_list": config_list, }, ) qdrclient = QdrantClient(url="https://02cbe366-829e-43a6-adf5-3b712a886c21.us-west-1-0.aws.cloud.qdrant.io", api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.hWu5_qKaYHUhuMAjUScqw1R_1kkXiidv337wuGKcu9o") # Create user proxy agent RetrieveUserProxyAgent( ragproxyagent = RetrieveUserProxyAgent( name="ragproxyagent", human_input_mode="NEVER", max_consecutive_auto_reply=3, retrieve_config = { "task": "qa", "docs_path": [ "https://www.ryhintl.com/hunting.md" ], "chunk_token_size": 2000, "model": config_list[0]["model"], "vector_db": "qdrant", # Qdrant Cloud database "collection_name": "scouter_base", "db_config": {"client": qdrclient}, "get_or_create": True, # set to False if you don't want to reuse an existing collection "overwrite": True, # set to True if you want to overwrite an existing collection, each overwrite will force a index creation and reupload of documents }, code_execution_config=False, # set to False if you don't want to execute the code ) # Create a group chat between all agents groupChat = GroupChat( agents=[assistant, ragproxyagent], messages=[], max_round=3, ) groupChatManager = GroupChatManager( groupchat=groupChat, llm_config={"config_list": config_list} ) chat_result = ragproxyagent.initiate_chat(assistant, message=ragproxyagent.message_generator, problem=userPrompt) chat_result_dict = vars(chat_result) user_contents = [entry['content'] for entry in chat_result_dict['chat_history'] if entry['role'] == 'user'] basic_content = [entry["content"] for entry in chat_result_dict["chat_history"] if entry["role"] == "assistant"] return user_contents[0]+"\n\n"+basic_content[0] def gen_newbiz(requests): userPrompt=requests+" pythonコードは作成しないでください。" assistant="あなたはコンサルタントです。" ragproxyagent="あなたは高度な知識を持つコンサルティング会社の上席コンサルタントです。assistantによる応答を分析し、詳細情報を提供します。" # GROQ config config_list = [ { "model": "llama-3.3-70b-versatile", "api_key": os.environ["GROQ_API_KEY"], "api_type": "groq", } ] assistant = AssistantAgent( name="assistant", system_message="You are a helpful assistant.", llm_config={ "timeout": 600, "cache_seed": 42, "config_list": config_list, }, ) # Create user proxy agent RetrieveUserProxyAgent( ragproxyagent = RetrieveUserProxyAgent( name="ragproxyagent", human_input_mode="NEVER", max_consecutive_auto_reply=3, retrieve_config={ "task": "default", "docs_path": [ "https://www.ryhintl.com/yoshinoya.md" ], "chunk_token_size": 2000, "model": config_list[0]["model"], "vector_db": "mongodb", # MongoDB Atlas database "collection_name": "yoshinoyaHD", "db_config": { "connection_string": "mongodb+srv://richardhuh0629:VIPpOA51jOsiirVY@cluster0.yzn9dzx.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0", # MongoDB Atlas connection string "database_name": "companies", # MongoDB Atlas database "index_name": "default", "wait_until_index_ready": 120.0, # Setting to wait 120 seconds or until index is constructed before querying "wait_until_document_ready": 120.0, # Setting to wait 120 seconds or until document is properly indexed after insertion/update }, "get_or_create": True, # set to False if you don't want to reuse an existing collection "overwrite": True, # set to True if you want to overwrite an existing collection, each overwrite will force a index creation and reupload of documents }, code_execution_config=False, # set to False if you don't want to execute the code ) # Create a group chat between all agents groupChat = GroupChat( agents=[assistant, ragproxyagent], messages=[], max_round=3, ) groupChatManager = GroupChatManager( groupchat=groupChat, llm_config={"config_list": config_list} ) chat_result = ragproxyagent.initiate_chat(assistant, message=ragproxyagent.message_generator, problem=userPrompt) chat_result_dict = vars(chat_result) user_contents = [entry['content'] for entry in chat_result_dict['chat_history'] if entry['role'] == 'user'] basic_content = [entry["content"] for entry in chat_result_dict["chat_history"] if entry["role"] == "assistant"] return user_contents[0] #return user_contents[0]+"\n\n"+basic_content[0] # Gradio UIインターフェース設定 def gradio_interface_task(task_type: str, param1: str, param2: str = None, param3: float = None): if task_type == "Currency Conversion": if param1 in ["USD", "EUR", "WON", "YEN"]: return currency_calculator(float(param3), param1, param2) else: return "Undefined Currency" elif task_type == "Weather Forecast": mycity = param1.lower() #if param1 in ["san francisco", "new york", "chicago", "seoul", "tokyo"]: if mycity in ["san francisco", "new york", "chicago", "seoul", "tokyo"]: return weather_forecast(param1) else: return "Undefined City" elif task_type == "Trend Prediction": return trend_predict(param1) elif task_type == "Division Revenue": return autorevenue(param1) elif task_type == "Division CAGR": return autocagr(param1) elif task_type == "Scout": return gen_scout(param1) elif task_type == "New Biz": return gen_newbiz(param1) else: return "Invalid Task Type" # Gradioアプリケーション with gr.Blocks(title="Agentic RAG") as demo: gr.Markdown("# 🗞️ AGENTIC RAG") with gr.Blocks(): gr.HTML('