| import re |
|
|
| from bridge.context import ContextType |
| from channel.chat_message import ChatMessage |
| from common.log import logger |
| from common.tmp_dir import TmpDir |
| from lib import itchat |
| from lib.itchat.content import * |
|
|
| class WechatMessage(ChatMessage): |
| def __init__(self, itchat_msg, is_group=False): |
| super().__init__(itchat_msg) |
| self.msg_id = itchat_msg["MsgId"] |
| self.create_time = itchat_msg["CreateTime"] |
| self.is_group = is_group |
|
|
| if itchat_msg["Type"] == TEXT: |
| self.ctype = ContextType.TEXT |
| self.content = itchat_msg["Text"] |
| elif itchat_msg["Type"] == VOICE: |
| self.ctype = ContextType.VOICE |
| self.content = TmpDir().path() + itchat_msg["FileName"] |
| self._prepare_fn = lambda: itchat_msg.download(self.content) |
| elif itchat_msg["Type"] == PICTURE and itchat_msg["MsgType"] == 3: |
| self.ctype = ContextType.IMAGE |
| self.content = TmpDir().path() + itchat_msg["FileName"] |
| self._prepare_fn = lambda: itchat_msg.download(self.content) |
| elif itchat_msg["Type"] == NOTE and itchat_msg["MsgType"] == 10000: |
| if is_group and ("加入群聊" in itchat_msg["Content"] or "加入了群聊" in itchat_msg["Content"]): |
| |
| if "加入了群聊" in itchat_msg["Content"]: |
| self.ctype = ContextType.JOIN_GROUP |
| self.content = itchat_msg["Content"] |
| self.actual_user_nickname = re.findall(r"\"(.*?)\"", itchat_msg["Content"])[-1] |
| elif "加入群聊" in itchat_msg["Content"]: |
| self.ctype = ContextType.JOIN_GROUP |
| self.content = itchat_msg["Content"] |
| self.actual_user_nickname = re.findall(r"\"(.*?)\"", itchat_msg["Content"])[0] |
|
|
| elif is_group and ("移出了群聊" in itchat_msg["Content"]): |
| self.ctype = ContextType.EXIT_GROUP |
| self.content = itchat_msg["Content"] |
| self.actual_user_nickname = re.findall(r"\"(.*?)\"", itchat_msg["Content"])[0] |
| |
| elif "你已添加了" in itchat_msg["Content"]: |
| self.ctype = ContextType.ACCEPT_FRIEND |
| self.content = itchat_msg["Content"] |
| elif "拍了拍我" in itchat_msg["Content"]: |
| self.ctype = ContextType.PATPAT |
| self.content = itchat_msg["Content"] |
| if is_group: |
| self.actual_user_nickname = re.findall(r"\"(.*?)\"", itchat_msg["Content"])[0] |
| else: |
| raise NotImplementedError("Unsupported note message: " + itchat_msg["Content"]) |
| elif itchat_msg["Type"] == ATTACHMENT: |
| self.ctype = ContextType.FILE |
| self.content = TmpDir().path() + itchat_msg["FileName"] |
| self._prepare_fn = lambda: itchat_msg.download(self.content) |
| elif itchat_msg["Type"] == SHARING: |
| self.ctype = ContextType.SHARING |
| self.content = itchat_msg.get("Url") |
|
|
| else: |
| raise NotImplementedError("Unsupported message type: Type:{} MsgType:{}".format(itchat_msg["Type"], itchat_msg["MsgType"])) |
|
|
| self.from_user_id = itchat_msg["FromUserName"] |
| self.to_user_id = itchat_msg["ToUserName"] |
|
|
| user_id = itchat.instance.storageClass.userName |
| nickname = itchat.instance.storageClass.nickName |
|
|
| |
| |
| if self.from_user_id == user_id: |
| self.from_user_nickname = nickname |
| if self.to_user_id == user_id: |
| self.to_user_nickname = nickname |
| try: |
| |
| self.my_msg = itchat_msg["ToUserName"] == itchat_msg["User"]["UserName"] and \ |
| itchat_msg["ToUserName"] != itchat_msg["FromUserName"] |
| self.other_user_id = itchat_msg["User"]["UserName"] |
| self.other_user_nickname = itchat_msg["User"]["NickName"] |
| if self.other_user_id == self.from_user_id: |
| self.from_user_nickname = self.other_user_nickname |
| if self.other_user_id == self.to_user_id: |
| self.to_user_nickname = self.other_user_nickname |
| if itchat_msg["User"].get("Self"): |
| |
| self.self_display_name = itchat_msg["User"].get("Self").get("DisplayName") |
| except KeyError as e: |
| logger.warn("[WX]get other_user_id failed: " + str(e)) |
| if self.from_user_id == user_id: |
| self.other_user_id = self.to_user_id |
| else: |
| self.other_user_id = self.from_user_id |
|
|
| if self.is_group: |
| self.is_at = itchat_msg["IsAt"] |
| self.actual_user_id = itchat_msg["ActualUserName"] |
| if self.ctype not in [ContextType.JOIN_GROUP, ContextType.PATPAT, ContextType.EXIT_GROUP]: |
| self.actual_user_nickname = itchat_msg["ActualNickName"] |
|
|