picpocket / wx_access_token.py
chenchaoyun
fix
d11ff01
from typing import Optional
import aiohttp
from config import access_token_cache, WECHAT_APPID, WECHAT_SECRET, logger
async def get_access_token() -> Optional[str]:
"""获取微信 stable access_token (推荐方式)"""
import time
# 检查缓存是否有效
if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]:
return access_token_cache["token"]
# 使用新的 getStableAccessToken 接口
url = "https://api.weixin.qq.com/cgi-bin/stable_token"
data = {
"grant_type": "client_credential",
"appid": WECHAT_APPID,
"secret": WECHAT_SECRET,
"force_refresh": False, # 是否强制刷新
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
if response.status == 200:
result = await response.json()
logger.debug(f"getStableAccessToken response: {result}")
if "access_token" in result:
access_token_cache["token"] = result["access_token"]
access_token_cache["expires_at"] = (
time.time() + result.get("expires_in", 7200) - 300
)
expires_time = access_token_cache["expires_at"]
logger.info(
f"成功获取 stable access_token expires_time={expires_time}"
)
return result["access_token"]
else:
logger.error(f"Failed to get stable access_token: {result}")
else:
logger.error(f"Failed to request stable access_token: {response.status}")
except Exception as e:
logger.error(f"Exception while getting stable access_token: {str(e)}")
return None
async def get_access_token_old() -> Optional[str]:
"""获取微信 access_token"""
import time
# 检查缓存是否有效
if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]:
return access_token_cache["token"]
# 获取新的 access_token
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": WECHAT_APPID,
"secret": WECHAT_SECRET,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if "access_token" in data:
access_token_cache["token"] = data["access_token"]
access_token_cache["expires_at"] = (
time.time() + data.get("expires_in", 7200) - 300
) # 提前5分钟过期
logger.info("Successfully obtained WeChat access_token...")
return data["access_token"]
else:
logger.error(f"Failed to get access_token, returned content: {data}")
return None
else:
logger.error(f"Failed to get access_token, status={response.status}")
return None
except Exception as e:
logger.error(f"Failed to get access_token: {str(e)}")
return None
async def check_image_security(image_data: bytes) -> bool:
"""
检测图片内容安全
:param image_data: 图片二进制数据
:return: True表示安全,False表示有风险
"""
access_token = await get_access_token()
if not access_token:
logger.warning("Unable to get access_token, skipping security check")
return True # 获取token失败时允许继续,避免影响正常用户
url = f"https://api.weixin.qq.com/wxa/img_sec_check?access_token={access_token}"
try:
async with aiohttp.ClientSession() as session:
# 微信API要求使用 multipart/form-data 格式
data = aiohttp.FormData()
data.add_field("media", image_data, content_type="image/jpeg")
async with session.post(url, data=data, timeout=10) as response:
if response.status == 200:
result = await response.json()
logger.info(f"Checking image content safety...result={result}")
if result.get("errcode") == 0:
return True # 安全
elif result.get("errcode") == 87014:
logger.warning("Image content contains illegal content...")
return False
else:
logger.warning(f"Image security check returned error: {result}")
return True # 其他错误时允许继续
else:
logger.warning(f"Image security check request failed: {response.status}")
return True
except Exception as e:
logger.error(f"Image security check exception: {str(e)}")
return True # 异常时允许继续,避免影响正常用户