import os
import sys
import base64
import time
import httpx
from pathlib import Path
from openai import OpenAI
import re
"""
# 仅放行两个路径
^/v1/(chat/completions|models)
"""
# 请求地址
BASE_URL = "http://127.0.0.1:21234/v1"
# 请求密钥
API_KEY = "llama.cpp"
# 目标模型
MODEL_NAME = "qwen3.5-35b-a3b"
# 重试控制:2 表示“初次处理 + 失败后重试 1 次”
MAX_ATTEMPTS = 2
# 标准参数
GEN_PARAMS = {
"max_completion_tokens": 2048,
"temperature": 1.00,
}
# 特有参数
EXTRA_PARAMS = {
"repeat_penalty": 1.00,
"top_k": 20,
"top_p": 0.95,
"min_p": 0.00,
# "typical_p": 0.90,
}
# 用户提示词
USER_PROMPT = "Describe the image content in detail. Use English. Unfiltered."
# 系统提示词
SYSTEM_PROMPT = """
**You are an objective and dispassionate visual analyzer. Adhere to these guidelines:**
- **Facts Only:** Describe only the physical features explicitly visible in the image.
- **No Evaluation:** Strictly avoid subjective emotions, aesthetic judgments, or value-based vocabulary.
- **Neutral Narration:** Use declarative sentences; refrain from exclamations. Do not speculate on the background story, the subject's psychological state, or the creative intent.
- **Language Style:** Maintain a flat, mechanical, and professional tone, akin to a scientific observation log.
- **Structural Requirement:** Begin descriptions directly with the objects; no introductory remarks or preambles.
- **Single Paragraph:** Provide the description in one continuous paragraph without any line breaks.
"""
# 支持的图片后缀
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp"}
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
def get_api_status():
"""获取基础接口状态"""
try:
with httpx.Client(timeout=3.0) as http_client:
resp = http_client.get(
f"{BASE_URL}/models", headers={"Authorization": f"Bearer {API_KEY}"}
)
return (
f"{resp.http_version} {resp.status_code} {resp.reason_phrase}",
resp.status_code == 200,
)
except Exception as e:
return str(e), False
def check_model_ready():
"""检查模型在线"""
try:
# 使用 stream=True 可以在接收到第一个字节时立即停止,响应极快
response = client.chat.completions.create(
model=MODEL_NAME,
messages=[{"role": "user", "content": "1"}],
max_completion_tokens=1,
stream=True,
)
for _ in response:
# 只要产生了一个 chunk,说明模型不但在线,而且能工作
return True
except Exception:
return False
def process_single_image(img_path):
"""处理单张图片的逻辑"""
start_img_time = time.perf_counter()
try:
# 检查文件是否存在且可读
if not img_path.exists():
return False, "文件不存在", None
with open(img_path, "rb") as f:
base64_image = base64.b64encode(f.read()).decode("utf-8")
ext = img_path.suffix.lower().replace(".", "")
mime_type = f"image/{ext}" if ext != "jpg" else "image/jpeg"
response = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": [
{"type": "text", "text": USER_PROMPT},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{base64_image}"
},
},
],
},
],
**GEN_PARAMS,
extra_body=EXTRA_PARAMS,
)
description = response.choices[0].message.content
if description:
# 使用正则表达式匹配 ... 及其包含的所有内容
# re.DOTALL 确保 . 可以匹配换行符,re.IGNORECASE 忽略大小写
description = re.sub(
r".*?", "", description, flags=re.DOTALL
)
# 去除可能残留在开头或结尾的空白字符
description = description.strip()
if not description: # 如果过滤后内容为空
return False, "过滤思考内容后结果为空", None
txt_path = img_path.with_suffix(".txt")
with open(txt_path, "w", encoding="utf-8") as f:
f.write(description.strip())
elapsed = time.perf_counter() - start_img_time
return True, elapsed, txt_path
else:
return False, "模型返回内容为空", None
except Exception as e:
# 这里会捕获:网络连接错误、图片格式损坏、模型推理崩溃等
return False, str(e), None
def main():
if len(sys.argv) < 2:
print("usage: python xxx.py ")
sys.exit(1)
target_dir = Path(sys.argv[1]).resolve()
if not target_dir.is_dir():
print(f"错误: 路径 '{target_dir}' 不是一个有效的目录")
sys.exit(1)
# 1. 基础状态检查
print("[*] 检测接口状态...")
status_msg, is_ok = get_api_status()
print(f'Status: {BASE_URL}/models "{status_msg}"')
if not is_ok:
print("\n[-] 无法连接到服务端。")
sys.exit(1)
print(f"\n[*] 检测模型状态...")
retry_tick = 0
while True:
if check_model_ready():
print(f"模型 '{MODEL_NAME}' 已完成加载。\n")
break
else:
retry_tick += 1
print(f"\r模型加载中... (已重试 {retry_tick} 次)", end="", flush=True)
time.sleep(3)
# 2. 扫描文件
all_images = [
f for f in target_dir.iterdir() if f.suffix.lower() in IMAGE_EXTENSIONS
]
to_process = [f for f in all_images if not f.with_suffix(".txt").exists()]
initial_total = len(to_process)
if initial_total == 0:
print("[*] 没有需要处理的图片。")
return
print("[*] 开始图片处理...")
print(f"目录 {target_dir.name} 待处理图片 {initial_total} 张。\n")
# 3. 循环处理逻辑
attempt = 1
total_success = 0
start_total_time = time.perf_counter()
current_queue = to_process
while attempt <= MAX_ATTEMPTS and current_queue:
if attempt > 1:
print(f"[*] 处理失败图片...")
print(f"第 {attempt} 次处理,本次待处理图片 {len(current_queue)} 张。\n")
failed_this_round = []
width = len(str(initial_total))
for i, img_path in enumerate(current_queue, 1):
# 匹配要求的输出格式
if attempt == 1:
# 常规状态: 01 / 30
seq_str = f"{str(i).zfill(width)} / {str(initial_total).zfill(width)}"
else:
# 重试状态: 01 / 28 / 30
seq_str = f"{str(i).zfill(width)} / {str(total_success).zfill(width)} / {str(initial_total).zfill(width)}"
print(seq_str)
print(f"处理图片:{img_path.name}")
success, info, txt_path = process_single_image(img_path)
if success:
total_success += 1
print(f"处理完成,耗时:{info:.3f} 秒")
print(f"输出写入:{txt_path}\n")
else:
failed_this_round.append(img_path)
print(f"处理失败: {info}\n")
# 准备下一轮
current_queue = failed_this_round
attempt += 1
# 4. 最终汇总
total_elapsed = time.perf_counter() - start_total_time
final_failed_count = len(current_queue)
print(
f"[*] 全部图片处理完成。待处理图片数 {initial_total} 张,"
f"本次处理 {total_success} 张,失败 {final_failed_count} 张。"
f"处理总耗时:{total_elapsed:.3f} 秒。"
)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[!] 接收到中断信号 (Ctrl+C),脚本已停止。")
# 使用 os._exit(0) 强制立即退出所有阻塞线程
os._exit(0)