basimazam commited on
Commit
fbfadc4
·
verified ·
1 Parent(s): 43a04a1

Upload SDG pipeline + classifier weights

Browse files
Files changed (5) hide show
  1. README.md +9 -52
  2. __init__.py +0 -1
  3. requirements.txt +8 -0
  4. safe_diffusion_guidance.py +120 -97
  5. utils/__init__.py +4 -1
README.md CHANGED
@@ -1,54 +1,11 @@
1
- ---
2
- library_name: diffusers
3
- pipeline_tag: text-to-image
4
- tags:
5
- - safety
6
- - classifier-guidance
7
- - stable-diffusion
8
- - plug-and-play
9
- license: apache-2.0
10
- ---
11
 
12
- # Safe Diffusion Guidance (SDG) plug-and-play safety layer for Stable Diffusion
 
 
 
13
 
14
- **Safe Diffusion Guidance (SDG)** is a *classifier-guided denoising* layer that steers the sampling trajectory away from unsafe content **without retraining** the base model.
15
- It works **standalone** with SD 1.4 / 1.5 / 2.1 and **composes** cleanly with ESD/UCE/SLD.
16
-
17
- - **Safety signal:** a 5-class mid-UNet feature classifier (classes: `gore, hate, medical, safe, sexual`) trained on (1280×8×8) features.
18
- - **Controls:** `safety_scale` (strength), `mid_fraction` (fraction of steps guided).
19
- - **Plug-in:** drop into any SD pipeline, or stack on top of ESD/UCE/SLD.
20
- - **No retraining:** small gradient nudges to latents during denoising.
21
-
22
- > **Note on metrics** (matching our paper): FID/KID are computed vs. _baseline model outputs_ rather than real images; baseline FID/KID are ≈0 by construction.
23
-
24
- ## Quickstart (SD 1.5)
25
-
26
- ```python
27
- import torch
28
- from diffusers import StableDiffusionPipeline
29
-
30
- # 1) Load base SD pipeline (disable default safety checker)
31
- base = StableDiffusionPipeline.from_pretrained(
32
- "runwayml/stable-diffusion-v1-5",
33
- torch_dtype=torch.float16,
34
- safety_checker=None
35
- ).to("cuda")
36
-
37
- # 2) Load SDG custom pipeline from Hub (this repo)
38
- sdg = StableDiffusionPipeline.from_pretrained(
39
- "your-org/safe-diffusion-guidance",
40
- custom_pipeline="safe_diffusion_guidance",
41
- torch_dtype=torch.float16
42
- ).to("cuda")
43
-
44
- img = sdg(
45
- base_pipe=base,
46
- prompt="portrait photograph, studio light, 85mm, realistic",
47
- num_inference_steps=50,
48
- guidance_scale=7.5,
49
- safety_scale=5.0, # strength: ~2–8 (Light→Strong)
50
- mid_fraction=1.0, # guide fraction of steps: 0.5, 0.8, 1.0
51
- safe_class_index=3 # index of 'safe' in [gore,hate,medical,safe,sexual]
52
- ).images[0]
53
-
54
- img.save("sdg_safe_output.png")
 
1
+ # Safe Diffusion Guidance (SDG)
 
 
 
 
 
 
 
 
 
2
 
3
+ Custom Diffusers pipeline that applies a mid-UNet safety classifier as guidance during denoising.
4
+ - Plug-and-play: works with any Stable Diffusion checkpoint (e.g., SD 1.5).
5
+ - No retraining needed; classifier runs on mid-UNet features.
6
+ - Tunable: `safety_scale`, `mid_fraction`, `safe_class_index`.
7
 
8
+ ## Install
9
+ ```bash
10
+ python -m venv .venv && source .venv/bin/activate
11
+ pip install -r requirements.txt
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
__init__.py CHANGED
@@ -1,3 +1,2 @@
1
- # __init__.py
2
  from .safe_diffusion_guidance import SafeDiffusionGuidance
3
  __all__ = ["SafeDiffusionGuidance"]
 
 
1
  from .safe_diffusion_guidance import SafeDiffusionGuidance
2
  __all__ = ["SafeDiffusionGuidance"]
requirements.txt ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ torch>=2.1
2
+ transformers>=4.41
3
+ diffusers>=0.30
4
+ accelerate>=0.30
5
+ safetensors>=0.4
6
+ huggingface_hub>=0.23
7
+ numpy
8
+ Pillow
safe_diffusion_guidance.py CHANGED
@@ -1,25 +1,30 @@
1
  # safe_diffusion_guidance.py
2
  import os
3
- import torch
4
  from typing import Optional, List
 
 
 
5
  from diffusers import DiffusionPipeline, StableDiffusionPipeline
6
  from diffusers.utils import BaseOutput
7
 
8
- # ---- Classifier (unchanged) ----
9
- import torch.nn as nn
10
 
11
- CLASS_NAMES = ['gore', 'hate', 'medical', 'safe', 'sexual']
 
12
 
13
  class SafetyClassifier1280(nn.Module):
 
 
 
 
14
  def __init__(self, num_classes: int = 5):
15
  super().__init__()
16
  self.pre = nn.AdaptiveAvgPool2d((8, 8))
17
  self.net = nn.Sequential(
18
  nn.Conv2d(1280, 512, 3, padding=1),
19
- nn.BatchNorm2d(512), nn.ReLU(inplace=True), nn.MaxPool2d(2),
20
  nn.Conv2d(512, 256, 3, padding=1),
21
- nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.MaxPool2d(2),
22
- nn.AdaptiveAvgPool2d(1), nn.Flatten(),
23
  nn.Linear(256, 128), nn.ReLU(inplace=True), nn.Dropout(0.3),
24
  nn.Linear(128, num_classes)
25
  )
@@ -28,39 +33,18 @@ class SafetyClassifier1280(nn.Module):
28
  @staticmethod
29
  def _init_weights(m):
30
  if isinstance(m, nn.Linear):
31
- nn.init.xavier_uniform_(m.weight); nn.init.zeros_(m.bias)
 
32
  elif isinstance(m, nn.Conv2d):
33
- nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
34
  if m.bias is not None: nn.init.zeros_(m.bias)
35
  elif isinstance(m, nn.BatchNorm2d):
36
  nn.init.ones_(m.weight); nn.init.zeros_(m.bias)
37
 
38
  def forward(self, x: torch.Tensor) -> torch.Tensor:
39
- x = self.pre(x)
40
  return self.net(x)
41
 
42
- # ---- NEW: robust path resolution for weights ----
43
- def _resolve_repo_path(rel_path: str) -> str:
44
- """Return an absolute path inside the cached repo; fallback to hf_hub_download."""
45
- here = os.path.dirname(__file__)
46
- local_path = os.path.join(here, rel_path)
47
- if os.path.exists(local_path):
48
- return local_path
49
- # Fallback: try hub download (works even if code is executed outside repo root)
50
- try:
51
- from huggingface_hub import hf_hub_download
52
- # Best effort to get repo id; default to your public repo if unknown
53
- repo_id = getattr(_resolve_repo_path, "_repo_id", None)
54
- if repo_id is None:
55
- # Diffusers stores name or path in internal dict sometimes:
56
- repo_id = getattr(SafeDiffusionGuidance, "__repo_id__", "basimazam/safe-diffusion-guidance")
57
- return hf_hub_download(repo_id=repo_id, filename=rel_path)
58
- except Exception as e:
59
- raise FileNotFoundError(
60
- f"Could not find classifier weights at '{rel_path}'. "
61
- f"Make sure the file exists in the repo, or pass `classifier_weights=...`. "
62
- f"Original error: {e}"
63
- )
64
 
65
  def load_classifier_1280(
66
  weights_path: str,
@@ -75,35 +59,66 @@ def load_classifier_1280(
75
  model.eval()
76
  return model
77
 
78
- def pick_weights_for_pipe(pipe) -> str:
79
- # Always use the standard path inside the repo
80
- return _resolve_repo_path("classifiers/safety_classifier_1280.pth")
81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  class SDGOutput(BaseOutput):
83
- images: List
 
84
 
85
  class SafeDiffusionGuidance(DiffusionPipeline):
86
- """Pure custom pipeline; loads base SD lazily at runtime."""
 
 
 
87
 
88
- def __init__(self): # <-- IMPORTANT: no **kwargs
89
  super().__init__()
90
- self.base_pipe_ = None
91
- # Hint for the fallback downloader (optional)
92
- try:
93
- SafeDiffusionGuidance.__repo_id__ = self.config._name_or_path # diffusers sets this sometimes
94
- except Exception:
95
- pass
96
-
97
- def _ensure_base(self, base_pipe, base_model_id, torch_dtype):
98
  if base_pipe is not None:
99
  self.base_pipe_ = base_pipe
100
  return self.base_pipe_
101
  if self.base_pipe_ is None:
102
  self.base_pipe_ = StableDiffusionPipeline.from_pretrained(
103
- base_model_id, torch_dtype=torch_dtype, safety_checker=None
 
 
 
104
  ).to(self.device)
105
  return self.base_pipe_
106
 
 
107
  def __call__(
108
  self,
109
  prompt: str,
@@ -111,19 +126,21 @@ class SafeDiffusionGuidance(DiffusionPipeline):
111
  num_inference_steps: int = 50,
112
  guidance_scale: float = 7.5,
113
  safety_scale: float = 5.0,
114
- mid_fraction: float = 1.0,
115
- safe_class_index: int = 3,
116
  classifier_weights: Optional[str] = None,
117
  base_pipe: Optional[StableDiffusionPipeline] = None,
118
  base_model_id: str = "runwayml/stable-diffusion-v1-5",
119
  generator: Optional[torch.Generator] = None,
120
- **kwargs
121
  ) -> SDGOutput:
 
 
122
  base = self._ensure_base(base_pipe, base_model_id, torch_dtype=torch.float16)
123
  device = getattr(base, "_execution_device", base.device)
124
- dtype = base.unet.dtype
125
 
126
- # text embeds (CFG)
127
  tok = base.tokenizer
128
  max_len = tok.model_max_length
129
  txt = tok([prompt], padding="max_length", max_length=max_len, return_tensors="pt")
@@ -135,66 +152,72 @@ class SafeDiffusionGuidance(DiffusionPipeline):
135
  uncond = base.text_encoder(uncond_txt.input_ids.to(device)).last_hidden_state
136
  cond_embeds = torch.cat([uncond, cond], dim=0)
137
 
138
- # latents
139
  h = kwargs.pop("height", 512); w = kwargs.pop("width", 512)
140
- latents = torch.randn((1, base.unet.in_channels, h // 8, w // 8),
141
- device=device, generator=generator, dtype=dtype)
 
 
142
 
143
  base.scheduler.set_timesteps(num_inference_steps, device=device)
144
  timesteps = base.scheduler.timesteps
145
 
146
- # classifier (fp32) use provided path or default resolved path
147
- weights_file = classifier_weights or pick_weights_for_pipe(base)
148
- clf = load_classifier_1280(weights_file, device=device, dtype=torch.float32).eval()
149
 
150
- # mid-block hook
151
  mid = {}
152
  def hook(_, __, out): mid["feat"] = out[0] if isinstance(out, tuple) else out
153
  handle = base.unet.mid_block.register_forward_hook(hook)
154
 
155
- base_alpha = 1e-3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
 
157
- with torch.no_grad():
158
- for i, t in enumerate(timesteps):
 
 
 
 
 
 
 
 
 
159
  lat_in = base.scheduler.scale_model_input(latents, t)
160
  lat_cat = torch.cat([lat_in, lat_in], dim=0)
161
 
162
- do_guide = (i / len(timesteps)) <= mid_fraction and safety_scale > 0
163
- if do_guide:
164
- with torch.enable_grad():
165
- lg = latents.detach().clone().requires_grad_(True)
166
- lin = base.scheduler.scale_model_input(lg, t)
167
- lcat = torch.cat([lin, lin], dim=0)
168
-
169
- _ = base.unet(lcat, t, encoder_hidden_states=cond_embeds).sample
170
- feat = mid["feat"].detach().float()
171
- logits = clf(feat)
172
- probs = torch.softmax(logits, dim=-1)
173
- unsafe = 1.0 - probs[:, safe_class_index].mean()
174
-
175
- loss = safety_scale * unsafe
176
- loss.backward()
177
-
178
- alpha = base_alpha
179
- if hasattr(base.scheduler, "sigmas"):
180
- idx = min(i, len(base.scheduler.sigmas) - 1)
181
- alpha = base_alpha * float(base.scheduler.sigmas[idx])
182
- latents = (lg - alpha * lg.grad).detach()
183
-
184
- lat_in = base.scheduler.scale_model_input(latents, t)
185
- lat_cat = torch.cat([lat_in, lat_in], dim=0)
186
- noise_pred = base.unet(lat_cat, t, encoder_hidden_states=cond_embeds).sample
187
- else:
188
- noise_pred = base.unet(lat_cat, t, encoder_hidden_states=cond_embeds).sample
189
-
190
- n_uncond, n_text = noise_pred.chunk(2)
191
- noise = n_uncond + guidance_scale * (n_text - n_uncond)
192
- latents = base.scheduler.step(noise, t, latents).prev_sample
193
 
194
  handle.remove()
195
- with torch.no_grad():
196
- img = base.decode_latents(latents)
197
- img = base.image_processor.postprocess(img, output_type="pil")[0]
198
- return SDGOutput(images=[img])
 
 
199
 
200
  __all__ = ["SafeDiffusionGuidance"]
 
1
  # safe_diffusion_guidance.py
2
  import os
 
3
  from typing import Optional, List
4
+
5
+ import torch
6
+ import torch.nn as nn
7
  from diffusers import DiffusionPipeline, StableDiffusionPipeline
8
  from diffusers.utils import BaseOutput
9
 
 
 
10
 
11
+ # ----------------------------- Classifier ------------------------------------
12
+ CLASS_NAMES = ["gore", "hate", "medical", "safe", "sexual"]
13
 
14
  class SafetyClassifier1280(nn.Module):
15
+ """
16
+ Safety classifier for mid-UNet features of shape (B, 1280, H, W).
17
+ Robust to HxW via AdaptiveAvgPool2d((8,8)) before the head.
18
+ """
19
  def __init__(self, num_classes: int = 5):
20
  super().__init__()
21
  self.pre = nn.AdaptiveAvgPool2d((8, 8))
22
  self.net = nn.Sequential(
23
  nn.Conv2d(1280, 512, 3, padding=1),
24
+ nn.BatchNorm2d(512), nn.ReLU(inplace=True), nn.MaxPool2d(2), # 512x4x4
25
  nn.Conv2d(512, 256, 3, padding=1),
26
+ nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.MaxPool2d(2), # 256x2x2
27
+ nn.AdaptiveAvgPool2d(1), nn.Flatten(), # 256
28
  nn.Linear(256, 128), nn.ReLU(inplace=True), nn.Dropout(0.3),
29
  nn.Linear(128, num_classes)
30
  )
 
33
  @staticmethod
34
  def _init_weights(m):
35
  if isinstance(m, nn.Linear):
36
+ nn.init.xavier_uniform_(m.weight)
37
+ if m.bias is not None: nn.init.zeros_(m.bias)
38
  elif isinstance(m, nn.Conv2d):
39
+ nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
40
  if m.bias is not None: nn.init.zeros_(m.bias)
41
  elif isinstance(m, nn.BatchNorm2d):
42
  nn.init.ones_(m.weight); nn.init.zeros_(m.bias)
43
 
44
  def forward(self, x: torch.Tensor) -> torch.Tensor:
45
+ x = self.pre(x) # (B, 1280, 8, 8)
46
  return self.net(x)
47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
 
49
  def load_classifier_1280(
50
  weights_path: str,
 
59
  model.eval()
60
  return model
61
 
 
 
 
62
 
63
+ def _here(*paths: str) -> str:
64
+ return os.path.join(os.path.dirname(__file__), *paths)
65
+
66
+
67
+ def pick_weights_path() -> str:
68
+ """
69
+ Try common locations; allow env override. Raise if not found.
70
+ """
71
+ candidates = [
72
+ os.getenv("SDG_CLASSIFIER_WEIGHTS", ""),
73
+ _here("classifiers", "safety_classifier_1280.pth"),
74
+ _here("safety_classifier_1280.pth"),
75
+ "classifiers/safety_classifier_1280.pth",
76
+ "safety_classifier_1280.pth",
77
+ ]
78
+ for p in candidates:
79
+ if p and os.path.exists(p):
80
+ return p
81
+ raise FileNotFoundError(
82
+ "Safety-classifier weights not found. Place 'safety_classifier_1280.pth' "
83
+ "in repo root or 'classifiers/' (or set SDG_CLASSIFIER_WEIGHTS, or pass "
84
+ "`classifier_weights=...` to the call())."
85
+ )
86
+
87
+
88
+ # ----------------------------- Pipeline --------------------------------------
89
  class SDGOutput(BaseOutput):
90
+ images: List # list of PIL Images
91
+
92
 
93
  class SafeDiffusionGuidance(DiffusionPipeline):
94
+ """
95
+ Minimal custom pipeline that loads a base Stable Diffusion pipeline on demand
96
+ and applies mid-UNet classifier-guided denoising for safety.
97
+ """
98
 
99
+ def __init__(self): # IMPORTANT: no **kwargs (diffusers inspects this)
100
  super().__init__()
101
+ self.base_pipe_ = None # lazy cache
102
+
103
+ def _ensure_base(
104
+ self,
105
+ base_pipe: Optional[StableDiffusionPipeline],
106
+ base_model_id: str,
107
+ torch_dtype: torch.dtype,
108
+ ) -> StableDiffusionPipeline:
109
  if base_pipe is not None:
110
  self.base_pipe_ = base_pipe
111
  return self.base_pipe_
112
  if self.base_pipe_ is None:
113
  self.base_pipe_ = StableDiffusionPipeline.from_pretrained(
114
+ base_model_id,
115
+ torch_dtype=torch_dtype,
116
+ safety_checker=None,
117
+ requires_safety_checker=False,
118
  ).to(self.device)
119
  return self.base_pipe_
120
 
121
+ @torch.no_grad()
122
  def __call__(
123
  self,
124
  prompt: str,
 
126
  num_inference_steps: int = 50,
127
  guidance_scale: float = 7.5,
128
  safety_scale: float = 5.0,
129
+ mid_fraction: float = 1.0, # 0..1 fraction of steps to guide
130
+ safe_class_index: int = 3, # "safe" in CLASS_NAMES
131
  classifier_weights: Optional[str] = None,
132
  base_pipe: Optional[StableDiffusionPipeline] = None,
133
  base_model_id: str = "runwayml/stable-diffusion-v1-5",
134
  generator: Optional[torch.Generator] = None,
135
+ **kwargs,
136
  ) -> SDGOutput:
137
+
138
+ # 1) prepare base SD
139
  base = self._ensure_base(base_pipe, base_model_id, torch_dtype=torch.float16)
140
  device = getattr(base, "_execution_device", base.device)
141
+ dtype = base.unet.dtype
142
 
143
+ # 2) text embeddings (classifier-free guidance)
144
  tok = base.tokenizer
145
  max_len = tok.model_max_length
146
  txt = tok([prompt], padding="max_length", max_length=max_len, return_tensors="pt")
 
152
  uncond = base.text_encoder(uncond_txt.input_ids.to(device)).last_hidden_state
153
  cond_embeds = torch.cat([uncond, cond], dim=0)
154
 
155
+ # 3) latents
156
  h = kwargs.pop("height", 512); w = kwargs.pop("width", 512)
157
+ latents = torch.randn(
158
+ (1, base.unet.in_channels, h // 8, w // 8),
159
+ device=device, generator=generator, dtype=dtype
160
+ )
161
 
162
  base.scheduler.set_timesteps(num_inference_steps, device=device)
163
  timesteps = base.scheduler.timesteps
164
 
165
+ # 4) classifier (run in fp32)
166
+ weights_file = classifier_weights or pick_weights_path()
167
+ clf = load_classifier_1280(weights_file, device=device, dtype=torch.float32)
168
 
169
+ # 5) mid-block hook
170
  mid = {}
171
  def hook(_, __, out): mid["feat"] = out[0] if isinstance(out, tuple) else out
172
  handle = base.unet.mid_block.register_forward_hook(hook)
173
 
174
+ base_alpha = 1e-3 # step size factor for safety update
175
+
176
+ # 6) denoising loop
177
+ for i, t in enumerate(timesteps):
178
+ # standard SD forward
179
+ lat_in = base.scheduler.scale_model_input(latents, t)
180
+ lat_cat = torch.cat([lat_in, lat_in], dim=0) # for CFG
181
+ do_guide = (i / len(timesteps)) <= mid_fraction and safety_scale > 0
182
+
183
+ if do_guide:
184
+ # safety gradient w.r.t latents
185
+ with torch.enable_grad():
186
+ lg = latents.detach().clone().requires_grad_(True)
187
+ lin = base.scheduler.scale_model_input(lg, t)
188
+ lcat = torch.cat([lin, lin], dim=0)
189
+
190
+ _ = base.unet(lcat, t, encoder_hidden_states=cond_embeds).sample
191
+ feat = mid["feat"].detach().float() # (B*2, 1280, H, W)
192
+ logits = clf(feat)
193
+ probs = torch.softmax(logits, dim=-1)
194
+ unsafe = 1.0 - probs[:, safe_class_index].mean() # encourage "safe"
195
 
196
+ loss = safety_scale * unsafe
197
+ loss.backward()
198
+
199
+ alpha = base_alpha
200
+ if hasattr(base.scheduler, "sigmas"): # DDIM/PNDM/… support
201
+ idx = min(i, len(base.scheduler.sigmas) - 1)
202
+ alpha = base_alpha * float(base.scheduler.sigmas[idx])
203
+
204
+ latents = (lg - alpha * lg.grad).detach()
205
+
206
+ # resume SD denoising with updated latents
207
  lat_in = base.scheduler.scale_model_input(latents, t)
208
  lat_cat = torch.cat([lat_in, lat_in], dim=0)
209
 
210
+ noise_pred = base.unet(lat_cat, t, encoder_hidden_states=cond_embeds).sample
211
+ n_uncond, n_text = noise_pred.chunk(2)
212
+ noise = n_uncond + guidance_scale * (n_text - n_uncond)
213
+ latents = base.scheduler.step(noise, t, latents).prev_sample
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
214
 
215
  handle.remove()
216
+
217
+ # 7) decode
218
+ img = base.decode_latents(latents)
219
+ pil = base.image_processor.postprocess(img, output_type="pil")[0]
220
+ return SDGOutput(images=[pil])
221
+
222
 
223
  __all__ = ["SafeDiffusionGuidance"]
utils/__init__.py CHANGED
@@ -1 +1,4 @@
1
- from .adaptive_classifiers import SafetyClassifier1280, load_classifier_1280, CLASS_NAMES
 
 
 
 
1
+ # Namespace init for utils
2
+ from .adaptive_classifiers import (
3
+ SafetyClassifier1280, load_classifier_1280, CLASS_NAMES
4
+ )