ComfyUI-Ranking-API / image_moderation.py
ZHIWEI666's picture
版本更新
c60e0ef verified
# image_moderation.py
# ==========================================
# 🔒 P0安全优化:图片内容审核模块
# ==========================================
# 支持腾讯云/阿里云图片审核API
# 环境变量配置:
# - IMAGE_MODERATION_PROVIDER: "tencent" 或 "aliyun" (默认: tencent)
# - TENCENT_SECRET_ID / TENCENT_SECRET_KEY
# - ALIYUN_ACCESS_KEY_ID / ALIYUN_ACCESS_KEY_SECRET
# ==========================================
import os
import base64
import json
import logging
import hmac
import hashlib
import time
import uuid
from datetime import datetime
from typing import Optional, Tuple
import httpx
logger = logging.getLogger("ComfyUI-Ranking.ImageModeration")
# ==========================================
# 🔧 配置
# ==========================================
MODERATION_ENABLED = os.environ.get("IMAGE_MODERATION_ENABLED", "false").lower() == "true"
# 腾讯云配置
TENCENT_SECRET_ID = os.environ.get("TENCENT_SECRET_ID", "")
TENCENT_SECRET_KEY = os.environ.get("TENCENT_SECRET_KEY", "")
TENCENT_REGION = os.environ.get("TENCENT_REGION", "ap-guangzhou")
# 阿里云配置
ALIYUN_ACCESS_KEY_ID = os.environ.get("ALIYUN_ACCESS_KEY_ID", "")
ALIYUN_ACCESS_KEY_SECRET = os.environ.get("ALIYUN_ACCESS_KEY_SECRET", "")
ALIYUN_REGION = os.environ.get("ALIYUN_REGION", "cn-shanghai")
# 💰 免费额度管理(每月重置)
ALIYUN_FREE_QUOTA = 3000 # 阿里云每月免费 3000 次
TENCENT_FREE_QUOTA = 10000 # 腾讯云每月免费 10000 次
# 额度记录文件
QUOTA_FILE = "/tmp/image_moderation_quota.json"
def _load_quota() -> dict:
"""加载当月额度使用记录"""
current_month = datetime.now().strftime("%Y-%m")
try:
if os.path.exists(QUOTA_FILE):
with open(QUOTA_FILE, "r") as f:
data = json.load(f)
# 检查是否是当月记录,否则重置
if data.get("month") != current_month:
return {"month": current_month, "aliyun": 0, "tencent": 0}
return data
except Exception as e:
logger.debug(f"加载额度记录失败: {e}")
return {"month": current_month, "aliyun": 0, "tencent": 0}
def _save_quota(quota: dict):
"""保存额度使用记录"""
try:
with open(QUOTA_FILE, "w") as f:
json.dump(quota, f)
except Exception as e:
logger.warning(f"保存额度记录失败: {e}")
def _increment_quota(provider: str):
"""增加使用次数"""
quota = _load_quota()
quota[provider] = quota.get(provider, 0) + 1
_save_quota(quota)
logger.info(f"审核额度: {provider}={quota[provider]}, 阿里云剩余{ALIYUN_FREE_QUOTA - quota.get('aliyun', 0)}, 腾讯云剩余{TENCENT_FREE_QUOTA - quota.get('tencent', 0)}")
def _get_available_provider() -> str:
"""
获取可用的审核服务商
策略:阿里云优先 → 腾讯云兆底 → 都用完则跳过
"""
quota = _load_quota()
# 1️⃣ 优先阿里云
if quota.get("aliyun", 0) < ALIYUN_FREE_QUOTA and ALIYUN_ACCESS_KEY_ID:
return "aliyun"
# 2️⃣ 阿里云用完,切换腾讯云
if quota.get("tencent", 0) < TENCENT_FREE_QUOTA and TENCENT_SECRET_ID:
return "tencent"
# 3️⃣ 两家都用完,返回 None
return None
# ==========================================
# 📊 审核结果类型
# ==========================================
class ModerationResult:
"""图片审核结果"""
def __init__(self, passed: bool, label: str = "", confidence: float = 0.0,
suggestion: str = "pass", details: dict = None):
self.passed = passed # 是否通过
self.label = label # 违规标签(如 Porn, Terror, Ad)
self.confidence = confidence # 置信度 0-100
self.suggestion = suggestion # pass/review/block
self.details = details or {} # 详细信息
def to_dict(self):
return {
"passed": self.passed,
"label": self.label,
"confidence": self.confidence,
"suggestion": self.suggestion,
"details": self.details
}
# ==========================================
# 🔵 腾讯云图片审核
# ==========================================
def _tencent_sign_v3(secret_id: str, secret_key: str, host: str,
payload: str, timestamp: int) -> dict:
"""腾讯云 TC3-HMAC-SHA256 签名"""
service = "ims"
date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")
# 规范请求
http_request_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
ct = "application/json; charset=utf-8"
canonical_headers = f"content-type:{ct}\nhost:{host}\nx-tc-action:imagemoderationsync\n"
signed_headers = "content-type;host;x-tc-action"
hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest()
canonical_request = (f"{http_request_method}\n{canonical_uri}\n{canonical_querystring}\n"
f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}")
# 待签名字符串
algorithm = "TC3-HMAC-SHA256"
credential_scope = f"{date}/{service}/tc3_request"
hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}"
# 计算签名
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
secret_date = sign(("TC3" + secret_key).encode("utf-8"), date)
secret_service = sign(secret_date, service)
secret_signing = sign(secret_service, "tc3_request")
signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
# 构建 Authorization
authorization = (f"{algorithm} Credential={secret_id}/{credential_scope}, "
f"SignedHeaders={signed_headers}, Signature={signature}")
return {
"Authorization": authorization,
"Content-Type": ct,
"Host": host,
"X-TC-Action": "ImageModerationSync",
"X-TC-Timestamp": str(timestamp),
"X-TC-Version": "2020-07-13",
"X-TC-Region": TENCENT_REGION
}
async def moderate_image_tencent(image_content: bytes) -> ModerationResult:
"""
腾讯云图片审核
API文档: https://cloud.tencent.com/document/product/1125/53273
"""
if not TENCENT_SECRET_ID or not TENCENT_SECRET_KEY:
logger.warning("腾讯云审核未配置,跳过审核")
return ModerationResult(passed=True, suggestion="pass")
host = "ims.tencentcloudapi.com"
timestamp = int(time.time())
# Base64 编码图片
file_content_base64 = base64.b64encode(image_content).decode("utf-8")
payload = json.dumps({
"BizType": "default", # 使用默认策略
"FileContent": file_content_base64,
"DataId": str(uuid.uuid4())
})
headers = _tencent_sign_v3(TENCENT_SECRET_ID, TENCENT_SECRET_KEY, host, payload, timestamp)
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(f"https://{host}", content=payload, headers=headers)
result = resp.json()
response = result.get("Response", {})
if "Error" in response:
logger.error(f"腾讯云审核API错误: {response['Error']}")
return ModerationResult(passed=True, suggestion="pass",
details={"error": response["Error"]})
# 解析审核结果
suggestion = response.get("Suggestion", "Pass").lower()
label = response.get("Label", "")
score = response.get("Score", 0)
passed = suggestion == "pass"
logger.info(f"腾讯云审核结果: suggestion={suggestion}, label={label}, score={score}")
return ModerationResult(
passed=passed,
label=label,
confidence=score,
suggestion=suggestion,
details=response
)
except Exception as e:
logger.error(f"腾讯云审核异常: {str(e)}")
# 审核服务异常时默认放行,避免影响正常业务
return ModerationResult(passed=True, suggestion="pass",
details={"error": str(e)})
# ==========================================
# 🟠 阿里云图片审核
# ==========================================
def _aliyun_sign(access_key_secret: str, string_to_sign: str) -> str:
"""阿里云 ROA 签名"""
signature = hmac.new(
(access_key_secret + "&").encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha1
).digest()
return base64.b64encode(signature).decode("utf-8")
def _build_aliyun_signature(method: str, headers: dict, uri: str, access_key_secret: str) -> str:
"""
构建阿里云 ROA 风格签名
参考: https://help.aliyun.com/document_detail/315526.html
"""
# 1. 构建 CanonicalizedHeaders
acs_headers = {}
for key, value in headers.items():
lower_key = key.lower()
if lower_key.startswith("x-acs-"):
acs_headers[lower_key] = value
sorted_headers = sorted(acs_headers.items())
canonical_headers = "\n".join([f"{k}:{v}" for k, v in sorted_headers])
# 2. 构建 StringToSign
content_type = headers.get("Content-Type", "")
accept = headers.get("Accept", "")
string_to_sign = f"{method}\n{accept}\n\n{content_type}\n\n{canonical_headers}\n{uri}"
# 3. 计算签名
return _aliyun_sign(access_key_secret, string_to_sign)
async def moderate_image_aliyun(image_content: bytes) -> ModerationResult:
"""
阿里云图片审核
API文档: https://help.aliyun.com/document_detail/70292.html
"""
if not ALIYUN_ACCESS_KEY_ID or not ALIYUN_ACCESS_KEY_SECRET:
logger.warning("阿里云审核未配置,跳过审核")
return ModerationResult(passed=True, suggestion="pass")
# Base64 编码图片
file_content_base64 = base64.b64encode(image_content).decode("utf-8")
# 使用阿里云内容安全 API (绿网)
endpoint = f"https://green.{ALIYUN_REGION}.aliyuncs.com"
uri = "/green/image/scan"
# 构建请求体
payload = {
"scenes": ["porn", "terrorism", "ad"], # 鉴黄、暴恐、广告
"tasks": [{
"dataId": str(uuid.uuid4()),
"content": file_content_base64,
"type": "BASE64"
}]
}
payload_str = json.dumps(payload)
# 签名参数
timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
nonce = str(uuid.uuid4())
# 🔒 正确构建头部(包含签名所需字段)
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"x-acs-version": "2018-05-09",
"x-acs-signature-method": "HMAC-SHA1",
"x-acs-signature-version": "1.0",
"x-acs-signature-nonce": nonce,
"Date": timestamp,
}
# 🔒 计算签名
signature = _build_aliyun_signature("POST", headers, uri, ALIYUN_ACCESS_KEY_SECRET)
# 添加 Authorization 头部
headers["Authorization"] = f"acs {ALIYUN_ACCESS_KEY_ID}:{signature}"
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{endpoint}{uri}",
content=payload_str,
headers=headers
)
result = resp.json()
# 解析审核结果
if result.get("code") != 200:
logger.error(f"阿里云审核API错误: {result}")
return ModerationResult(passed=True, suggestion="pass",
details={"error": result})
data = result.get("data", [])
if not data:
return ModerationResult(passed=True, suggestion="pass")
task_result = data[0].get("results", [])
# 检查所有场景的审核结果
max_rate = 0
block_label = ""
overall_suggestion = "pass"
for scene_result in task_result:
scene = scene_result.get("scene", "")
suggestion = scene_result.get("suggestion", "pass")
rate = scene_result.get("rate", 0)
label = scene_result.get("label", "")
if suggestion == "block":
overall_suggestion = "block"
if rate > max_rate:
max_rate = rate
block_label = f"{scene}:{label}"
elif suggestion == "review" and overall_suggestion != "block":
overall_suggestion = "review"
if rate > max_rate:
max_rate = rate
block_label = f"{scene}:{label}"
passed = overall_suggestion == "pass"
logger.info(f"阿里云审核结果: suggestion={overall_suggestion}, label={block_label}, rate={max_rate}")
return ModerationResult(
passed=passed,
label=block_label,
confidence=max_rate * 100,
suggestion=overall_suggestion,
details=result
)
except Exception as e:
logger.error(f"阿里云审核异常: {str(e)}")
# 审核服务异常时默认放行
return ModerationResult(passed=True, suggestion="pass",
details={"error": str(e)})
# ==========================================
# 🎯 统一审核入口(智能额度管理)
# ==========================================
# 🔒 动态配置检查
SYSTEM_CONFIG_FILE = "/tmp/system_config.json"
def _is_moderation_enabled() -> bool:
"""
检查图片审核是否启用
优先检查系统配置文件,其次检查环境变量
"""
# 1️⃣ 优先检查系统配置文件(管理员动态设置)
try:
if os.path.exists(SYSTEM_CONFIG_FILE):
with open(SYSTEM_CONFIG_FILE, "r") as f:
config = json.load(f)
if "image_moderation_enabled" in config:
return config["image_moderation_enabled"] == True
except Exception as e:
logger.debug(f"读取系统配置失败: {e}")
# 2️⃣ 回退到环境变量
return MODERATION_ENABLED
async def moderate_image(image_content: bytes, file_ext: str = "") -> ModerationResult:
"""
统一图片审核入口
策略:阿里云优先(3000次/月) → 腾讯云兆底(10000次/月) → 都用完则跳过
Args:
image_content: 图片二进制内容
file_ext: 文件扩展名(用于判断是否需要审核)
Returns:
ModerationResult: 审核结果
"""
# 跳过非图片文件
if file_ext.lower() in ["json", "mp4"]:
return ModerationResult(passed=True, suggestion="pass",
details={"skipped": True, "reason": "非图片文件"})
# 检查是否启用审核(支持动态配置)
if not _is_moderation_enabled():
logger.debug("图片审核未启用")
return ModerationResult(passed=True, suggestion="pass",
details={"skipped": True, "reason": "审核未启用"})
# 💰 智能选择服务商(基于免费额度)
provider = _get_available_provider()
if provider is None:
logger.warning("本月免费审核额度已用完,跳过审核")
return ModerationResult(passed=True, suggestion="pass",
details={"skipped": True, "reason": "免费额度已用完"})
# 执行审核
if provider == "aliyun":
result = await moderate_image_aliyun(image_content)
else:
result = await moderate_image_tencent(image_content)
# 记录使用次数(只有审核成功才计数)
if "error" not in result.details:
_increment_quota(provider)
result.details["provider"] = provider
return result
def moderate_image_sync(image_content: bytes, file_ext: str = "") -> ModerationResult:
"""
同步版本的图片审核(用于非异步上下文)
"""
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(moderate_image(image_content, file_ext))