|
|
import asyncio |
|
|
import base64 |
|
|
import json |
|
|
import os |
|
|
import re |
|
|
import sys |
|
|
import shutil |
|
|
from datetime import datetime, timedelta |
|
|
from urllib.parse import urlencode, urlparse, urlunparse, parse_qsl |
|
|
|
|
|
import requests |
|
|
|
|
|
|
|
|
if sys.platform.startswith('win'): |
|
|
import codecs |
|
|
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach()) |
|
|
sys.stderr = codecs.getwriter('utf-8')(sys.stderr.detach()) |
|
|
|
|
|
from src.config import ( |
|
|
AI_DEBUG_MODE, |
|
|
IMAGE_DOWNLOAD_HEADERS, |
|
|
IMAGE_SAVE_DIR, |
|
|
TASK_IMAGE_DIR_PREFIX, |
|
|
MODEL_NAME, |
|
|
NTFY_TOPIC_URL, |
|
|
GOTIFY_URL, |
|
|
GOTIFY_TOKEN, |
|
|
BARK_URL, |
|
|
PCURL_TO_MOBILE, |
|
|
WX_BOT_URL, |
|
|
TELEGRAM_BOT_TOKEN, |
|
|
TELEGRAM_CHAT_ID, |
|
|
WEBHOOK_URL, |
|
|
WEBHOOK_METHOD, |
|
|
WEBHOOK_HEADERS, |
|
|
WEBHOOK_CONTENT_TYPE, |
|
|
WEBHOOK_QUERY_PARAMETERS, |
|
|
WEBHOOK_BODY, |
|
|
ENABLE_RESPONSE_FORMAT, |
|
|
client, |
|
|
) |
|
|
from src.utils import convert_goofish_link, retry_on_failure |
|
|
|
|
|
|
|
|
def safe_print(text): |
|
|
"""安全的打印函数,处理编码错误""" |
|
|
try: |
|
|
print(text) |
|
|
except UnicodeEncodeError: |
|
|
|
|
|
try: |
|
|
print(text.encode('ascii', errors='ignore').decode('ascii')) |
|
|
except: |
|
|
|
|
|
print("[输出包含无法显示的字符]") |
|
|
|
|
|
|
|
|
@retry_on_failure(retries=2, delay=3) |
|
|
async def _download_single_image(url, save_path): |
|
|
"""一个带重试的内部函数,用于异步下载单个图片。""" |
|
|
loop = asyncio.get_running_loop() |
|
|
|
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: requests.get(url, headers=IMAGE_DOWNLOAD_HEADERS, timeout=20, stream=True) |
|
|
) |
|
|
response.raise_for_status() |
|
|
with open(save_path, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
return save_path |
|
|
|
|
|
|
|
|
async def download_all_images(product_id, image_urls, task_name="default"): |
|
|
"""异步下载一个商品的所有图片。如果图片已存在则跳过。支持任务隔离。""" |
|
|
if not image_urls: |
|
|
return [] |
|
|
|
|
|
|
|
|
task_image_dir = os.path.join(IMAGE_SAVE_DIR, f"{TASK_IMAGE_DIR_PREFIX}{task_name}") |
|
|
os.makedirs(task_image_dir, exist_ok=True) |
|
|
|
|
|
urls = [url.strip() for url in image_urls if url.strip().startswith('http')] |
|
|
if not urls: |
|
|
return [] |
|
|
|
|
|
saved_paths = [] |
|
|
total_images = len(urls) |
|
|
for i, url in enumerate(urls): |
|
|
try: |
|
|
clean_url = url.split('.heic')[0] if '.heic' in url else url |
|
|
file_name_base = os.path.basename(clean_url).split('?')[0] |
|
|
file_name = f"product_{product_id}_{i + 1}_{file_name_base}" |
|
|
file_name = re.sub(r'[\\/*?:"<>|]', "", file_name) |
|
|
if not os.path.splitext(file_name)[1]: |
|
|
file_name += ".jpg" |
|
|
|
|
|
save_path = os.path.join(task_image_dir, file_name) |
|
|
|
|
|
if os.path.exists(save_path): |
|
|
safe_print(f" [图片] 图片 {i + 1}/{total_images} 已存在,跳过下载: {os.path.basename(save_path)}") |
|
|
saved_paths.append(save_path) |
|
|
continue |
|
|
|
|
|
safe_print(f" [图片] 正在下载图片 {i + 1}/{total_images}: {url}") |
|
|
if await _download_single_image(url, save_path): |
|
|
safe_print(f" [图片] 图片 {i + 1}/{total_images} 已成功下载到: {os.path.basename(save_path)}") |
|
|
saved_paths.append(save_path) |
|
|
except Exception as e: |
|
|
safe_print(f" [图片] 处理图片 {url} 时发生错误,已跳过此图: {e}") |
|
|
|
|
|
return saved_paths |
|
|
|
|
|
|
|
|
def cleanup_task_images(task_name): |
|
|
"""清理指定任务的图片目录""" |
|
|
task_image_dir = os.path.join(IMAGE_SAVE_DIR, f"{TASK_IMAGE_DIR_PREFIX}{task_name}") |
|
|
if os.path.exists(task_image_dir): |
|
|
try: |
|
|
shutil.rmtree(task_image_dir) |
|
|
safe_print(f" [清理] 已删除任务 '{task_name}' 的临时图片目录: {task_image_dir}") |
|
|
except Exception as e: |
|
|
safe_print(f" [清理] 删除任务 '{task_name}' 的临时图片目录时出错: {e}") |
|
|
else: |
|
|
safe_print(f" [清理] 任务 '{task_name}' 的临时图片目录不存在: {task_image_dir}") |
|
|
|
|
|
|
|
|
def cleanup_ai_logs(logs_dir: str, keep_days: int = 1) -> None: |
|
|
try: |
|
|
cutoff = datetime.now() - timedelta(days=keep_days) |
|
|
for filename in os.listdir(logs_dir): |
|
|
if not filename.endswith(".log"): |
|
|
continue |
|
|
try: |
|
|
timestamp = datetime.strptime(filename[:15], "%Y%m%d_%H%M%S") |
|
|
except ValueError: |
|
|
continue |
|
|
if timestamp < cutoff: |
|
|
os.remove(os.path.join(logs_dir, filename)) |
|
|
except Exception as e: |
|
|
safe_print(f" [日志] 清理AI日志时出错: {e}") |
|
|
|
|
|
|
|
|
def encode_image_to_base64(image_path): |
|
|
"""将本地图片文件编码为 Base64 字符串。""" |
|
|
if not image_path or not os.path.exists(image_path): |
|
|
return None |
|
|
try: |
|
|
with open(image_path, "rb") as image_file: |
|
|
return base64.b64encode(image_file.read()).decode('utf-8') |
|
|
except Exception as e: |
|
|
safe_print(f"编码图片时出错: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def validate_ai_response_format(parsed_response): |
|
|
"""验证AI响应的格式是否符合预期结构""" |
|
|
required_fields = [ |
|
|
"prompt_version", |
|
|
"is_recommended", |
|
|
"reason", |
|
|
"risk_tags", |
|
|
"criteria_analysis" |
|
|
] |
|
|
|
|
|
|
|
|
for field in required_fields: |
|
|
if field not in parsed_response: |
|
|
safe_print(f" [AI分析] 警告:响应缺少必需字段 '{field}'") |
|
|
return False |
|
|
|
|
|
|
|
|
criteria_analysis = parsed_response.get("criteria_analysis", {}) |
|
|
if not isinstance(criteria_analysis, dict) or not criteria_analysis: |
|
|
safe_print(" [AI分析] 警告:criteria_analysis必须是非空字典") |
|
|
return False |
|
|
|
|
|
|
|
|
if "seller_type" not in criteria_analysis: |
|
|
safe_print(" [AI分析] 警告:criteria_analysis缺少必需字段 'seller_type'") |
|
|
return False |
|
|
|
|
|
|
|
|
if not isinstance(parsed_response.get("is_recommended"), bool): |
|
|
safe_print(" [AI分析] 警告:is_recommended字段不是布尔类型") |
|
|
return False |
|
|
|
|
|
if not isinstance(parsed_response.get("risk_tags"), list): |
|
|
safe_print(" [AI分析] 警告:risk_tags字段不是列表类型") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
@retry_on_failure(retries=3, delay=5) |
|
|
async def send_ntfy_notification(product_data, reason): |
|
|
"""当发现推荐商品时,异步发送一个高优先级的 ntfy.sh 通知。""" |
|
|
if not NTFY_TOPIC_URL and not WX_BOT_URL and not (GOTIFY_URL and GOTIFY_TOKEN) and not BARK_URL and not (TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID) and not WEBHOOK_URL: |
|
|
safe_print("警告:未在 .env 文件中配置任何通知服务 (NTFY_TOPIC_URL, WX_BOT_URL, GOTIFY_URL/TOKEN, BARK_URL, TELEGRAM_BOT_TOKEN/CHAT_ID, WEBHOOK_URL),跳过通知。") |
|
|
return |
|
|
|
|
|
title = product_data.get('商品标题', 'N/A') |
|
|
price = product_data.get('当前售价', 'N/A') |
|
|
link = product_data.get('商品链接', '#') |
|
|
if PCURL_TO_MOBILE: |
|
|
mobile_link = convert_goofish_link(link) |
|
|
message = f"价格: {price}\n原因: {reason}\n手机端链接: {mobile_link}\n电脑端链接: {link}" |
|
|
else: |
|
|
message = f"价格: {price}\n原因: {reason}\n链接: {link}" |
|
|
|
|
|
notification_title = f"🚨 新推荐! {title[:30]}..." |
|
|
|
|
|
|
|
|
if NTFY_TOPIC_URL: |
|
|
try: |
|
|
safe_print(f" -> 正在发送 ntfy 通知到: {NTFY_TOPIC_URL}") |
|
|
loop = asyncio.get_running_loop() |
|
|
await loop.run_in_executor( |
|
|
None, |
|
|
lambda: requests.post( |
|
|
NTFY_TOPIC_URL, |
|
|
data=message.encode('utf-8'), |
|
|
headers={ |
|
|
"Title": notification_title.encode('utf-8'), |
|
|
"Priority": "urgent", |
|
|
"Tags": "bell,vibration" |
|
|
}, |
|
|
timeout=10 |
|
|
) |
|
|
) |
|
|
safe_print(" -> ntfy 通知发送成功。") |
|
|
except Exception as e: |
|
|
safe_print(f" -> 发送 ntfy 通知失败: {e}") |
|
|
|
|
|
|
|
|
if GOTIFY_URL and GOTIFY_TOKEN: |
|
|
try: |
|
|
safe_print(f" -> 正在发送 Gotify 通知到: {GOTIFY_URL}") |
|
|
|
|
|
payload = { |
|
|
'title': (None, notification_title), |
|
|
'message': (None, message), |
|
|
'priority': (None, '5') |
|
|
} |
|
|
|
|
|
gotify_url_with_token = f"{GOTIFY_URL}/message?token={GOTIFY_TOKEN}" |
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: requests.post( |
|
|
gotify_url_with_token, |
|
|
files=payload, |
|
|
timeout=10 |
|
|
) |
|
|
) |
|
|
response.raise_for_status() |
|
|
safe_print(" -> Gotify 通知发送成功。") |
|
|
except requests.exceptions.RequestException as e: |
|
|
safe_print(f" -> 发送 Gotify 通知失败: {e}") |
|
|
except Exception as e: |
|
|
safe_print(f" -> 发送 Gotify 通知时发生未知错误: {e}") |
|
|
|
|
|
|
|
|
if BARK_URL: |
|
|
try: |
|
|
safe_print(f" -> 正在发送 Bark 通知...") |
|
|
|
|
|
bark_payload = { |
|
|
"title": notification_title, |
|
|
"body": message, |
|
|
"level": "timeSensitive", |
|
|
"group": "闲鱼监控" |
|
|
} |
|
|
|
|
|
link_to_use = convert_goofish_link(link) if PCURL_TO_MOBILE else link |
|
|
bark_payload["url"] = link_to_use |
|
|
|
|
|
|
|
|
main_image = product_data.get('商品主图链接') |
|
|
if not main_image: |
|
|
|
|
|
image_list = product_data.get('商品图片列表', []) |
|
|
if image_list: |
|
|
main_image = image_list[0] |
|
|
|
|
|
if main_image: |
|
|
bark_payload['icon'] = main_image |
|
|
|
|
|
headers = { "Content-Type": "application/json; charset=utf-8" } |
|
|
loop = asyncio.get_running_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: requests.post( |
|
|
BARK_URL, |
|
|
json=bark_payload, |
|
|
headers=headers, |
|
|
timeout=10 |
|
|
) |
|
|
) |
|
|
response.raise_for_status() |
|
|
safe_print(" -> Bark 通知发送成功。") |
|
|
except requests.exceptions.RequestException as e: |
|
|
safe_print(f" -> 发送 Bark 通知失败: {e}") |
|
|
except Exception as e: |
|
|
safe_print(f" -> 发送 Bark 通知时发生未知错误: {e}") |
|
|
|
|
|
|
|
|
if WX_BOT_URL: |
|
|
|
|
|
lines = message.split('\n') |
|
|
markdown_content = f"## {notification_title}\n\n" |
|
|
|
|
|
for line in lines: |
|
|
if line.startswith('手机端链接:') or line.startswith('电脑端链接:') or line.startswith('链接:'): |
|
|
|
|
|
if ':' in line: |
|
|
label, url = line.split(':', 1) |
|
|
url = url.strip() |
|
|
if url and url != '#': |
|
|
markdown_content += f"- **{label}:** [{url}]({url})\n" |
|
|
else: |
|
|
markdown_content += f"- **{label}:** 暂无链接\n" |
|
|
else: |
|
|
markdown_content += f"- {line}\n" |
|
|
else: |
|
|
|
|
|
if line: |
|
|
markdown_content += f"- {line}\n" |
|
|
else: |
|
|
markdown_content += "\n" |
|
|
|
|
|
payload = { |
|
|
"msgtype": "markdown", |
|
|
"markdown": { |
|
|
"content": markdown_content |
|
|
} |
|
|
} |
|
|
|
|
|
try: |
|
|
safe_print(f" -> 正在发送企业微信通知到: {WX_BOT_URL}") |
|
|
headers = { "Content-Type": "application/json" } |
|
|
loop = asyncio.get_running_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: requests.post( |
|
|
WX_BOT_URL, |
|
|
json=payload, |
|
|
headers=headers, |
|
|
timeout=10 |
|
|
) |
|
|
) |
|
|
response.raise_for_status() |
|
|
result = response.json() |
|
|
safe_print(f" -> 企业微信通知发送成功。响应: {result}") |
|
|
except requests.exceptions.RequestException as e: |
|
|
safe_print(f" -> 发送企业微信通知失败: {e}") |
|
|
except Exception as e: |
|
|
safe_print(f" -> 发送企业微信通知时发生未知错误: {e}") |
|
|
|
|
|
|
|
|
if TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID: |
|
|
try: |
|
|
safe_print(f" -> 正在发送 Telegram 通知...") |
|
|
|
|
|
|
|
|
telegram_api_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" |
|
|
|
|
|
|
|
|
telegram_message = f"🚨 <b>新推荐!</b>\n\n" |
|
|
telegram_message += f"<b>{title[:50]}...</b>\n\n" |
|
|
telegram_message += f"💰 价格: {price}\n" |
|
|
telegram_message += f"📝 原因: {reason}\n" |
|
|
|
|
|
|
|
|
if PCURL_TO_MOBILE: |
|
|
mobile_link = convert_goofish_link(link) |
|
|
telegram_message += f"📱 <a href='{mobile_link}'>手机端链接</a>\n" |
|
|
telegram_message += f"💻 <a href='{link}'>电脑端链接</a>" |
|
|
|
|
|
|
|
|
telegram_payload = { |
|
|
"chat_id": TELEGRAM_CHAT_ID, |
|
|
"text": telegram_message, |
|
|
"parse_mode": "HTML", |
|
|
"disable_web_page_preview": False |
|
|
} |
|
|
|
|
|
headers = {"Content-Type": "application/json"} |
|
|
loop = asyncio.get_running_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: requests.post( |
|
|
telegram_api_url, |
|
|
json=telegram_payload, |
|
|
headers=headers, |
|
|
timeout=10 |
|
|
) |
|
|
) |
|
|
response.raise_for_status() |
|
|
result = response.json() |
|
|
if result.get("ok"): |
|
|
safe_print(" -> Telegram 通知发送成功。") |
|
|
else: |
|
|
safe_print(f" -> Telegram 通知发送失败: {result.get('description', '未知错误')}") |
|
|
except requests.exceptions.RequestException as e: |
|
|
safe_print(f" -> 发送 Telegram 通知失败: {e}") |
|
|
except Exception as e: |
|
|
safe_print(f" -> 发送 Telegram 通知时发生未知错误: {e}") |
|
|
|
|
|
|
|
|
if WEBHOOK_URL: |
|
|
try: |
|
|
safe_print(f" -> 正在发送通用 Webhook 通知到: {WEBHOOK_URL}") |
|
|
|
|
|
|
|
|
def replace_placeholders(template_str): |
|
|
if not template_str: |
|
|
return "" |
|
|
|
|
|
safe_title = json.dumps(notification_title, ensure_ascii=False)[1:-1] |
|
|
safe_content = json.dumps(message, ensure_ascii=False)[1:-1] |
|
|
|
|
|
return template_str.replace("${title}", safe_title).replace("${content}", safe_content).replace("{{title}}", safe_title).replace("{{content}}", safe_content) |
|
|
|
|
|
|
|
|
headers = {} |
|
|
if WEBHOOK_HEADERS: |
|
|
try: |
|
|
headers = json.loads(WEBHOOK_HEADERS) |
|
|
except json.JSONDecodeError: |
|
|
safe_print(f" -> [警告] Webhook 请求头格式错误,请检查 .env 中的 WEBHOOK_HEADERS。") |
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
|
|
|
|
if WEBHOOK_METHOD == "GET": |
|
|
|
|
|
final_url = WEBHOOK_URL |
|
|
if WEBHOOK_QUERY_PARAMETERS: |
|
|
try: |
|
|
params_str = replace_placeholders(WEBHOOK_QUERY_PARAMETERS) |
|
|
params = json.loads(params_str) |
|
|
|
|
|
|
|
|
url_parts = list(urlparse(final_url)) |
|
|
query = dict(parse_qsl(url_parts[4])) |
|
|
query.update(params) |
|
|
url_parts[4] = urlencode(query) |
|
|
final_url = urlunparse(url_parts) |
|
|
except json.JSONDecodeError: |
|
|
safe_print(f" -> [警告] Webhook 查询参数格式错误,请检查 .env 中的 WEBHOOK_QUERY_PARAMETERS。") |
|
|
|
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: requests.get(final_url, headers=headers, timeout=15) |
|
|
) |
|
|
|
|
|
elif WEBHOOK_METHOD == "POST": |
|
|
|
|
|
final_url = WEBHOOK_URL |
|
|
if WEBHOOK_QUERY_PARAMETERS: |
|
|
try: |
|
|
params_str = replace_placeholders(WEBHOOK_QUERY_PARAMETERS) |
|
|
params = json.loads(params_str) |
|
|
|
|
|
|
|
|
url_parts = list(urlparse(final_url)) |
|
|
query = dict(parse_qsl(url_parts[4])) |
|
|
query.update(params) |
|
|
url_parts[4] = urlencode(query) |
|
|
final_url = urlunparse(url_parts) |
|
|
except json.JSONDecodeError: |
|
|
safe_print(f" -> [警告] Webhook 查询参数格式错误,请检查 .env 中的 WEBHOOK_QUERY_PARAMETERS。") |
|
|
|
|
|
|
|
|
data = None |
|
|
json_payload = None |
|
|
|
|
|
if WEBHOOK_BODY: |
|
|
body_str = replace_placeholders(WEBHOOK_BODY) |
|
|
try: |
|
|
if WEBHOOK_CONTENT_TYPE == "JSON": |
|
|
json_payload = json.loads(body_str) |
|
|
if 'Content-Type' not in headers and 'content-type' not in headers: |
|
|
headers['Content-Type'] = 'application/json; charset=utf-8' |
|
|
elif WEBHOOK_CONTENT_TYPE == "FORM": |
|
|
data = json.loads(body_str) |
|
|
if 'Content-Type' not in headers and 'content-type' not in headers: |
|
|
headers['Content-Type'] = 'application/x-www-form-urlencoded' |
|
|
else: |
|
|
safe_print(f" -> [警告] 不支持的 WEBHOOK_CONTENT_TYPE: {WEBHOOK_CONTENT_TYPE}。") |
|
|
except json.JSONDecodeError: |
|
|
safe_print(f" -> [警告] Webhook 请求体格式错误,请检查 .env 中的 WEBHOOK_BODY。") |
|
|
|
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: requests.post(final_url, headers=headers, json=json_payload, data=data, timeout=15) |
|
|
) |
|
|
else: |
|
|
safe_print(f" -> [警告] 不支持的 WEBHOOK_METHOD: {WEBHOOK_METHOD}。") |
|
|
return |
|
|
|
|
|
response.raise_for_status() |
|
|
safe_print(f" -> Webhook 通知发送成功。状态码: {response.status_code}") |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
safe_print(f" -> 发送 Webhook 通知失败: {e}") |
|
|
except Exception as e: |
|
|
safe_print(f" -> 发送 Webhook 通知时发生未知错误: {e}") |
|
|
|
|
|
|
|
|
@retry_on_failure(retries=3, delay=5) |
|
|
async def get_ai_analysis(product_data, image_paths=None, prompt_text=""): |
|
|
"""将完整的商品JSON数据和所有图片发送给 AI 进行分析(异步)。""" |
|
|
if not client: |
|
|
safe_print(" [AI分析] 错误:AI客户端未初始化,跳过分析。") |
|
|
return None |
|
|
|
|
|
item_info = product_data.get('商品信息', {}) |
|
|
product_id = item_info.get('商品ID', 'N/A') |
|
|
|
|
|
safe_print(f"\n [AI分析] 开始分析商品 #{product_id} (含 {len(image_paths or [])} 张图片)...") |
|
|
safe_print(f" [AI分析] 标题: {item_info.get('商品标题', '无')}") |
|
|
|
|
|
if not prompt_text: |
|
|
safe_print(" [AI分析] 错误:未提供AI分析所需的prompt文本。") |
|
|
return None |
|
|
|
|
|
product_details_json = json.dumps(product_data, ensure_ascii=False, indent=2) |
|
|
system_prompt = prompt_text |
|
|
|
|
|
if AI_DEBUG_MODE: |
|
|
safe_print("\n--- [AI DEBUG] ---") |
|
|
safe_print("--- PRODUCT DATA (JSON) ---") |
|
|
safe_print(product_details_json) |
|
|
safe_print("--- PROMPT TEXT (完整内容) ---") |
|
|
safe_print(prompt_text) |
|
|
safe_print("-------------------\n") |
|
|
|
|
|
combined_text_prompt = f"""请基于你的专业知识和我的要求,分析以下完整的商品JSON数据: |
|
|
|
|
|
```json |
|
|
{product_details_json} |
|
|
``` |
|
|
|
|
|
{system_prompt} |
|
|
""" |
|
|
user_content_list = [] |
|
|
|
|
|
|
|
|
if image_paths: |
|
|
for path in image_paths: |
|
|
base64_image = encode_image_to_base64(path) |
|
|
if base64_image: |
|
|
user_content_list.append( |
|
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}) |
|
|
|
|
|
|
|
|
user_content_list.append({"type": "text", "text": combined_text_prompt}) |
|
|
|
|
|
messages = [{"role": "user", "content": user_content_list}] |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
logs_dir = os.path.join("logs", "ai") |
|
|
os.makedirs(logs_dir, exist_ok=True) |
|
|
cleanup_ai_logs(logs_dir, keep_days=1) |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
log_filename = f"{current_time}.log" |
|
|
log_filepath = os.path.join(logs_dir, log_filename) |
|
|
|
|
|
task_name = product_data.get("任务名称") or product_data.get("任务名") or "unknown" |
|
|
log_payload = { |
|
|
"timestamp": current_time, |
|
|
"task_name": task_name, |
|
|
"product_id": product_id, |
|
|
"title": item_info.get("商品标题", "无"), |
|
|
"image_count": len(image_paths or []), |
|
|
} |
|
|
log_content = json.dumps(log_payload, ensure_ascii=False) |
|
|
|
|
|
|
|
|
with open(log_filepath, 'w', encoding='utf-8') as f: |
|
|
f.write(log_content) |
|
|
|
|
|
safe_print(f" [日志] AI分析请求已保存到: {log_filepath}") |
|
|
|
|
|
except Exception as e: |
|
|
safe_print(f" [日志] 保存AI分析日志时出错: {e}") |
|
|
|
|
|
|
|
|
max_retries = 3 |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
current_temperature = 0.1 if attempt == 0 else 0.05 |
|
|
|
|
|
from src.config import get_ai_request_params |
|
|
|
|
|
|
|
|
request_params = { |
|
|
"model": MODEL_NAME, |
|
|
"messages": messages, |
|
|
"temperature": current_temperature, |
|
|
"max_tokens": 4000 |
|
|
} |
|
|
|
|
|
|
|
|
if ENABLE_RESPONSE_FORMAT: |
|
|
request_params["response_format"] = {"type": "json_object"} |
|
|
|
|
|
response = await client.chat.completions.create( |
|
|
**get_ai_request_params(**request_params) |
|
|
) |
|
|
|
|
|
|
|
|
if hasattr(response, 'choices'): |
|
|
ai_response_content = response.choices[0].message.content |
|
|
else: |
|
|
|
|
|
ai_response_content = response |
|
|
|
|
|
if AI_DEBUG_MODE: |
|
|
safe_print(f"\n--- [AI DEBUG] 第{attempt + 1}次尝试 ---") |
|
|
safe_print("--- RAW AI RESPONSE ---") |
|
|
safe_print(ai_response_content) |
|
|
safe_print("---------------------\n") |
|
|
|
|
|
|
|
|
try: |
|
|
parsed_response = json.loads(ai_response_content) |
|
|
|
|
|
|
|
|
if validate_ai_response_format(parsed_response): |
|
|
safe_print(f" [AI分析] 第{attempt + 1}次尝试成功,响应格式验证通过") |
|
|
return parsed_response |
|
|
else: |
|
|
safe_print(f" [AI分析] 第{attempt + 1}次尝试格式验证失败") |
|
|
if attempt < max_retries - 1: |
|
|
safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") |
|
|
continue |
|
|
else: |
|
|
safe_print(" [AI分析] 所有重试完成,使用最后一次结果") |
|
|
return parsed_response |
|
|
|
|
|
except json.JSONDecodeError: |
|
|
safe_print(f" [AI分析] 第{attempt + 1}次尝试JSON解析失败,尝试清理响应内容...") |
|
|
|
|
|
|
|
|
cleaned_content = ai_response_content.strip() |
|
|
if cleaned_content.startswith('```json'): |
|
|
cleaned_content = cleaned_content[7:] |
|
|
if cleaned_content.startswith('```'): |
|
|
cleaned_content = cleaned_content[3:] |
|
|
if cleaned_content.endswith('```'): |
|
|
cleaned_content = cleaned_content[:-3] |
|
|
cleaned_content = cleaned_content.strip() |
|
|
|
|
|
|
|
|
json_start_index = cleaned_content.find('{') |
|
|
json_end_index = cleaned_content.rfind('}') |
|
|
|
|
|
if json_start_index != -1 and json_end_index != -1 and json_end_index > json_start_index: |
|
|
json_str = cleaned_content[json_start_index:json_end_index + 1] |
|
|
try: |
|
|
parsed_response = json.loads(json_str) |
|
|
if validate_ai_response_format(parsed_response): |
|
|
safe_print(f" [AI分析] 第{attempt + 1}次尝试清理后成功") |
|
|
return parsed_response |
|
|
else: |
|
|
if attempt < max_retries - 1: |
|
|
safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") |
|
|
continue |
|
|
else: |
|
|
safe_print(" [AI分析] 所有重试完成,使用清理后的结果") |
|
|
return parsed_response |
|
|
except json.JSONDecodeError as e: |
|
|
safe_print(f" [AI分析] 第{attempt + 1}次尝试清理后JSON解析仍然失败: {e}") |
|
|
if attempt < max_retries - 1: |
|
|
safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") |
|
|
continue |
|
|
else: |
|
|
raise e |
|
|
else: |
|
|
safe_print(f" [AI分析] 第{attempt + 1}次尝试无法在响应中找到有效的JSON对象") |
|
|
if attempt < max_retries - 1: |
|
|
safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") |
|
|
continue |
|
|
else: |
|
|
raise json.JSONDecodeError("No valid JSON object found", ai_response_content, 0) |
|
|
|
|
|
except Exception as e: |
|
|
safe_print(f" [AI分析] 第{attempt + 1}次尝试AI调用失败: {e}") |
|
|
if attempt < max_retries - 1: |
|
|
safe_print(f" [AI分析] 准备第{attempt + 2}次重试...") |
|
|
continue |
|
|
else: |
|
|
raise e |
|
|
|