import argparse import random from pathlib import Path from typing import Iterable, Optional import matplotlib.cm as cm import matplotlib.pyplot as plt import numpy as np from scipy.cluster.hierarchy import dendrogram, linkage from scipy.spatial.distance import euclidean, squareform from sklearn.metrics import silhouette_score def read_similarity_matrix(file_path: Path): with file_path.open('r', encoding='utf-8') as f: lines = f.readlines() names = lines[0].strip().split() matrix = [] for line in lines[1:]: row = line.strip().split()[1:] matrix.append([float(x) for x in row]) similarity_matrix = np.array(matrix) return names, similarity_matrix class TreeNode: def __init__(self, name=None): self.name = name self.children = [] self.value = 0 self.split = True def add_child(self, child): self.children.append(child) def build_tree(Z, names): nodes = [TreeNode(name) for name in names] for i, link in enumerate(Z): node = TreeNode() node.value = link[2] node.add_child(int(link[0])) node.add_child(int(link[1])) nodes.append(node) return nodes def find_best_thold(node_idx,nodes, distance_matrix,min_socre=0,max_socre=1): node = nodes[node_idx] threshold_range = np.linspace(min_socre * node.value, max_socre * node.value, 50) silhouette_scores = [] all_n_clusters = [] for threshold in threshold_range: labels,_ = gen_label_from_node(node_idx,nodes,threshold) labels = sorted(labels,key=lambda x:x[1]) labels = [x[0] for x in labels] n_clusters = len(np.unique(labels)) if n_clusters > 1 and n_clusters < len(distance_matrix): score = silhouette_score(distance_matrix, labels, metric='precomputed') else: score = -1 silhouette_scores.append(score) all_n_clusters.append(n_clusters) best_threshold_idx = np.argmax(silhouette_scores) best_threshold = threshold_range[best_threshold_idx] best_score = silhouette_scores[best_threshold_idx] return best_threshold, best_score def gen_label_from_node(node_idx,nodes,thd,now_label=0): node = nodes[node_idx] if len(node.children)==0: return [(now_label,node_idx)],now_label else: if node.value>thd: label_list = [] for child in node.children: now_label_list,now_label = gen_label_from_node(child,nodes,thd,now_label) now_label+=1 label_list+=now_label_list return label_list,now_label else: label_list = [] for child in node.children: now_label_list,now_label = gen_label_from_node(child,nodes,thd,now_label) label_list+=now_label_list return label_list,now_label def find_new_root(node_idx,nodes,thd): node = nodes[node_idx] if node.value<=thd: return [node_idx] new_root = [] for child in node.children: new_root+=find_new_root(child,nodes,thd) return new_root def get_leaf(node_idx,nodes): node = nodes[node_idx] if len(node.children)==0: return [node_idx] leaf_list = [] for child in node.children: leaf_list+=get_leaf(child,nodes) return leaf_list def merge_tree(node_idx,nodes,distance_matrix,deep=0,end_thd=0.25): if len(nodes[node_idx].children)==0: return print(f"Node {node_idx}: Value: {nodes[node_idx].value}, Depth: {deep}") if nodes[node_idx].value<=end_thd or deep>=5: nodes[node_idx].children = get_leaf(node_idx,nodes) nodes[node_idx].split = False return leaf_list = np.array(sorted(get_leaf(node_idx,nodes))) new_distance_matrix = distance_matrix[leaf_list][:,leaf_list] best_threshold, best_score = find_best_thold(node_idx, nodes, new_distance_matrix,min_socre=0) if best_score==-1: nodes[node_idx].children = get_leaf(node_idx,nodes) return new_root = find_new_root(node_idx,nodes,best_threshold) nodes[node_idx].children = new_root for child in new_root: merge_tree(child,nodes,distance_matrix,deep=deep+1,end_thd=end_thd) def merge_dict(a,b): for key in b.keys(): if key in a.keys(): a[key]+=b[key] else: a[key] = b[key] return a def update_tree(node_idx, nodes, edge_list, fa=-1, deep=0): node = nodes[node_idx] if len(node.children)==0: edge_list.append((fa,node_idx,[nodes[node_idx].name])) return {deep:[[node_idx]]} if node.split==False: leafs = get_leaf(node_idx,nodes) edge_list.append((fa,node_idx,[nodes[idx].name for idx in leafs])) return {deep:[leafs]} edge_list.append((fa,node_idx,[])) new_tree = {} for child in node.children: new_tree = merge_dict( new_tree, update_tree(child, nodes, edge_list, node_idx, deep=deep+1), ) if deep not in new_tree.keys(): new_tree[deep] = [] new_tree[deep].append(get_leaf(node_idx,nodes)) return new_tree def color_distance(c1, c2): return euclidean(c1[:3], c2[:3]) # only consider the RGB components def ensure_color_diversity(colors, min_distance=0.2): random.shuffle(colors) for i in range(1, len(colors)): if color_distance(colors[i], colors[i-1]) < min_distance: for j in range(i + 1, len(colors)): if color_distance(colors[i], colors[j]) > min_distance: colors[i], colors[j] = colors[j], colors[i] break return colors def draw_table(new_tree, names, max_deep=3, save_path='fig/E/test.pdf'): base_list = new_tree[0][0] data = [base_list] cmap = cm.get_cmap('tab20c', 2048) cmap = [cmap(i) for i in range(2048)] cmap = ensure_color_diversity(cmap) cell_colours = [['#FFDDC1' for _ in base_list]] color_start=0 for i in range(1,max_deep+1): if i not in new_tree.keys(): print(f"Level {i} not in new_tree") continue data.append([names[base] for base in base_list]) color_list = [] for k,base in enumerate(base_list): color_id = -1 for j in range(len(new_tree[i])): if base in new_tree[i][j]: color_id = j break if color_id==-1: color_list.append(cell_colours[-1][k]) else: color_list.append(cmap[color_start+color_id]) cell_colours.append(color_list) color_start+=len(new_tree[i]) data = list(zip(*data)) cell_colours = list(zip(*cell_colours)) columns = ['Node ID']+['Level {}'.format(i) for i in range(1,max_deep+1)] plt.figure(figsize=(30, 40)) table = plt.table(cellText=data, colLabels=columns, loc='center', cellLoc='center', colColours=['#f5f5f5']*len(columns),cellColours=cell_colours) table.auto_set_column_width([0, 1]) plt.axis('off') plt.savefig(save_path, format='pdf' ,bbox_inches='tight',pad_inches=0.01) def fix_asymmetry(matrix): matrix = (matrix + matrix.T) / 2 return matrix def rename(edge): cnt=0 reid={} du={} edge_dict={} queue=[] for i in range(len(edge)): du[edge[i][0]]=du.get(edge[i][0],0)+1 edge_dict[edge[i][1]]=edge[i] if edge[i][2] != []: queue.append(edge[i][1]) while len(queue)>0: now = queue.pop(0) if now==-1: reid[now]=-1 continue if now not in reid.keys(): reid[now]=cnt cnt+=1 now_edge = edge_dict[now] du[now_edge[0]]-=1 if du[now_edge[0]]==0: queue.append(now_edge[0]) new_edge = [(reid[x[0]],reid[x[1]],x[2]) for x in edge] return new_edge def save_edge(edge,save_path): with open(save_path,'w') as f: for e in edge: if e[2]: name_str = ','.join(e[2]) else: name_str = 'none' f.write(f"{e[1]} {e[0]} {name_str}\n") def filter_class(names, similarity_matrix): choose_idx = [] for i in range(len(names)): if 'extend' not in names[i] and 'polish' not in names[i] and\ 'translate' not in names[i] and 'paraphrase' not in names[i]: if 'B' in names[i] or 'human' in names[i]: choose_idx.append(i) else: if random.random()<0.3: choose_idx.append(i) elif 'human' in names[i]: if random.random()<0.3: choose_idx.append(i) elif random.random()<0.15: choose_idx.append(i) new_names = [names[i] for i in choose_idx] choose_idx = np.array(choose_idx) new_similarity_matrix = similarity_matrix[choose_idx][:,choose_idx] return new_names, new_similarity_matrix def filter(names, similarity_matrix,filter_human=False,filter_llm=False,filter_mix=False): choose_idx = [] for i in range(len(names)): if names[i] == 'human' and filter_human: continue if filter_llm and 'human' not in names[i]: continue if filter_mix and 'human' in names[i] and names[i]!='human': continue choose_idx.append(i) new_names = [names[i] for i in choose_idx] choose_idx = np.array(choose_idx) new_similarity_matrix = similarity_matrix[choose_idx][:,choose_idx] return new_names, new_similarity_matrix def reid_tree_dict(tree_dict, nodes, names): name_to_index = {name: idx for idx, name in enumerate(names)} for deep,values in tree_dict.items(): rename_now = [] # print(values,len(values)) for list_ in values: now_list = [] for idx in list_: name = nodes[idx].name if name not in name_to_index: name_to_index[name] = len(names) names.append(name) name_idx = name_to_index[name] now_list.append(name_idx) rename_now.append(now_list) tree_dict[deep] = rename_now return tree_dict def gen_tree(similarity_matrix,names,opt): distance_matrix = 1 - similarity_matrix np.fill_diagonal(distance_matrix, 0) condensed_distance_matrix = squareform(distance_matrix) Z = linkage(condensed_distance_matrix, method='weighted') # alternative methods include 'single', 'complete', or 'ward' if opt.save_drg: plt.figure(figsize=(30, 47)) dendrogram(Z, labels=names, orientation='right',leaf_font_size=16) # rotate the dendrogram so the root is on the right plt.savefig(opt.dendrogram_path, format='pdf' ,bbox_inches='tight') nodes = build_tree(Z, names) merge_tree(len(nodes)-1,nodes,distance_matrix,end_thd=opt.end_score) return nodes def chage_tree_priori1(nodes): human_node = TreeNode(name='human') root = TreeNode() root.add_child(len(nodes)) root.add_child(len(nodes)-1) nodes.append(human_node) nodes.append(root) return nodes def chage_tree_priori2(human_nodes,llm_nodes): root = TreeNode() root.add_child(len(human_nodes)-1) root.add_child(len(human_nodes)+len(llm_nodes)-1) for i in range(len(llm_nodes)): llm_nodes[i].children = [len(human_nodes)+x for x in llm_nodes[i].children] nodes = human_nodes+llm_nodes nodes.append(root) return nodes def chage_tree_priori3(co_nodes,llm_nodes): human_node = TreeNode(name='human') root = TreeNode() root.add_child(len(co_nodes)+len(llm_nodes)) root.add_child(len(co_nodes)-1) root.add_child(len(co_nodes)+len(llm_nodes)-1) for i in range(len(llm_nodes)): llm_nodes[i].children = [len(co_nodes)+x for x in llm_nodes[i].children] nodes = co_nodes+llm_nodes nodes.append(human_node) nodes.append(root) return nodes def randmo_filter(names, similarity_matrix): choose_idx = [] for i in range(len(names)): if 'human' in names[i]: choose_idx.append(i) elif 'fair' in names[i] or 'pplm' in names[i] or 'gpt2-pytorch' in names[i] or ' transfo' in names[i] or 'ctrl' in names[i]: continue elif 'xlnet' in names[i] or 'grover' in names[i]: if random.random()<0.07: choose_idx.append(i) elif random.random()<0.22: choose_idx.append(i) new_names = [] for i in choose_idx: if names[i].startswith('7B') or names[i].startswith('13B') or names[i].startswith('30B') or names[i].startswith('65B'): new_names.append('LLaMA_'+names[i]) else: new_names.append(names[i]) choose_idx = np.array(choose_idx) new_similarity_matrix = similarity_matrix[choose_idx][:,choose_idx] return new_names, new_similarity_matrix def ishuman(name): return ('human' in name) def ismachine(name): return ('machine' in name or 'rephrase' in name) def get_llm(x): if 'gpt-3.5-turbo' in x: return 'gpt-3.5-turbo' elif 'gpt-4o' in x: return 'gpt-4o' elif 'llama-3.3-70b' in x: return 'llama-3.3-70b' elif 'gemini-1.5-pro' in x: return 'gemini-1.5-pro' elif 'claude-3-5-sonnet' in x: return 'claude-3-5-sonnet' elif 'qwen2.5-72b' in x: return 'qwen2.5-72b' else: raise ValueError(f"Invalid class name: {x}") def get_name(name): name = name.split('_') assert len(name) == 2 if ishuman(name[0]): if name[1]=='humanize:human' or name[1]=='human': return 'human' elif name[1]=='humanize:tool': return 'human_humanize_tool' else: llm_name = get_llm(name[1]) return f'human_rephrase_{llm_name}' elif ismachine(name[0]): llm_name = get_llm(name[0]) if name[1]=='humanize:human' or name[1]=='human': return f'{llm_name}_humanize_human' elif name[1]=='humanize:tool': return f'{llm_name}_humanize_tool' elif 'humanize:' in name[1]: llm_name2 = get_llm(name[1]) return f'{llm_name}_humanize_{llm_name2}' else: return llm_name def clear_names(names): new_names = [] for name in names: new_names.append(get_name(name)) return new_names def build_argument_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Construct the HAT tree from a similarity matrix.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument('--file-path', type=Path, required=True, help='Input similarity matrix text file.') parser.add_argument('--priori',type=int,default=1,choices=[0,1,2,3]) parser.add_argument('--save-txt-path', type=Path, required=True, help='Destination path for the tree definition.') parser.add_argument('--save-table-path', type=Path, required=True, help='Destination path for the visualised table.') parser.add_argument('--dendrogram-path', type=Path, default=None, help='Optional path for the dendrogram PDF when saved.') parser.add_argument('--save-drg', action='store_true', help='Persist the dendrogram PDF alongside the tree.') parser.add_argument('--no-save-drg', dest='save_drg', action='store_false') parser.set_defaults(save_drg=True) parser.add_argument('--save-max-dep', type=int, default=5) parser.add_argument('--end-score', type=float, default=0.1) parser.add_argument('--randmo-filter', action='store_true', help='Randomly subsample similarity entries.') return parser def main(argv: Optional[Iterable[str]] = None) -> None: parser = build_argument_parser() opt = parser.parse_args(argv) names, similarity_matrix = read_similarity_matrix(opt.file_path) if opt.save_drg: if opt.dendrogram_path is None: opt.dendrogram_path = opt.save_table_path.with_name( f"{opt.save_table_path.stem}_dendrogram.pdf" ) opt.dendrogram_path.parent.mkdir(parents=True, exist_ok=True) else: opt.dendrogram_path = None similarity_matrix = fix_asymmetry(similarity_matrix) if opt.randmo_filter: names, similarity_matrix = randmo_filter(names, similarity_matrix) # names = clear_names(names) if opt.priori==1: llm_names, llm_similarity_matrix = filter(names, similarity_matrix,filter_human=True) nodes = gen_tree(llm_similarity_matrix,llm_names,opt) nodes = chage_tree_priori1(nodes) elif opt.priori==2: human_names, human_similarity_matrix = filter(names, similarity_matrix,filter_llm=True) human_nodes = gen_tree(human_similarity_matrix,human_names,opt) llm_names, llm_similarity_matrix = filter(names, similarity_matrix,filter_human=True,filter_mix=True) llm_nodes = gen_tree(llm_similarity_matrix,llm_names,opt) nodes = chage_tree_priori2(human_nodes,llm_nodes) elif opt.priori==3: co_names, co_similarity_matrix = filter(names, similarity_matrix,filter_llm=True,filter_human=True) co_nodes = gen_tree(co_similarity_matrix,co_names,opt) llm_names, llm_similarity_matrix = filter(names, similarity_matrix,filter_human=True,filter_mix=True) llm_nodes = gen_tree(llm_similarity_matrix,llm_names,opt) nodes = chage_tree_priori3(co_nodes,llm_nodes) elif opt.priori==0: nodes = gen_tree(similarity_matrix,names,opt) else: raise ValueError("Invalid value for --priori. Choose from 0, 1, 2, or 3.") edge=[] tree_dict = update_tree(len(nodes)-1, nodes, edge) edge = rename(edge) opt.save_txt_path.parent.mkdir(parents=True, exist_ok=True) opt.save_table_path.parent.mkdir(parents=True, exist_ok=True) save_edge(edge,opt.save_txt_path) tree_dict = reid_tree_dict(tree_dict, nodes, names) draw_table(tree_dict, names, max_deep=opt.save_max_dep, save_path=opt.save_table_path) if __name__ == "__main__": main() __all__ = ["build_argument_parser", "main", "read_similarity_matrix", "gen_tree"]