File size: 5,254 Bytes
7a6cb13
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from typing import Optional

import aiohttp

from config import access_token_cache, WECHAT_APPID, WECHAT_SECRET, logger


async def get_access_token() -> Optional[str]:
    """获取微信 stable access_token (推荐方式)"""
    import time

    # 检查缓存是否有效
    if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]:
        return access_token_cache["token"]
    # 使用新的 getStableAccessToken 接口
    url = "https://api.weixin.qq.com/cgi-bin/stable_token"
    data = {
        "grant_type": "client_credential",
        "appid": WECHAT_APPID,
        "secret": WECHAT_SECRET,
        "force_refresh": False,  # 是否强制刷新
    }

    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json=data) as response:
                if response.status == 200:
                    result = await response.json()
                    logger.debug(f"getStableAccessToken response: {result}")
                    if "access_token" in result:
                        access_token_cache["token"] = result["access_token"]
                        access_token_cache["expires_at"] = (
                            time.time() + result.get("expires_in", 7200) - 300
                        )
                        expires_time = access_token_cache["expires_at"]
                        logger.info(
                            f"成功获取 stable access_token expires_time={expires_time}"
                        )
                        return result["access_token"]
                    else:
                        logger.error(f"Failed to get stable access_token: {result}")
                else:
                    logger.error(f"Failed to request stable access_token: {response.status}")
    except Exception as e:
        logger.error(f"Exception while getting stable access_token: {str(e)}")

    return None


async def get_access_token_old() -> Optional[str]:
    """获取微信 access_token"""
    import time

    # 检查缓存是否有效
    if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]:
        return access_token_cache["token"]
    # 获取新的 access_token
    url = "https://api.weixin.qq.com/cgi-bin/token"
    params = {
        "grant_type": "client_credential",
        "appid": WECHAT_APPID,
        "secret": WECHAT_SECRET,
    }

    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    if "access_token" in data:
                        access_token_cache["token"] = data["access_token"]
                        access_token_cache["expires_at"] = (
                            time.time() + data.get("expires_in", 7200) - 300
                        )  # 提前5分钟过期
                        logger.info("Successfully obtained WeChat access_token...")
                        return data["access_token"]
                    else:
                        logger.error(f"Failed to get access_token, returned content: {data}")
                        return None
                else:
                    logger.error(f"Failed to get access_token, status={response.status}")
                    return None
    except Exception as e:
        logger.error(f"Failed to get access_token: {str(e)}")

    return None


async def check_image_security(image_data: bytes) -> bool:
    """
    检测图片内容安全
    :param image_data: 图片二进制数据
    :return: True表示安全,False表示有风险
    """
    access_token = await get_access_token()
    if not access_token:
        logger.warning("Unable to get access_token, skipping security check")
        return True  # 获取token失败时允许继续,避免影响正常用户
    url = f"https://api.weixin.qq.com/wxa/img_sec_check?access_token={access_token}"
    try:
        async with aiohttp.ClientSession() as session:
            # 微信API要求使用 multipart/form-data 格式
            data = aiohttp.FormData()
            data.add_field("media", image_data, content_type="image/jpeg")
            async with session.post(url, data=data, timeout=10) as response:
                if response.status == 200:
                    result = await response.json()
                    logger.info(f"Checking image content safety...result={result}")
                    if result.get("errcode") == 0:
                        return True  # 安全
                    elif result.get("errcode") == 87014:
                        logger.warning("Image content contains illegal content...")
                        return False
                    else:
                        logger.warning(f"Image security check returned error: {result}")
                        return True  # 其他错误时允许继续
                else:
                    logger.warning(f"Image security check request failed: {response.status}")
                    return True
    except Exception as e:
        logger.error(f"Image security check exception: {str(e)}")
        return True  # 异常时允许继续,避免影响正常用户