Spaces:
Running
Running
| # image_moderation.py | |
| # ========================================== | |
| # 🔒 P0安全优化:图片内容审核模块 | |
| # ========================================== | |
| # 支持腾讯云/阿里云图片审核API | |
| # 环境变量配置: | |
| # - IMAGE_MODERATION_PROVIDER: "tencent" 或 "aliyun" (默认: tencent) | |
| # - TENCENT_SECRET_ID / TENCENT_SECRET_KEY | |
| # - ALIYUN_ACCESS_KEY_ID / ALIYUN_ACCESS_KEY_SECRET | |
| # ========================================== | |
| import os | |
| import base64 | |
| import json | |
| import logging | |
| import hmac | |
| import hashlib | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from typing import Optional, Tuple | |
| import httpx | |
| logger = logging.getLogger("ComfyUI-Ranking.ImageModeration") | |
| # ========================================== | |
| # 🔧 配置 | |
| # ========================================== | |
| MODERATION_ENABLED = os.environ.get("IMAGE_MODERATION_ENABLED", "false").lower() == "true" | |
| # 腾讯云配置 | |
| TENCENT_SECRET_ID = os.environ.get("TENCENT_SECRET_ID", "") | |
| TENCENT_SECRET_KEY = os.environ.get("TENCENT_SECRET_KEY", "") | |
| TENCENT_REGION = os.environ.get("TENCENT_REGION", "ap-guangzhou") | |
| # 阿里云配置 | |
| ALIYUN_ACCESS_KEY_ID = os.environ.get("ALIYUN_ACCESS_KEY_ID", "") | |
| ALIYUN_ACCESS_KEY_SECRET = os.environ.get("ALIYUN_ACCESS_KEY_SECRET", "") | |
| ALIYUN_REGION = os.environ.get("ALIYUN_REGION", "cn-shanghai") | |
| # 💰 免费额度管理(每月重置) | |
| ALIYUN_FREE_QUOTA = 3000 # 阿里云每月免费 3000 次 | |
| TENCENT_FREE_QUOTA = 10000 # 腾讯云每月免费 10000 次 | |
| # 额度记录文件 | |
| QUOTA_FILE = "/tmp/image_moderation_quota.json" | |
| def _load_quota() -> dict: | |
| """加载当月额度使用记录""" | |
| current_month = datetime.now().strftime("%Y-%m") | |
| try: | |
| if os.path.exists(QUOTA_FILE): | |
| with open(QUOTA_FILE, "r") as f: | |
| data = json.load(f) | |
| # 检查是否是当月记录,否则重置 | |
| if data.get("month") != current_month: | |
| return {"month": current_month, "aliyun": 0, "tencent": 0} | |
| return data | |
| except Exception as e: | |
| logger.debug(f"加载额度记录失败: {e}") | |
| return {"month": current_month, "aliyun": 0, "tencent": 0} | |
| def _save_quota(quota: dict): | |
| """保存额度使用记录""" | |
| try: | |
| with open(QUOTA_FILE, "w") as f: | |
| json.dump(quota, f) | |
| except Exception as e: | |
| logger.warning(f"保存额度记录失败: {e}") | |
| def _increment_quota(provider: str): | |
| """增加使用次数""" | |
| quota = _load_quota() | |
| quota[provider] = quota.get(provider, 0) + 1 | |
| _save_quota(quota) | |
| logger.info(f"审核额度: {provider}={quota[provider]}, 阿里云剩余{ALIYUN_FREE_QUOTA - quota.get('aliyun', 0)}, 腾讯云剩余{TENCENT_FREE_QUOTA - quota.get('tencent', 0)}") | |
| def _get_available_provider() -> str: | |
| """ | |
| 获取可用的审核服务商 | |
| 策略:阿里云优先 → 腾讯云兆底 → 都用完则跳过 | |
| """ | |
| quota = _load_quota() | |
| # 1️⃣ 优先阿里云 | |
| if quota.get("aliyun", 0) < ALIYUN_FREE_QUOTA and ALIYUN_ACCESS_KEY_ID: | |
| return "aliyun" | |
| # 2️⃣ 阿里云用完,切换腾讯云 | |
| if quota.get("tencent", 0) < TENCENT_FREE_QUOTA and TENCENT_SECRET_ID: | |
| return "tencent" | |
| # 3️⃣ 两家都用完,返回 None | |
| return None | |
| # ========================================== | |
| # 📊 审核结果类型 | |
| # ========================================== | |
| class ModerationResult: | |
| """图片审核结果""" | |
| def __init__(self, passed: bool, label: str = "", confidence: float = 0.0, | |
| suggestion: str = "pass", details: dict = None): | |
| self.passed = passed # 是否通过 | |
| self.label = label # 违规标签(如 Porn, Terror, Ad) | |
| self.confidence = confidence # 置信度 0-100 | |
| self.suggestion = suggestion # pass/review/block | |
| self.details = details or {} # 详细信息 | |
| def to_dict(self): | |
| return { | |
| "passed": self.passed, | |
| "label": self.label, | |
| "confidence": self.confidence, | |
| "suggestion": self.suggestion, | |
| "details": self.details | |
| } | |
| # ========================================== | |
| # 🔵 腾讯云图片审核 | |
| # ========================================== | |
| def _tencent_sign_v3(secret_id: str, secret_key: str, host: str, | |
| payload: str, timestamp: int) -> dict: | |
| """腾讯云 TC3-HMAC-SHA256 签名""" | |
| service = "ims" | |
| date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d") | |
| # 规范请求 | |
| http_request_method = "POST" | |
| canonical_uri = "/" | |
| canonical_querystring = "" | |
| ct = "application/json; charset=utf-8" | |
| canonical_headers = f"content-type:{ct}\nhost:{host}\nx-tc-action:imagemoderationsync\n" | |
| signed_headers = "content-type;host;x-tc-action" | |
| hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest() | |
| canonical_request = (f"{http_request_method}\n{canonical_uri}\n{canonical_querystring}\n" | |
| f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}") | |
| # 待签名字符串 | |
| algorithm = "TC3-HMAC-SHA256" | |
| credential_scope = f"{date}/{service}/tc3_request" | |
| hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() | |
| string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}" | |
| # 计算签名 | |
| def sign(key, msg): | |
| return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() | |
| secret_date = sign(("TC3" + secret_key).encode("utf-8"), date) | |
| secret_service = sign(secret_date, service) | |
| secret_signing = sign(secret_service, "tc3_request") | |
| signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() | |
| # 构建 Authorization | |
| authorization = (f"{algorithm} Credential={secret_id}/{credential_scope}, " | |
| f"SignedHeaders={signed_headers}, Signature={signature}") | |
| return { | |
| "Authorization": authorization, | |
| "Content-Type": ct, | |
| "Host": host, | |
| "X-TC-Action": "ImageModerationSync", | |
| "X-TC-Timestamp": str(timestamp), | |
| "X-TC-Version": "2020-07-13", | |
| "X-TC-Region": TENCENT_REGION | |
| } | |
| async def moderate_image_tencent(image_content: bytes) -> ModerationResult: | |
| """ | |
| 腾讯云图片审核 | |
| API文档: https://cloud.tencent.com/document/product/1125/53273 | |
| """ | |
| if not TENCENT_SECRET_ID or not TENCENT_SECRET_KEY: | |
| logger.warning("腾讯云审核未配置,跳过审核") | |
| return ModerationResult(passed=True, suggestion="pass") | |
| host = "ims.tencentcloudapi.com" | |
| timestamp = int(time.time()) | |
| # Base64 编码图片 | |
| file_content_base64 = base64.b64encode(image_content).decode("utf-8") | |
| payload = json.dumps({ | |
| "BizType": "default", # 使用默认策略 | |
| "FileContent": file_content_base64, | |
| "DataId": str(uuid.uuid4()) | |
| }) | |
| headers = _tencent_sign_v3(TENCENT_SECRET_ID, TENCENT_SECRET_KEY, host, payload, timestamp) | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| resp = await client.post(f"https://{host}", content=payload, headers=headers) | |
| result = resp.json() | |
| response = result.get("Response", {}) | |
| if "Error" in response: | |
| logger.error(f"腾讯云审核API错误: {response['Error']}") | |
| return ModerationResult(passed=True, suggestion="pass", | |
| details={"error": response["Error"]}) | |
| # 解析审核结果 | |
| suggestion = response.get("Suggestion", "Pass").lower() | |
| label = response.get("Label", "") | |
| score = response.get("Score", 0) | |
| passed = suggestion == "pass" | |
| logger.info(f"腾讯云审核结果: suggestion={suggestion}, label={label}, score={score}") | |
| return ModerationResult( | |
| passed=passed, | |
| label=label, | |
| confidence=score, | |
| suggestion=suggestion, | |
| details=response | |
| ) | |
| except Exception as e: | |
| logger.error(f"腾讯云审核异常: {str(e)}") | |
| # 审核服务异常时默认放行,避免影响正常业务 | |
| return ModerationResult(passed=True, suggestion="pass", | |
| details={"error": str(e)}) | |
| # ========================================== | |
| # 🟠 阿里云图片审核 | |
| # ========================================== | |
| def _aliyun_sign(access_key_secret: str, string_to_sign: str) -> str: | |
| """阿里云 ROA 签名""" | |
| signature = hmac.new( | |
| (access_key_secret + "&").encode("utf-8"), | |
| string_to_sign.encode("utf-8"), | |
| hashlib.sha1 | |
| ).digest() | |
| return base64.b64encode(signature).decode("utf-8") | |
| def _build_aliyun_signature(method: str, headers: dict, uri: str, access_key_secret: str) -> str: | |
| """ | |
| 构建阿里云 ROA 风格签名 | |
| 参考: https://help.aliyun.com/document_detail/315526.html | |
| """ | |
| # 1. 构建 CanonicalizedHeaders | |
| acs_headers = {} | |
| for key, value in headers.items(): | |
| lower_key = key.lower() | |
| if lower_key.startswith("x-acs-"): | |
| acs_headers[lower_key] = value | |
| sorted_headers = sorted(acs_headers.items()) | |
| canonical_headers = "\n".join([f"{k}:{v}" for k, v in sorted_headers]) | |
| # 2. 构建 StringToSign | |
| content_type = headers.get("Content-Type", "") | |
| accept = headers.get("Accept", "") | |
| string_to_sign = f"{method}\n{accept}\n\n{content_type}\n\n{canonical_headers}\n{uri}" | |
| # 3. 计算签名 | |
| return _aliyun_sign(access_key_secret, string_to_sign) | |
| async def moderate_image_aliyun(image_content: bytes) -> ModerationResult: | |
| """ | |
| 阿里云图片审核 | |
| API文档: https://help.aliyun.com/document_detail/70292.html | |
| """ | |
| if not ALIYUN_ACCESS_KEY_ID or not ALIYUN_ACCESS_KEY_SECRET: | |
| logger.warning("阿里云审核未配置,跳过审核") | |
| return ModerationResult(passed=True, suggestion="pass") | |
| # Base64 编码图片 | |
| file_content_base64 = base64.b64encode(image_content).decode("utf-8") | |
| # 使用阿里云内容安全 API (绿网) | |
| endpoint = f"https://green.{ALIYUN_REGION}.aliyuncs.com" | |
| uri = "/green/image/scan" | |
| # 构建请求体 | |
| payload = { | |
| "scenes": ["porn", "terrorism", "ad"], # 鉴黄、暴恐、广告 | |
| "tasks": [{ | |
| "dataId": str(uuid.uuid4()), | |
| "content": file_content_base64, | |
| "type": "BASE64" | |
| }] | |
| } | |
| payload_str = json.dumps(payload) | |
| # 签名参数 | |
| timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") | |
| nonce = str(uuid.uuid4()) | |
| # 🔒 正确构建头部(包含签名所需字段) | |
| headers = { | |
| "Accept": "application/json", | |
| "Content-Type": "application/json", | |
| "x-acs-version": "2018-05-09", | |
| "x-acs-signature-method": "HMAC-SHA1", | |
| "x-acs-signature-version": "1.0", | |
| "x-acs-signature-nonce": nonce, | |
| "Date": timestamp, | |
| } | |
| # 🔒 计算签名 | |
| signature = _build_aliyun_signature("POST", headers, uri, ALIYUN_ACCESS_KEY_SECRET) | |
| # 添加 Authorization 头部 | |
| headers["Authorization"] = f"acs {ALIYUN_ACCESS_KEY_ID}:{signature}" | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| resp = await client.post( | |
| f"{endpoint}{uri}", | |
| content=payload_str, | |
| headers=headers | |
| ) | |
| result = resp.json() | |
| # 解析审核结果 | |
| if result.get("code") != 200: | |
| logger.error(f"阿里云审核API错误: {result}") | |
| return ModerationResult(passed=True, suggestion="pass", | |
| details={"error": result}) | |
| data = result.get("data", []) | |
| if not data: | |
| return ModerationResult(passed=True, suggestion="pass") | |
| task_result = data[0].get("results", []) | |
| # 检查所有场景的审核结果 | |
| max_rate = 0 | |
| block_label = "" | |
| overall_suggestion = "pass" | |
| for scene_result in task_result: | |
| scene = scene_result.get("scene", "") | |
| suggestion = scene_result.get("suggestion", "pass") | |
| rate = scene_result.get("rate", 0) | |
| label = scene_result.get("label", "") | |
| if suggestion == "block": | |
| overall_suggestion = "block" | |
| if rate > max_rate: | |
| max_rate = rate | |
| block_label = f"{scene}:{label}" | |
| elif suggestion == "review" and overall_suggestion != "block": | |
| overall_suggestion = "review" | |
| if rate > max_rate: | |
| max_rate = rate | |
| block_label = f"{scene}:{label}" | |
| passed = overall_suggestion == "pass" | |
| logger.info(f"阿里云审核结果: suggestion={overall_suggestion}, label={block_label}, rate={max_rate}") | |
| return ModerationResult( | |
| passed=passed, | |
| label=block_label, | |
| confidence=max_rate * 100, | |
| suggestion=overall_suggestion, | |
| details=result | |
| ) | |
| except Exception as e: | |
| logger.error(f"阿里云审核异常: {str(e)}") | |
| # 审核服务异常时默认放行 | |
| return ModerationResult(passed=True, suggestion="pass", | |
| details={"error": str(e)}) | |
| # ========================================== | |
| # 🎯 统一审核入口(智能额度管理) | |
| # ========================================== | |
| # 🔒 动态配置检查 | |
| SYSTEM_CONFIG_FILE = "/tmp/system_config.json" | |
| def _is_moderation_enabled() -> bool: | |
| """ | |
| 检查图片审核是否启用 | |
| 优先检查系统配置文件,其次检查环境变量 | |
| """ | |
| # 1️⃣ 优先检查系统配置文件(管理员动态设置) | |
| try: | |
| if os.path.exists(SYSTEM_CONFIG_FILE): | |
| with open(SYSTEM_CONFIG_FILE, "r") as f: | |
| config = json.load(f) | |
| if "image_moderation_enabled" in config: | |
| return config["image_moderation_enabled"] == True | |
| except Exception as e: | |
| logger.debug(f"读取系统配置失败: {e}") | |
| # 2️⃣ 回退到环境变量 | |
| return MODERATION_ENABLED | |
| async def moderate_image(image_content: bytes, file_ext: str = "") -> ModerationResult: | |
| """ | |
| 统一图片审核入口 | |
| 策略:阿里云优先(3000次/月) → 腾讯云兆底(10000次/月) → 都用完则跳过 | |
| Args: | |
| image_content: 图片二进制内容 | |
| file_ext: 文件扩展名(用于判断是否需要审核) | |
| Returns: | |
| ModerationResult: 审核结果 | |
| """ | |
| # 跳过非图片文件 | |
| if file_ext.lower() in ["json", "mp4"]: | |
| return ModerationResult(passed=True, suggestion="pass", | |
| details={"skipped": True, "reason": "非图片文件"}) | |
| # 检查是否启用审核(支持动态配置) | |
| if not _is_moderation_enabled(): | |
| logger.debug("图片审核未启用") | |
| return ModerationResult(passed=True, suggestion="pass", | |
| details={"skipped": True, "reason": "审核未启用"}) | |
| # 💰 智能选择服务商(基于免费额度) | |
| provider = _get_available_provider() | |
| if provider is None: | |
| logger.warning("本月免费审核额度已用完,跳过审核") | |
| return ModerationResult(passed=True, suggestion="pass", | |
| details={"skipped": True, "reason": "免费额度已用完"}) | |
| # 执行审核 | |
| if provider == "aliyun": | |
| result = await moderate_image_aliyun(image_content) | |
| else: | |
| result = await moderate_image_tencent(image_content) | |
| # 记录使用次数(只有审核成功才计数) | |
| if "error" not in result.details: | |
| _increment_quota(provider) | |
| result.details["provider"] = provider | |
| return result | |
| def moderate_image_sync(image_content: bytes, file_ext: str = "") -> ModerationResult: | |
| """ | |
| 同步版本的图片审核(用于非异步上下文) | |
| """ | |
| import asyncio | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| return loop.run_until_complete(moderate_image(image_content, file_ext)) | |