import asyncio import base64 import json import os import re import sys import shutil from datetime import datetime, timedelta from urllib.parse import urlencode, urlparse, urlunparse, parse_qsl import requests # 设置标准输出编码为UTF-8,解决Windows控制台编码问题 if sys.platform.startswith('win'): import codecs sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach()) sys.stderr = codecs.getwriter('utf-8')(sys.stderr.detach()) from src.config import ( AI_DEBUG_MODE, IMAGE_DOWNLOAD_HEADERS, IMAGE_SAVE_DIR, TASK_IMAGE_DIR_PREFIX, MODEL_NAME, NTFY_TOPIC_URL, GOTIFY_URL, GOTIFY_TOKEN, BARK_URL, PCURL_TO_MOBILE, WX_BOT_URL, TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, WEBHOOK_URL, WEBHOOK_METHOD, WEBHOOK_HEADERS, WEBHOOK_CONTENT_TYPE, WEBHOOK_QUERY_PARAMETERS, WEBHOOK_BODY, ENABLE_RESPONSE_FORMAT, client, ) from src.utils import convert_goofish_link, retry_on_failure def safe_print(text): """安全的打印函数,处理编码错误""" try: print(text) except UnicodeEncodeError: # 如果遇到编码错误,尝试用ASCII编码并忽略无法编码的字符 try: print(text.encode('ascii', errors='ignore').decode('ascii')) except: # 如果还是失败,打印一个简化的消息 print("[输出包含无法显示的字符]") @retry_on_failure(retries=2, delay=3) async def _download_single_image(url, save_path): """一个带重试的内部函数,用于异步下载单个图片。""" loop = asyncio.get_running_loop() # 使用 run_in_executor 运行同步的 requests 代码,避免阻塞事件循环 response = await loop.run_in_executor( None, lambda: requests.get(url, headers=IMAGE_DOWNLOAD_HEADERS, timeout=20, stream=True) ) response.raise_for_status() with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return save_path async def download_all_images(product_id, image_urls, task_name="default"): """异步下载一个商品的所有图片。如果图片已存在则跳过。支持任务隔离。""" if not image_urls: return [] # 为每个任务创建独立的图片目录 task_image_dir = os.path.join(IMAGE_SAVE_DIR, f"{TASK_IMAGE_DIR_PREFIX}{task_name}") os.makedirs(task_image_dir, exist_ok=True) urls = [url.strip() for url in image_urls if url.strip().startswith('http')] if not urls: return [] saved_paths = [] total_images = len(urls) for i, url in enumerate(urls): try: clean_url = url.split('.heic')[0] if '.heic' in url else url file_name_base = os.path.basename(clean_url).split('?')[0] file_name = f"product_{product_id}_{i + 1}_{file_name_base}" file_name = re.sub(r'[\\/*?:"<>|]', "", file_name) if not os.path.splitext(file_name)[1]: file_name += ".jpg" save_path = os.path.join(task_image_dir, file_name) if os.path.exists(save_path): safe_print(f" [图片] 图片 {i + 1}/{total_images} 已存在,跳过下载: {os.path.basename(save_path)}") saved_paths.append(save_path) continue safe_print(f" [图片] 正在下载图片 {i + 1}/{total_images}: {url}") if await _download_single_image(url, save_path): safe_print(f" [图片] 图片 {i + 1}/{total_images} 已成功下载到: {os.path.basename(save_path)}") saved_paths.append(save_path) except Exception as e: safe_print(f" [图片] 处理图片 {url} 时发生错误,已跳过此图: {e}") return saved_paths def cleanup_task_images(task_name): """清理指定任务的图片目录""" task_image_dir = os.path.join(IMAGE_SAVE_DIR, f"{TASK_IMAGE_DIR_PREFIX}{task_name}") if os.path.exists(task_image_dir): try: shutil.rmtree(task_image_dir) safe_print(f" [清理] 已删除任务 '{task_name}' 的临时图片目录: {task_image_dir}") except Exception as e: safe_print(f" [清理] 删除任务 '{task_name}' 的临时图片目录时出错: {e}") else: safe_print(f" [清理] 任务 '{task_name}' 的临时图片目录不存在: {task_image_dir}") def cleanup_ai_logs(logs_dir: str, keep_days: int = 1) -> None: try: cutoff = datetime.now() - timedelta(days=keep_days) for filename in os.listdir(logs_dir): if not filename.endswith(".log"): continue try: timestamp = datetime.strptime(filename[:15], "%Y%m%d_%H%M%S") except ValueError: continue if timestamp < cutoff: os.remove(os.path.join(logs_dir, filename)) except Exception as e: safe_print(f" [日志] 清理AI日志时出错: {e}") def encode_image_to_base64(image_path): """将本地图片文件编码为 Base64 字符串。""" if not image_path or not os.path.exists(image_path): return None try: with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') except Exception as e: safe_print(f"编码图片时出错: {e}") return None def validate_ai_response_format(parsed_response): """验证AI响应的格式是否符合预期结构""" required_fields = [ "prompt_version", "is_recommended", "reason", "risk_tags", "criteria_analysis" ] # 检查顶层字段 for field in required_fields: if field not in parsed_response: safe_print(f" [AI分析] 警告:响应缺少必需字段 '{field}'") return False # 检查criteria_analysis是否为字典且不为空 criteria_analysis = parsed_response.get("criteria_analysis", {}) if not isinstance(criteria_analysis, dict) or not criteria_analysis: safe_print(" [AI分析] 警告:criteria_analysis必须是非空字典") return False # 检查seller_type字段(所有商品都需要) if "seller_type" not in criteria_analysis: safe_print(" [AI分析] 警告:criteria_analysis缺少必需字段 'seller_type'") return False # 检查数据类型 if not isinstance(parsed_response.get("is_recommended"), bool): safe_print(" [AI分析] 警告:is_recommended字段不是布尔类型") return False if not isinstance(parsed_response.get("risk_tags"), list): safe_print(" [AI分析] 警告:risk_tags字段不是列表类型") return False return True @retry_on_failure(retries=3, delay=5) async def send_ntfy_notification(product_data, reason): """当发现推荐商品时,异步发送一个高优先级的 ntfy.sh 通知。""" if not NTFY_TOPIC_URL and not WX_BOT_URL and not (GOTIFY_URL and GOTIFY_TOKEN) and not BARK_URL and not (TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID) and not WEBHOOK_URL: safe_print("警告:未在 .env 文件中配置任何通知服务 (NTFY_TOPIC_URL, WX_BOT_URL, GOTIFY_URL/TOKEN, BARK_URL, TELEGRAM_BOT_TOKEN/CHAT_ID, WEBHOOK_URL),跳过通知。") return title = product_data.get('商品标题', 'N/A') price = product_data.get('当前售价', 'N/A') link = product_data.get('商品链接', '#') if PCURL_TO_MOBILE: mobile_link = convert_goofish_link(link) message = f"价格: {price}\n原因: {reason}\n手机端链接: {mobile_link}\n电脑端链接: {link}" else: message = f"价格: {price}\n原因: {reason}\n链接: {link}" notification_title = f"🚨 新推荐! {title[:30]}..." # --- 发送 ntfy 通知 --- if NTFY_TOPIC_URL: try: safe_print(f" -> 正在发送 ntfy 通知到: {NTFY_TOPIC_URL}") loop = asyncio.get_running_loop() await loop.run_in_executor( None, lambda: requests.post( NTFY_TOPIC_URL, data=message.encode('utf-8'), headers={ "Title": notification_title.encode('utf-8'), "Priority": "urgent", "Tags": "bell,vibration" }, timeout=10 ) ) safe_print(" -> ntfy 通知发送成功。") except Exception as e: safe_print(f" -> 发送 ntfy 通知失败: {e}") # --- 发送 Gotify 通知 --- if GOTIFY_URL and GOTIFY_TOKEN: try: safe_print(f" -> 正在发送 Gotify 通知到: {GOTIFY_URL}") # Gotify uses multipart/form-data payload = { 'title': (None, notification_title), 'message': (None, message), 'priority': (None, '5') } gotify_url_with_token = f"{GOTIFY_URL}/message?token={GOTIFY_TOKEN}" loop = asyncio.get_running_loop() response = await loop.run_in_executor( None, lambda: requests.post( gotify_url_with_token, files=payload, timeout=10 ) ) response.raise_for_status() safe_print(" -> Gotify 通知发送成功。") except requests.exceptions.RequestException as e: safe_print(f" -> 发送 Gotify 通知失败: {e}") except Exception as e: safe_print(f" -> 发送 Gotify 通知时发生未知错误: {e}") # --- 发送 Bark 通知 --- if BARK_URL: try: safe_print(f" -> 正在发送 Bark 通知...") bark_payload = { "title": notification_title, "body": message, "level": "timeSensitive", "group": "闲鱼监控" } link_to_use = convert_goofish_link(link) if PCURL_TO_MOBILE else link bark_payload["url"] = link_to_use # Add icon if available main_image = product_data.get('商品主图链接') if not main_image: # Fallback to image list if main image not present image_list = product_data.get('商品图片列表', []) if image_list: main_image = image_list[0] if main_image: bark_payload['icon'] = main_image headers = { "Content-Type": "application/json; charset=utf-8" } loop = asyncio.get_running_loop() response = await loop.run_in_executor( None, lambda: requests.post( BARK_URL, json=bark_payload, headers=headers, timeout=10 ) ) response.raise_for_status() safe_print(" -> Bark 通知发送成功。") except requests.exceptions.RequestException as e: safe_print(f" -> 发送 Bark 通知失败: {e}") except Exception as e: safe_print(f" -> 发送 Bark 通知时发生未知错误: {e}") # --- 发送企业微信机器人通知 --- if WX_BOT_URL: # 将消息转换为Markdown格式,使链接可点击 lines = message.split('\n') markdown_content = f"## {notification_title}\n\n" for line in lines: if line.startswith('手机端链接:') or line.startswith('电脑端链接:') or line.startswith('链接:'): # 提取链接部分并转换为Markdown超链接 if ':' in line: label, url = line.split(':', 1) url = url.strip() if url and url != '#': markdown_content += f"- **{label}:** [{url}]({url})\n" else: markdown_content += f"- **{label}:** 暂无链接\n" else: markdown_content += f"- {line}\n" else: # 其他行保持原样 if line: markdown_content += f"- {line}\n" else: markdown_content += "\n" payload = { "msgtype": "markdown", "markdown": { "content": markdown_content } } try: safe_print(f" -> 正在发送企业微信通知到: {WX_BOT_URL}") headers = { "Content-Type": "application/json" } loop = asyncio.get_running_loop() response = await loop.run_in_executor( None, lambda: requests.post( WX_BOT_URL, json=payload, headers=headers, timeout=10 ) ) response.raise_for_status() result = response.json() safe_print(f" -> 企业微信通知发送成功。响应: {result}") except requests.exceptions.RequestException as e: safe_print(f" -> 发送企业微信通知失败: {e}") except Exception as e: safe_print(f" -> 发送企业微信通知时发生未知错误: {e}") # --- 发送 Telegram 机器人通知 --- if TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID: try: safe_print(f" -> 正在发送 Telegram 通知...") # 构建 Telegram API URL telegram_api_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" # 格式化消息内容 telegram_message = f"🚨 新推荐!\n\n" telegram_message += f"{title[:50]}...\n\n" telegram_message += f"💰 价格: {price}\n" telegram_message += f"📝 原因: {reason}\n" # 添加链接 if PCURL_TO_MOBILE: mobile_link = convert_goofish_link(link) telegram_message += f"📱 手机端链接\n" telegram_message += f"💻 电脑端链接" # 构建请求负载 telegram_payload = { "chat_id": TELEGRAM_CHAT_ID, "text": telegram_message, "parse_mode": "HTML", "disable_web_page_preview": False } headers = {"Content-Type": "application/json"} loop = asyncio.get_running_loop() response = await loop.run_in_executor( None, lambda: requests.post( telegram_api_url, json=telegram_payload, headers=headers, timeout=10 ) ) response.raise_for_status() result = response.json() if result.get("ok"): safe_print(" -> Telegram 通知发送成功。") else: safe_print(f" -> Telegram 通知发送失败: {result.get('description', '未知错误')}") except requests.exceptions.RequestException as e: safe_print(f" -> 发送 Telegram 通知失败: {e}") except Exception as e: safe_print(f" -> 发送 Telegram 通知时发生未知错误: {e}") # --- 发送通用 Webhook 通知 --- if WEBHOOK_URL: try: safe_print(f" -> 正在发送通用 Webhook 通知到: {WEBHOOK_URL}") # 替换占位符 def replace_placeholders(template_str): if not template_str: return "" # 对内容进行JSON转义,避免换行符和特殊字符破坏JSON格式 safe_title = json.dumps(notification_title, ensure_ascii=False)[1:-1] # 去掉外层引号 safe_content = json.dumps(message, ensure_ascii=False)[1:-1] # 去掉外层引号 # 同时支持旧的${title}${content}和新的{{title}}{{content}}格式 return template_str.replace("${title}", safe_title).replace("${content}", safe_content).replace("{{title}}", safe_title).replace("{{content}}", safe_content) # 准备请求头 headers = {} if WEBHOOK_HEADERS: try: headers = json.loads(WEBHOOK_HEADERS) except json.JSONDecodeError: safe_print(f" -> [警告] Webhook 请求头格式错误,请检查 .env 中的 WEBHOOK_HEADERS。") loop = asyncio.get_running_loop() if WEBHOOK_METHOD == "GET": # 准备查询参数 final_url = WEBHOOK_URL if WEBHOOK_QUERY_PARAMETERS: try: params_str = replace_placeholders(WEBHOOK_QUERY_PARAMETERS) params = json.loads(params_str) # 解析原始URL并追加新参数 url_parts = list(urlparse(final_url)) query = dict(parse_qsl(url_parts[4])) query.update(params) url_parts[4] = urlencode(query) final_url = urlunparse(url_parts) except json.JSONDecodeError: safe_print(f" -> [警告] Webhook 查询参数格式错误,请检查 .env 中的 WEBHOOK_QUERY_PARAMETERS。") response = await loop.run_in_executor( None, lambda: requests.get(final_url, headers=headers, timeout=15) ) elif WEBHOOK_METHOD == "POST": # 准备URL(处理查询参数) final_url = WEBHOOK_URL if WEBHOOK_QUERY_PARAMETERS: try: params_str = replace_placeholders(WEBHOOK_QUERY_PARAMETERS) params = json.loads(params_str) # 解析原始URL并追加新参数 url_parts = list(urlparse(final_url)) query = dict(parse_qsl(url_parts[4])) query.update(params) url_parts[4] = urlencode(query) final_url = urlunparse(url_parts) except json.JSONDecodeError: safe_print(f" -> [警告] Webhook 查询参数格式错误,请检查 .env 中的 WEBHOOK_QUERY_PARAMETERS。") # 准备请求体 data = None json_payload = None if WEBHOOK_BODY: body_str = replace_placeholders(WEBHOOK_BODY) try: if WEBHOOK_CONTENT_TYPE == "JSON": json_payload = json.loads(body_str) if 'Content-Type' not in headers and 'content-type' not in headers: headers['Content-Type'] = 'application/json; charset=utf-8' elif WEBHOOK_CONTENT_TYPE == "FORM": data = json.loads(body_str) # requests会处理url-encoding if 'Content-Type' not in headers and 'content-type' not in headers: headers['Content-Type'] = 'application/x-www-form-urlencoded' else: safe_print(f" -> [警告] 不支持的 WEBHOOK_CONTENT_TYPE: {WEBHOOK_CONTENT_TYPE}。") except json.JSONDecodeError: safe_print(f" -> [警告] Webhook 请求体格式错误,请检查 .env 中的 WEBHOOK_BODY。") response = await loop.run_in_executor( None, lambda: requests.post(final_url, headers=headers, json=json_payload, data=data, timeout=15) ) else: safe_print(f" -> [警告] 不支持的 WEBHOOK_METHOD: {WEBHOOK_METHOD}。") return response.raise_for_status() safe_print(f" -> Webhook 通知发送成功。状态码: {response.status_code}") except requests.exceptions.RequestException as e: safe_print(f" -> 发送 Webhook 通知失败: {e}") except Exception as e: safe_print(f" -> 发送 Webhook 通知时发生未知错误: {e}") @retry_on_failure(retries=3, delay=5) async def get_ai_analysis(product_data, image_paths=None, prompt_text=""): """将完整的商品JSON数据和所有图片发送给 AI 进行分析(异步)。""" if not client: safe_print(" [AI分析] 错误:AI客户端未初始化,跳过分析。") return None item_info = product_data.get('商品信息', {}) product_id = item_info.get('商品ID', 'N/A') safe_print(f"\n [AI分析] 开始分析商品 #{product_id} (含 {len(image_paths or [])} 张图片)...") safe_print(f" [AI分析] 标题: {item_info.get('商品标题', '无')}") if not prompt_text: safe_print(" [AI分析] 错误:未提供AI分析所需的prompt文本。") return None product_details_json = json.dumps(product_data, ensure_ascii=False, indent=2) system_prompt = prompt_text if AI_DEBUG_MODE: safe_print("\n--- [AI DEBUG] ---") safe_print("--- PRODUCT DATA (JSON) ---") safe_print(product_details_json) safe_print("--- PROMPT TEXT (完整内容) ---") safe_print(prompt_text) safe_print("-------------------\n") combined_text_prompt = f"""请基于你的专业知识和我的要求,分析以下完整的商品JSON数据: ```json {product_details_json} ``` {system_prompt} """ user_content_list = [] # 先添加图片内容 if image_paths: for path in image_paths: base64_image = encode_image_to_base64(path) if base64_image: user_content_list.append( {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}) # 再添加文本内容 user_content_list.append({"type": "text", "text": combined_text_prompt}) messages = [{"role": "user", "content": user_content_list}] # 保存最终传输内容到日志文件 try: # 创建logs文件夹 logs_dir = os.path.join("logs", "ai") os.makedirs(logs_dir, exist_ok=True) cleanup_ai_logs(logs_dir, keep_days=1) # 生成日志文件名(当前时间) current_time = datetime.now().strftime("%Y%m%d_%H%M%S") log_filename = f"{current_time}.log" log_filepath = os.path.join(logs_dir, log_filename) task_name = product_data.get("任务名称") or product_data.get("任务名") or "unknown" log_payload = { "timestamp": current_time, "task_name": task_name, "product_id": product_id, "title": item_info.get("商品标题", "无"), "image_count": len(image_paths or []), } log_content = json.dumps(log_payload, ensure_ascii=False) # 写入日志文件 with open(log_filepath, 'w', encoding='utf-8') as f: f.write(log_content) safe_print(f" [日志] AI分析请求已保存到: {log_filepath}") except Exception as e: safe_print(f" [日志] 保存AI分析日志时出错: {e}") # 增强的AI调用,包含更严格的格式控制和重试机制 max_retries = 3 for attempt in range(max_retries): try: # 根据重试次数调整参数 current_temperature = 0.1 if attempt == 0 else 0.05 # 重试时使用更低的温度 from src.config import get_ai_request_params # 构建请求参数,根据ENABLE_RESPONSE_FORMAT决定是否使用response_format request_params = { "model": MODEL_NAME, "messages": messages, "temperature": current_temperature, "max_tokens": 4000 } # 只有启用response_format时才添加该参数 if ENABLE_RESPONSE_FORMAT: request_params["response_format"] = {"type": "json_object"} response = await client.chat.completions.create( **get_ai_request_params(**request_params) ) # 兼容不同API响应格式,检查response是否为字符串 if hasattr(response, 'choices'): ai_response_content = response.choices[0].message.content else: # 如果response是字符串,则直接使用 ai_response_content = response if AI_DEBUG_MODE: safe_print(f"\n--- [AI DEBUG] 第{attempt + 1}次尝试 ---") safe_print("--- RAW AI RESPONSE ---") safe_print(ai_response_content) safe_print("---------------------\n") # 尝试直接解析JSON try: parsed_response = json.loads(ai_response_content) # 验证响应格式 if validate_ai_response_format(parsed_response): safe_print(f" [AI分析] 第{attempt + 1}次尝试成功,响应格式验证通过") return parsed_response else: safe_print(f" [AI分析] 第{attempt + 1}次尝试格式验证失败") if attempt < max_retries - 1: safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") continue else: safe_print(" [AI分析] 所有重试完成,使用最后一次结果") return parsed_response except json.JSONDecodeError: safe_print(f" [AI分析] 第{attempt + 1}次尝试JSON解析失败,尝试清理响应内容...") # 清理可能的Markdown代码块标记 cleaned_content = ai_response_content.strip() if cleaned_content.startswith('```json'): cleaned_content = cleaned_content[7:] if cleaned_content.startswith('```'): cleaned_content = cleaned_content[3:] if cleaned_content.endswith('```'): cleaned_content = cleaned_content[:-3] cleaned_content = cleaned_content.strip() # 寻找JSON对象边界 json_start_index = cleaned_content.find('{') json_end_index = cleaned_content.rfind('}') if json_start_index != -1 and json_end_index != -1 and json_end_index > json_start_index: json_str = cleaned_content[json_start_index:json_end_index + 1] try: parsed_response = json.loads(json_str) if validate_ai_response_format(parsed_response): safe_print(f" [AI分析] 第{attempt + 1}次尝试清理后成功") return parsed_response else: if attempt < max_retries - 1: safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") continue else: safe_print(" [AI分析] 所有重试完成,使用清理后的结果") return parsed_response except json.JSONDecodeError as e: safe_print(f" [AI分析] 第{attempt + 1}次尝试清理后JSON解析仍然失败: {e}") if attempt < max_retries - 1: safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") continue else: raise e else: safe_print(f" [AI分析] 第{attempt + 1}次尝试无法在响应中找到有效的JSON对象") if attempt < max_retries - 1: safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") continue else: raise json.JSONDecodeError("No valid JSON object found", ai_response_content, 0) except Exception as e: safe_print(f" [AI分析] 第{attempt + 1}次尝试AI调用失败: {e}") if attempt < max_retries - 1: safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") continue else: raise e