MarshallCN commited on
Commit
7c81274
·
1 Parent(s): fecd626

add targeted attack- classify all obj as truck

Browse files
Files changed (2) hide show
  1. app.py +7 -37
  2. attacks.py +202 -29
app.py CHANGED
@@ -25,37 +25,7 @@ SAMPLE_IMAGES = sorted([
25
  # Load ultralytics model (wrapper)
26
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
27
  yolom = YOLO(MODEL_PATH) # wrapper
28
- # yolom_c = YOLO(MODEL_PATH_C) # wrapper
29
- # put underlying module to eval on correct device might be needed in attacks functions
30
-
31
- # def run_detection_on_pil(img_pil: Image.Image, eval_model_state, conf: float = 0.45):
32
- # """
33
- # Use ultralytics wrapper predict to get a visualization image with boxes.
34
- # This is inference-only and does not require gradient.
35
- # """
36
- # # ultralytics accepts numpy array (H,W,3) in RGB, we pass it directly
37
- # img = np.array(img_pil)
38
- # # use model.predict with verbose=False to avoid prints
39
- # eva_model = yolom if eval_model_state == "yolom" else YOLO(MODEL_PATH_C)
40
- # res = eva_model.predict(source=img, conf=conf, imgsz=imgsz, save=False, verbose=False)
41
- # r = res[0]
42
- # im_out = img.copy()
43
- # # Boxes object may be empty
44
- # try:
45
- # boxes = r.boxes
46
- # for box in boxes:
47
- # xyxy = box.xyxy[0].cpu().numpy().astype(int)
48
- # x1, y1, x2, y2 = map(int, xyxy)
49
- # conf_score = float(box.conf[0].cpu().numpy())
50
- # cls_id = int(box.cls[0].cpu().numpy())
51
- # # label = f"{cls_id}:{conf_score:.2f}"
52
- # label = f"{names[cls_id]}:{conf_score:.2f}"
53
- # cv2.rectangle(im_out, (x1, y1), (x2, y2), (0,255,0), 2)
54
- # cv2.putText(im_out, label, (x1, max(10,y1-5)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)
55
- # except Exception as e:
56
- # # if no boxes or structure unexpected, just return original
57
- # pass
58
- # return Image.fromarray(im_out)
59
  def iou(a, b):
60
  ax1, ay1, ax2, ay2 = a
61
  bx1, by1, bx2, by2 = b
@@ -158,9 +128,9 @@ def detect_and_attack(image, eval_model_state, attack_mode, eps, alpha, iters, c
158
 
159
  try:
160
  if attack_mode == "fgsm":
161
- adv_pil = attacks.fgsm_attack_on_detector(yolom, pil, eps=eps, device=device, imgsz=imgsz)
162
  elif attack_mode == "pgd":
163
- adv_pil = attacks.pgd_attack_on_detector(yolom, pil, eps=eps, alpha=alpha, iters=iters, device=device, imgsz=imgsz)
164
  else:
165
  adv_pil = attacks.demo_random_perturbation(pil, eps=eps)
166
  except Exception as ex:
@@ -177,7 +147,7 @@ if __name__ == "__main__":
177
  desc_html = (
178
  "Adversarial examples are generated locally using a "
179
  "<strong>client-side</strong> model’s gradients (white-box), then evaluated against the "
180
- "<strong>server-side aggregated (FedAvg) central model</strong>. "
181
  "If the perturbation transfers, it can "
182
  "degrade or alter the FedAvg model’s predictions on the same input image."
183
  )
@@ -232,7 +202,7 @@ if __name__ == "__main__":
232
  with gr.Row():
233
  eval_choice = gr.Dropdown(
234
  choices=[(f"Client model {MODEL_PATH}", "client"),
235
- (f"Central model {MODEL_PATH_C}", "central")],
236
  value="client", # ★ 初始值为合法 value
237
  label="Evaluation model"
238
  )
@@ -243,7 +213,7 @@ if __name__ == "__main__":
243
  def on_eval_change(val: str):
244
  if isinstance(val, (list, tuple)):
245
  val = val[0] if len(val) else "client"
246
- if val not in ("client", "central"):
247
  val = "client"
248
  model = "yolom" if val == "client" else "yolom_c"
249
  return gr.update(value=val), model
@@ -286,6 +256,6 @@ if __name__ == "__main__":
286
  show_error=True,
287
  )
288
  else:
289
- demo.launch(share=True)
290
 
291
 
 
25
  # Load ultralytics model (wrapper)
26
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
27
  yolom = YOLO(MODEL_PATH) # wrapper
28
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  def iou(a, b):
30
  ax1, ay1, ax2, ay2 = a
31
  bx1, by1, bx2, by2 = b
 
128
 
129
  try:
130
  if attack_mode == "fgsm":
131
+ adv_pil = attacks.fgsm_attack_on_detector(yolom, pil, eps=eps, device=device, imgsz=imgsz, gt_xywh=GT_boxes)
132
  elif attack_mode == "pgd":
133
+ adv_pil = attacks.pgd_attack_on_detector(yolom, pil, eps=eps, alpha=alpha, iters=iters, device=device, imgsz=imgsz, gt_xywh=GT_boxes)
134
  else:
135
  adv_pil = attacks.demo_random_perturbation(pil, eps=eps)
136
  except Exception as ex:
 
147
  desc_html = (
148
  "Adversarial examples are generated locally using a "
149
  "<strong>client-side</strong> model’s gradients (white-box), then evaluated against the "
150
+ "<strong>server-side aggregated (FedAvg) global model</strong>. "
151
  "If the perturbation transfers, it can "
152
  "degrade or alter the FedAvg model’s predictions on the same input image."
153
  )
 
202
  with gr.Row():
203
  eval_choice = gr.Dropdown(
204
  choices=[(f"Client model {MODEL_PATH}", "client"),
205
+ (f"Global model {MODEL_PATH_C}", "global")],
206
  value="client", # ★ 初始值为合法 value
207
  label="Evaluation model"
208
  )
 
213
  def on_eval_change(val: str):
214
  if isinstance(val, (list, tuple)):
215
  val = val[0] if len(val) else "client"
216
+ if val not in ("client", "global"):
217
  val = "client"
218
  model = "yolom" if val == "client" else "yolom_c"
219
  return gr.update(value=val), model
 
256
  show_error=True,
257
  )
258
  else:
259
+ demo.launch()
260
 
261
 
attacks.py CHANGED
@@ -131,32 +131,173 @@ def get_torch_module_from_ultralytics(model) -> nn.Module:
131
  raise RuntimeError("无法找到底层 torch.nn.Module。请确保传入的是 ultralytics.YOLO 实例且能访问 model.model。")
132
 
133
  # ----- interpret raw model outputs to confidences -----
134
- def preds_to_confidence_sum(preds: torch.Tensor) -> torch.Tensor:
135
- """
136
- preds: tensor shape (batch, N_preds, C) or (batch, C, H, W) depending on model.
137
- We support the common YOLO format where last dim: [x,y,w,h,obj_conf, class_probs...]
138
- Returns scalar: sum of (obj_conf * max_class_prob) over batch and predictions.
139
- """
140
- if preds is None:
141
- raise ValueError("preds is None")
142
- # handle shape (batch, N_preds, C)
143
- if preds.ndim == 3:
144
- # assume last dim: 5 + num_classes
145
- if preds.shape[-1] < 6:
146
- # can't interpret
147
- raise RuntimeError(f"preds last dim too small ({preds.shape[-1]}). Expecting >=6.")
148
- obj_conf = preds[..., 4] # (batch, N)
149
- class_probs = preds[..., 5:] # (batch, N, num_cls)
150
- max_class, _ = class_probs.max(dim=-1) # (batch, N)
151
- conf = obj_conf * max_class
152
- return conf.sum()
153
- # some models output (batch, C, H, W) - flatten
154
- if preds.ndim == 4:
155
- # try to collapse so that last dim is class
156
- b, c, h, w = preds.shape
157
- flat = preds.view(b, c, -1).permute(0, 2, 1) # (batch, N, C)
158
- return preds_to_confidence_sum(flat)
159
- raise RuntimeError(f"Unhandled preds dimensionality: {preds.shape}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
 
161
  # ----- core attack implementations -----
162
  def fgsm_attack_on_detector(
@@ -165,7 +306,7 @@ def fgsm_attack_on_detector(
165
  eps: float = 0.03,
166
  device: Optional[torch.device] = None,
167
  imgsz: Optional[int] = None, # None=自动对齐到 stride 倍数;也可传 640
168
-
169
  ) -> Image.Image:
170
  """
171
  Perform a single-step FGSM on a detection model (white-box).
@@ -200,7 +341,21 @@ def fgsm_attack_on_detector(
200
  raise RuntimeError("模型 forward 返回了 tuple/list,但无法从中找到预测张量。")
201
  preds = tensor_pred
202
 
203
- loss = - preds_to_confidence_sum(preds)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
  loss.backward()
205
 
206
  # (d) FGSM 在 letterboxed 空间施扰
@@ -227,6 +382,7 @@ def pgd_attack_on_detector(
227
  iters: int = 10,
228
  device: Optional[torch.device] = None,
229
  imgsz: Optional[int] = None, # None=自动对齐到 stride 倍数;也可传 640
 
230
  ):
231
  """
232
  在 YOLO 的 letterbox 域做 PGD,
@@ -249,6 +405,9 @@ def pgd_attack_on_detector(
249
  x_lb_orig, meta = letterbox_tensor(x0, imgsz=imgsz, stride=s, fill=114/255.0) # [1,3,S,S]
250
  x = x_lb_orig.clone().detach().requires_grad_(True)
251
 
 
 
 
252
  for _ in range(iters):
253
  # 前向 + 反向(需要梯度)
254
  preds = net(x)
@@ -256,7 +415,21 @@ def pgd_attack_on_detector(
256
  preds = next((p for p in preds if isinstance(p, torch.Tensor) and p.ndim >= 3), None)
257
  if preds is None:
258
  raise RuntimeError("模型 forward 返回 tuple/list,但未找到预测张量。")
259
- loss = - preds_to_confidence_sum(preds) # 我们希望置信度总和下降 → 最小化
 
 
 
 
 
 
 
 
 
 
 
 
 
 
260
  loss.backward()
261
 
262
  # 更新步与投影(不记录计算图)
 
131
  raise RuntimeError("无法找到底层 torch.nn.Module。请确保传入的是 ultralytics.YOLO 实例且能访问 model.model。")
132
 
133
  # ----- interpret raw model outputs to confidences -----
134
+
135
+
136
+ def _ensure_bcn(preds):
137
+ assert preds.ndim == 3
138
+ B, C1, C2 = preds.shape
139
+ if C1 - 4 > 0 and C2 >= 1000: # [B, 4+nc, N]
140
+ return preds
141
+ if C2 - 4 > 0 and C1 >= 1000: # [B, N, 4+nc]
142
+ return preds.permute(0, 2, 1).contiguous()
143
+ return preds
144
+
145
+ def _xywh_to_xyxy(xywh):
146
+ x,y,w,h = xywh.unbind(-1)
147
+ return torch.stack([x-w/2, y-h/2, x+w/2, y+h/2], dim=-1)
148
+
149
+ def _xyxy_to_xywh(xyxy):
150
+ x1,y1,x2,y2 = xyxy.unbind(-1)
151
+ cx = (x1+x2)/2; cy = (y1+y2)/2
152
+ w = (x2-x1).clamp(min=0); h = (y2-y1).clamp(min=0)
153
+ return torch.stack([cx,cy,w,h], dim=-1)
154
+
155
+ def _map_xyxy_to_letterbox(xyxy_tensor, meta):
156
+ if meta is None:
157
+ return xyxy_tensor
158
+ r = meta.get('ratio', meta.get('scale', (1.0, 1.0)))
159
+ p = meta.get('pad', (0.0, 0.0))
160
+ if isinstance(r, (int, float)):
161
+ r = (float(r), float(r))
162
+ rx, ry = float(r[0]), float(r[1])
163
+ px, py = float(p[0]), float(p[1])
164
+ x1 = xyxy_tensor[:, 0] * rx + px
165
+ y1 = xyxy_tensor[:, 1] * ry + py
166
+ x2 = xyxy_tensor[:, 2] * rx + px
167
+ y2 = xyxy_tensor[:, 3] * ry + py
168
+ return torch.stack([x1, y1, x2, y2], dim=-1)
169
+
170
+ def _iou_xyxy(b_xyxy, g_xyxy):
171
+ N, M = b_xyxy.size(0), g_xyxy.size(0)
172
+ b = b_xyxy[:, None, :].expand(N, M, 4)
173
+ g = g_xyxy[None, :, :].expand(N, M, 4)
174
+ inter_x1 = torch.maximum(b[...,0], g[...,0])
175
+ inter_y1 = torch.maximum(b[...,1], g[...,1])
176
+ inter_x2 = torch.minimum(b[...,2], g[...,2])
177
+ inter_y2 = torch.minimum(b[...,3], g[...,3])
178
+ inter_w = (inter_x2 - inter_x1).clamp(min=0)
179
+ inter_h = (inter_y2 - inter_y1).clamp(min=0)
180
+ inter = inter_w * inter_h
181
+ area_b = (b[...,2]-b[...,0]).clamp(min=0) * (b[...,3]-b[...,1]).clamp(min=0)
182
+ area_g = (g[...,2]-g[...,0]).clamp(min=0) * (g[...,3]-g[...,1]).clamp(min=0)
183
+ return inter / (area_b + area_g - inter + 1e-9)
184
+
185
+ def _gt_list_to_xyxy_tensor(gt_list, device, meta=None):
186
+ if not gt_list:
187
+ return torch.empty(0, 4, device=device, dtype=torch.float32)
188
+ xyxy = torch.tensor([b['xyxy'] for b in gt_list], dtype=torch.float32, device=device)
189
+ return _map_xyxy_to_letterbox(xyxy, meta)
190
+
191
+ def preds_to_targeted_loss(
192
+ preds, # [B,4+nc,N] 或 [B,N,4+nc];类别部分最好是 logits
193
+ target_cls: int,
194
+ gt_xywh, # 这里直接支持 list[{'xyxy':..., 'cls':..., 'conf':...}]
195
+ topk: int = 20,
196
+ kappa: float = 0.1,
197
+ lambda_margin: float = 1.0,
198
+ lambda_keep: float = 0.2,
199
+ lambda_target: float = 0.0, # 新增:恢复 -p_t.mean() 这项
200
+ debug: bool = False,
201
+ meta: dict | None = None, # 若 GT 是原图坐标,传入 letterbox 的 meta
202
+ ):
203
+ preds = _ensure_bcn(preds)
204
+ B, C, N = preds.shape
205
+ nc = C - 4
206
+ assert 0 <= target_cls < nc
207
+
208
+ # 解析 GT(list -> tensor in letterbox coords)
209
+ gt_xyxy_lb = _gt_list_to_xyxy_tensor(gt_xywh, preds.device, meta=meta) # [M,4]
210
+
211
+ boxes_bxn4 = preds[:, :4, :].permute(0, 2, 1) # [B,N,4] (xywh, letterbox)
212
+ logits_bxcn = preds[:, 4:, :] # [B,nc,N]
213
+
214
+ # 若类别部分像概率(0~1),转为 logits
215
+ zmin, zmax = logits_bxcn.min().item(), logits_bxcn.max().item()
216
+ if 0.0 <= zmin and zmax <= 1.0:
217
+ p = logits_bxcn.clamp(1e-6, 1-1e-6)
218
+ logits_bxcn = torch.log(p) - torch.log1p(-p)
219
+
220
+ # 选与 GT 最相关的候选 idx(batch=0)
221
+ b_xyxy = _xywh_to_xyxy(boxes_bxn4[0]) # [N,4]
222
+ if gt_xyxy_lb.numel() > 0:
223
+ iou = _iou_xyxy(b_xyxy, gt_xyxy_lb) # [N,M]
224
+ best_per_gt = iou.argmax(dim=0) # [M]
225
+ idx = torch.unique(best_per_gt, sorted=False)
226
+ if idx.numel() < topk:
227
+ topvals = iou.max(dim=1).values
228
+ topidx2 = torch.topk(topvals, k=min(topk, N)).indices
229
+ idx = torch.unique(torch.cat([idx, topidx2], 0), sorted=False)[:topk]
230
+ else:
231
+ # 没 GT 就按当前最大类别置信度取 topk
232
+ z = logits_bxcn[0] # [nc,N]
233
+ pmax = z.softmax(dim=0).max(dim=0).values
234
+ idx = torch.topk(pmax, k=min(topk, N)).indices
235
+
236
+ if idx.numel() == 0:
237
+ idx = torch.arange(min(topk, N), device=preds.device)
238
+
239
+ # 取这些候选的类别 logits:[K,nc]
240
+ z = logits_bxcn[0, :, idx].T # [K,nc]
241
+
242
+ # 1) CW-style margin
243
+ mask = torch.ones(nc, device=z.device, dtype=torch.bool)
244
+ mask[target_cls] = False
245
+ z_t = z[:, target_cls]
246
+ z_oth = z[:, mask].max(dim=1).values
247
+ loss_margin = F.relu(kappa + z_oth - z_t).mean()
248
+
249
+ # 2) keep(KL >= 0)
250
+ with torch.no_grad():
251
+ p_clean = z.detach().softmax(dim=1)
252
+ logp_adv = z.log_softmax(dim=1)
253
+ loss_keep = F.kl_div(logp_adv, p_clean, reduction="batchmean")
254
+
255
+ # 3) 你的旧项:直接推高目标类 logit
256
+ loss_target = -z_t.mean()
257
+
258
+ loss = (
259
+ lambda_margin * loss_margin
260
+ + lambda_keep * loss_keep
261
+ + lambda_target * loss_target
262
+ )
263
+
264
+ if debug:
265
+ same_ratio = (z.argmax(dim=1) == target_cls).float().mean().item()
266
+ print(
267
+ f"[dbg] K={idx.numel()} nc={nc} target={target_cls} "
268
+ f"margin={loss_margin.item():.6f} keep={loss_keep.item():.6f} "
269
+ f"targ={loss_target.item():.6f} same_ratio={same_ratio:.3f} "
270
+ f"z_t_mean={z_t.mean().item():.3f} z_oth_mean={z_oth.mean().item():.3f}"
271
+ )
272
+ return loss
273
+
274
+
275
+ # def preds_to_confidence_sum(preds: torch.Tensor) -> torch.Tensor:
276
+ # """
277
+ # preds: tensor shape (batch, N_preds, C) or (batch, C, H, W) depending on model.
278
+ # We support the common YOLO format where last dim: [x,y,w,h,obj_conf, class_probs...]
279
+ # Returns scalar: sum of (obj_conf * max_class_prob) over batch and predictions.
280
+ # """
281
+ # if preds is None:
282
+ # raise ValueError("preds is None")
283
+ # # handle shape (batch, N_preds, C)
284
+ # if preds.ndim == 3:
285
+ # # assume last dim: 5 + num_classes
286
+ # if preds.shape[-1] < 6:
287
+ # # can't interpret
288
+ # raise RuntimeError(f"preds last dim too small ({preds.shape[-1]}). Expecting >=6.")
289
+ # obj_conf = preds[..., 4] # (batch, N)
290
+ # class_probs = preds[..., 5:] # (batch, N, num_cls)
291
+ # max_class, _ = class_probs.max(dim=-1) # (batch, N)
292
+ # conf = obj_conf * max_class
293
+ # return conf.sum()
294
+ # # some models output (batch, C, H, W) - flatten
295
+ # if preds.ndim == 4:
296
+ # # try to collapse so that last dim is class
297
+ # b, c, h, w = preds.shape
298
+ # flat = preds.view(b, c, -1).permute(0, 2, 1) # (batch, N, C)
299
+ # return preds_to_confidence_sum(flat)
300
+ # raise RuntimeError(f"Unhandled preds dimensionality: {preds.shape}")
301
 
302
  # ----- core attack implementations -----
303
  def fgsm_attack_on_detector(
 
306
  eps: float = 0.03,
307
  device: Optional[torch.device] = None,
308
  imgsz: Optional[int] = None, # None=自动对齐到 stride 倍数;也可传 640
309
+ gt_xywh: torch.Tensor | None = None # letterbox坐标系下的目标框(可选)
310
  ) -> Image.Image:
311
  """
312
  Perform a single-step FGSM on a detection model (white-box).
 
341
  raise RuntimeError("模型 forward 返回了 tuple/list,但无法从中找到预测张量。")
342
  preds = tensor_pred
343
 
344
+ target_cls = 2
345
+ loss = - preds_to_targeted_loss(
346
+ preds,
347
+ target_cls=target_cls,
348
+ gt_xywh=gt_xywh, # 直接传你的 list[dict]
349
+ topk=20,
350
+ kappa=0.1,
351
+ lambda_margin=1.0,
352
+ lambda_keep=0.2,
353
+ lambda_target=0.0, # 恢复 -p_t.mean() 的影响
354
+ debug=False,
355
+ meta=meta # 若 GT 是原图坐标,务必传 meta
356
+ )
357
+
358
+ # loss = - preds_to_confidence_sum(preds)
359
  loss.backward()
360
 
361
  # (d) FGSM 在 letterboxed 空间施扰
 
382
  iters: int = 10,
383
  device: Optional[torch.device] = None,
384
  imgsz: Optional[int] = None, # None=自动对齐到 stride 倍数;也可传 640
385
+ gt_xywh: torch.Tensor | None = None # letterbox坐标系下的目标框(可选)
386
  ):
387
  """
388
  在 YOLO 的 letterbox 域做 PGD,
 
405
  x_lb_orig, meta = letterbox_tensor(x0, imgsz=imgsz, stride=s, fill=114/255.0) # [1,3,S,S]
406
  x = x_lb_orig.clone().detach().requires_grad_(True)
407
 
408
+ targeted = True
409
+ sign = -1.0 if targeted else 1.0 # 定向取负号,非定向取正号
410
+ target_cls = 2
411
  for _ in range(iters):
412
  # 前向 + 反向(需要梯度)
413
  preds = net(x)
 
415
  preds = next((p for p in preds if isinstance(p, torch.Tensor) and p.ndim >= 3), None)
416
  if preds is None:
417
  raise RuntimeError("模型 forward 返回 tuple/list,但未找到预测张量。")
418
+
419
+ loss = - preds_to_targeted_loss(
420
+ preds,
421
+ target_cls=target_cls,
422
+ gt_xywh=gt_xywh, # 直接传你的 list[dict]
423
+ topk=20,
424
+ kappa=0.1,
425
+ lambda_margin=1.0,
426
+ lambda_keep=0.2,
427
+ lambda_target=0.0, # 恢复 -p_t.mean() 的影响
428
+ debug=False,
429
+ meta=meta # 若 GT 是原图坐标,务必传 meta
430
+ )
431
+
432
+ # loss = - preds_to_confidence_sum(preds) # 我们希望置信度总和下降 → 最小化
433
  loss.backward()
434
 
435
  # 更新步与投影(不记录计算图)