| import time, re, io |
| import json, copy |
| import logging |
|
|
| from .. import config, utils |
| from ..components.contact import accept_friend |
| from ..returnvalues import ReturnValue |
| from ..storage import contact_change |
| from ..utils import update_info_dict |
|
|
| logger = logging.getLogger('itchat') |
|
|
| def load_contact(core): |
| core.update_chatroom = update_chatroom |
| core.update_friend = update_friend |
| core.get_contact = get_contact |
| core.get_friends = get_friends |
| core.get_chatrooms = get_chatrooms |
| core.get_mps = get_mps |
| core.set_alias = set_alias |
| core.set_pinned = set_pinned |
| core.accept_friend = accept_friend |
| core.get_head_img = get_head_img |
| core.create_chatroom = create_chatroom |
| core.set_chatroom_name = set_chatroom_name |
| core.delete_member_from_chatroom = delete_member_from_chatroom |
| core.add_member_into_chatroom = add_member_into_chatroom |
|
|
| def update_chatroom(self, userName, detailedMember=False): |
| if not isinstance(userName, list): |
| userName = [userName] |
| url = '%s/webwxbatchgetcontact?type=ex&r=%s' % ( |
| self.loginInfo['url'], int(time.time())) |
| headers = { |
| 'ContentType': 'application/json; charset=UTF-8', |
| 'User-Agent' : config.USER_AGENT } |
| data = { |
| 'BaseRequest': self.loginInfo['BaseRequest'], |
| 'Count': len(userName), |
| 'List': [{ |
| 'UserName': u, |
| 'ChatRoomId': '', } for u in userName], } |
| chatroomList = json.loads(self.s.post(url, data=json.dumps(data), headers=headers |
| ).content.decode('utf8', 'replace')).get('ContactList') |
| if not chatroomList: |
| return ReturnValue({'BaseResponse': { |
| 'ErrMsg': 'No chatroom found', |
| 'Ret': -1001, }}) |
|
|
| if detailedMember: |
| def get_detailed_member_info(encryChatroomId, memberList): |
| url = '%s/webwxbatchgetcontact?type=ex&r=%s' % ( |
| self.loginInfo['url'], int(time.time())) |
| headers = { |
| 'ContentType': 'application/json; charset=UTF-8', |
| 'User-Agent' : config.USER_AGENT, } |
| data = { |
| 'BaseRequest': self.loginInfo['BaseRequest'], |
| 'Count': len(memberList), |
| 'List': [{ |
| 'UserName': member['UserName'], |
| 'EncryChatRoomId': encryChatroomId} \ |
| for member in memberList], } |
| return json.loads(self.s.post(url, data=json.dumps(data), headers=headers |
| ).content.decode('utf8', 'replace'))['ContactList'] |
| MAX_GET_NUMBER = 50 |
| for chatroom in chatroomList: |
| totalMemberList = [] |
| for i in range(int(len(chatroom['MemberList']) / MAX_GET_NUMBER + 1)): |
| memberList = chatroom['MemberList'][i*MAX_GET_NUMBER: (i+1)*MAX_GET_NUMBER] |
| totalMemberList += get_detailed_member_info(chatroom['EncryChatRoomId'], memberList) |
| chatroom['MemberList'] = totalMemberList |
|
|
| update_local_chatrooms(self, chatroomList) |
| r = [self.storageClass.search_chatrooms(userName=c['UserName']) |
| for c in chatroomList] |
| return r if 1 < len(r) else r[0] |
|
|
| def update_friend(self, userName): |
| if not isinstance(userName, list): |
| userName = [userName] |
| url = '%s/webwxbatchgetcontact?type=ex&r=%s' % ( |
| self.loginInfo['url'], int(time.time())) |
| headers = { |
| 'ContentType': 'application/json; charset=UTF-8', |
| 'User-Agent' : config.USER_AGENT } |
| data = { |
| 'BaseRequest': self.loginInfo['BaseRequest'], |
| 'Count': len(userName), |
| 'List': [{ |
| 'UserName': u, |
| 'EncryChatRoomId': '', } for u in userName], } |
| friendList = json.loads(self.s.post(url, data=json.dumps(data), headers=headers |
| ).content.decode('utf8', 'replace')).get('ContactList') |
|
|
| update_local_friends(self, friendList) |
| r = [self.storageClass.search_friends(userName=f['UserName']) |
| for f in friendList] |
| return r if len(r) != 1 else r[0] |
|
|
| @contact_change |
| def update_local_chatrooms(core, l): |
| ''' |
| get a list of chatrooms for updating local chatrooms |
| return a list of given chatrooms with updated info |
| ''' |
| for chatroom in l: |
| |
| utils.emoji_formatter(chatroom, 'NickName') |
| for member in chatroom['MemberList']: |
| if 'NickName' in member: |
| utils.emoji_formatter(member, 'NickName') |
| if 'DisplayName' in member: |
| utils.emoji_formatter(member, 'DisplayName') |
| if 'RemarkName' in member: |
| utils.emoji_formatter(member, 'RemarkName') |
| |
| oldChatroom = utils.search_dict_list( |
| core.chatroomList, 'UserName', chatroom['UserName']) |
| if oldChatroom: |
| update_info_dict(oldChatroom, chatroom) |
| |
| memberList = chatroom.get('MemberList', []) |
| oldMemberList = oldChatroom['MemberList'] |
| if memberList: |
| for member in memberList: |
| oldMember = utils.search_dict_list( |
| oldMemberList, 'UserName', member['UserName']) |
| if oldMember: |
| update_info_dict(oldMember, member) |
| else: |
| oldMemberList.append(member) |
| else: |
| core.chatroomList.append(chatroom) |
| oldChatroom = utils.search_dict_list( |
| core.chatroomList, 'UserName', chatroom['UserName']) |
| |
| if len(chatroom['MemberList']) != len(oldChatroom['MemberList']) and \ |
| chatroom['MemberList']: |
| existsUserNames = [member['UserName'] for member in chatroom['MemberList']] |
| delList = [] |
| for i, member in enumerate(oldChatroom['MemberList']): |
| if member['UserName'] not in existsUserNames: |
| delList.append(i) |
| delList.sort(reverse=True) |
| for i in delList: |
| del oldChatroom['MemberList'][i] |
| |
| if oldChatroom.get('ChatRoomOwner') and oldChatroom.get('MemberList'): |
| owner = utils.search_dict_list(oldChatroom['MemberList'], |
| 'UserName', oldChatroom['ChatRoomOwner']) |
| oldChatroom['OwnerUin'] = (owner or {}).get('Uin', 0) |
| |
| if 'OwnerUin' in oldChatroom and oldChatroom['OwnerUin'] != 0: |
| oldChatroom['IsAdmin'] = \ |
| oldChatroom['OwnerUin'] == int(core.loginInfo['wxuin']) |
| else: |
| oldChatroom['IsAdmin'] = None |
| |
| newSelf = utils.search_dict_list(oldChatroom['MemberList'], |
| 'UserName', core.storageClass.userName) |
| oldChatroom['Self'] = newSelf or copy.deepcopy(core.loginInfo['User']) |
| return { |
| 'Type' : 'System', |
| 'Text' : [chatroom['UserName'] for chatroom in l], |
| 'SystemInfo' : 'chatrooms', |
| 'FromUserName' : core.storageClass.userName, |
| 'ToUserName' : core.storageClass.userName, } |
|
|
| @contact_change |
| def update_local_friends(core, l): |
| ''' |
| get a list of friends or mps for updating local contact |
| ''' |
| fullList = core.memberList + core.mpList |
| for friend in l: |
| if 'NickName' in friend: |
| utils.emoji_formatter(friend, 'NickName') |
| if 'DisplayName' in friend: |
| utils.emoji_formatter(friend, 'DisplayName') |
| if 'RemarkName' in friend: |
| utils.emoji_formatter(friend, 'RemarkName') |
| oldInfoDict = utils.search_dict_list( |
| fullList, 'UserName', friend['UserName']) |
| if oldInfoDict is None: |
| oldInfoDict = copy.deepcopy(friend) |
| if oldInfoDict['VerifyFlag'] & 8 == 0: |
| core.memberList.append(oldInfoDict) |
| else: |
| core.mpList.append(oldInfoDict) |
| else: |
| update_info_dict(oldInfoDict, friend) |
|
|
| @contact_change |
| def update_local_uin(core, msg): |
| ''' |
| content contains uins and StatusNotifyUserName contains username |
| they are in same order, so what I do is to pair them together |
| |
| I caught an exception in this method while not knowing why |
| but don't worry, it won't cause any problem |
| ''' |
| uins = re.search('<username>([^<]*?)<', msg['Content']) |
| usernameChangedList = [] |
| r = { |
| 'Type': 'System', |
| 'Text': usernameChangedList, |
| 'SystemInfo': 'uins', } |
| if uins: |
| uins = uins.group(1).split(',') |
| usernames = msg['StatusNotifyUserName'].split(',') |
| if 0 < len(uins) == len(usernames): |
| for uin, username in zip(uins, usernames): |
| if not '@' in username: continue |
| fullContact = core.memberList + core.chatroomList + core.mpList |
| userDicts = utils.search_dict_list(fullContact, |
| 'UserName', username) |
| if userDicts: |
| if userDicts.get('Uin', 0) == 0: |
| userDicts['Uin'] = uin |
| usernameChangedList.append(username) |
| logger.debug('Uin fetched: %s, %s' % (username, uin)) |
| else: |
| if userDicts['Uin'] != uin: |
| logger.debug('Uin changed: %s, %s' % ( |
| userDicts['Uin'], uin)) |
| else: |
| if '@@' in username: |
| core.storageClass.updateLock.release() |
| update_chatroom(core, username) |
| core.storageClass.updateLock.acquire() |
| newChatroomDict = utils.search_dict_list( |
| core.chatroomList, 'UserName', username) |
| if newChatroomDict is None: |
| newChatroomDict = utils.struct_friend_info({ |
| 'UserName': username, |
| 'Uin': uin, |
| 'Self': copy.deepcopy(core.loginInfo['User'])}) |
| core.chatroomList.append(newChatroomDict) |
| else: |
| newChatroomDict['Uin'] = uin |
| elif '@' in username: |
| core.storageClass.updateLock.release() |
| update_friend(core, username) |
| core.storageClass.updateLock.acquire() |
| newFriendDict = utils.search_dict_list( |
| core.memberList, 'UserName', username) |
| if newFriendDict is None: |
| newFriendDict = utils.struct_friend_info({ |
| 'UserName': username, |
| 'Uin': uin, }) |
| core.memberList.append(newFriendDict) |
| else: |
| newFriendDict['Uin'] = uin |
| usernameChangedList.append(username) |
| logger.debug('Uin fetched: %s, %s' % (username, uin)) |
| else: |
| logger.debug('Wrong length of uins & usernames: %s, %s' % ( |
| len(uins), len(usernames))) |
| else: |
| logger.debug('No uins in 51 message') |
| logger.debug(msg['Content']) |
| return r |
|
|
| def get_contact(self, update=False): |
| if not update: |
| return utils.contact_deep_copy(self, self.chatroomList) |
| def _get_contact(seq=0): |
| url = '%s/webwxgetcontact?r=%s&seq=%s&skey=%s' % (self.loginInfo['url'], |
| int(time.time()), seq, self.loginInfo['skey']) |
| headers = { |
| 'ContentType': 'application/json; charset=UTF-8', |
| 'User-Agent' : config.USER_AGENT, } |
| try: |
| r = self.s.get(url, headers=headers) |
| except: |
| logger.info('Failed to fetch contact, that may because of the amount of your chatrooms') |
| for chatroom in self.get_chatrooms(): |
| self.update_chatroom(chatroom['UserName'], detailedMember=True) |
| return 0, [] |
| j = json.loads(r.content.decode('utf-8', 'replace')) |
| return j.get('Seq', 0), j.get('MemberList') |
| seq, memberList = 0, [] |
| while 1: |
| seq, batchMemberList = _get_contact(seq) |
| memberList.extend(batchMemberList) |
| if seq == 0: |
| break |
| chatroomList, otherList = [], [] |
| for m in memberList: |
| if m['Sex'] != 0: |
| otherList.append(m) |
| elif '@@' in m['UserName']: |
| chatroomList.append(m) |
| elif '@' in m['UserName']: |
| |
| otherList.append(m) |
| if chatroomList: |
| update_local_chatrooms(self, chatroomList) |
| if otherList: |
| update_local_friends(self, otherList) |
| return utils.contact_deep_copy(self, chatroomList) |
|
|
| def get_friends(self, update=False): |
| if update: |
| self.get_contact(update=True) |
| return utils.contact_deep_copy(self, self.memberList) |
|
|
| def get_chatrooms(self, update=False, contactOnly=False): |
| if contactOnly: |
| return self.get_contact(update=True) |
| else: |
| if update: |
| self.get_contact(True) |
| return utils.contact_deep_copy(self, self.chatroomList) |
|
|
| def get_mps(self, update=False): |
| if update: self.get_contact(update=True) |
| return utils.contact_deep_copy(self, self.mpList) |
|
|
| def set_alias(self, userName, alias): |
| oldFriendInfo = utils.search_dict_list( |
| self.memberList, 'UserName', userName) |
| if oldFriendInfo is None: |
| return ReturnValue({'BaseResponse': { |
| 'Ret': -1001, }}) |
| url = '%s/webwxoplog?lang=%s&pass_ticket=%s' % ( |
| self.loginInfo['url'], 'zh_CN', self.loginInfo['pass_ticket']) |
| data = { |
| 'UserName' : userName, |
| 'CmdId' : 2, |
| 'RemarkName' : alias, |
| 'BaseRequest' : self.loginInfo['BaseRequest'], } |
| headers = { 'User-Agent' : config.USER_AGENT} |
| r = self.s.post(url, json.dumps(data, ensure_ascii=False).encode('utf8'), |
| headers=headers) |
| r = ReturnValue(rawResponse=r) |
| if r: |
| oldFriendInfo['RemarkName'] = alias |
| return r |
|
|
| def set_pinned(self, userName, isPinned=True): |
| url = '%s/webwxoplog?pass_ticket=%s' % ( |
| self.loginInfo['url'], self.loginInfo['pass_ticket']) |
| data = { |
| 'UserName' : userName, |
| 'CmdId' : 3, |
| 'OP' : int(isPinned), |
| 'BaseRequest' : self.loginInfo['BaseRequest'], } |
| headers = { 'User-Agent' : config.USER_AGENT} |
| r = self.s.post(url, json=data, headers=headers) |
| return ReturnValue(rawResponse=r) |
|
|
| def accept_friend(self, userName, v4= '', autoUpdate=True): |
| url = f"{self.loginInfo['url']}/webwxverifyuser?r={int(time.time())}&pass_ticket={self.loginInfo['pass_ticket']}" |
| data = { |
| 'BaseRequest': self.loginInfo['BaseRequest'], |
| 'Opcode': 3, |
| 'VerifyUserListSize': 1, |
| 'VerifyUserList': [{ |
| 'Value': userName, |
| 'VerifyUserTicket': v4, }], |
| 'VerifyContent': '', |
| 'SceneListCount': 1, |
| 'SceneList': [33], |
| 'skey': self.loginInfo['skey'], } |
| headers = { |
| 'ContentType': 'application/json; charset=UTF-8', |
| 'User-Agent' : config.USER_AGENT } |
| r = self.s.post(url, headers=headers, |
| data=json.dumps(data, ensure_ascii=False).encode('utf8', 'replace')) |
| if autoUpdate: |
| self.update_friend(userName) |
| return ReturnValue(rawResponse=r) |
|
|
| def get_head_img(self, userName=None, chatroomUserName=None, picDir=None): |
| ''' get head image |
| * if you want to get chatroom header: only set chatroomUserName |
| * if you want to get friend header: only set userName |
| * if you want to get chatroom member header: set both |
| ''' |
| params = { |
| 'userName': userName or chatroomUserName or self.storageClass.userName, |
| 'skey': self.loginInfo['skey'], |
| 'type': 'big', } |
| url = '%s/webwxgeticon' % self.loginInfo['url'] |
| if chatroomUserName is None: |
| infoDict = self.storageClass.search_friends(userName=userName) |
| if infoDict is None: |
| return ReturnValue({'BaseResponse': { |
| 'ErrMsg': 'No friend found', |
| 'Ret': -1001, }}) |
| else: |
| if userName is None: |
| url = '%s/webwxgetheadimg' % self.loginInfo['url'] |
| else: |
| chatroom = self.storageClass.search_chatrooms(userName=chatroomUserName) |
| if chatroomUserName is None: |
| return ReturnValue({'BaseResponse': { |
| 'ErrMsg': 'No chatroom found', |
| 'Ret': -1001, }}) |
| if 'EncryChatRoomId' in chatroom: |
| params['chatroomid'] = chatroom['EncryChatRoomId'] |
| params['chatroomid'] = params.get('chatroomid') or chatroom['UserName'] |
| headers = { 'User-Agent' : config.USER_AGENT} |
| r = self.s.get(url, params=params, stream=True, headers=headers) |
| tempStorage = io.BytesIO() |
| for block in r.iter_content(1024): |
| tempStorage.write(block) |
| if picDir is None: |
| return tempStorage.getvalue() |
| with open(picDir, 'wb') as f: |
| f.write(tempStorage.getvalue()) |
| tempStorage.seek(0) |
| return ReturnValue({'BaseResponse': { |
| 'ErrMsg': 'Successfully downloaded', |
| 'Ret': 0, }, |
| 'PostFix': utils.get_image_postfix(tempStorage.read(20)), }) |
|
|
| def create_chatroom(self, memberList, topic=''): |
| url = '%s/webwxcreatechatroom?pass_ticket=%s&r=%s' % ( |
| self.loginInfo['url'], self.loginInfo['pass_ticket'], int(time.time())) |
| data = { |
| 'BaseRequest': self.loginInfo['BaseRequest'], |
| 'MemberCount': len(memberList.split(',')), |
| 'MemberList': [{'UserName': member} for member in memberList.split(',')], |
| 'Topic': topic, } |
| headers = { |
| 'content-type': 'application/json; charset=UTF-8', |
| 'User-Agent' : config.USER_AGENT } |
| r = self.s.post(url, headers=headers, |
| data=json.dumps(data, ensure_ascii=False).encode('utf8', 'ignore')) |
| return ReturnValue(rawResponse=r) |
|
|
| def set_chatroom_name(self, chatroomUserName, name): |
| url = '%s/webwxupdatechatroom?fun=modtopic&pass_ticket=%s' % ( |
| self.loginInfo['url'], self.loginInfo['pass_ticket']) |
| data = { |
| 'BaseRequest': self.loginInfo['BaseRequest'], |
| 'ChatRoomName': chatroomUserName, |
| 'NewTopic': name, } |
| headers = { |
| 'content-type': 'application/json; charset=UTF-8', |
| 'User-Agent' : config.USER_AGENT } |
| r = self.s.post(url, headers=headers, |
| data=json.dumps(data, ensure_ascii=False).encode('utf8', 'ignore')) |
| return ReturnValue(rawResponse=r) |
|
|
| def delete_member_from_chatroom(self, chatroomUserName, memberList): |
| url = '%s/webwxupdatechatroom?fun=delmember&pass_ticket=%s' % ( |
| self.loginInfo['url'], self.loginInfo['pass_ticket']) |
| data = { |
| 'BaseRequest': self.loginInfo['BaseRequest'], |
| 'ChatRoomName': chatroomUserName, |
| 'DelMemberList': ','.join([member['UserName'] for member in memberList]), } |
| headers = { |
| 'content-type': 'application/json; charset=UTF-8', |
| 'User-Agent' : config.USER_AGENT} |
| r = self.s.post(url, data=json.dumps(data),headers=headers) |
| return ReturnValue(rawResponse=r) |
|
|
| def add_member_into_chatroom(self, chatroomUserName, memberList, |
| useInvitation=False): |
| ''' add or invite member into chatroom |
| * there are two ways to get members into chatroom: invite or directly add |
| * but for chatrooms with more than 40 users, you can only use invite |
| * but don't worry we will auto-force userInvitation for you when necessary |
| ''' |
| if not useInvitation: |
| chatroom = self.storageClass.search_chatrooms(userName=chatroomUserName) |
| if not chatroom: chatroom = self.update_chatroom(chatroomUserName) |
| if len(chatroom['MemberList']) > self.loginInfo['InviteStartCount']: |
| useInvitation = True |
| if useInvitation: |
| fun, memberKeyName = 'invitemember', 'InviteMemberList' |
| else: |
| fun, memberKeyName = 'addmember', 'AddMemberList' |
| url = '%s/webwxupdatechatroom?fun=%s&pass_ticket=%s' % ( |
| self.loginInfo['url'], fun, self.loginInfo['pass_ticket']) |
| params = { |
| 'BaseRequest' : self.loginInfo['BaseRequest'], |
| 'ChatRoomName' : chatroomUserName, |
| memberKeyName : memberList, } |
| headers = { |
| 'content-type': 'application/json; charset=UTF-8', |
| 'User-Agent' : config.USER_AGENT} |
| r = self.s.post(url, data=json.dumps(params),headers=headers) |
| return ReturnValue(rawResponse=r) |
|
|