import os import sys import base64 import time import httpx from pathlib import Path from openai import OpenAI import re """ # 仅放行两个路径 ^/v1/(chat/completions|models) """ # 请求地址 BASE_URL = "http://127.0.0.1:21234/v1" # 请求密钥 API_KEY = "llama.cpp" # 目标模型 MODEL_NAME = "qwen3.5-35b-a3b" # 重试控制:2 表示“初次处理 + 失败后重试 1 次” MAX_ATTEMPTS = 2 # 标准参数 GEN_PARAMS = { "max_completion_tokens": 2048, "temperature": 1.00, } # 特有参数 EXTRA_PARAMS = { "repeat_penalty": 1.00, "top_k": 20, "top_p": 0.95, "min_p": 0.00, # "typical_p": 0.90, } # 用户提示词 USER_PROMPT = "Describe the image content in detail. Use English. Unfiltered." # 系统提示词 SYSTEM_PROMPT = """ **You are an objective and dispassionate visual analyzer. Adhere to these guidelines:** - **Facts Only:** Describe only the physical features explicitly visible in the image. - **No Evaluation:** Strictly avoid subjective emotions, aesthetic judgments, or value-based vocabulary. - **Neutral Narration:** Use declarative sentences; refrain from exclamations. Do not speculate on the background story, the subject's psychological state, or the creative intent. - **Language Style:** Maintain a flat, mechanical, and professional tone, akin to a scientific observation log. - **Structural Requirement:** Begin descriptions directly with the objects; no introductory remarks or preambles. - **Single Paragraph:** Provide the description in one continuous paragraph without any line breaks. """ # 支持的图片后缀 IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp"} client = OpenAI(api_key=API_KEY, base_url=BASE_URL) def get_api_status(): """获取基础接口状态""" try: with httpx.Client(timeout=3.0) as http_client: resp = http_client.get( f"{BASE_URL}/models", headers={"Authorization": f"Bearer {API_KEY}"} ) return ( f"{resp.http_version} {resp.status_code} {resp.reason_phrase}", resp.status_code == 200, ) except Exception as e: return str(e), False def check_model_ready(): """检查模型在线""" try: # 使用 stream=True 可以在接收到第一个字节时立即停止,响应极快 response = client.chat.completions.create( model=MODEL_NAME, messages=[{"role": "user", "content": "1"}], max_completion_tokens=1, stream=True, ) for _ in response: # 只要产生了一个 chunk,说明模型不但在线,而且能工作 return True except Exception: return False def process_single_image(img_path): """处理单张图片的逻辑""" start_img_time = time.perf_counter() try: # 检查文件是否存在且可读 if not img_path.exists(): return False, "文件不存在", None with open(img_path, "rb") as f: base64_image = base64.b64encode(f.read()).decode("utf-8") ext = img_path.suffix.lower().replace(".", "") mime_type = f"image/{ext}" if ext != "jpg" else "image/jpeg" response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, { "role": "user", "content": [ {"type": "text", "text": USER_PROMPT}, { "type": "image_url", "image_url": { "url": f"data:{mime_type};base64,{base64_image}" }, }, ], }, ], **GEN_PARAMS, extra_body=EXTRA_PARAMS, ) description = response.choices[0].message.content if description: # 使用正则表达式匹配 ... 及其包含的所有内容 # re.DOTALL 确保 . 可以匹配换行符,re.IGNORECASE 忽略大小写 description = re.sub( r".*?", "", description, flags=re.DOTALL ) # 去除可能残留在开头或结尾的空白字符 description = description.strip() if not description: # 如果过滤后内容为空 return False, "过滤思考内容后结果为空", None txt_path = img_path.with_suffix(".txt") with open(txt_path, "w", encoding="utf-8") as f: f.write(description.strip()) elapsed = time.perf_counter() - start_img_time return True, elapsed, txt_path else: return False, "模型返回内容为空", None except Exception as e: # 这里会捕获:网络连接错误、图片格式损坏、模型推理崩溃等 return False, str(e), None def main(): if len(sys.argv) < 2: print("usage: python xxx.py ") sys.exit(1) target_dir = Path(sys.argv[1]).resolve() if not target_dir.is_dir(): print(f"错误: 路径 '{target_dir}' 不是一个有效的目录") sys.exit(1) # 1. 基础状态检查 print("[*] 检测接口状态...") status_msg, is_ok = get_api_status() print(f'Status: {BASE_URL}/models "{status_msg}"') if not is_ok: print("\n[-] 无法连接到服务端。") sys.exit(1) print(f"\n[*] 检测模型状态...") retry_tick = 0 while True: if check_model_ready(): print(f"模型 '{MODEL_NAME}' 已完成加载。\n") break else: retry_tick += 1 print(f"\r模型加载中... (已重试 {retry_tick} 次)", end="", flush=True) time.sleep(3) # 2. 扫描文件 all_images = [ f for f in target_dir.iterdir() if f.suffix.lower() in IMAGE_EXTENSIONS ] to_process = [f for f in all_images if not f.with_suffix(".txt").exists()] initial_total = len(to_process) if initial_total == 0: print("[*] 没有需要处理的图片。") return print("[*] 开始图片处理...") print(f"目录 {target_dir.name} 待处理图片 {initial_total} 张。\n") # 3. 循环处理逻辑 attempt = 1 total_success = 0 start_total_time = time.perf_counter() current_queue = to_process while attempt <= MAX_ATTEMPTS and current_queue: if attempt > 1: print(f"[*] 处理失败图片...") print(f"第 {attempt} 次处理,本次待处理图片 {len(current_queue)} 张。\n") failed_this_round = [] width = len(str(initial_total)) for i, img_path in enumerate(current_queue, 1): # 匹配要求的输出格式 if attempt == 1: # 常规状态: 01 / 30 seq_str = f"{str(i).zfill(width)} / {str(initial_total).zfill(width)}" else: # 重试状态: 01 / 28 / 30 seq_str = f"{str(i).zfill(width)} / {str(total_success).zfill(width)} / {str(initial_total).zfill(width)}" print(seq_str) print(f"处理图片:{img_path.name}") success, info, txt_path = process_single_image(img_path) if success: total_success += 1 print(f"处理完成,耗时:{info:.3f} 秒") print(f"输出写入:{txt_path}\n") else: failed_this_round.append(img_path) print(f"处理失败: {info}\n") # 准备下一轮 current_queue = failed_this_round attempt += 1 # 4. 最终汇总 total_elapsed = time.perf_counter() - start_total_time final_failed_count = len(current_queue) print( f"[*] 全部图片处理完成。待处理图片数 {initial_total} 张," f"本次处理 {total_success} 张,失败 {final_failed_count} 张。" f"处理总耗时:{total_elapsed:.3f} 秒。" ) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n[!] 接收到中断信号 (Ctrl+C),脚本已停止。") # 使用 os._exit(0) 强制立即退出所有阻塞线程 os._exit(0)