File size: 9,504 Bytes
0b75f52 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | import os
import subprocess
import csv
import shutil
import threading
import logging
import signal
import sys
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
# ================= 配置区域 =================
# OpenAI API Key
OPENAI_API_KEY = "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"
# 源文件夹路径
SOURCE_REPOS_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_raw").resolve()
# 基础输出路径
BASE_OUTPUT_DIR = Path("~/chemrepo").expanduser().resolve()
# 全局失败日志路径
GLOBAL_ERROR_LOG = BASE_OUTPUT_DIR / "failures.log"
# CSV 记录路径
CSV_FILE = BASE_OUTPUT_DIR / "run.csv"
# 并发数量
MAX_WORKERS = 256
# ===========================================
# 设置环境变量
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
# 确保基础输出目录存在
BASE_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# --- 全局锁与状态追踪 ---
# 用于写入 failures.log 的锁
error_log_lock = threading.Lock()
# 用于追踪当前正在处理的项目(用于中断时清理)
active_projects = set()
active_projects_lock = threading.Lock()
def add_active_project(name):
with active_projects_lock:
active_projects.add(name)
def remove_active_project(name):
with active_projects_lock:
active_projects.discard(name)
def log_failure_globally(project_name, content, extra_info=""):
"""将失败信息写入全局日志"""
with error_log_lock:
with open(GLOBAL_ERROR_LOG, "a", encoding="utf-8") as g_log:
g_log.write(f"\n{'='*40}\n")
g_log.write(f"PROJECT: {project_name}\n")
g_log.write(f"TIME: {datetime.now()}\n")
g_log.write(f"STATUS: Failed/Interrupted\n")
g_log.write(f"{'='*40}\n")
g_log.write(content)
if extra_info:
g_log.write(f"\n[Details]: {extra_info}\n")
g_log.write(f"\n{'='*40}\n")
def cleanup_project_folder(project_name):
"""删除项目输出文件夹"""
project_out_dir = BASE_OUTPUT_DIR / project_name
if project_out_dir.exists():
try:
shutil.rmtree(project_out_dir)
print(f"🗑️ Deleted failed/interrupted directory: {project_out_dir}")
except OSError as e:
print(f"⚠️ Failed to delete directory {project_out_dir}: {e}")
def process_single_project(project_path):
"""
处理单个项目文件夹的任务函数
"""
project_name = project_path.name
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 定义输出路径
project_out_dir = BASE_OUTPUT_DIR / project_name
hp_dir = project_out_dir / "hp"
mdp_dir = project_out_dir / "mdp"
local_log_file = project_out_dir / "process.log"
# --- 1. 检查输出文件夹是否存在 (断点续传) ---
# 如果 hp 和 mdp 存在,且 mdp 不为空,才算跳过;如果为空,重新跑一遍可能也没意义,
# 但根据逻辑这里只要文件夹在就跳过。如果你想重试空项目,可以把这里改一下。
if hp_dir.exists() and mdp_dir.exists():
return {
"project": project_name,
"status": "Skipped",
"start_time": start_time,
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 标记为活跃项目
add_active_project(project_name)
# 创建项目主目录
project_out_dir.mkdir(parents=True, exist_ok=True)
status = "Failed"
python_error = None
# 使用 try-finally 确保即使线程崩溃也能从 active 列表移除
try:
with open(local_log_file, "w", encoding="utf-8") as log_f:
try:
log_f.write(f"[{datetime.now()}] Processing project: {project_name}\n")
# --- 2. 确保 .gitignore 存在 ---
gitignore_path = project_path / ".gitignore"
if not gitignore_path.exists():
gitignore_path.touch()
log_f.write(f"[{datetime.now()}] Created .gitignore file.\n")
# --- 3. 构建命令 ---
cmd = [
"repoagent", "run",
"-m", "gpt-5.1-2025-11-13",
"-r", "1",
"-tp", str(project_path.absolute()),
"--print-hierarchy",
"-hp", str(hp_dir),
"-mdp", str(mdp_dir)
]
log_f.write(f"[{datetime.now()}] Command: {' '.join(cmd)}\n")
log_f.write(f"[{datetime.now()}] Starting RepoAgent...\n")
log_f.flush()
# --- 4. 执行命令 ---
subprocess.run(cmd, stdout=log_f, stderr=subprocess.STDOUT, check=True)
# --- 5. 检查是否生成了文档 (新增逻辑) ---
has_docs = False
if mdp_dir.exists():
# 检查目录下是否有任何文件
if any(mdp_dir.iterdir()):
has_docs = True
if has_docs:
status = "Success"
log_f.write(f"\n[{datetime.now()}] Completed successfully.\n")
else:
status = "EmptyProject"
log_f.write(f"\n[{datetime.now()}] Finished, but mdp folder is EMPTY. Marked as EmptyProject.\n")
except Exception as e:
status = "Failed"
python_error = str(e)
try: log_f.write(f"\n[{datetime.now()}] ERROR: {python_error}\n")
except: pass
print(f"❌ Error processing {project_name}: {python_error}")
# --- 6. 失败处理逻辑 ---
if status == "Failed":
# 读取日志内容
failed_log_content = ""
if local_log_file.exists():
try:
with open(local_log_file, "r", encoding="utf-8", errors='ignore') as f:
failed_log_content = f.read()
except: failed_log_content = "Read Error"
# 写入全局日志
log_failure_globally(project_name, failed_log_content, python_error)
# 删除文件夹
cleanup_project_folder(project_name)
except Exception:
# 兜底捕获
pass
finally:
remove_active_project(project_name)
return {
"project": project_name,
"status": status,
"start_time": start_time,
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def main():
if not SOURCE_REPOS_DIR.exists():
print(f"Error: Source directory {SOURCE_REPOS_DIR} does not exist.")
return
# CSV 头部
csv_headers = ["project", "status", "start_time", "end_time"]
# 初始化 CSV
file_exists = CSV_FILE.exists()
with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=csv_headers)
if not file_exists:
writer.writeheader()
# --- 1. 获取项目并按首字母排序 ---
projects = sorted([p for p in SOURCE_REPOS_DIR.iterdir() if p.is_dir()], key=lambda x: x.name)
print(f"Found {len(projects)} projects (Sorted A-Z).\nOutput Dir: {BASE_OUTPUT_DIR}")
print(f"Failures Log: {GLOBAL_ERROR_LOG}")
print(f"Starting concurrent processing with {MAX_WORKERS} workers...\n")
print(f"💡 Press Ctrl+C to stop. Interrupted projects will be cleaned up automatically.\n")
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
try:
future_to_project = {executor.submit(process_single_project, p): p for p in projects}
with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=csv_headers)
for future in as_completed(future_to_project):
result = future.result()
writer.writerow(result)
f.flush()
# 控制台输出增加 EmptyProject 的显示
if result["status"] == "Success":
print(f"✅ {result['project']} Finished.")
elif result["status"] == "EmptyProject":
print(f"⚠️ {result['project']} Finished (Empty - No Docs Generated).")
elif result["status"] == "Skipped":
print(f"⏭️ {result['project']} Skipped.")
else:
print(f"❌ {result['project']} Failed.")
except KeyboardInterrupt:
print("\n\n🛑 KeyboardInterrupt detected! Stopping workers...")
executor.shutdown(wait=False, cancel_futures=True)
print("🧹 Cleaning up active incomplete projects...")
with active_projects_lock:
projects_to_clean = list(active_projects)
for proj_name in projects_to_clean:
log_failure_globally(proj_name, "Process terminated by User (KeyboardInterrupt).")
cleanup_project_folder(proj_name)
print("Done. Exiting.")
sys.exit(0)
print(f"\nAll tasks completed. \nCSV: {CSV_FILE}")
if __name__ == "__main__":
main() |