|
|
from typing import Optional |
|
|
|
|
|
import aiohttp |
|
|
|
|
|
from config import access_token_cache, WECHAT_APPID, WECHAT_SECRET, logger |
|
|
|
|
|
|
|
|
async def get_access_token() -> Optional[str]: |
|
|
"""获取微信 stable access_token (推荐方式)""" |
|
|
import time |
|
|
|
|
|
|
|
|
if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]: |
|
|
return access_token_cache["token"] |
|
|
|
|
|
url = "https://api.weixin.qq.com/cgi-bin/stable_token" |
|
|
data = { |
|
|
"grant_type": "client_credential", |
|
|
"appid": WECHAT_APPID, |
|
|
"secret": WECHAT_SECRET, |
|
|
"force_refresh": False, |
|
|
} |
|
|
|
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.post(url, json=data) as response: |
|
|
if response.status == 200: |
|
|
result = await response.json() |
|
|
logger.debug(f"getStableAccessToken response: {result}") |
|
|
if "access_token" in result: |
|
|
access_token_cache["token"] = result["access_token"] |
|
|
access_token_cache["expires_at"] = ( |
|
|
time.time() + result.get("expires_in", 7200) - 300 |
|
|
) |
|
|
expires_time = access_token_cache["expires_at"] |
|
|
logger.info( |
|
|
f"成功获取 stable access_token expires_time={expires_time}" |
|
|
) |
|
|
return result["access_token"] |
|
|
else: |
|
|
logger.error(f"Failed to get stable access_token: {result}") |
|
|
else: |
|
|
logger.error(f"Failed to request stable access_token: {response.status}") |
|
|
except Exception as e: |
|
|
logger.error(f"Exception while getting stable access_token: {str(e)}") |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
async def get_access_token_old() -> Optional[str]: |
|
|
"""获取微信 access_token""" |
|
|
import time |
|
|
|
|
|
|
|
|
if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]: |
|
|
return access_token_cache["token"] |
|
|
|
|
|
url = "https://api.weixin.qq.com/cgi-bin/token" |
|
|
params = { |
|
|
"grant_type": "client_credential", |
|
|
"appid": WECHAT_APPID, |
|
|
"secret": WECHAT_SECRET, |
|
|
} |
|
|
|
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(url, params=params) as response: |
|
|
if response.status == 200: |
|
|
data = await response.json() |
|
|
if "access_token" in data: |
|
|
access_token_cache["token"] = data["access_token"] |
|
|
access_token_cache["expires_at"] = ( |
|
|
time.time() + data.get("expires_in", 7200) - 300 |
|
|
) |
|
|
logger.info("Successfully obtained WeChat access_token...") |
|
|
return data["access_token"] |
|
|
else: |
|
|
logger.error(f"Failed to get access_token, returned content: {data}") |
|
|
return None |
|
|
else: |
|
|
logger.error(f"Failed to get access_token, status={response.status}") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get access_token: {str(e)}") |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
async def check_image_security(image_data: bytes) -> bool: |
|
|
""" |
|
|
检测图片内容安全 |
|
|
:param image_data: 图片二进制数据 |
|
|
:return: True表示安全,False表示有风险 |
|
|
""" |
|
|
access_token = await get_access_token() |
|
|
if not access_token: |
|
|
logger.warning("Unable to get access_token, skipping security check") |
|
|
return True |
|
|
url = f"https://api.weixin.qq.com/wxa/img_sec_check?access_token={access_token}" |
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
|
|
data = aiohttp.FormData() |
|
|
data.add_field("media", image_data, content_type="image/jpeg") |
|
|
async with session.post(url, data=data, timeout=10) as response: |
|
|
if response.status == 200: |
|
|
result = await response.json() |
|
|
logger.info(f"Checking image content safety...result={result}") |
|
|
if result.get("errcode") == 0: |
|
|
return True |
|
|
elif result.get("errcode") == 87014: |
|
|
logger.warning("Image content contains illegal content...") |
|
|
return False |
|
|
else: |
|
|
logger.warning(f"Image security check returned error: {result}") |
|
|
return True |
|
|
else: |
|
|
logger.warning(f"Image security check request failed: {response.status}") |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Image security check exception: {str(e)}") |
|
|
return True |
|
|
|