| | import torch |
| | import torch.nn as nn |
| | import sys |
| | import os |
| | import json |
| |
|
| | from collections import OrderedDict |
| | project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) |
| | sys.path.append(project_root) |
| |
|
| | from transformers import AutoConfig |
| | from InternVL.modeling_intern_vit import InternVisionModel |
| | from .perceiver_resampler import PerceiverResampler, MLP |
| | from config.configu import device, VIT_MODEL_PATH, MLP1_PATH, TOK_EMBEDDING_PATH, TOKENIZER_PATH,NORM_TOK_EMBEDDING_PATH,NORM_PARAMS_PATH |
| |
|
| |
|
| | def load_json(pth): |
| | """加载json文件""" |
| | with open(pth, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | return data |
| | def load_vision_model(location='cpu'): |
| | vit_config = AutoConfig.from_pretrained(TOKENIZER_PATH, trust_remote_code=True).vision_config |
| | vision_model = InternVisionModel(vit_config).to(device).to(torch.bfloat16) |
| | state_dict = torch.load(VIT_MODEL_PATH, weights_only=True, map_location=location) |
| | incompatible_keys = vision_model.load_state_dict(state_dict) |
| | if incompatible_keys.unexpected_keys: |
| | print(f"Unexpected keys: {incompatible_keys.unexpected_keys}") |
| | if incompatible_keys.missing_keys: |
| | print(f"Missing keys: {incompatible_keys.missing_keys}") |
| | print("vision model已加载") |
| | return vision_model |
| |
|
| | def load_mlp1(downsample_ratio, vit_hidden_size=1024, llm_hidden_size=4096,location='cpu'): |
| | mlp1 = nn.Sequential( |
| | nn.LayerNorm(vit_hidden_size * int(1 / downsample_ratio) ** 2), |
| | nn.Linear(vit_hidden_size * int(1 / downsample_ratio) ** 2, llm_hidden_size), |
| | nn.GELU(), |
| | nn.Linear(llm_hidden_size, llm_hidden_size) |
| | ).to(device).to(torch.bfloat16) |
| | mlp1.load_state_dict(torch.load(MLP1_PATH, weights_only=True, map_location=location)) |
| | print("mlp1已加载") |
| | return mlp1 |
| |
|
| | def load_tok_embeddings(path=TOK_EMBEDDING_PATH,vocab_size=92553, llm_hidden_size=4096,location='cpu'): |
| | tok_embeddings = nn.Embedding(vocab_size, llm_hidden_size, padding_idx=2).to(device).to(torch.bfloat16) |
| | tok_embeddings.load_state_dict(torch.load(path, weights_only=True, map_location=location)) |
| | print("tok_embedding已加载") |
| | return tok_embeddings |
| |
|
| |
|
| |
|
| | def load_normed_tok_embeddings(vocab_size=92553, llm_hidden_size=4096,load_checkboard=False,location="cpu"): |
| | tok_embeddings = nn.Embedding(vocab_size, llm_hidden_size, padding_idx=2).to(device).to(torch.bfloat16) |
| | tok_embeddings.load_state_dict(torch.load(NORM_TOK_EMBEDDING_PATH, weights_only=True, map_location=location)) |
| | print("norm tok_embedding已加载") |
| | if load_checkboard: |
| | checkboard_norm=torch.load(NORM_PARAMS_PATH) |
| | print("归一化参数(mu, sigma)已加载") |
| | return tok_embeddings,checkboard_norm |
| | return tok_embeddings |
| |
|
| |
|
| | def load_tokenizer(): |
| | from transformers import AutoTokenizer |
| | tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_PATH, trust_remote_code=True) |
| | return tokenizer |
| |
|
| | def load_perceiver_resampler(path=None, num_layers=4, checkpoint=None): |
| | model = PerceiverResampler(dim=4096, depth = num_layers).to(device).to(torch.bfloat16) |
| | if checkpoint == None and path!=None: |
| | checkpoint = torch.load(path) |
| | if path is not None: |
| | print(f"Load from {path}") |
| | if isinstance(checkpoint, dict): |
| | if 'model_state_dict' in checkpoint.keys(): |
| | model.load_state_dict(checkpoint['model_state_dict']) |
| | else: |
| | raise FileNotFoundError("no key model_state_dict in ckpt") |
| | else: |
| | model.load_state_dict(checkpoint) |
| | print(f"Model has a parameter scale of {sum(p.numel() for p in model.parameters())/1e9:.3f} B.") |
| | return model |
| |
|
| | def load_mlp(path=None): |
| | model = MLP(dim=256).to(device).to(torch.bfloat16) |
| | if path is not None: |
| | model.load_state_dict(torch.load(path)) |
| | print(f"Model has a parameter scale of {sum(p.numel() for p in model.parameters())/1e9:.3f} B.") |
| | return model |
| |
|
| | def load_perceiver_resampler_2(model_path, num_layers=4,device=None): |
| | if device==None: |
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | |
| | |
| | model = PerceiverResampler(dim=4096,depth=num_layers) |
| | |
| | |
| | state_dict = torch.load(model_path, map_location='cpu') |
| | |
| | |
| | state_dict = torch.load(model_path,weights_only=False) |
| | if 'model_state_dict' in state_dict.keys(): |
| | state_dict = state_dict['model_state_dict'] |
| |
|
| | |
| | new_state_dict = OrderedDict() |
| |
|
| | for key, value in state_dict.items(): |
| | |
| | if key.startswith('module.'): |
| | new_key = key[len('module.'):] |
| | else: |
| | new_key = key |
| | new_state_dict[new_key] = value |
| | model = model.to_empty(device=device) |
| | model.load_state_dict(new_state_dict) |
| | |
| | |
| | |
| | |
| | |
| | model = model.to(torch.bfloat16) |
| | return model |
| |
|
| | def load_pretrained_resampler(checkpoint_path, num_layers=6): |
| | model = load_perceiver_resampler(num_layers=num_layers) |
| | checkpoint = torch.load(checkpoint_path, map_location=device) |
| | |
| | |
| | if 'module.' in list(checkpoint.keys())[0]: |
| | print("load ddp Perceiver Resampler....") |
| | |
| | model.load_state_dict(checkpoint) |
| | elif 'module.' in list(checkpoint['model'].keys())[0]: |
| | print("load ddp Perceiver Resampler....") |
| | |
| | model.load_state_dict(checkpoint['model']) |
| | else: |
| | print("load Perseiver Resampler ...") |
| | model.load_state_dict(checkpoint) |
| | return model |
| |
|
| | def load_optimizer(optimizer, path, resume): |
| | |
| | |
| | if resume: |
| | |
| | ckpt = torch.load(path) |
| | if 'optimizer_state_dict' not in ckpt: |
| | return optimizer |
| | |
| | optimizer_state_dict = ckpt['optimizer_state_dict'] |
| | new_optimizer_state_dict = OrderedDict() |
| |
|
| | for key, value in optimizer_state_dict.items(): |
| | if key.startswith('module.'): |
| | new_key = key[len('module.'):] |
| | else: |
| | new_key = key |
| | new_optimizer_state_dict[new_key] = value |
| |
|
| | optimizer.load_state_dict(new_optimizer_state_dict) |
| | |
| | return optimizer |
| |
|
| | def load_scheduler(scheduler, path, resume): |
| | |
| | if resume: |
| | |
| | |
| | ckpt = torch.load(path) |
| | if 'scheduler_state_dict' not in ckpt: |
| | return scheduler |
| | |
| | scheduler_state_dict = ckpt['scheduler_state_dict'] |
| | new_scheduler_state_dict = OrderedDict() |
| | for key, value in scheduler_state_dict.items(): |
| | if key.startswith('module.'): |
| | new_key = key[len('module.'):] |
| | else: |
| | new_key = key |
| | new_scheduler_state_dict[new_key] = value |
| |
|
| | scheduler.load_state_dict(new_scheduler_state_dict) |
| | |
| | return scheduler |
| |
|
| | import numpy as np |
| | from tqdm import tqdm |
| | import torch |
| | import torch.nn as nn |
| | import torch.optim as optim |
| | from torch.utils.data import DataLoader, Dataset, random_split |
| | class BoundingBoxDataset(Dataset): |
| | """数据集class""" |
| | def __init__(self, data, targets): |
| | self.data = data |
| | self.targets = targets |
| |
|
| | def __len__(self): |
| | return len(self.data) |
| |
|
| | def __getitem__(self, idx): |
| | x = self.data[idx] |
| | y = self.targets[idx] |
| | return x, y |
| |
|
| | class Transformer(nn.Module): |
| | """核心的Transformer model,encoder only""" |
| | def __init__(self, input_dim:int, model_dim:int, num_heads:int, num_layers:int,output_dim:int,norms=True): |
| | super(Transformer, self).__init__() |
| | self.embedding=nn.Linear(input_dim,model_dim) |
| | if norms: |
| | self.layer_norm = nn.LayerNorm(model_dim) |
| | self.encoder_layer = nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_heads,batch_first=True) |
| | self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers,norm=self.layer_norm if norms==True else None) |
| | |
| | self.decoder=nn.Linear(model_dim,output_dim) |
| |
|
| | def forward(self, x): |
| | x=self.embedding(x) |
| | x = self.transformer_encoder(x) |
| | x=self.decoder(x) |
| | return x |
| | |
| | class OrderFormer: |
| | """封装后的模型,实现数据加载,训练,测试,推理功能""" |
| | def __init__(self, model_path=None,max_nums=300,input_dim=4, model_dim=256, num_heads=8, num_layers=4, output_dim=1,device=torch.device("cuda"),label_name="turn",norm=False): |
| | self.model = Transformer(input_dim, model_dim, num_heads, num_layers, output_dim,norms=norm).to_empty(device=device) |
| | if isinstance(model_path,str): |
| | self.model.load_state_dict(torch.load(model_path)) |
| |
|
| | self.device=device |
| | self.max_nums=max_nums |
| | self.input_dim=input_dim |
| | self.label_name=label_name |
| |
|
| | def _get_all_jsons(self,folder_path): |
| | """得到文件夹中的所有json文件路径""" |
| | files = os.listdir(folder_path) |
| | json_files = [folder_path+f for f in files if os.path.isfile(os.path.join(folder_path, f)) and f.endswith('json')] |
| | return json_files |
| | |
| | def _preprocess(self,datas): |
| | """ |
| | data: SHOULD BE Consistent with labelme data format |
| | return: |
| | [ |
| | [ |
| | [x1,y1,x2,y2],label |
| | ] |
| | ... |
| | ] |
| | x,y:[0,1] |
| | """ |
| | data=datas['shapes'] |
| | h=datas['imageHeight'] |
| | w=datas['imageWidth'] |
| | example=[] |
| | X=[] |
| | Y=[] |
| | L=[] |
| | for obj in data: |
| | |
| | l=obj[self.label_name] |
| | p=obj['points'] |
| | X.extend([p[0][0]/w,p[1][0]/w]) |
| | Y.extend([p[0][1]/h,p[1][1]/h]) |
| | L.append(l) |
| | xmin=min(X) |
| | ymin=min(Y) |
| | |
| | X=np.array(X)-xmin |
| | Y=np.array(Y)-ymin |
| | for i in range(len(L)): |
| | coord=[X[2*i],Y[2*i],X[2*i+1],Y[2*i+1]] |
| | example.append([coord,L[i]]) |
| | return example |
| | def _sort_boxes(self,boxes): |
| | """以到(0,0)距离排序box,确保输入box是唯一的排列序列 |
| | boxes=[[[x1,y1,x2,y2],label],...] |
| | label可以是标签,也可以是原始的bbox便于得到bbox和顺序的对应关系 |
| | """ |
| | return sorted(boxes,key=lambda x:((x[0][0]+x[0][2])/2)**2+((x[0][1]+x[0][3])/2)**2) |
| |
|
| | def _load_data(self,path,device=torch.device("cuda"),name='turn'): |
| | """ |
| | 从json转为tensor的构造函数 |
| | Args: |
| | path:jsons-jpgs所存在的文件夹 |
| | max_nums:单个样本中char的最大个数 |
| | name:取得char顺序指标的key |
| | Return: |
| | |
| | """ |
| | max_nums=self.max_nums |
| | device=self.device |
| | all_jsons=self._get_all_jsons(path) |
| | raw=[] |
| | for j in all_jsons: |
| | datas=load_json(j) |
| | example=self._preprocess(datas) |
| | raw.append(example) |
| | transformed_inputs=[] |
| | transformed_labels=[] |
| | originNs=[] |
| | for item in raw: |
| | item=self._sort_boxes(item) |
| | originNs.append(len(item)) |
| | lst=[] |
| | ls=[] |
| | for x in item: |
| | |
| | lst.extend(x[0]) |
| | |
| | ls.append(int(x[1])) |
| | |
| | lst.extend([0]*self.input_dim*(max_nums-len(item))) |
| | ls.extend([0]*(max_nums-len(item))) |
| | |
| | transformed_inputs.append(lst) |
| | transformed_labels.append(ls) |
| | return torch.tensor(transformed_inputs,dtype=torch.float32).reshape((-1,max_nums,self.input_dim)).to(device),torch.tensor(transformed_labels,dtype=torch.float32).reshape((-1,self.max_nums,1)).to(device),originNs |
| | |
| | def _decode(self,output,N,batch_size=1): |
| | """从输出的tensor解码得到排序""" |
| | new_output=output.reshape((batch_size,-1))[:,:N] |
| | sorted_indices = torch.argsort(new_output, dim=1) |
| | ranks = torch.argsort(sorted_indices, dim=1) |
| | return ranks + 1 |
| | |
| | def _get_acc(self,tensor1, tensor2): |
| | """计算两个相同形状tensor数值相同的位置的占比""" |
| | |
| | assert tensor1.shape == tensor2.shape, "Tensors must have the same shape" |
| | |
| | |
| | equal_mask = tensor1 == tensor2 |
| | |
| | |
| | equal_count = torch.sum(equal_mask).item() |
| | total_elements = torch.numel(tensor1) |
| | |
| | proportion_equal = equal_count / total_elements |
| | |
| | return proportion_equal |
| |
|
| |
|
| | def train(self, path,batch_size=4,lr=0.0002,weight_decay=0,epochs=1000,verbose=True): |
| | """训练函数""" |
| | if verbose: |
| | print("Loading dataset...") |
| | data,labels,_=self._load_data(path=path,device=self.device,name=self.label_name) |
| |
|
| | |
| | optimizer = optim.AdamW(self.model.parameters(), lr=lr,weight_decay=weight_decay,amsgrad=True) |
| | |
| | scheduler=optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2, eta_min=1e-6) |
| | criterion=torch.nn.MSELoss() |
| |
|
| | dataset = BoundingBoxDataset(data, labels) |
| | dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) |
| | min_loss=float("inf") |
| | if verbose: |
| | print("Start training...") |
| | for epoch in range(epochs): |
| | losses=0 |
| | for batch_idx,(inputs, y) in enumerate(tqdm((dataloader))): |
| | optimizer.zero_grad() |
| | outputs = self.model(inputs) |
| |
|
| | loss = criterion(outputs, y) |
| | loss.backward() |
| | losses+=loss.item() |
| | scheduler.step(epoch + batch_idx / len(dataloader)) |
| | optimizer.step() |
| | |
| | |
| | if verbose: |
| | print(f"Epoch {epoch+1}/{epochs}, Loss: {losses/len(dataloader)}") |
| | if losses/len(dataloader)<min_loss: |
| | min_loss=losses/len(dataloader) |
| | if verbose: |
| | print("Saving best model...") |
| | torch.save(self.model.state_dict(),'best.pth') |
| |
|
| | |
| | def eval(self, path,verbose=False): |
| | """在数据集上测试,计算平均loss和mAP""" |
| | testdata,testlabels,Ns=self._load_data(path=path,device=self.device,name=self.label_name) |
| | dataset = BoundingBoxDataset(testdata, testlabels) |
| | testloader=DataLoader(dataset,batch_size=1,shuffle=False) |
| |
|
| | self.model.eval() |
| | losses=0 |
| | mAP=0 |
| | if verbose: |
| | print("Evaluation...") |
| | criterion = nn.MSELoss() |
| | for i,(inputs, y) in enumerate(testloader): |
| | outputs = self.model(inputs) |
| | pred= self._decode(outputs,Ns[i]) |
| | gt=y.reshape((1,-1))[:,:Ns[i]] |
| | loss = criterion(pred, gt) |
| | acc=self._get_acc(pred,gt) |
| | if verbose: |
| |
|
| | print("Pred:",pred) |
| | print("GT:",gt) |
| | print("loss= ",loss.item()) |
| | print("acc= ",acc,'\n') |
| | losses+=loss.item() |
| | |
| | mAP+=acc |
| | print(f"Test MSELoss= {losses/len(testloader):.4f}\nTest mAP= {mAP/len(testloader):.4f}") |
| | |
| | def predict(self,datas,jpg_path=None,save_path=None,verbose=False): |
| | """ |
| | 进行单个数据的预测,如果有图片,保存路径,可以进行verbose可视化 |
| | 返回一个dict,key是顺序,value是box的位置 |
| | """ |
| | if save_path: |
| | os.makedirs(save_path,exist_ok=True) |
| | import time |
| | st=time.time() |
| | data=datas['shapes'] |
| | h=datas['imageHeight'] |
| | w=datas['imageWidth'] |
| | example=[] |
| | X=[] |
| | Y=[] |
| | Ls=[] |
| | for obj in data: |
| | |
| | p=obj['points'] |
| | flat_p=[p[0][0],p[0][1],p[1][0],p[1][1]] |
| | Ls.append(flat_p) |
| | X.extend([p[0][0]/w,p[1][0]/w]) |
| | Y.extend([p[0][1]/h,p[1][1]/h]) |
| | xmin=min(X) |
| | ymin=min(Y) |
| | |
| | X=np.array(X)-xmin |
| | Y=np.array(Y)-ymin |
| | for i in range(len(data)): |
| | coord=[X[2*i],Y[2*i],X[2*i+1],Y[2*i+1]] |
| | example.append([coord,Ls[i]]) |
| | example=self._sort_boxes(example) |
| | inputs=[] |
| | labels=[] |
| | for coord in example: |
| | inputs.extend(coord[0]) |
| | labels.append(coord[1]) |
| | inputs.extend([0]*self.input_dim*(self.max_nums-len(example))) |
| | |
| | x=torch.tensor(inputs,dtype=torch.bfloat16).reshape((-1,self.max_nums,self.input_dim)).to(self.device) |
| | |
| | mstart=time.time() |
| | self.model.eval() |
| | y=self.model(x) |
| | mtime=time.time()-mstart |
| | pred=self._decode(y,len(example)).squeeze().tolist() |
| | results={} |
| | if isinstance(pred,int): |
| | pred=[pred] |
| | for p,l in zip(pred,labels): |
| | results[p]=l |
| | |
| | post_start=time.time() |
| | results=self.postprocess(dict(sorted(results.items(), key=lambda item: item[0])),w,h,save_path,jpg_path) |
| | ptime=time.time()-post_start |
| | if verbose: |
| | print(f"Using {time.time()-st:.3f}s to sort boxes,with {mtime:.3f}s on OrderFormer inference,{ptime:.3f}s on postprocess.") |
| | if verbose and isinstance(jpg_path,str) and isinstance(save_path,str): |
| | import cv2 |
| | frame = cv2.imread(jpg_path) |
| |
|
| | for idx ,points in results.items(): |
| | x1, y1, x2, y2 = int(points[0]), int(points[1]), int(points[2]), int(points[3]) |
| | cv2.rectangle(frame, (x1, y1), (x2, y2), thickness=2,color=(255,0,0),lineType=cv2.LINE_AA) |
| | label_position = ((x1+x2)//2,(y1+y2)//2) |
| | cv2.putText(frame, str(idx), label_position, cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) |
| | name=jpg_path.split("/")[-1] |
| | cv2.imwrite(save_path+"ordered_"+name,frame) |
| | |
| | return dict(sorted(results.items(), key=lambda item: item[0])) |
| |
|
| | |
| | |
| | def postprocess(self,results,width,height,save_dir,jpg_path,vis=True,max_iters=5): |
| | def ordered_permute(b1,b2,b3): |
| | ws=[b1[2]-b1[0],b2[2]-b2[0],b3[2]-b3[0]] |
| | hs=[b1[3]-b1[1],b2[3]-b2[1],b3[3]-b3[1]] |
| | c1=[(b1[0]+b1[2])/2,(b1[1]+b1[3])/2] |
| | c2=[(b2[0]+b2[2])/2,(b2[1]+b2[3])/2] |
| | c3=[(b3[0]+b3[2])/2,(b3[1]+b3[3])/2] |
| | s=[ws[0]*hs[0],ws[1]*hs[1],ws[2]*hs[2]] |
| | if max(abs(c1[1]-c2[1]),abs(c1[1]-c3[1]),abs(c2[1]-c3[1]))<min(hs) and min(s)/max(s)>0.7: |
| | c=[c1[0],c2[0],c3[0]] |
| |
|
| | else: |
| | c=[3,2,1] |
| | indexed_c = list(enumerate(c)) |
| |
|
| | |
| | sorted_by_value = sorted(indexed_c, key=lambda x: x[1],reverse=True) |
| |
|
| | |
| | sorted_indices = [index for index, value in sorted_by_value] |
| |
|
| | return sorted_indices |
| | index=list(results.keys()) |
| | boxes=[[item[0]/width,item[1]/height,item[2]/width,item[3]/height] for item in list(results.values())] |
| | for i in range(len(index)-2): |
| | now=boxes[i] |
| | next_1=boxes[i+1] |
| | next_2=boxes[i+2] |
| | order=ordered_permute(now,next_1,next_2) |
| | |
| | j=i+1 |
| | boxes[i],boxes[i+1],boxes[i+2]=boxes[i+order[0]],boxes[i+order[1]],boxes[i+order[2]] |
| | results[j],results[j+1],results[j+2]=results[j+order[0]],results[j+order[1]],results[j+order[2]] |
| | |
| | return results |
| | |
| | def load_orderformer(path, |
| | max_num=50, |
| | input_dim=4, |
| | output_dim=1, |
| | model_dim=256, |
| | num_layers=4, |
| | num_heads=8, |
| | ): |
| |
|
| | model=OrderFormer(max_nums=max_num, |
| | num_layers=num_layers, |
| | input_dim=input_dim, |
| | output_dim=output_dim, |
| | model_dim=model_dim, |
| | num_heads=num_heads, |
| | model_path=path, |
| | label_name='turn', |
| | norm=False) |
| | return model |