Spaces:
Sleeping
Sleeping
| from agent_build_sdk.builder import AgentBuilder | |
| from agent_build_sdk.model.model import AgentResp, AgentReq, STATUS_DISTRIBUTION, STATUS_ROUND, STATUS_VOTE, \ | |
| STATUS_START, STATUS_VOTE_RESULT, STATUS_RESULT | |
| from agent_build_sdk.sdk.agent import BasicAgent | |
| from agent_build_sdk.sdk.agent import format_prompt | |
| from prompts import DESC_PROMPT, VOTE_PROMPT | |
| from agent_build_sdk.utils.logger import logger | |
| from openai import OpenAI | |
| import os | |
| class SpyAgent(BasicAgent): | |
| def perceive(self, req=AgentReq): | |
| logger.info("spy perceive: {}".format(req)) | |
| if req.status == STATUS_START: # 开始新的一局比赛 | |
| self.memory.clear() | |
| self.memory.set_variable("name", req.message) | |
| self.memory.append_history( | |
| '主持人: 女士们先生们,欢迎来到《谁是卧底》游戏!我们有一个由6名玩家组成的小组,在其中有一名卧底。让我们开始吧!每个人都会收到一张纸。其中5人的纸上拥有相同的单词,而卧底则会收到含义上相似的单词。我们将大多数人拿到的单词称为"公共词",将卧底拿到的单词称为"卧底词"。一旦你拿到了你的单词,首先需要根据其他人的发言判断自己是否拿到了卧底词。如果判断自己拿到了卧底词,请猜测公共词是什么,然后描述公共词来混淆视听,避免被投票淘汰。如果判断自己拿到了公共词,请思考如何巧妙地描述它而不泄露它,不能让卧底察觉,也要给同伴暗示。每人每轮用一句话描述自己拿到的词语,每个人的描述禁止重复,话中不能出现所持词语。每轮描述完毕,所有在场的人投票选出怀疑是卧底的那个人,得票数最多的人出局。卧底出局则游戏结束,若卧底未出局,游戏继续。现在游戏开始。') | |
| elif req.status == STATUS_DISTRIBUTION: # 分配单词 | |
| self.memory.set_variable("word", req.word) | |
| self.memory.append_history( | |
| '主持人: 你好,{},你分配到的单词是:{}'.format(self.memory.load_variable("name"), req.word)) | |
| elif req.status == STATUS_ROUND: # 发言环节 | |
| if req.name: | |
| # 其他玩家发言 | |
| self.memory.append_history(req.name + ': ' + req.message) | |
| else: | |
| # 主持人发言 | |
| self.memory.append_history('主持人: 现在进入第{}轮。'.format(str(req.round))) | |
| self.memory.append_history('主持人: 每个玩家描述自己分配到的单词。') | |
| elif req.status == STATUS_VOTE: # 投票环节 | |
| self.memory.append_history(req.name + ': ' + req.message) | |
| elif req.status == STATUS_VOTE_RESULT: # 投票环节 | |
| out_player = req.name if req.name else req.message | |
| if out_player: | |
| self.memory.append_history('主持人: 投票结果是:{}。'.format(out_player)) | |
| else: | |
| self.memory.append_history('主持人: 无人出局。') | |
| elif req.status == STATUS_RESULT: | |
| self.memory.append_history(req.message) | |
| else: | |
| raise NotImplementedError | |
| def interact(self, req=AgentReq) -> AgentResp: | |
| logger.info("spy interact: {}".format(req)) | |
| if req.status == STATUS_ROUND: | |
| prompt = format_prompt(DESC_PROMPT, | |
| {"name": self.memory.load_variable("name"), | |
| "word": self.memory.load_variable("word"), | |
| "history": "\n".join(self.memory.load_history()) | |
| }) | |
| logger.info("prompt:" + prompt) | |
| result = self.llm_caller(prompt) | |
| logger.info("spy interact result: {}".format(result)) | |
| return AgentResp(success=True, result=result, errMsg=None) | |
| elif req.status == STATUS_VOTE: | |
| self.memory.append_history('主持人: 到了投票的时候了。每个人,请指向你认为可能是卧底的人。') | |
| choices = [name for name in req.message.split(",") if name != self.memory.load_variable("name")] # 排除自己 | |
| self.memory.set_variable("choices", choices) | |
| prompt = format_prompt(VOTE_PROMPT, {"name": self.memory.load_variable("name"), | |
| "choices": choices, | |
| "history": "\n".join(self.memory.load_history()) | |
| }) | |
| logger.info("prompt:" + prompt) | |
| result = self.llm_caller(prompt) | |
| logger.info("spy interact result: {}".format(result)) | |
| return AgentResp(success=True, result=result, errMsg=None) | |
| else: | |
| raise NotImplementedError | |
| def llm_caller(self, prompt): | |
| client = OpenAI( | |
| api_key=os.getenv('API_KEY'), | |
| base_url=os.getenv('BASE_URL') | |
| ) | |
| completion = client.chat.completions.create( | |
| model=self.model_name, | |
| messages=[ | |
| {'role': 'system', 'content': 'You are a helpful assistant.'}, | |
| {'role': 'user', 'content': prompt} | |
| ], | |
| temperature=0 | |
| ) | |
| try: | |
| return completion.choices[0].message.content | |
| except Exception as e: | |
| print(e) | |
| return None | |
| if __name__ == '__main__': | |
| name = 'spy' | |
| agent_builder = AgentBuilder(name, agent=SpyAgent(name, model_name=os.getenv('MODEL_NAME'))) | |
| agent_builder.start() |