# image_moderation.py # ========================================== # 🔒 P0安全优化:图片内容审核模块 # ========================================== # 支持腾讯云/阿里云图片审核API # 环境变量配置: # - IMAGE_MODERATION_PROVIDER: "tencent" 或 "aliyun" (默认: tencent) # - TENCENT_SECRET_ID / TENCENT_SECRET_KEY # - ALIYUN_ACCESS_KEY_ID / ALIYUN_ACCESS_KEY_SECRET # ========================================== import os import base64 import json import logging import hmac import hashlib import time import uuid from datetime import datetime from typing import Optional, Tuple import httpx logger = logging.getLogger("ComfyUI-Ranking.ImageModeration") # ========================================== # 🔧 配置 # ========================================== MODERATION_ENABLED = os.environ.get("IMAGE_MODERATION_ENABLED", "false").lower() == "true" # 腾讯云配置 TENCENT_SECRET_ID = os.environ.get("TENCENT_SECRET_ID", "") TENCENT_SECRET_KEY = os.environ.get("TENCENT_SECRET_KEY", "") TENCENT_REGION = os.environ.get("TENCENT_REGION", "ap-guangzhou") # 阿里云配置 ALIYUN_ACCESS_KEY_ID = os.environ.get("ALIYUN_ACCESS_KEY_ID", "") ALIYUN_ACCESS_KEY_SECRET = os.environ.get("ALIYUN_ACCESS_KEY_SECRET", "") ALIYUN_REGION = os.environ.get("ALIYUN_REGION", "cn-shanghai") # 💰 免费额度管理(每月重置) ALIYUN_FREE_QUOTA = 3000 # 阿里云每月免费 3000 次 TENCENT_FREE_QUOTA = 10000 # 腾讯云每月免费 10000 次 # 额度记录文件 QUOTA_FILE = "/tmp/image_moderation_quota.json" def _load_quota() -> dict: """加载当月额度使用记录""" current_month = datetime.now().strftime("%Y-%m") try: if os.path.exists(QUOTA_FILE): with open(QUOTA_FILE, "r") as f: data = json.load(f) # 检查是否是当月记录,否则重置 if data.get("month") != current_month: return {"month": current_month, "aliyun": 0, "tencent": 0} return data except Exception as e: logger.debug(f"加载额度记录失败: {e}") return {"month": current_month, "aliyun": 0, "tencent": 0} def _save_quota(quota: dict): """保存额度使用记录""" try: with open(QUOTA_FILE, "w") as f: json.dump(quota, f) except Exception as e: logger.warning(f"保存额度记录失败: {e}") def _increment_quota(provider: str): """增加使用次数""" quota = _load_quota() quota[provider] = quota.get(provider, 0) + 1 _save_quota(quota) logger.info(f"审核额度: {provider}={quota[provider]}, 阿里云剩余{ALIYUN_FREE_QUOTA - quota.get('aliyun', 0)}, 腾讯云剩余{TENCENT_FREE_QUOTA - quota.get('tencent', 0)}") def _get_available_provider() -> str: """ 获取可用的审核服务商 策略:阿里云优先 → 腾讯云兆底 → 都用完则跳过 """ quota = _load_quota() # 1️⃣ 优先阿里云 if quota.get("aliyun", 0) < ALIYUN_FREE_QUOTA and ALIYUN_ACCESS_KEY_ID: return "aliyun" # 2️⃣ 阿里云用完,切换腾讯云 if quota.get("tencent", 0) < TENCENT_FREE_QUOTA and TENCENT_SECRET_ID: return "tencent" # 3️⃣ 两家都用完,返回 None return None # ========================================== # 📊 审核结果类型 # ========================================== class ModerationResult: """图片审核结果""" def __init__(self, passed: bool, label: str = "", confidence: float = 0.0, suggestion: str = "pass", details: dict = None): self.passed = passed # 是否通过 self.label = label # 违规标签(如 Porn, Terror, Ad) self.confidence = confidence # 置信度 0-100 self.suggestion = suggestion # pass/review/block self.details = details or {} # 详细信息 def to_dict(self): return { "passed": self.passed, "label": self.label, "confidence": self.confidence, "suggestion": self.suggestion, "details": self.details } # ========================================== # 🔵 腾讯云图片审核 # ========================================== def _tencent_sign_v3(secret_id: str, secret_key: str, host: str, payload: str, timestamp: int) -> dict: """腾讯云 TC3-HMAC-SHA256 签名""" service = "ims" date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d") # 规范请求 http_request_method = "POST" canonical_uri = "/" canonical_querystring = "" ct = "application/json; charset=utf-8" canonical_headers = f"content-type:{ct}\nhost:{host}\nx-tc-action:imagemoderationsync\n" signed_headers = "content-type;host;x-tc-action" hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest() canonical_request = (f"{http_request_method}\n{canonical_uri}\n{canonical_querystring}\n" f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}") # 待签名字符串 algorithm = "TC3-HMAC-SHA256" credential_scope = f"{date}/{service}/tc3_request" hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}" # 计算签名 def sign(key, msg): return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() secret_date = sign(("TC3" + secret_key).encode("utf-8"), date) secret_service = sign(secret_date, service) secret_signing = sign(secret_service, "tc3_request") signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() # 构建 Authorization authorization = (f"{algorithm} Credential={secret_id}/{credential_scope}, " f"SignedHeaders={signed_headers}, Signature={signature}") return { "Authorization": authorization, "Content-Type": ct, "Host": host, "X-TC-Action": "ImageModerationSync", "X-TC-Timestamp": str(timestamp), "X-TC-Version": "2020-07-13", "X-TC-Region": TENCENT_REGION } async def moderate_image_tencent(image_content: bytes) -> ModerationResult: """ 腾讯云图片审核 API文档: https://cloud.tencent.com/document/product/1125/53273 """ if not TENCENT_SECRET_ID or not TENCENT_SECRET_KEY: logger.warning("腾讯云审核未配置,跳过审核") return ModerationResult(passed=True, suggestion="pass") host = "ims.tencentcloudapi.com" timestamp = int(time.time()) # Base64 编码图片 file_content_base64 = base64.b64encode(image_content).decode("utf-8") payload = json.dumps({ "BizType": "default", # 使用默认策略 "FileContent": file_content_base64, "DataId": str(uuid.uuid4()) }) headers = _tencent_sign_v3(TENCENT_SECRET_ID, TENCENT_SECRET_KEY, host, payload, timestamp) try: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post(f"https://{host}", content=payload, headers=headers) result = resp.json() response = result.get("Response", {}) if "Error" in response: logger.error(f"腾讯云审核API错误: {response['Error']}") return ModerationResult(passed=True, suggestion="pass", details={"error": response["Error"]}) # 解析审核结果 suggestion = response.get("Suggestion", "Pass").lower() label = response.get("Label", "") score = response.get("Score", 0) passed = suggestion == "pass" logger.info(f"腾讯云审核结果: suggestion={suggestion}, label={label}, score={score}") return ModerationResult( passed=passed, label=label, confidence=score, suggestion=suggestion, details=response ) except Exception as e: logger.error(f"腾讯云审核异常: {str(e)}") # 审核服务异常时默认放行,避免影响正常业务 return ModerationResult(passed=True, suggestion="pass", details={"error": str(e)}) # ========================================== # 🟠 阿里云图片审核 # ========================================== def _aliyun_sign(access_key_secret: str, string_to_sign: str) -> str: """阿里云 ROA 签名""" signature = hmac.new( (access_key_secret + "&").encode("utf-8"), string_to_sign.encode("utf-8"), hashlib.sha1 ).digest() return base64.b64encode(signature).decode("utf-8") def _build_aliyun_signature(method: str, headers: dict, uri: str, access_key_secret: str) -> str: """ 构建阿里云 ROA 风格签名 参考: https://help.aliyun.com/document_detail/315526.html """ # 1. 构建 CanonicalizedHeaders acs_headers = {} for key, value in headers.items(): lower_key = key.lower() if lower_key.startswith("x-acs-"): acs_headers[lower_key] = value sorted_headers = sorted(acs_headers.items()) canonical_headers = "\n".join([f"{k}:{v}" for k, v in sorted_headers]) # 2. 构建 StringToSign content_type = headers.get("Content-Type", "") accept = headers.get("Accept", "") string_to_sign = f"{method}\n{accept}\n\n{content_type}\n\n{canonical_headers}\n{uri}" # 3. 计算签名 return _aliyun_sign(access_key_secret, string_to_sign) async def moderate_image_aliyun(image_content: bytes) -> ModerationResult: """ 阿里云图片审核 API文档: https://help.aliyun.com/document_detail/70292.html """ if not ALIYUN_ACCESS_KEY_ID or not ALIYUN_ACCESS_KEY_SECRET: logger.warning("阿里云审核未配置,跳过审核") return ModerationResult(passed=True, suggestion="pass") # Base64 编码图片 file_content_base64 = base64.b64encode(image_content).decode("utf-8") # 使用阿里云内容安全 API (绿网) endpoint = f"https://green.{ALIYUN_REGION}.aliyuncs.com" uri = "/green/image/scan" # 构建请求体 payload = { "scenes": ["porn", "terrorism", "ad"], # 鉴黄、暴恐、广告 "tasks": [{ "dataId": str(uuid.uuid4()), "content": file_content_base64, "type": "BASE64" }] } payload_str = json.dumps(payload) # 签名参数 timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") nonce = str(uuid.uuid4()) # 🔒 正确构建头部(包含签名所需字段) headers = { "Accept": "application/json", "Content-Type": "application/json", "x-acs-version": "2018-05-09", "x-acs-signature-method": "HMAC-SHA1", "x-acs-signature-version": "1.0", "x-acs-signature-nonce": nonce, "Date": timestamp, } # 🔒 计算签名 signature = _build_aliyun_signature("POST", headers, uri, ALIYUN_ACCESS_KEY_SECRET) # 添加 Authorization 头部 headers["Authorization"] = f"acs {ALIYUN_ACCESS_KEY_ID}:{signature}" try: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{endpoint}{uri}", content=payload_str, headers=headers ) result = resp.json() # 解析审核结果 if result.get("code") != 200: logger.error(f"阿里云审核API错误: {result}") return ModerationResult(passed=True, suggestion="pass", details={"error": result}) data = result.get("data", []) if not data: return ModerationResult(passed=True, suggestion="pass") task_result = data[0].get("results", []) # 检查所有场景的审核结果 max_rate = 0 block_label = "" overall_suggestion = "pass" for scene_result in task_result: scene = scene_result.get("scene", "") suggestion = scene_result.get("suggestion", "pass") rate = scene_result.get("rate", 0) label = scene_result.get("label", "") if suggestion == "block": overall_suggestion = "block" if rate > max_rate: max_rate = rate block_label = f"{scene}:{label}" elif suggestion == "review" and overall_suggestion != "block": overall_suggestion = "review" if rate > max_rate: max_rate = rate block_label = f"{scene}:{label}" passed = overall_suggestion == "pass" logger.info(f"阿里云审核结果: suggestion={overall_suggestion}, label={block_label}, rate={max_rate}") return ModerationResult( passed=passed, label=block_label, confidence=max_rate * 100, suggestion=overall_suggestion, details=result ) except Exception as e: logger.error(f"阿里云审核异常: {str(e)}") # 审核服务异常时默认放行 return ModerationResult(passed=True, suggestion="pass", details={"error": str(e)}) # ========================================== # 🎯 统一审核入口(智能额度管理) # ========================================== # 🔒 动态配置检查 SYSTEM_CONFIG_FILE = "/tmp/system_config.json" def _is_moderation_enabled() -> bool: """ 检查图片审核是否启用 优先检查系统配置文件,其次检查环境变量 """ # 1️⃣ 优先检查系统配置文件(管理员动态设置) try: if os.path.exists(SYSTEM_CONFIG_FILE): with open(SYSTEM_CONFIG_FILE, "r") as f: config = json.load(f) if "image_moderation_enabled" in config: return config["image_moderation_enabled"] == True except Exception as e: logger.debug(f"读取系统配置失败: {e}") # 2️⃣ 回退到环境变量 return MODERATION_ENABLED async def moderate_image(image_content: bytes, file_ext: str = "") -> ModerationResult: """ 统一图片审核入口 策略:阿里云优先(3000次/月) → 腾讯云兆底(10000次/月) → 都用完则跳过 Args: image_content: 图片二进制内容 file_ext: 文件扩展名(用于判断是否需要审核) Returns: ModerationResult: 审核结果 """ # 跳过非图片文件 if file_ext.lower() in ["json", "mp4"]: return ModerationResult(passed=True, suggestion="pass", details={"skipped": True, "reason": "非图片文件"}) # 检查是否启用审核(支持动态配置) if not _is_moderation_enabled(): logger.debug("图片审核未启用") return ModerationResult(passed=True, suggestion="pass", details={"skipped": True, "reason": "审核未启用"}) # 💰 智能选择服务商(基于免费额度) provider = _get_available_provider() if provider is None: logger.warning("本月免费审核额度已用完,跳过审核") return ModerationResult(passed=True, suggestion="pass", details={"skipped": True, "reason": "免费额度已用完"}) # 执行审核 if provider == "aliyun": result = await moderate_image_aliyun(image_content) else: result = await moderate_image_tencent(image_content) # 记录使用次数(只有审核成功才计数) if "error" not in result.details: _increment_quota(provider) result.details["provider"] = provider return result def moderate_image_sync(image_content: bytes, file_ext: str = "") -> ModerationResult: """ 同步版本的图片审核(用于非异步上下文) """ import asyncio try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(moderate_image(image_content, file_ext))