from typing import Optional import aiohttp from config import access_token_cache, WECHAT_APPID, WECHAT_SECRET, logger async def get_access_token() -> Optional[str]: """获取微信 stable access_token (推荐方式)""" import time # 检查缓存是否有效 if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]: return access_token_cache["token"] # 使用新的 getStableAccessToken 接口 url = "https://api.weixin.qq.com/cgi-bin/stable_token" data = { "grant_type": "client_credential", "appid": WECHAT_APPID, "secret": WECHAT_SECRET, "force_refresh": False, # 是否强制刷新 } try: async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: if response.status == 200: result = await response.json() logger.debug(f"getStableAccessToken response: {result}") if "access_token" in result: access_token_cache["token"] = result["access_token"] access_token_cache["expires_at"] = ( time.time() + result.get("expires_in", 7200) - 300 ) expires_time = access_token_cache["expires_at"] logger.info( f"成功获取 stable access_token expires_time={expires_time}" ) return result["access_token"] else: logger.error(f"Failed to get stable access_token: {result}") else: logger.error(f"Failed to request stable access_token: {response.status}") except Exception as e: logger.error(f"Exception while getting stable access_token: {str(e)}") return None async def get_access_token_old() -> Optional[str]: """获取微信 access_token""" import time # 检查缓存是否有效 if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]: return access_token_cache["token"] # 获取新的 access_token url = "https://api.weixin.qq.com/cgi-bin/token" params = { "grant_type": "client_credential", "appid": WECHAT_APPID, "secret": WECHAT_SECRET, } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() if "access_token" in data: access_token_cache["token"] = data["access_token"] access_token_cache["expires_at"] = ( time.time() + data.get("expires_in", 7200) - 300 ) # 提前5分钟过期 logger.info("Successfully obtained WeChat access_token...") return data["access_token"] else: logger.error(f"Failed to get access_token, returned content: {data}") return None else: logger.error(f"Failed to get access_token, status={response.status}") return None except Exception as e: logger.error(f"Failed to get access_token: {str(e)}") return None async def check_image_security(image_data: bytes) -> bool: """ 检测图片内容安全 :param image_data: 图片二进制数据 :return: True表示安全,False表示有风险 """ access_token = await get_access_token() if not access_token: logger.warning("Unable to get access_token, skipping security check") return True # 获取token失败时允许继续,避免影响正常用户 url = f"https://api.weixin.qq.com/wxa/img_sec_check?access_token={access_token}" try: async with aiohttp.ClientSession() as session: # 微信API要求使用 multipart/form-data 格式 data = aiohttp.FormData() data.add_field("media", image_data, content_type="image/jpeg") async with session.post(url, data=data, timeout=10) as response: if response.status == 200: result = await response.json() logger.info(f"Checking image content safety...result={result}") if result.get("errcode") == 0: return True # 安全 elif result.get("errcode") == 87014: logger.warning("Image content contains illegal content...") return False else: logger.warning(f"Image security check returned error: {result}") return True # 其他错误时允许继续 else: logger.warning(f"Image security check request failed: {response.status}") return True except Exception as e: logger.error(f"Image security check exception: {str(e)}") return True # 异常时允许继续,避免影响正常用户