Spaces:
Paused
Paused
| import os | |
| import sys | |
| from collections import namedtuple | |
| from tqdm import tqdm | |
| import numpy as np | |
| # 定义坐标范围结构体 | |
| AxisRange = namedtuple('AxisRange', ['min_val', 'max_val']) | |
| def validate_file(path): | |
| """验证输入文件是否存在且可读""" | |
| if not os.path.isfile(path): | |
| print(f"错误:输入文件不存在 {path}") | |
| sys.exit(1) | |
| if not os.access(path, os.R_OK): | |
| print(f"错误:无法读取文件 {path}") | |
| sys.exit(1) | |
| def validate_dir(path): | |
| """验证目录是否存在且可访问""" | |
| if not os.path.isdir(path): | |
| print(f"错误:输入目录不存在 {path}") | |
| sys.exit(1) | |
| if not os.access(path, os.R_OK): | |
| print(f"错误:无法访问目录 {path}") | |
| sys.exit(1) | |
| def get_axis_ranges(lines): | |
| """ | |
| 获取XYZ三轴的坐标范围 | |
| 返回: {'x': AxisRange, 'y': AxisRange, 'z': AxisRange} | |
| """ | |
| axes = {'x': [], 'y': [], 'z': []} | |
| for line in lines: | |
| if line.startswith('1 ') and len(line.split()) >= 15: | |
| parts = line.split() | |
| try: | |
| axes['x'].append(float(parts[2])) | |
| axes['y'].append(float(parts[3])) | |
| axes['z'].append(float(parts[4])) | |
| except (IndexError, ValueError) as e: | |
| print(f"警告:跳过格式错误行 - {line.strip()} | 错误: {str(e)}") | |
| continue | |
| return { | |
| axis: AxisRange(min(vals), max(vals)) | |
| for axis, vals in axes.items() if vals | |
| } | |
| def process_single_file(input_path, output_path): | |
| """ | |
| 处理单个.ldr文件 | |
| 返回: (是否成功, 原始范围, 新范围) | |
| """ | |
| try: | |
| with open(input_path, 'r') as f: | |
| lines = f.readlines() | |
| except Exception as e: | |
| print(f"\n读取文件失败: {input_path} | 错误: {str(e)}") | |
| return False, None, None | |
| # 获取原始坐标范围 | |
| original_ranges = get_axis_ranges(lines) | |
| print("\n原始坐标范围:") | |
| for axis in ['x', 'y', 'z']: | |
| if axis in original_ranges: | |
| print(f"{axis.upper()}: [{original_ranges[axis].min_val}, {original_ranges[axis].max_val}]") | |
| # 第一遍:旋转所有坐标并计算最大负偏移量 | |
| rotated_coords = [] | |
| for line in lines: | |
| if line.startswith('1 ') and len(line.split()) >= 15: | |
| try: | |
| parts = line.split() | |
| # 旋转180度绕Z轴:x和y取反 | |
| x = -float(parts[2]) | |
| y = -float(parts[3]) | |
| z = float(parts[4]) # Z保持不变 | |
| rotated_coords.append((x, y, z)) | |
| except Exception as e: | |
| print(f"坐标转换出错: {line.strip()} | 错误: {str(e)}") | |
| continue | |
| if not rotated_coords: | |
| print("错误: 没有找到有效零件坐标") | |
| return False, None, None | |
| # 计算需要平移的量(将所有坐标变为非负数) | |
| min_x = min(coord[0] for coord in rotated_coords) | |
| min_y = min(coord[1] for coord in rotated_coords) | |
| offset_x = -min_x if min_x < 0 else 0 | |
| offset_y = -min_y if min_y < 0 else 0 | |
| # 第二遍:应用变换 | |
| new_lines = [] | |
| transformed_coords = [] | |
| for line in lines: | |
| if line.startswith('1 ') and len(line.split()) >= 15: | |
| try: | |
| parts = line.split() | |
| # 旋转180度绕Z轴:x和y取反 | |
| x = -float(parts[2]) | |
| y = -float(parts[3]) | |
| z = float(parts[4]) | |
| # 平移使所有坐标为正 | |
| x += offset_x | |
| y += offset_y | |
| # 记录变换后坐标用于检查 | |
| transformed_coords.append((x, y, z)) | |
| parts[2] = str(x) | |
| parts[3] = str(y) | |
| # 处理旋转矩阵:R_z180 @ R_original | |
| a, b, c, d, e, f, g, h, i = map(float, parts[5:14]) | |
| parts[5:14] = map(str, [-a, -b, -c, -d, -e, -f, g, h, i]) | |
| line = ' '.join(parts) + '\n' | |
| except Exception as e: | |
| print(f"处理行时出错: {line.strip()} | 错误: {str(e)}") | |
| continue | |
| new_lines.append(line) | |
| # 计算变换后的坐标范围 | |
| if transformed_coords: | |
| new_ranges = { | |
| 'x': type('', (), {'min_val': min(c[0] for c in transformed_coords), | |
| 'max_val': max(c[0] for c in transformed_coords)}), | |
| 'y': type('', (), {'min_val': min(c[1] for c in transformed_coords), | |
| 'max_val': max(c[1] for c in transformed_coords)}), | |
| 'z': type('', (), {'min_val': min(c[2] for c in transformed_coords), | |
| 'max_val': max(c[2] for c in transformed_coords)}) | |
| } | |
| else: | |
| new_ranges = None | |
| # 检查坐标范围变化是否符合预期 | |
| if original_ranges and new_ranges: | |
| print("\n变换后坐标范围:") | |
| for axis in ['x', 'y', 'z']: | |
| if axis in new_ranges: | |
| print(f"{axis.upper()}: [{new_ranges[axis].min_val}, {new_ranges[axis].max_val}]") | |
| # 预期检查 | |
| expected_x_min = -original_ranges['x'].max_val + offset_x | |
| expected_x_max = -original_ranges['x'].min_val + offset_x | |
| expected_y_min = -original_ranges['y'].max_val + offset_y | |
| expected_y_max = -original_ranges['y'].min_val + offset_y | |
| x_valid = (abs(new_ranges['x'].min_val - expected_x_min) < 1e-6 and | |
| abs(new_ranges['x'].max_val - expected_x_max) < 1e-6) | |
| y_valid = (abs(new_ranges['y'].min_val - expected_y_min) < 1e-6 and | |
| abs(new_ranges['y'].max_val - expected_y_max) < 1e-6) | |
| z_valid = (abs(new_ranges['z'].min_val - original_ranges['z'].min_val) < 1e-6 and | |
| abs(new_ranges['z'].max_val - original_ranges['z'].max_val) < 1e-6) | |
| # print("\n坐标范围检查结果:") | |
| # print(f"X轴范围 {'符合' if x_valid else '不符合'}预期: " | |
| # f"实际[{new_ranges['x'].min_val}, {new_ranges['x'].max_val}] vs " | |
| # f"预期[{expected_x_min}, {expected_x_max}]") | |
| # print(f"Y轴范围 {'符合' if y_valid else '不符合'}预期: " | |
| # f"实际[{new_ranges['y'].min_val}, {new_ranges['y'].max_val}] vs " | |
| # f"预期[{expected_y_min}, {expected_y_max}]") | |
| # print(f"Z轴范围 {'符合' if z_valid else '不符合'}预期: " | |
| # f"实际[{new_ranges['z'].min_val}, {new_ranges['z'].max_val}] vs " | |
| # f"原始[{original_ranges['z'].min_val}, {original_ranges['z'].max_val}]") | |
| try: | |
| os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True) | |
| with open(output_path, 'w') as f: | |
| f.writelines(new_lines) | |
| return True, original_ranges, new_ranges | |
| except Exception as e: | |
| print(f"\n写入文件失败: {output_path} | 错误: {str(e)}") | |
| return False, None, None | |
| def inverse_process_single_file(input_path, output_path): | |
| """ | |
| 对process_single_file处理后的.ldr文件进行逆变换 | |
| 返回: (是否成功, 原始范围, 新范围) | |
| """ | |
| try: | |
| with open(input_path, 'r') as f: | |
| lines = f.readlines() | |
| except Exception as e: | |
| print(f"\n读取文件失败: {input_path} | 错误: {str(e)}") | |
| return False, None, None | |
| # 获取当前坐标范围(即process_single_file处理后的范围) | |
| current_ranges = get_axis_ranges(lines) | |
| print("\n当前坐标范围:") | |
| for axis in ['x', 'y', 'z']: | |
| if axis in current_ranges: | |
| print(f"{axis.upper()}: [{current_ranges[axis].min_val}, {current_ranges[axis].max_val}]") | |
| # 第一遍:获取所有坐标,计算逆平移所需的偏移量 | |
| coords = [] | |
| for line in lines: | |
| if line.startswith('1 ') and len(line.split()) >= 15: | |
| try: | |
| parts = line.split() | |
| x = float(parts[2]) | |
| y = float(parts[3]) | |
| z = float(parts[4]) | |
| coords.append((x, y, z)) | |
| except Exception as e: | |
| print(f"坐标提取出错: {line.strip()} | 错误: {str(e)}") | |
| continue | |
| if not coords: | |
| print("错误: 没有找到有效零件坐标") | |
| return False, None, None | |
| # 计算逆平移的偏移量(这与原始变换的offset_x和offset_y相同) | |
| # 因为原始变换确保了min_x和min_y为0 | |
| offset_x = coords[0][0] if len(coords) > 0 else 0 | |
| offset_y = coords[0][1] if len(coords) > 0 else 0 | |
| for x, y, z in coords: | |
| if x < offset_x: | |
| offset_x = x | |
| if y < offset_y: | |
| offset_y = y | |
| # 第二遍:应用逆变换 | |
| new_lines = [] | |
| transformed_coords = [] | |
| for line in lines: | |
| if line.startswith('1 ') and len(line.split()) >= 15: | |
| try: | |
| parts = line.split() | |
| # 先逆平移 | |
| x = float(parts[2]) - offset_x | |
| y = float(parts[3]) - offset_y | |
| z = float(parts[4]) | |
| # 再逆旋转(同样是Z轴180度旋转,x和y取反) | |
| x = -x | |
| y = -y | |
| # 记录变换后坐标用于检查 | |
| transformed_coords.append((x, y, z)) | |
| parts[2] = str(x) | |
| parts[3] = str(y) | |
| # 逆处理旋转矩阵 | |
| a, b, c, d, e, f, g, h, i = map(float, parts[5:14]) | |
| # 逆旋转矩阵处理(撤销 R_z180 @ R_original 操作) | |
| parts[5:14] = map(str, [-a, -b, -c, -d, -e, -f, g, h, i]) | |
| line = ' '.join(parts) + '\n' | |
| except Exception as e: | |
| print(f"处理行时出错: {line.strip()} | 错误: {str(e)}") | |
| continue | |
| new_lines.append(line) | |
| # 计算逆变换后的坐标范围 | |
| if transformed_coords: | |
| original_ranges = { | |
| 'x': type('', (), {'min_val': min(c[0] for c in transformed_coords), | |
| 'max_val': max(c[0] for c in transformed_coords)}), | |
| 'y': type('', (), {'min_val': min(c[1] for c in transformed_coords), | |
| 'max_val': max(c[1] for c in transformed_coords)}), | |
| 'z': type('', (), {'min_val': min(c[2] for c in transformed_coords), | |
| 'max_val': max(c[2] for c in transformed_coords)}) | |
| } | |
| else: | |
| original_ranges = None | |
| # 显示逆变换后的坐标范围 | |
| if original_ranges: | |
| print("\n逆变换后坐标范围:") | |
| for axis in ['x', 'y', 'z']: | |
| if axis in original_ranges: | |
| print(f"{axis.upper()}: [{original_ranges[axis].min_val}, {original_ranges[axis].max_val}]") | |
| try: | |
| os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True) | |
| with open(output_path, 'w') as f: | |
| f.writelines(new_lines) | |
| return True, current_ranges, original_ranges | |
| except Exception as e: | |
| print(f"\n写入文件失败: {output_path} | 错误: {str(e)}") | |
| return False, None, None | |
| def generate_output_filename(input_filename): | |
| """生成带_flip后缀的输出文件名""" | |
| base, ext = os.path.splitext(input_filename) | |
| return f"{base}_flip{ext}" | |
| def batch_process(input_dir, output_dir): | |
| """ | |
| 批量处理目录中的所有.ldr文件 | |
| """ | |
| validate_dir(input_dir) | |
| os.makedirs(output_dir, exist_ok=True) | |
| ldr_files = [ | |
| f for f in os.listdir(input_dir) | |
| if f.lower().endswith('.ldr') | |
| ] | |
| if not ldr_files: | |
| print(f"警告:目录中没有找到.ldr文件 - {input_dir}") | |
| return | |
| print(f"\n开始批量处理 {len(ldr_files)} 个文件...") | |
| success_count = 0 | |
| for filename in tqdm(ldr_files, desc="处理进度"): | |
| input_path = os.path.join(input_dir, filename) | |
| output_filename = generate_output_filename(filename) # 使用新文件名生成函数 | |
| output_path = os.path.join(output_dir, output_filename) | |
| success, _, _ = process_single_file(input_path, output_path) | |
| if success: | |
| success_count += 1 | |
| print("\n处理完成!") | |
| print(f"成功处理文件: {success_count}/{len(ldr_files)}") | |
| print(f"输出目录: {output_dir}") | |
| if __name__ == "__main__": | |
| input_dir = "/public/home/wangshuo/gap/assembly/cubedit/outputs/test_drp_r512_wrongcond" | |
| output_dir = "/public/home/wangshuo/gap/assembly/cubedit/outputs/test_drp_r512_wrongcond" | |
| batch_process(input_dir, output_dir) |