lockseed / app.py
klyfff's picture
Update app.py
22da5c3 verified
import gradio as gr
import numpy as np
import random
import spaces
import traceback
import time
from diffusers import DiffusionPipeline
import torch
MAX_SEED = np.iinfo(np.int32).max
MAX_IMAGE_SIZE = 2048
def encode_prompt_sdxl(pipe, text, device):
"""Encode text through both SDXL text encoders. Returns hidden states + pooled."""
tok1 = pipe.tokenizer(
text, padding="max_length",
max_length=pipe.tokenizer.model_max_length,
truncation=True, return_tensors="pt",
).to(device)
tok2 = pipe.tokenizer_2(
text, padding="max_length",
max_length=pipe.tokenizer_2.model_max_length,
truncation=True, return_tensors="pt",
).to(device)
with torch.no_grad():
out1 = pipe.text_encoder(tok1.input_ids)
out2 = pipe.text_encoder_2(tok2.input_ids, output_hidden_states=True)
hidden = torch.cat([out1[0], out2.hidden_states[-2]], dim=-1)
pooled = out2[0]
return hidden, pooled
class StepTimer:
"""Tracks per-step timing for dynamic ETA."""
def __init__(self, total_steps):
self.total_steps = total_steps
self.step_times = []
self.start_time = None
self.last_time = None
def start(self):
self.start_time = time.time()
self.last_time = self.start_time
def step(self, step_num):
if self.start_time is None:
self.start()
now = time.time()
if self.last_time is not None and self.last_time != now:
self.step_times.append(now - self.last_time)
self.last_time = now
if self.step_times:
avg = sum(self.step_times) / len(self.step_times)
remaining = max(0, self.total_steps - step_num)
eta = avg * remaining
elapsed = now - self.start_time
print(f" Step {step_num}/{self.total_steps} | "
f"{self.step_times[-1]:.2f}s | "
f"Avg: {avg:.2f}s/step | "
f"ETA: {eta:.1f}s | "
f"Elapsed: {elapsed:.1f}s")
def summary(self):
if not self.step_times or self.start_time is None:
return "No steps recorded"
total = time.time() - self.start_time
avg = sum(self.step_times) / len(self.step_times)
return f"{len(self.step_times)} steps in {total:.1f}s ({avg:.2f}s/step)"
class WaveCollapseTracker:
"""Tracks spatial deltas to visualize Turing Morphogenesis."""
def __init__(self, epsilon, timer):
self.epsilon = epsilon
self.timer = timer
self.prev_latents = None
self.cumulative_mask = None
self.snapshot = None
def callback(self, pipe, step_index, timestep, callback_kwargs):
self.timer.step(step_index + 1)
latents = callback_kwargs["latents"]
if self.prev_latents is not None:
delta = (latents - self.prev_latents).abs().mean(dim=1, keepdim=True)
new_settled = delta < self.epsilon
if self.cumulative_mask is None:
self.cumulative_mask = new_settled
self.snapshot = latents.clone()
else:
newly_frozen = new_settled & ~self.cumulative_mask
if newly_frozen.any():
stamp = newly_frozen.expand_as(latents).to(latents.dtype)
self.snapshot = (self.snapshot * (1.0 - stamp)) + (latents * stamp)
self.cumulative_mask = self.cumulative_mask | new_settled
self.prev_latents = latents.clone()
return callback_kwargs
@spaces.GPU
def infer(
prompt, negative_prompt, seed, randomize_seed,
width, height, guidance_scale, num_inference_steps,
auto_anti_prompt, epsilon,
progress=gr.Progress(track_tqdm=True),
):
try:
seed = int(seed)
num_inference_steps = int(num_inference_steps)
width = int(width)
height = int(height)
device = "cuda" if torch.cuda.is_available() else "cpu"
model_repo_id = "stabilityai/stable-diffusion-xl-base-1.0"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
if randomize_seed:
seed = random.randint(0, MAX_SEED)
t_start = time.time()
pipe = DiffusionPipeline.from_pretrained(
model_repo_id, torch_dtype=torch_dtype
)
pipe = pipe.to(device)
t_loaded = time.time()
print(f"{'='*60}")
print(f"[LockSeed] Pipeline loaded in {t_loaded - t_start:.1f}s")
print(f"[LockSeed] Settings: steps={num_inference_steps}, cfg={guidance_scale}, "
f"seed={seed}, size={width}x{height}")
print(f"{'='*60}")
generator = torch.Generator(device=device).manual_seed(seed)
# --- The Genius Monkey-Patch ---
_original_randn_like = torch.randn_like
locked_seed = seed
def _locked_randn_like(tensor, *args, **kwargs):
torch.manual_seed(locked_seed)
return _original_randn_like(tensor, *args, **kwargs)
timer = StepTimer(num_inference_steps)
wave_tracker = WaveCollapseTracker(epsilon, timer)
try:
torch.randn_like = _locked_randn_like
# We pass output_type="latent" so the pipeline stops before the VAE decode
if auto_anti_prompt and prompt:
pos_hidden, pos_pooled = encode_prompt_sdxl(pipe, prompt, device)
neg_hidden = -pos_hidden
neg_pooled = -pos_pooled
wave_tracker.timer.start()
t_gen_start = time.time()
pipeline_output = pipe(
prompt_embeds=pos_hidden,
negative_prompt_embeds=neg_hidden,
pooled_prompt_embeds=pos_pooled,
negative_pooled_prompt_embeds=neg_pooled,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
width=width,
height=height,
generator=generator,
output_type="latent",
callback_on_step_end=wave_tracker.callback,
)
else:
wave_tracker.timer.start()
t_gen_start = time.time()
pipeline_output = pipe(
prompt=prompt,
negative_prompt=negative_prompt if negative_prompt else None,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
width=width,
height=height,
generator=generator,
output_type="latent",
callback_on_step_end=wave_tracker.callback,
)
t_gen_end = time.time()
final_latents = pipeline_output.images
# --- Decode the Accumulated Wave Collapse Master Snapshot ---
if wave_tracker.snapshot is not None and wave_tracker.cumulative_mask is not None:
unsettled = (~wave_tracker.cumulative_mask).expand_as(final_latents).to(final_latents.dtype)
final_snapshot = (wave_tracker.snapshot * (1.0 - unsettled)) + (final_latents * unsettled)
else:
final_snapshot = final_latents
with torch.no_grad():
# Upcast to prevent VAE black-screen bug
pipe.vae.to(dtype=torch.float32)
final_snapshot_fp32 = (final_snapshot / pipe.vae.config.scaling_factor).to(torch.float32)
collapse_tensor = pipe.vae.decode(final_snapshot_fp32, return_dict=False)[0]
final_image = pipe.image_processor.postprocess(collapse_tensor, output_type="pil")[0]
total_time = t_gen_end - t_start
step_summary = timer.summary()
status = f"{'CLIP Mirror ON' if auto_anti_prompt else 'Standard CFG'} | {step_summary} | Total: {total_time:.1f}s"
return final_image, seed, status
finally:
torch.randn_like = _original_randn_like
except Exception as e:
print(f"ERROR: {str(e)}")
print(traceback.format_exc())
raise
with gr.Blocks(theme=gr.themes.Monochrome()) as demo:
gr.Markdown("# LockSeed Martin: Wave Collapse Sampler (Turing Morphogenesis)")
gr.Markdown(
"**Auto Anti-Prompt**: Mirrors your positive prompt's CLIP embedding "
"and feeds the negated vector directly as the negative conditioning. "
"No token lookup — pure vector negation."
)
with gr.Row():
with gr.Column(scale=1):
prompt = gr.Textbox(label="Prompt", lines=2)
negative_prompt = gr.Textbox(
label="Negative Prompt (used when anti-prompt is off)",
value="", lines=1,
)
auto_anti_prompt = gr.Checkbox(
label="Auto Anti-Prompt (mirror CLIP)", value=True
)
with gr.Row():
seed = gr.Number(label="Seed", value=935000922, precision=0)
randomize_seed = gr.Checkbox(label="Randomize Seed", value=False)
guidance_scale = gr.Slider(
label="Guidance Scale", minimum=0, maximum=15, step=0.1, value=7
)
num_inference_steps = gr.Slider(
label="Inference Steps", minimum=1, maximum=50, step=1, value=10
)
epsilon = gr.Slider(
label="Wave Collapse Epsilon", minimum=0.01, maximum=2.0, step=0.01, value=0.1
)
with gr.Row():
width = gr.Dropdown(choices=[512, 768, 1024, 1536, 2048], value=1024, label="Width")
height = gr.Dropdown(choices=[512, 768, 1024, 1536, 2048], value=1024, label="Height")
generate_button = gr.Button("Initiate Morphogenesis", variant="primary")
with gr.Column(scale=1):
output_image = gr.Image(label="Final Generated Image")
with gr.Row():
output_seed = gr.Textbox(label="Used Seed", interactive=False)
status_display = gr.Textbox(label="Generation Info", interactive=False)
generate_button.click(
fn=infer,
inputs=[
prompt, negative_prompt, seed, randomize_seed,
width, height, guidance_scale, num_inference_steps,
auto_anti_prompt, epsilon
],
outputs=[output_image, output_seed, status_display],
)
if __name__ == "__main__":
demo.launch()