| |
|
| | import json |
| | import os |
| | import random |
| |
|
| | from gradio import ChatMessage |
| | from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, ToolMessage |
| | from langgraph.graph import StateGraph, START, END |
| | from langgraph.graph.message import add_messages |
| | from langgraph.prebuilt import ToolNode |
| | from support.log_manager import logger |
| | from support.my_tools import captain_agent_tools, boss_agent_tools |
| | from support.tools.boss_agent_tools import create_fake_tool_call |
| | from support.tools.captain_agent_tools import create_agent_fake_tool_call |
| | from support.prompts import captain_agent_system_prompt, player_agent_system_prompt, boss_agent_system_prompt |
| | from typing import Annotated, List, Literal, Optional |
| | from typing_extensions import TypedDict |
| |
|
| |
|
| | class State(TypedDict): |
| | messages: Annotated[list, add_messages] |
| | chat_history: List[str] |
| | round_messages: Annotated[list, add_messages] |
| | original_board: dict |
| | board: dict |
| | players: List |
| | current_team: Literal['red', 'blue'] |
| | current_role: str |
| | turn: int |
| | last_user_message: str |
| | guesses: List[str] |
| | clue: Optional[str] |
| | clue_number: Optional[int] |
| | next_team: Literal['red', 'blue'] |
| | history_guessed_words: List[str] |
| | human_clue: str |
| | human_clue_number: int |
| | teams_reviewed: List[str] |
| | end_round: bool |
| | winner_and_score: tuple |
| |
|
| | red_boss_is_called_counter: int |
| | blue_boss_is_called_counter: int |
| | red_captain_is_called_counter: int |
| | blue_captain_is_called_counter: int |
| |
|
| |
|
| | class MyGraph: |
| | def __init__(self): |
| |
|
| | self.red_team = self._create_red_team_graph() |
| | self.blue_team = self._create_blue_team_graph() |
| | self.graph = self._create_graph() |
| |
|
| | self.players = [] |
| | self.board = None |
| | self.guessed_words = [] |
| | self.current_team = "" |
| | self.chat_history = [] |
| | self.winners = [] |
| |
|
| | self.IS_HUMAN_PLAYING = None |
| |
|
| | def _create_red_team_graph(self): |
| | """Compile red team subgraph""" |
| | builder = StateGraph(State) |
| |
|
| | |
| | builder.add_node("red_boss", self.red_boss) |
| | builder.add_node("red_captain", self.red_captain) |
| | builder.add_node("red_agent_1", self.red_agent_1) |
| | builder.add_node("red_agent_2", self.red_agent_2) |
| | builder.add_node("red_boss_is_called", self.red_boss_is_called) |
| | builder.add_node("red_captain_is_called", self.red_captain_is_called) |
| |
|
| | |
| | choose_word_tool = ToolNode(tools=[boss_agent_tools[0]]) |
| | final_choice = ToolNode(tools=[captain_agent_tools[0]]) |
| | transfer_to_agent_1 = ToolNode(tools=[captain_agent_tools[1]]) |
| | transfer_to_agent_2 = ToolNode(tools=[captain_agent_tools[2]]) |
| |
|
| | builder.add_node("choose_word_tool", choose_word_tool) |
| | builder.add_node("final_choice", final_choice) |
| | builder.add_node("transfer_to_agent_1", transfer_to_agent_1) |
| | builder.add_node("transfer_to_agent_2", transfer_to_agent_2) |
| | builder.add_node("update_turn", self.update_turn) |
| |
|
| | |
| | builder.add_edge(START, "red_boss") |
| | builder.add_conditional_edges( |
| | "red_boss", |
| | self.boss_choice, |
| | { |
| | "choose_word_tool": "choose_word_tool", |
| | "red_boss_is_called": "red_boss_is_called", |
| | }, |
| | ) |
| |
|
| | builder.add_edge("red_boss_is_called", "red_boss") |
| | builder.add_edge("choose_word_tool", "red_captain") |
| |
|
| | builder.add_conditional_edges( |
| | "red_captain", |
| | self.should_continue, |
| | { |
| | "final_choice": "final_choice", |
| | "transfer_to_agent_1": "transfer_to_agent_1", |
| | "transfer_to_agent_2": "transfer_to_agent_2", |
| | "red_captain_is_called": "red_captain_is_called", |
| | }, |
| | ) |
| |
|
| | builder.add_edge("red_captain_is_called", "red_captain") |
| | builder.add_edge("transfer_to_agent_1", "red_agent_1") |
| | builder.add_edge("transfer_to_agent_2", "red_agent_2") |
| | builder.add_edge("red_agent_1", "red_captain") |
| | builder.add_edge("red_agent_2", "red_captain") |
| | builder.add_edge("final_choice", "update_turn") |
| | builder.add_edge("update_turn", END) |
| |
|
| | return builder.compile() |
| |
|
| | def _create_blue_team_graph(self): |
| | """Compile blue team subgraph""" |
| | builder = StateGraph(State) |
| |
|
| | |
| | builder.add_node("blue_boss", self.blue_boss) |
| | builder.add_node("blue_captain", self.blue_captain) |
| | builder.add_node("blue_agent_1", self.blue_agent_1) |
| | builder.add_node("blue_agent_2", self.blue_agent_2) |
| | builder.add_node("blue_boss_is_called", self.blue_boss_is_called) |
| | builder.add_node("blue_captain_is_called", self.blue_captain_is_called) |
| |
|
| | |
| | choose_word_tool = ToolNode(tools=[boss_agent_tools[0]]) |
| | final_choice = ToolNode(tools=[captain_agent_tools[0]]) |
| | transfer_to_agent_1 = ToolNode(tools=[captain_agent_tools[1]]) |
| | transfer_to_agent_2 = ToolNode(tools=[captain_agent_tools[2]]) |
| |
|
| | builder.add_node("choose_word_tool", choose_word_tool) |
| | builder.add_node("final_choice", final_choice) |
| | builder.add_node("transfer_to_agent_1", transfer_to_agent_1) |
| | builder.add_node("transfer_to_agent_2", transfer_to_agent_2) |
| | builder.add_node("update_turn", self.update_turn) |
| |
|
| | |
| | builder.add_edge(START, "blue_boss") |
| | builder.add_conditional_edges( |
| | "blue_boss", |
| | self.boss_choice, |
| | { |
| | "choose_word_tool": "choose_word_tool", |
| | "blue_boss_is_called": "blue_boss_is_called", |
| | }, |
| | ) |
| |
|
| | builder.add_edge("blue_boss_is_called", "blue_boss") |
| | builder.add_edge("choose_word_tool", "blue_captain") |
| |
|
| | builder.add_conditional_edges( |
| | "blue_captain", |
| | self.should_continue, |
| | { |
| | "final_choice": "final_choice", |
| | "transfer_to_agent_1": "transfer_to_agent_1", |
| | "transfer_to_agent_2": "transfer_to_agent_2", |
| | "blue_captain_is_called": "blue_captain_is_called", |
| | }, |
| | ) |
| |
|
| | builder.add_edge("blue_captain_is_called", "blue_captain") |
| | builder.add_edge("transfer_to_agent_1", "blue_agent_1") |
| | builder.add_edge("transfer_to_agent_2", "blue_agent_2") |
| | builder.add_edge("blue_agent_1", "blue_captain") |
| | builder.add_edge("blue_agent_2", "blue_captain") |
| | builder.add_edge("final_choice", "update_turn") |
| | builder.add_edge("update_turn", END) |
| |
|
| | return builder.compile() |
| |
|
| | def _create_graph(self) -> StateGraph: |
| | """Create and compile the graph.""" |
| |
|
| | builder = StateGraph(State) |
| |
|
| | red_graph = self._create_red_team_graph() |
| | blue_graph = self._create_blue_team_graph() |
| |
|
| | builder.add_node("judge", self.judge) |
| | |
| | |
| | builder.add_node("red_team", red_graph) |
| | builder.add_node("blue_team", blue_graph) |
| |
|
| | builder.add_edge(START, "judge") |
| | builder.add_conditional_edges( |
| | "judge", |
| | self.route_after_judge, |
| | ["red_team", "blue_team", END], |
| | ) |
| | builder.add_edge("red_team", "judge") |
| | builder.add_edge("blue_team", "judge") |
| |
|
| | graph = builder.compile() |
| |
|
| | |
| | if not os.path.exists("graph.png"): |
| | try: |
| | img = graph.get_graph(xray=True).draw_mermaid_png() |
| | with open("graph.png", "wb") as f: |
| | f.write(img) |
| | except Exception as e: |
| | logger.error(f"[GRAPH IMAGE ERROR]: {e}") |
| |
|
| | return graph |
| |
|
| | |
| | async def red_boss(self, state: State): |
| | """Red team boss gives a clue""" |
| | logger.info("[RED BOSS] MOMENT ") |
| | boss = next((p for p in state["players"] if p.team == "red" and p.role == "boss"), None) |
| | if not boss: |
| | return state |
| |
|
| | board = state["board"] |
| | team_name = state["current_team"].upper() |
| | formatted_boss_system_prompt = boss_agent_system_prompt.format(team_name) |
| | chat_history = state["chat_history"] |
| | |
| | |
| | |
| | |
| | formatted_history, current_round_messages = self._split_chat_history(chat_history, viewer_role="boss") |
| | self.current_team = state["current_team"] |
| |
|
| | new_message = [ |
| | { |
| | "role": "system", |
| | "content": formatted_boss_system_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f""" |
| | Keep in mind the history of the game so far:\n |
| | [HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n |
| | [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
| | Here is the current board:\n |
| | Red words: {", ".join(board['red'])}\n |
| | Blue words: {", ".join(board['blue'])}\n |
| | Neutral words: {", ".join(board['neutral'])}\n |
| | Killer word: {board['killer']}\n\n |
| | Based on this board, provide a clue and a number of words that relate to that clue. |
| | """ |
| | } |
| | ] |
| |
|
| | if self.IS_HUMAN_PLAYING and boss.model_name == "Human brain": |
| | clue_ = state['human_clue'] |
| | clue_number = state['human_clue_number'] |
| | answer = create_fake_tool_call(clue_, clue_number) |
| | else: |
| | if boss.model_name == "claude-sonnet-4-5-20250929": |
| | llm_with_tools = boss.model.bind_tools(boss_agent_tools) |
| | else: |
| | llm_with_tools = boss.model.bind_tools(boss_agent_tools, tool_choice="ChooseWord") |
| | answer = await llm_with_tools.ainvoke(new_message) |
| |
|
| | logger.info(f"[RED BOSS ANSWER]: {answer}") |
| |
|
| | chat_entry, clue, clue_number, _ = self._format_chat_entry(boss, answer) |
| |
|
| | return { |
| | "messages": [answer], |
| | "current_role": "captain", |
| | "chat_history": state.get("chat_history", []) + [chat_entry], |
| | "clue": clue, |
| | "clue_number": clue_number, |
| | } |
| |
|
| | async def red_captain(self, state: State): |
| | """Red team captain coordinates guessing""" |
| |
|
| | logger.info("°°°"*50) |
| | logger.info(f"[RED CAPTAIN] State clue: {state['clue']}") |
| | captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None) |
| | agents = [p for p in state["players"] if p.team == "red" and p.role == "player"] |
| | if not captain: |
| | return state |
| |
|
| | board = state["board"] |
| |
|
| | available_words = ( |
| | board["red"] + |
| | board["blue"] + |
| | board["neutral"] + |
| | [board["killer"]] |
| | ) |
| |
|
| | random.shuffle(available_words) |
| |
|
| | logger.info(f"AVAILABLE WORDS: {available_words}") |
| | chat_history = state["chat_history"] |
| | |
| | |
| | |
| | |
| |
|
| | formatted_history, current_round_messages = self._split_chat_history(chat_history) |
| |
|
| | team_name = state["current_team"].upper() |
| | formatted_captain_system_prompt = captain_agent_system_prompt.format( |
| | team_name, |
| | agents[0].name, |
| | agents[1].name |
| | ) |
| |
|
| | new_message = [ |
| | { |
| | "role": "system", |
| | "content": formatted_captain_system_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f""" |
| | Consider the history of the game so far:\n[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n |
| | [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
| | Here is the list of words on the board:\n{', '.join(available_words)}\n\n |
| | This is what your boss said: '{state['clue']}' {state['clue_number']}, suggest which words to guess. |
| | """ |
| | } |
| | ] |
| |
|
| | llm_with_tools = captain.model.bind_tools( |
| | captain_agent_tools, |
| | ) |
| |
|
| | if state['red_captain_is_called_counter'] > 4: |
| | logger.info("Creating fake answer to stop loop") |
| | answer = create_agent_fake_tool_call() |
| | else: |
| | logger.info(f"red_captain_is_called_counter: {state['red_captain_is_called_counter']}") |
| | answer = await llm_with_tools.ainvoke(new_message) |
| |
|
| | logger.info(f"[RED CAPTAIN ANSWER]: {answer}") |
| | logger.info("°°°"*50) |
| | chat_entry, _, _, guesses = self._format_chat_entry(captain, answer) |
| |
|
| | return { |
| | "messages": [answer], |
| | "chat_history": state.get("chat_history", []) + [chat_entry], |
| | "guesses": guesses |
| | } |
| |
|
| | async def red_agent_1(self, state: State): |
| | """Red team agent 1 discusses the clue""" |
| |
|
| | logger.info("---"*50) |
| | logger.info("[RED AGENT 1]") |
| | logger.info("MESSAGES") |
| | logger.info(state['messages']) |
| | logger.info("---"*20) |
| |
|
| | chat_history = state["chat_history"] |
| | |
| | |
| | |
| | |
| | formatted_history, current_round_messages = self._split_chat_history(chat_history) |
| |
|
| | captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None) |
| | agents = [p for p in state["players"] if p.team == "red" and p.role == "player"] |
| | if not agents: |
| | return state |
| |
|
| | agent = agents[0] |
| |
|
| | board = state["board"] |
| |
|
| | available_words = ( |
| | board["red"] + |
| | board["blue"] + |
| | board["neutral"] + |
| | [board["killer"]] |
| | ) |
| |
|
| | random.shuffle(available_words) |
| |
|
| | team_name = state["current_team"].upper() |
| | formatted_player_system_prompt = player_agent_system_prompt.format( |
| | team_name, |
| | captain.name, |
| | agents[1].name |
| | ) |
| |
|
| | new_message = [ |
| | { |
| | "role": "system", |
| | "content": formatted_player_system_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f""" |
| | [HISTORY]\n{formatted_history}\n[END HISTORY]\n\n |
| | [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
| | Available words: {', '.join(available_words)} |
| | """ |
| | } |
| | ] |
| |
|
| | answer = await agent.model.ainvoke(new_message) |
| | chat_entry, _, _, _ = self._format_chat_entry(agent, answer) |
| |
|
| | logger.info(['RED AGENT 1 ANSWER']) |
| | logger.info(answer) |
| |
|
| | return { |
| | "messages": [answer], |
| | "chat_history": state.get("chat_history", []) + [chat_entry] |
| | } |
| |
|
| | async def red_agent_2(self, state: State): |
| | """Red team agent 2 discusses the clue""" |
| |
|
| | logger.info(f"[RED AGENT 2] State clue: {state['clue']}") |
| | chat_history = state["chat_history"] |
| | |
| | |
| | |
| | |
| | formatted_history, current_round_messages = self._split_chat_history(chat_history) |
| |
|
| | captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None) |
| | agents = [p for p in state["players"] if p.team == "red" and p.role == "player"] |
| | if len(agents) < 2: |
| | return state |
| |
|
| | agent = agents[1] |
| | board = state["board"] |
| |
|
| | available_words = ( |
| | board["red"] + |
| | board["blue"] + |
| | board["neutral"] + |
| | [board["killer"]] |
| | ) |
| |
|
| | random.shuffle(available_words) |
| |
|
| | team_name = state["current_team"].upper() |
| | formatted_player_system_prompt = player_agent_system_prompt.format( |
| | team_name, |
| | captain.name, |
| | agents[0].name |
| | ) |
| |
|
| | new_message = [ |
| | { |
| | "role": "system", |
| | "content": formatted_player_system_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f""" |
| | [HISTORY]\n{formatted_history}\n[END HISTORY]\n\n |
| | [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
| | Available words: {', '.join(available_words)} |
| | """ |
| | } |
| | ] |
| |
|
| | answer = await agent.model.ainvoke(new_message) |
| | chat_entry, _, _, _ = self._format_chat_entry(agent, answer) |
| |
|
| | logger.info(['RED AGENT 2 ANSWER']) |
| | logger.info(answer) |
| |
|
| | return { |
| | "messages": [answer], |
| | "chat_history": state.get("chat_history", []) + [chat_entry] |
| | } |
| |
|
| | async def blue_boss(self, state: State): |
| | """Blue team boss gives a clue""" |
| | logger.info("[BLUE BOSS MOMENT]") |
| | chat_history = state["chat_history"] |
| | |
| | |
| | |
| | |
| | formatted_history, current_round_messages = self._split_chat_history(chat_history, viewer_role="boss") |
| |
|
| | boss = next((p for p in state["players"] if p.team == "blue" and p.role == "boss"), None) |
| | if not boss: |
| | return state |
| |
|
| | board = state["board"] |
| | team_name = state["current_team"].upper() |
| | formatted_boss_system_prompt = boss_agent_system_prompt.format(team_name) |
| | self.current_team = state["current_team"] |
| |
|
| | new_message = [ |
| | { |
| | "role": "system", |
| | "content": formatted_boss_system_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f""" |
| | Keep in mind the history of the game so far:\n |
| | [HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n |
| | [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
| | Here is the current board:\n |
| | Red words: {", ".join(board['red'])}\n |
| | Blue words: {", ".join(board['blue'])}\n |
| | Neutral words: {", ".join(board['neutral'])}\n |
| | Killer word: {board['killer']}\n\n |
| | Based on this board, provide a clue and a number of words that relate to that clue. |
| | """ |
| | } |
| | ] |
| |
|
| | if self.IS_HUMAN_PLAYING and boss.model_name == "Human brain": |
| | clue_ = state['human_clue'] |
| | clue_number = state['human_clue_number'] |
| | answer = create_fake_tool_call(clue_, clue_number) |
| | else: |
| | if boss.model_name == "claude-sonnet-4-5-20250929": |
| | llm_with_tools = boss.model.bind_tools(boss_agent_tools) |
| | else: |
| | llm_with_tools = boss.model.bind_tools(boss_agent_tools, tool_choice="ChooseWord") |
| | answer = await llm_with_tools.ainvoke(new_message) |
| |
|
| | logger.info(f"[BLUE BOSS ANSWER] : {answer}") |
| |
|
| | chat_entry, clue, clue_number, _ = self._format_chat_entry(boss, answer) |
| |
|
| | return { |
| | "messages": [answer], |
| | "current_role": "captain", |
| | "chat_history": state.get("chat_history", []) + [chat_entry], |
| | "clue": clue, |
| | "clue_number": clue_number, |
| | } |
| |
|
| | async def blue_captain(self, state: State): |
| | """Blue team captain coordinates guessing""" |
| |
|
| | logger.info("°°°"*50) |
| | logger.info(f"[BLUE CAPTAIN] State clue: {state['clue']}") |
| | captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None) |
| | agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"] |
| | if not captain: |
| | return state |
| |
|
| | board = state["board"] |
| | chat_history = state["chat_history"] |
| | |
| | |
| | |
| | |
| | formatted_history, current_round_messages = self._split_chat_history(chat_history) |
| |
|
| | available_words = ( |
| | board["red"] + |
| | board["blue"] + |
| | board["neutral"] + |
| | [board["killer"]] |
| | ) |
| |
|
| | random.shuffle(available_words) |
| |
|
| | team_name = state["current_team"].upper() |
| | formatted_captain_system_prompt = captain_agent_system_prompt.format( |
| | team_name, |
| | agents[0].name, |
| | agents[1].name |
| | ) |
| |
|
| | new_message = [ |
| | { |
| | "role": "system", |
| | "content": formatted_captain_system_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f""" |
| | Consider the history of the game so far:\n[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n |
| | [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
| | Here is the list of words on the board:\n{', '.join(available_words)}\n\n |
| | This is what your boss said: '{state['clue']}' {state['clue_number']}, suggest which words to guess. |
| | """ |
| | } |
| | ] |
| |
|
| | llm_with_tools = captain.model.bind_tools( |
| | captain_agent_tools, |
| | ) |
| |
|
| | if state['blue_captain_is_called_counter'] > 4: |
| | logger.info("Creating fake answer to stop loop") |
| | answer = create_agent_fake_tool_call() |
| | else: |
| | logger.info(f"blue_captain_is_called_counter: {state['blue_captain_is_called_counter']}") |
| | answer = await llm_with_tools.ainvoke(new_message) |
| |
|
| | logger.info(f"[BLUE CAPTAIN ANSWER] : {answer}") |
| | logger.info("°°°"*50) |
| | chat_entry, _, _, guesses = self._format_chat_entry(captain, answer) |
| |
|
| | return { |
| | "messages": [answer], |
| | "chat_history": state.get("chat_history", []) + [chat_entry], |
| | "guesses": guesses |
| | } |
| |
|
| | async def blue_agent_1(self, state: State): |
| | """Blue team agent 1 discusses the clue""" |
| | logger.info("---"*50) |
| | logger.info("[BLUE AGENT 1]") |
| | chat_history = state["chat_history"] |
| | |
| | |
| | |
| | |
| | formatted_history, current_round_messages = self._split_chat_history(chat_history) |
| |
|
| | captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None) |
| | agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"] |
| | if not agents: |
| | return state |
| |
|
| | agent = agents[0] |
| | board = state["board"] |
| |
|
| | available_words = ( |
| | board["red"] + |
| | board["blue"] + |
| | board["neutral"] + |
| | [board["killer"]] |
| | ) |
| |
|
| | random.shuffle(available_words) |
| |
|
| | team_name = state["current_team"].upper() |
| | formatted_player_system_prompt = player_agent_system_prompt.format( |
| | team_name, |
| | captain.name, |
| | agents[1].name |
| | ) |
| |
|
| | new_message = [ |
| | { |
| | "role": "system", |
| | "content": formatted_player_system_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f""" |
| | [HISTORY]\n{formatted_history}\n[END HISTORY]\n\n |
| | [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
| | Available words: {', '.join(available_words)} |
| | """ |
| | } |
| | ] |
| |
|
| | answer = await agent.model.ainvoke(new_message) |
| |
|
| | logger.info(['BLUE AGENT 1 ANSWER']) |
| | logger.info(answer) |
| | chat_entry, _, _, _ = self._format_chat_entry(agent, answer) |
| |
|
| | return { |
| | "messages": [answer], |
| | "chat_history": state.get("chat_history", []) + [chat_entry] |
| | } |
| |
|
| | async def blue_agent_2(self, state: State): |
| | """Blue team agent 2 discusses the clue""" |
| | logger.info("---"*50) |
| | logger.info("[BLUE AGENT 2]") |
| | chat_history = state["chat_history"] |
| | |
| | |
| | |
| | |
| | formatted_history, current_round_messages = self._split_chat_history(chat_history) |
| |
|
| | captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None) |
| | agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"] |
| | if not agents: |
| | return state |
| |
|
| | agent = agents[1] |
| | board = state["board"] |
| |
|
| | available_words = ( |
| | board["red"] + |
| | board["blue"] + |
| | board["neutral"] + |
| | [board["killer"]] |
| | ) |
| |
|
| | random.shuffle(available_words) |
| |
|
| | team_name = state["current_team"].upper() |
| | formatted_player_system_prompt = player_agent_system_prompt.format( |
| | team_name, |
| | captain.name, |
| | agents[0].name |
| | ) |
| |
|
| | new_message = [ |
| | { |
| | "role": "system", |
| | "content": formatted_player_system_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f""" |
| | [HISTORY]\n{formatted_history}\n[END HISTORY]\n\n |
| | [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
| | Available words: {', '.join(available_words)} |
| | """ |
| | } |
| | ] |
| |
|
| | answer = await agent.model.ainvoke(new_message) |
| |
|
| | logger.info(['BLUE AGENT 2 ANSWER']) |
| | logger.info(answer) |
| | chat_entry, _, _, _ = self._format_chat_entry(agent, answer) |
| |
|
| | return { |
| | "messages": [answer], |
| | "chat_history": state.get("chat_history", []) + [chat_entry] |
| | } |
| |
|
| | def update_turn(self, state: State): |
| | """Update turn counter""" |
| | turn = state.get("turn", 1) |
| |
|
| | return { |
| | "turn": turn+1 |
| | } |
| |
|
| | def red_boss_is_called(self, state: State): |
| | """Update turn counter""" |
| | _counter = state.get("red_boss_is_called_counter", 1) |
| |
|
| | return { |
| | "red_boss_is_called_counter": _counter+1 |
| | } |
| |
|
| | def blue_boss_is_called(self, state: State): |
| | """Update turn counter""" |
| | _counter = state.get("blue_boss_is_called_counter", 1) |
| |
|
| | return { |
| | "blue_boss_is_called_counter": _counter+1 |
| | } |
| |
|
| | def red_captain_is_called(self, state: State): |
| | """Update turn counter""" |
| | _counter = state.get("red_captain_is_called_counter", 1) |
| |
|
| | return { |
| | "red_captain_is_called_counter": _counter+1 |
| | } |
| |
|
| | def blue_captain_is_called(self, state: State): |
| | """Update turn counter""" |
| | _counter = state.get("blue_captain_is_called_counter", 1) |
| |
|
| | return { |
| | "blue_captain_is_called_counter": _counter+1 |
| | } |
| |
|
| | def judge(self, state: State): |
| | """Evaluate guesses, update board, check win/lose conditions.""" |
| |
|
| | |
| | def check_win_condition(): |
| | """Returns (is_game_over, winner, win_message) tuple""" |
| | red_remaining = len(board.get("red", [])) |
| | blue_remaining = len(board.get("blue", [])) |
| |
|
| | if red_remaining == 0: |
| | return True, "red", (red_remaining, blue_remaining) |
| |
|
| | if blue_remaining == 0: |
| | return True, "blue", (red_remaining, blue_remaining) |
| |
|
| | return False, None, None |
| |
|
| | |
| | def create_multi_message_state(messages_content_list, next_team=None, switch_role=False, winner_and_score=None): |
| | """Creates state with multiple messages""" |
| | messages = [] |
| | chat_entries = [] |
| |
|
| | for content, title in messages_content_list: |
| | message = AIMessage( |
| | content=content, |
| | metadata={"title": title, "sender": "judge"} |
| | ) |
| | messages.append(message) |
| |
|
| | if title == "⚖️ Judge Decision": |
| | info = "chat_history" |
| | else: |
| | info = None |
| |
|
| | chat_entry, _, _, _ = self._format_chat_entry(None, message, info=info) |
| | chat_entries.append(chat_entry) |
| |
|
| | logger.info("****" * 50) |
| |
|
| | filtered_chat = self._filter_important_messages(state.get("chat_history", [])) |
| | logger.info("**"*50) |
| | logger.info("FILTERED CHAT") |
| | logger.info(filtered_chat) |
| | logger.info("**"*50) |
| |
|
| | end_state = { |
| | "messages": messages, |
| | "chat_history": filtered_chat + chat_entries, |
| | "board": board, |
| | "guesses": [], |
| | "history_guessed_words": history_guessed_words, |
| | "teams_reviewed": teams_reviewed, |
| | "end_round": end_round, |
| | "winner_and_score": winner_and_score |
| | } |
| |
|
| | if next_team: |
| | end_state.update({ |
| | "current_team": next_team, |
| | "current_role": "boss" if switch_role else state.get("current_role"), |
| | "turn": state.get("turn", 1) + 1, |
| | "next_team": next_team, |
| | }) |
| |
|
| | return end_state |
| |
|
| | |
| | def create_turn_end_messages(results_list, next_team): |
| | """Creates separate messages for results and turn transition""" |
| | team_emoji = "🔵" if next_team == "blue" else "🔴" |
| | results_text = "\n".join(results_list) |
| |
|
| | red_remaining = len(board.get("red", [])) |
| | blue_remaining = len(board.get("blue", [])) |
| |
|
| | |
| | results_message = results_text |
| |
|
| | |
| | transition_message = ( |
| | f"🔄 **TURN COMPLETE**\n\n" |
| | f"{team_emoji} **{next_team.upper()} TEAM'S TURN** now begins!\n\n" |
| | f"**Remaining Words:**\n" |
| | f"🔴 Red: {red_remaining}\n" |
| | f"🔵 Blue: {blue_remaining}" |
| | ) |
| | return [ |
| | (results_message, "⚖️ Judge Decision"), |
| | (transition_message, "🔄 Turn Transition") |
| | ] |
| |
|
| | |
| | def create_round_end_messages(results_list, next_team): |
| | """Creates separate messages for results and round transition""" |
| | team_emoji = "🔵" if next_team == "blue" else "🔴" |
| | results_text = "\n".join(results_list) |
| |
|
| | red_remaining = len(board.get("red", [])) |
| | blue_remaining = len(board.get("blue", [])) |
| |
|
| | |
| | results_message = results_text |
| |
|
| | |
| | transition_message = ( |
| | f"🎯 **ROUND COMPLETE!**\n\n" |
| | f"Both teams have played their turn.\n\n" |
| | f"{team_emoji} **{next_team.upper()} TEAM** starts the next round!\n\n" |
| | f"**Score Update:**\n" |
| | f"🔴 Red Team: {red_remaining} words remaining\n" |
| | f"🔵 Blue Team: {blue_remaining} words remaining\n\n" |
| | f"Let's keep going! 💪" |
| | ) |
| |
|
| | return [ |
| | (results_message, "⚖️ Judge Decision"), |
| | (transition_message, "🎯 Round Complete") |
| | ] |
| |
|
| | |
| | def create_game_over_messages(results_list, winner, scores, reason="normal"): |
| | """Creates separate messages for results and game over""" |
| | results_text = "\n".join(results_list) |
| | red_remaining, blue_remaining = scores |
| | winner_emoji = "🔴" if winner == "red" else "🔵" |
| |
|
| | |
| | results_message = results_text |
| |
|
| | |
| | if reason == "killer": |
| | loser = "red" if winner == "blue" else "blue" |
| | game_over_message = ( |
| | f"🏆 **GAME OVER!**\n\n" |
| | f"💀 {loser.upper()} team hit the KILLER WORD!\n\n" |
| | f"{winner_emoji} **{winner.upper()} TEAM WINS!** 🎉\n\n" |
| | f"**Final Score:**\n" |
| | f"🔴 Red: {red_remaining} words remaining\n" |
| | f"🔵 Blue: {blue_remaining} words remaining\n\n" |
| | f"Better luck next time! 😅" |
| | ) |
| | else: |
| | game_over_message = ( |
| | f"🏆 **GAME OVER!**\n\n" |
| | f"{winner_emoji} **{winner.upper()} TEAM WINS!** 🎉\n\n" |
| | f"All {winner} words have been found!\n\n" |
| | f"**Final Score:**\n" |
| | f"🔴 Red: {red_remaining} words remaining\n" |
| | f"🔵 Blue: {blue_remaining} words remaining\n\n" |
| | f"Congratulations to the {winner.title()} Team! 🥳" |
| | ) |
| |
|
| | return [ |
| | (results_message, "⚖️ Judge Decision"), |
| | (game_over_message, "🏆 Game Over") |
| | ] |
| |
|
| | logger.info("IS HUMAN PLAYING") |
| | logger.info(self.IS_HUMAN_PLAYING) |
| | logger.info("****" * 50) |
| | logger.info("[JUDGE]") |
| | logger.info("CHAT HISTORY") |
| | logger.info(state['chat_history']) |
| |
|
| | |
| | if state.get("turn", 0) == 0: |
| | logger.info("[JUDGE INITIALIZING GAME STATE]") |
| | team_emoji = "🔴" if state["current_team"] == "red" else "🔵" |
| |
|
| | |
| | game_start_content = ( |
| | f"🎮 **CODENAMES GAME STARTED!**\n\n" |
| | f"{team_emoji} **{state['current_team'].upper()} TEAM** goes first!\n\n" |
| | f"**Game Rules:**\n" |
| | f"- Teams alternate turns to guess words\n" |
| | f"- Match words to your team's color\n" |
| | f"- Avoid the opponent's words, neutral words, and the killer word ☠️\n" |
| | f"- First team to find all their words wins!\n\n" |
| | f"Good luck! 🍀" |
| | ) |
| |
|
| | message = AIMessage( |
| | content=game_start_content, |
| | metadata={"title": "🎮 Game Start", "sender": "judge"} |
| | ) |
| | logger.info(f"[JUDGE MESSAGE]: {message}") |
| | chat_entry, _, _, _ = self._format_chat_entry(None, message, info="Game start") |
| | filtered_chat = self._filter_important_messages(state.get("chat_history", [])) |
| | return { |
| | "messages": [message], |
| | "turn": 1, |
| | "chat_history": filtered_chat + [chat_entry], |
| | } |
| |
|
| | |
| | guesses = state.get("guesses", []) |
| | if not guesses: |
| | logger.info("[JUDGE] No guesses made.") |
| | return state |
| |
|
| | |
| | board = state["board"] |
| | self.board = board |
| | current_team = state["current_team"] |
| | other_team = "red" if current_team == "blue" else "blue" |
| | history_guessed_words = state.get("history_guessed_words", []) |
| | teams_reviewed = state.get('teams_reviewed', []) |
| | teams_reviewed.append(current_team) |
| |
|
| | if len(set(teams_reviewed)) == 2: |
| | end_round = True |
| | teams_reviewed = [] |
| | else: |
| | end_round = False |
| |
|
| | results = ["📋 **Turn Results:**"] |
| | logger.info(f"[JUDGE] Team {current_team.upper()} guesses: {guesses}") |
| |
|
| | |
| | for word in guesses: |
| | history_guessed_words.append(word) |
| | self.guessed_words = history_guessed_words |
| | logger.info(f"[JUDGE] Evaluating word: {word}") |
| |
|
| | if word == "STOP_TURN": |
| | results.append(f"🚧 **{word}** The {current_team.upper()} team stops the turn.") |
| | break |
| |
|
| | |
| | elif word in board.get(current_team, []): |
| | board[current_team].remove(word) |
| | self.board = board |
| | results.append(f"✅ **{word}** - Correct! ({current_team.upper()} team word)") |
| |
|
| | |
| | elif word in board.get(other_team, []): |
| | board[other_team].remove(word) |
| | self.board = board |
| | results.append(f"❌ **{word}** - Oh no! You selected a {other_team.upper()} team word!") |
| |
|
| | |
| | is_game_over, winner, scores = check_win_condition() |
| | if is_game_over: |
| | messages_list = create_game_over_messages(results, winner, scores) |
| | return create_multi_message_state(messages_list, winner_and_score=(winner, scores)) |
| |
|
| | |
| | if end_round: |
| | messages_list = create_round_end_messages(results, other_team) |
| | else: |
| | messages_list = create_turn_end_messages(results, other_team) |
| |
|
| | return create_multi_message_state(messages_list, next_team=other_team, switch_role=True) |
| |
|
| | |
| | elif word in board.get("neutral", []): |
| | board["neutral"].remove(word) |
| | self.board = board |
| | results.append(f"⚪ **{word}** - Neutral word. Turn ends!") |
| |
|
| | |
| | if end_round: |
| | messages_list = create_round_end_messages(results, other_team) |
| | else: |
| | messages_list = create_turn_end_messages(results, other_team) |
| |
|
| | return create_multi_message_state(messages_list, next_team=other_team, switch_role=True) |
| |
|
| | |
| | elif word == board.get("killer"): |
| | results.append(f"☠️ **{word}** - KILLER WORD! 💀") |
| | |
| | winner = other_team |
| | board["killer"] = None |
| | self.board = board |
| |
|
| | red_remaining = len(board.get("red", [])) |
| | blue_remaining = len(board.get("blue", [])) |
| | scores = (red_remaining, blue_remaining) |
| | messages_list = create_game_over_messages( |
| | results, winner, scores, reason="killer" |
| | ) |
| | return create_multi_message_state(messages_list, winner_and_score=(winner, scores, "killer")) |
| |
|
| | |
| | is_game_over, winner, scores = check_win_condition() |
| | if is_game_over: |
| | messages_list = create_game_over_messages(results, winner, scores) |
| | return create_multi_message_state(messages_list, winner_and_score=(winner, scores)) |
| |
|
| | |
| | if end_round: |
| | messages_list = create_round_end_messages(results, other_team) |
| | else: |
| | messages_list = create_turn_end_messages(results, other_team) |
| |
|
| | return create_multi_message_state(messages_list, next_team=other_team, switch_role=True) |
| |
|
| | def route_after_judge(self, state: State) -> str: |
| | """Route to the appropriate team or end the game.""" |
| | logger.info("***" * 50) |
| | logger.info(f"HUMAN PLAYING? {self.IS_HUMAN_PLAYING}") |
| | logger.info("***" * 50) |
| |
|
| | logger.info("\n\n") |
| | logger.info("***" * 50) |
| | logger.info("[ROUTING AFTER JUDGE]") |
| |
|
| | board = state.get("board", {}) |
| | logger.info("BOARD ROUTING") |
| | logger.info(board) |
| |
|
| | self.board = board |
| | self.chat_history = state['chat_history'] |
| | self.winners = state['winner_and_score'] |
| |
|
| | |
| | if board["killer"] is None: |
| | logger.info("KILLER HIT, END GAME") |
| | logger.info("***" * 50) |
| | return END |
| |
|
| | if not all(board.get(key) for key in ("red", "blue")): |
| | logger.info("END GAME") |
| | logger.info("***" * 50) |
| | return END |
| |
|
| | |
| | |
| | if state['end_round']: |
| | logger.info("ROUND ENDS") |
| | logger.info("***" * 50) |
| | return END |
| |
|
| | next_team = state.get("next_team") or state.get("current_team", "red") |
| | return f"{next_team}_team" |
| |
|
| | def boss_choice(self, state: State) -> str: |
| | """Determine whether to continue to tools or return a final answer.""" |
| | logger.info("###"*50) |
| | logger.info("[BOSS CHOICE]") |
| | messages = state["messages"] |
| | last_message = messages[-1] |
| |
|
| | if hasattr(last_message, "tool_calls") and last_message.tool_calls: |
| | logger.info("[TOOL CALL]") |
| | tool_name = last_message.tool_calls[0]["name"] |
| | logger.info(tool_name) |
| |
|
| | if tool_name == "ChooseWord": |
| | return "choose_word_tool" |
| | else: |
| | logger.warning("[WARNING] NESSUN TOOL E' STATO RICHIAMATO, TORNIAMO DAL BOSS") |
| | current_team = state['current_team'] |
| | logger.info(f"[CURRENT TEAM: {current_team}]") |
| | return f"{current_team}_boss_is_called" |
| |
|
| | def should_continue(self, state: State) -> str: |
| | """Determine whether to continue to tools or return a final answer.""" |
| | logger.info("###"*50) |
| | logger.info("[SHOULD CONTINUE]") |
| | messages = state["messages"] |
| | last_message = messages[-1] |
| |
|
| | if hasattr(last_message, "tool_calls") and last_message.tool_calls: |
| | logger.info("[TOOL CALL]") |
| | tool_name = last_message.tool_calls[0]["name"] |
| | logger.info(tool_name) |
| |
|
| | if tool_name == "Call_Agent_1": |
| | return "transfer_to_agent_1" |
| | elif tool_name == "Call_Agent_2": |
| | return "transfer_to_agent_2" |
| | elif tool_name == "TeamFinalChoice": |
| | return "final_choice" |
| |
|
| | else: |
| | logger.warning("[WARNING] NESSUN TOOL E' STATO RICHIAMATO, TORNIAMO DAL CAPITANO") |
| | current_team = state['current_team'] |
| | logger.info(f"[CURRENT TEAM: {current_team}]") |
| | return f"{current_team}_captain_is_called" |
| |
|
| | def _convert_to_string(self, value): |
| | """Helper to convert any value to string""" |
| | if isinstance(value, str): |
| | return value |
| | elif isinstance(value, (dict, list)): |
| | return str(value) |
| | else: |
| | return str(value) |
| |
|
| | def _filter_important_messages(self, chat_history): |
| | """ |
| | Filter chat history to keep only important messages: |
| | - Boss choices (ChooseWord) |
| | - Captain choices (TeamFinalChoice) |
| | - All Judge messages |
| | """ |
| | return [ |
| | entry for entry in chat_history |
| | if entry.get("tool_name") in ["ChooseWord", "TeamFinalChoice"] |
| | or entry.get("sender_type") == "judge" and entry.get("info") == "chat_history" |
| | ] |
| |
|
| | def _format_chat_entry(self, player, message, info=None): |
| | """Helper to create a structured chat history entry""" |
| | tool_name = None |
| | clue = None |
| | clue_number = None |
| | guesses = [] |
| |
|
| | |
| | if hasattr(message, 'tool_calls') and message.tool_calls: |
| | tool_call = message.tool_calls[0] |
| | tool_name = tool_call.get('name', '') |
| | args = tool_call.get('args', {}) |
| |
|
| | if tool_name == 'Call_Agent_1': |
| | content = f"Called Agent 1: {self._convert_to_string(args.get('message', 'N/A'))}" |
| | elif tool_name == 'Call_Agent_2': |
| | content = f"Called Agent 2: {self._convert_to_string(args.get('message', 'N/A'))}" |
| | elif tool_name == 'ChooseWord': |
| | clue = self._convert_to_string(args.get('clue', 'N/A')) |
| | clue_number = args.get('clue_number', 'N/A') |
| | content = f"Clue: '{clue.upper()}' for {clue_number} word(s)" |
| | elif tool_name == 'TeamFinalChoice': |
| | guesses = args.get('guesses', []) |
| | content = f"Final choices: {', '.join(guesses)}" |
| | else: |
| | content = f"Used tool {tool_name}: {args}" |
| | elif hasattr(message, 'content') and message.content: |
| | content = message.content |
| | elif hasattr(message, 'text') and message.text: |
| | content = message.text |
| | else: |
| | |
| | |
| | |
| | model_provider = message.response_metadata.get('model_provider') |
| | if (model_provider in ["openai", "google_genai", "anthropic"] and isinstance(message.content, list)): |
| | for item in message.content: |
| | if not isinstance(item, dict): |
| | continue |
| |
|
| | item_type = item.get('type') |
| | |
| | if item_type == 'reasoning': |
| | content = item.get('summary', []) |
| |
|
| | |
| | elif item_type == 'thinking': |
| | |
| | content = item.get('thinking', []) |
| |
|
| | |
| | elif item_type == 'text': |
| | content = item.get('text', '') |
| | elif message.additional_kwargs.get('reasoning_content'): |
| | content = message.additional_kwargs.get('reasoning_content') |
| |
|
| | |
| | if player is None: |
| | return { |
| | "sender_type": "judge", |
| | "team": "", |
| | "role": "", |
| | "name": "JUDGE", |
| | "tool_name": None, |
| | "content": content, |
| | "info": info, |
| | }, clue, clue_number, guesses |
| |
|
| | return { |
| | "sender_type": "player", |
| | "team": player.team, |
| | "role": player.role, |
| | "name": player.name, |
| | "tool_name": tool_name, |
| | "content": content, |
| | "info": info, |
| | }, clue, clue_number, guesses |
| |
|
| | def _split_chat_history(self, chat_history, viewer_role=None): |
| | """ |
| | Splits chat_history into two parts: |
| | - formatted_history: all entries up to and including the last one with info == "chat_history" |
| | - current_round: all entries after that |
| | |
| | If viewer_role is specified, filters content appropriately: |
| | - 'captain': hides boss reasoning from current round |
| | - 'boss' or None: shows everything |
| | """ |
| |
|
| | last_index = None |
| |
|
| | |
| | for i, entry in enumerate(chat_history): |
| | if entry.get("info") == "chat_history": |
| | last_index = i |
| |
|
| | |
| | def format_entry(entry, hide_boss_reasoning=False): |
| | name = entry.get("name", "Unknown") |
| | team = entry.get("team", "N/A") |
| | role = entry.get("role", "N/A") |
| |
|
| | |
| | if hide_boss_reasoning and entry.get("role") == "boss" and entry.get("tool_name") is None: |
| | return f"[{name} ({team} {role})]: [boss reasoning is hidden]" |
| |
|
| | content = entry.get("content", "") |
| | return f"[{name} ({team} {role})]: {content}" |
| |
|
| | |
| | if last_index is None: |
| | formatted_history = "" |
| | current_round_entries = chat_history |
| | else: |
| | |
| | formatted_history = "\n".join( |
| | format_entry(e, hide_boss_reasoning=False) for e in chat_history[:last_index + 1] |
| | ) |
| | current_round_entries = chat_history[last_index + 1:] |
| |
|
| | |
| | hide_reasoning = (viewer_role != "boss") |
| | current_round = "\n".join( |
| | format_entry(e, hide_boss_reasoning=hide_reasoning) for e in current_round_entries |
| | ) |
| |
|
| | return formatted_history, current_round |
| |
|
| | async def stream_graph(self, input_message, messages, players, board, dropdown_clue_number, guessed_words, chat_history, is_human_playing, turn_): |
| | """Stream the graph execution.""" |
| |
|
| | logger.info(f"Human Clue: {input_message}") |
| | logger.info(f"Human Clue number: {dropdown_clue_number}") |
| | logger.info(f"Board: {board}") |
| | logger.info(f"Starting team: {board['starting_team']}") |
| | logger.info(f"Turn: {turn_}") |
| |
|
| | self.IS_HUMAN_PLAYING = is_human_playing |
| | self.board = board |
| | inputs = { |
| | "messages": [HumanMessage(content=input_message)] if input_message else [], |
| | "original_board": board, |
| | "board": board, |
| | "players": players, |
| | "current_team": board.get("starting_team", "red"), |
| | "current_role": "boss", |
| | "turn": turn_, |
| | "last_user_message": input_message or "", |
| | "guesses": [], |
| | "clue": None, |
| | "clue_number": None, |
| | "round_messages": [], |
| | "chat_history": chat_history, |
| | "human_clue": input_message, |
| | "human_clue_number": dropdown_clue_number, |
| | "teams_reviewed": [], |
| | "history_guessed_words": guessed_words, |
| | "end_round": False, |
| | "winner_and_score": None, |
| | "red_boss_is_called_counter": 0, |
| | "blue_boss_is_called_counter": 0, |
| | "red_captain_is_called_counter": 0, |
| | "blue_captain_is_called_counter": 0, |
| | } |
| |
|
| | final_msg = "" |
| | messages = list(messages) if messages else [] |
| | previous_message_is_reasoning = False |
| | latest_lang_node = "" |
| | current_sender = "" |
| | last_message_sender = None |
| | last_reasoning_index = None |
| |
|
| | async for chunk in self.graph.astream( |
| | inputs, {"recursion_limit": 100}, |
| | stream_mode=["messages", "updates"], |
| | subgraphs=True, |
| | ): |
| | if chunk[1] == "messages": |
| | msg = chunk[2][0] |
| | lang_node = chunk[2][1]['langgraph_node'] |
| | if latest_lang_node != lang_node: |
| | latest_lang_node = lang_node |
| | current_sender = lang_node |
| | final_msg = "" |
| | last_message_sender = None |
| | last_reasoning_index = None |
| | else: |
| | agent_update = chunk[2] |
| | msg_from = next(iter(agent_update.keys())) |
| |
|
| | if msg_from in [ |
| | 'red_boss', 'red_captain', 'red_agent_1', 'red_agent_2', |
| | 'blue_boss', 'blue_captain', 'blue_agent_1', 'blue_agent_2', |
| | 'judge', 'final_choice' |
| | ]: |
| | current_sender = msg_from |
| | msg = None |
| | else: |
| | try: |
| | msg = next(iter(agent_update.values()))['messages'][-1] |
| | except Exception as e: |
| | logger.error(f"[EXCEPTION]: {e}") |
| | logger.error(f"[CHUNK]: {chunk}") |
| | logger.error(f"[Agent update]: {agent_update}") |
| | msg = None |
| |
|
| | |
| |
|
| | if isinstance(msg, AIMessage) and msg.tool_calls: |
| | final_msg = "" |
| | last_message_sender = None |
| | last_reasoning_index = None |
| | yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
| |
|
| | elif isinstance(msg, ToolMessage): |
| | logger.info(f"[TOOL MESSAGE: {msg}]") |
| | tool_name = msg.name |
| | if tool_name == "TeamFinalChoice": |
| | try: |
| | command_data = json.loads(msg.content) |
| | update_data = command_data.get("update", {}) |
| |
|
| | guesses = update_data.get("guesses") |
| |
|
| | new_message = ChatMessage( |
| | role="assistant", |
| | content="I made my final choices: " + ", ".join(guesses), |
| | metadata={ |
| | "title": "🧠 Guesses", |
| | "sender": f"{self.current_team}_captain" |
| | } |
| | ) |
| | except json.JSONDecodeError as e: |
| | logger.error(f"Error parsing tool message: {e}") |
| | new_message = ChatMessage( |
| | role="assistant", |
| | content=msg.content, |
| | metadata={ |
| | "title": "🧠 Guesses", |
| | "sender": f"{self.current_team}_captain" |
| | } |
| | ) |
| | elif tool_name == "ChooseWord": |
| | try: |
| | command_data = json.loads(msg.content) |
| | update_data = command_data.get("update", {}) |
| |
|
| | clue = update_data.get("clue") |
| | clue_number = update_data.get("clue_number") |
| |
|
| | logger.info(f"Clue: {clue}, Clue Number: {clue_number}") |
| |
|
| | new_message = ChatMessage( |
| | role="assistant", |
| | content=f"{clue}, {clue_number}", |
| | metadata={ |
| | "title": "🕵️♂️ Clue", |
| | "sender": f"{self.current_team}_boss" |
| | } |
| | ) |
| | except json.JSONDecodeError as e: |
| | logger.error(f"Error parsing tool message: {e}") |
| | new_message = ChatMessage( |
| | role="assistant", |
| | content=msg.content, |
| | metadata={ |
| | "title": "🕵️♂️ Clue", |
| | "sender": f"{self.current_team}_boss" |
| | } |
| | ) |
| | elif tool_name == "Call_Agent_1" or tool_name == "Call_Agent_2": |
| | if tool_name == "Call_Agent_1": |
| | title_ = "💭 Asking opinion of Agent 1" |
| | else: |
| | title_ = "💭 Asking opinion of Agent 2" |
| |
|
| | try: |
| | command_data = json.loads(msg.content) |
| | update_data = command_data.get("update", {}) |
| |
|
| | message = update_data.get("message") |
| |
|
| | new_message = ChatMessage( |
| | role="assistant", |
| | content=message, |
| | metadata={ |
| | "title": title_, |
| | "sender": f"{self.current_team}_captain" |
| | } |
| | ) |
| | except json.JSONDecodeError as e: |
| | logger.error(f"Error parsing tool message: {e}") |
| | new_message = ChatMessage( |
| | role="assistant", |
| | content=msg.content, |
| | metadata={ |
| | "title": title_, |
| | "sender": f"{self.current_team}_captain" |
| | } |
| | ) |
| | else: |
| | new_message = ChatMessage( |
| | role="assistant", |
| | content=msg.content, |
| | metadata={ |
| | "title": f"""🛠️ {tool_name}""", |
| | "sender": current_sender |
| | } |
| | ) |
| |
|
| | if not messages or messages[-1] != new_message: |
| | messages.append(new_message) |
| | last_message_sender = new_message.metadata.get("sender") |
| | |
| | |
| | |
| |
|
| | final_msg = "" |
| | yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
| |
|
| | elif isinstance(msg, AIMessageChunk): |
| | |
| |
|
| | model_provider = msg.response_metadata.get('model_provider') |
| | if (model_provider in ["openai", "google_genai", "anthropic"] and isinstance(msg.content, list)): |
| | for item in msg.content: |
| | if not isinstance(item, dict): |
| | continue |
| |
|
| | item_type = item.get('type') |
| |
|
| | |
| | if item_type == 'reasoning': |
| | reasoning_text = "" |
| | summary = item.get('summary', []) |
| |
|
| | for summary_item in summary: |
| | if isinstance(summary_item, dict) and summary_item.get('type') == 'summary_text': |
| | text = summary_item.get('text', '') |
| | item_index = summary_item.get('index') |
| | if text: |
| | reasoning_text += text |
| |
|
| | if reasoning_text: |
| | |
| | if last_reasoning_index is not None and item_index != last_reasoning_index: |
| | reasoning_text = "\n\n" + reasoning_text |
| |
|
| | last_reasoning_index = item_index |
| |
|
| | |
| | if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning: |
| | final_msg = reasoning_text |
| | messages.append(ChatMessage( |
| | role="assistant", |
| | content=final_msg, |
| | metadata={ |
| | "title": "🧠 Thinking...", |
| | "sender": current_sender |
| | } |
| | )) |
| | last_message_sender = current_sender |
| | else: |
| | final_msg += reasoning_text |
| | if len(messages) > 0: |
| | messages[-1].content = final_msg |
| |
|
| | previous_message_is_reasoning = True |
| | yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
| |
|
| | |
| | elif item_type == 'thinking': |
| | |
| | reasoning_text = item.get('thinking', []) |
| |
|
| | if reasoning_text: |
| |
|
| | |
| | if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning: |
| | final_msg = reasoning_text |
| | messages.append(ChatMessage( |
| | role="assistant", |
| | content=final_msg, |
| | metadata={ |
| | "title": "🧠 Thinking...", |
| | "sender": current_sender |
| | } |
| | )) |
| | last_message_sender = current_sender |
| | else: |
| | final_msg += reasoning_text |
| | if len(messages) > 0: |
| | messages[-1].content = final_msg |
| |
|
| | previous_message_is_reasoning = True |
| | yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
| |
|
| | |
| | elif item_type == 'text': |
| | text_content = item.get('text', '') |
| |
|
| | if text_content: |
| | |
| | if previous_message_is_reasoning: |
| | final_msg = "" |
| | previous_message_is_reasoning = False |
| | last_message_sender = None |
| | last_reasoning_index = None |
| |
|
| | |
| | if final_msg == "" or last_message_sender != current_sender: |
| | final_msg = text_content |
| | messages.append(ChatMessage( |
| | role="assistant", |
| | content=final_msg, |
| | metadata={"sender": current_sender} |
| | )) |
| | last_message_sender = current_sender |
| | else: |
| | |
| | final_msg += text_content |
| | messages[-1].content = final_msg |
| |
|
| | yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
| |
|
| | elif msg.additional_kwargs.get('reasoning_content'): |
| | reasoning_text = msg.additional_kwargs.get("reasoning_content") |
| | if reasoning_text: |
| | if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning: |
| | final_msg = reasoning_text |
| | messages.append(ChatMessage( |
| | role="assistant", |
| | content=final_msg, |
| | metadata={ |
| | "title": "🧠 Thinking...", |
| | "sender": current_sender |
| | } |
| | )) |
| | last_message_sender = current_sender |
| | else: |
| | final_msg += reasoning_text |
| | if len(messages) > 0: |
| | messages[-1].content = final_msg |
| |
|
| | previous_message_is_reasoning = True |
| |
|
| | elif msg.content and isinstance(msg.content, str): |
| | if previous_message_is_reasoning: |
| | final_msg = "" |
| | previous_message_is_reasoning = False |
| | last_message_sender = None |
| |
|
| | |
| | if final_msg == "" or last_message_sender != current_sender: |
| | final_msg = msg.content |
| | |
| | messages.append(ChatMessage( |
| | role="assistant", |
| | content=final_msg, |
| | metadata={"sender": current_sender} |
| | )) |
| | last_message_sender = current_sender |
| | else: |
| | |
| | final_msg += msg.content |
| | messages[-1].content = final_msg |
| |
|
| | yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
| | else: |
| | reasoning = msg.additional_kwargs.get("reasoning_content") |
| | if reasoning: |
| | |
| | if final_msg == "" or last_message_sender != current_sender: |
| | final_msg = reasoning |
| | messages.append(ChatMessage( |
| | role="assistant", |
| | content=final_msg, |
| | metadata={ |
| | "title": """🧠 Thinking...""", |
| | "sender": current_sender |
| | } |
| | )) |
| | last_message_sender = current_sender |
| | else: |
| | if len(messages) > 0: |
| | final_msg += reasoning |
| | messages[-1].content = final_msg |
| |
|
| | previous_message_is_reasoning = True |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
| |
|
| | elif isinstance(msg, AIMessage): |
| | if msg.content and msg.content.strip(): |
| | logger.info("***"*50) |
| | logger.info(f"msg.content: {msg.content}") |
| | logger.info(f"msg: {msg}") |
| |
|
| | try: |
| | if msg.metadata.get("sender"): |
| | new_sender = msg.metadata.get("sender") |
| | current_sender = new_sender |
| | except Exception as e: |
| | logger.error(f"[EXCEPTION]: {e} - {msg}") |
| | if msg.metadata.get("title"): |
| | new_title = msg.metadata.get("title") |
| | messages.append(ChatMessage( |
| | role="assistant", |
| | content=msg.content, |
| | metadata={ |
| | "sender": new_sender, |
| | "title": new_title |
| | } |
| | )) |
| | last_message_sender = current_sender |
| | yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
| |
|
| | yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|