| import re, os, sys, subprocess, copy, traceback, logging |
|
|
| try: |
| from HTMLParser import HTMLParser |
| except ImportError: |
| from html.parser import HTMLParser |
| try: |
| from urllib import quote as _quote |
| quote = lambda n: _quote(n.encode('utf8', 'replace')) |
| except ImportError: |
| from urllib.parse import quote |
|
|
| import requests |
|
|
| from . import config |
|
|
| logger = logging.getLogger('itchat') |
|
|
| emojiRegex = re.compile(r'<span class="emoji emoji(.{1,10})"></span>') |
| htmlParser = HTMLParser() |
| if not hasattr(htmlParser, 'unescape'): |
| import html |
| htmlParser.unescape = html.unescape |
| |
| try: |
| b = u'\u2588' |
| sys.stdout.write(b + '\r') |
| sys.stdout.flush() |
| except UnicodeEncodeError: |
| BLOCK = 'MM' |
| else: |
| BLOCK = b |
| friendInfoTemplate = {} |
| for k in ('UserName', 'City', 'DisplayName', 'PYQuanPin', 'RemarkPYInitial', 'Province', |
| 'KeyWord', 'RemarkName', 'PYInitial', 'EncryChatRoomId', 'Alias', 'Signature', |
| 'NickName', 'RemarkPYQuanPin', 'HeadImgUrl'): |
| friendInfoTemplate[k] = '' |
| for k in ('UniFriend', 'Sex', 'AppAccountFlag', 'VerifyFlag', 'ChatRoomId', 'HideInputBarFlag', |
| 'AttrStatus', 'SnsFlag', 'MemberCount', 'OwnerUin', 'ContactFlag', 'Uin', |
| 'StarFriend', 'Statues'): |
| friendInfoTemplate[k] = 0 |
| friendInfoTemplate['MemberList'] = [] |
|
|
| def clear_screen(): |
| os.system('cls' if config.OS == 'Windows' else 'clear') |
|
|
| def emoji_formatter(d, k): |
| ''' _emoji_deebugger is for bugs about emoji match caused by wechat backstage |
| like :face with tears of joy: will be replaced with :cat face with tears of joy: |
| ''' |
| def _emoji_debugger(d, k): |
| s = d[k].replace('<span class="emoji emoji1f450"></span', |
| '<span class="emoji emoji1f450"></span>') |
| def __fix_miss_match(m): |
| return '<span class="emoji emoji%s"></span>' % ({ |
| '1f63c': '1f601', '1f639': '1f602', '1f63a': '1f603', |
| '1f4ab': '1f616', '1f64d': '1f614', '1f63b': '1f60d', |
| '1f63d': '1f618', '1f64e': '1f621', '1f63f': '1f622', |
| }.get(m.group(1), m.group(1))) |
| return emojiRegex.sub(__fix_miss_match, s) |
| def _emoji_formatter(m): |
| s = m.group(1) |
| if len(s) == 6: |
| return ('\\U%s\\U%s'%(s[:2].rjust(8, '0'), s[2:].rjust(8, '0')) |
| ).encode('utf8').decode('unicode-escape', 'replace') |
| elif len(s) == 10: |
| return ('\\U%s\\U%s'%(s[:5].rjust(8, '0'), s[5:].rjust(8, '0')) |
| ).encode('utf8').decode('unicode-escape', 'replace') |
| else: |
| return ('\\U%s'%m.group(1).rjust(8, '0') |
| ).encode('utf8').decode('unicode-escape', 'replace') |
| d[k] = _emoji_debugger(d, k) |
| d[k] = emojiRegex.sub(_emoji_formatter, d[k]) |
|
|
| def msg_formatter(d, k): |
| emoji_formatter(d, k) |
| d[k] = d[k].replace('<br/>', '\n') |
| d[k] = htmlParser.unescape(d[k]) |
|
|
| def check_file(fileDir): |
| try: |
| with open(fileDir): |
| pass |
| return True |
| except: |
| return False |
|
|
| def print_qr(fileDir): |
| if config.OS == 'Darwin': |
| subprocess.call(['open', fileDir]) |
| elif config.OS == 'Linux': |
| subprocess.call(['xdg-open', fileDir]) |
| else: |
| os.startfile(fileDir) |
|
|
| def print_cmd_qr(qrText, white=BLOCK, black=' ', enableCmdQR=True): |
| blockCount = int(enableCmdQR) |
| if abs(blockCount) == 0: |
| blockCount = 1 |
| white *= abs(blockCount) |
| if blockCount < 0: |
| white, black = black, white |
| sys.stdout.write(' '*50 + '\r') |
| sys.stdout.flush() |
| qr = qrText.replace('0', white).replace('1', black) |
| sys.stdout.write(qr) |
| sys.stdout.flush() |
|
|
| def struct_friend_info(knownInfo): |
| member = copy.deepcopy(friendInfoTemplate) |
| for k, v in copy.deepcopy(knownInfo).items(): member[k] = v |
| return member |
|
|
| def search_dict_list(l, key, value): |
| ''' Search a list of dict |
| * return dict with specific value & key ''' |
| for i in l: |
| if i.get(key) == value: |
| return i |
|
|
| def print_line(msg, oneLine = False): |
| if oneLine: |
| sys.stdout.write(' '*40 + '\r') |
| sys.stdout.flush() |
| else: |
| sys.stdout.write('\n') |
| sys.stdout.write(msg.encode(sys.stdin.encoding or 'utf8', 'replace' |
| ).decode(sys.stdin.encoding or 'utf8', 'replace')) |
| sys.stdout.flush() |
|
|
| def test_connect(retryTime=5): |
| for i in range(retryTime): |
| try: |
| r = requests.get(config.BASE_URL) |
| return True |
| except: |
| if i == retryTime - 1: |
| logger.error(traceback.format_exc()) |
| return False |
|
|
| def contact_deep_copy(core, contact): |
| with core.storageClass.updateLock: |
| return copy.deepcopy(contact) |
|
|
| def get_image_postfix(data): |
| data = data[:20] |
| if b'GIF' in data: |
| return 'gif' |
| elif b'PNG' in data: |
| return 'png' |
| elif b'JFIF' in data: |
| return 'jpg' |
| return '' |
|
|
| def update_info_dict(oldInfoDict, newInfoDict): |
| ''' only normal values will be updated here |
| because newInfoDict is normal dict, so it's not necessary to consider templates |
| ''' |
| for k, v in newInfoDict.items(): |
| if any((isinstance(v, t) for t in (tuple, list, dict))): |
| pass |
| elif oldInfoDict.get(k) is None or v not in (None, '', '0', 0): |
| oldInfoDict[k] = v |