Spaces:
Paused
Paused
| from typing import Optional | |
| import aiohttp | |
| from config import access_token_cache, WECHAT_APPID, WECHAT_SECRET, logger | |
| async def get_access_token() -> Optional[str]: | |
| """获取微信 stable access_token (推荐方式)""" | |
| import time | |
| # 检查缓存是否有效 | |
| if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]: | |
| return access_token_cache["token"] | |
| # 使用新的 getStableAccessToken 接口 | |
| url = "https://api.weixin.qq.com/cgi-bin/stable_token" | |
| data = { | |
| "grant_type": "client_credential", | |
| "appid": WECHAT_APPID, | |
| "secret": WECHAT_SECRET, | |
| "force_refresh": False, # 是否强制刷新 | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(url, json=data) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| logger.debug(f"getStableAccessToken response: {result}") | |
| if "access_token" in result: | |
| access_token_cache["token"] = result["access_token"] | |
| access_token_cache["expires_at"] = ( | |
| time.time() + result.get("expires_in", 7200) - 300 | |
| ) | |
| expires_time = access_token_cache["expires_at"] | |
| logger.info( | |
| f"成功获取 stable access_token expires_time={expires_time}" | |
| ) | |
| return result["access_token"] | |
| else: | |
| logger.error(f"Failed to get stable access_token: {result}") | |
| else: | |
| logger.error(f"Failed to request stable access_token: {response.status}") | |
| except Exception as e: | |
| logger.error(f"Exception while getting stable access_token: {str(e)}") | |
| return None | |
| async def get_access_token_old() -> Optional[str]: | |
| """获取微信 access_token""" | |
| import time | |
| # 检查缓存是否有效 | |
| if access_token_cache["token"] and time.time() < access_token_cache["expires_at"]: | |
| return access_token_cache["token"] | |
| # 获取新的 access_token | |
| url = "https://api.weixin.qq.com/cgi-bin/token" | |
| params = { | |
| "grant_type": "client_credential", | |
| "appid": WECHAT_APPID, | |
| "secret": WECHAT_SECRET, | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if "access_token" in data: | |
| access_token_cache["token"] = data["access_token"] | |
| access_token_cache["expires_at"] = ( | |
| time.time() + data.get("expires_in", 7200) - 300 | |
| ) # 提前5分钟过期 | |
| logger.info("Successfully obtained WeChat access_token...") | |
| return data["access_token"] | |
| else: | |
| logger.error(f"Failed to get access_token, returned content: {data}") | |
| return None | |
| else: | |
| logger.error(f"Failed to get access_token, status={response.status}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to get access_token: {str(e)}") | |
| return None | |
| async def check_image_security(image_data: bytes) -> bool: | |
| """ | |
| 检测图片内容安全 | |
| :param image_data: 图片二进制数据 | |
| :return: True表示安全,False表示有风险 | |
| """ | |
| access_token = await get_access_token() | |
| if not access_token: | |
| logger.warning("Unable to get access_token, skipping security check") | |
| return True # 获取token失败时允许继续,避免影响正常用户 | |
| url = f"https://api.weixin.qq.com/wxa/img_sec_check?access_token={access_token}" | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| # 微信API要求使用 multipart/form-data 格式 | |
| data = aiohttp.FormData() | |
| data.add_field("media", image_data, content_type="image/jpeg") | |
| async with session.post(url, data=data, timeout=10) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| logger.info(f"Checking image content safety...result={result}") | |
| if result.get("errcode") == 0: | |
| return True # 安全 | |
| elif result.get("errcode") == 87014: | |
| logger.warning("Image content contains illegal content...") | |
| return False | |
| else: | |
| logger.warning(f"Image security check returned error: {result}") | |
| return True # 其他错误时允许继续 | |
| else: | |
| logger.warning(f"Image security check request failed: {response.status}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Image security check exception: {str(e)}") | |
| return True # 异常时允许继续,避免影响正常用户 | |