Spaces:
Running on Zero
Running on Zero
| import gradio as gr | |
| import numpy as np | |
| import random | |
| import spaces | |
| import traceback | |
| import time | |
| from diffusers import DiffusionPipeline | |
| import torch | |
| MAX_SEED = np.iinfo(np.int32).max | |
| MAX_IMAGE_SIZE = 2048 | |
| def encode_prompt_sdxl(pipe, text, device): | |
| """Encode text through both SDXL text encoders. Returns hidden states + pooled.""" | |
| tok1 = pipe.tokenizer( | |
| text, padding="max_length", | |
| max_length=pipe.tokenizer.model_max_length, | |
| truncation=True, return_tensors="pt", | |
| ).to(device) | |
| tok2 = pipe.tokenizer_2( | |
| text, padding="max_length", | |
| max_length=pipe.tokenizer_2.model_max_length, | |
| truncation=True, return_tensors="pt", | |
| ).to(device) | |
| with torch.no_grad(): | |
| out1 = pipe.text_encoder(tok1.input_ids) | |
| out2 = pipe.text_encoder_2(tok2.input_ids, output_hidden_states=True) | |
| hidden = torch.cat([out1[0], out2.hidden_states[-2]], dim=-1) | |
| pooled = out2[0] | |
| return hidden, pooled | |
| class StepTimer: | |
| """Tracks per-step timing for dynamic ETA.""" | |
| def __init__(self, total_steps): | |
| self.total_steps = total_steps | |
| self.step_times = [] | |
| self.start_time = None | |
| self.last_time = None | |
| def start(self): | |
| self.start_time = time.time() | |
| self.last_time = self.start_time | |
| def step(self, step_num): | |
| if self.start_time is None: | |
| self.start() | |
| now = time.time() | |
| if self.last_time is not None and self.last_time != now: | |
| self.step_times.append(now - self.last_time) | |
| self.last_time = now | |
| if self.step_times: | |
| avg = sum(self.step_times) / len(self.step_times) | |
| remaining = max(0, self.total_steps - step_num) | |
| eta = avg * remaining | |
| elapsed = now - self.start_time | |
| print(f" Step {step_num}/{self.total_steps} | " | |
| f"{self.step_times[-1]:.2f}s | " | |
| f"Avg: {avg:.2f}s/step | " | |
| f"ETA: {eta:.1f}s | " | |
| f"Elapsed: {elapsed:.1f}s") | |
| def summary(self): | |
| if not self.step_times or self.start_time is None: | |
| return "No steps recorded" | |
| total = time.time() - self.start_time | |
| avg = sum(self.step_times) / len(self.step_times) | |
| return f"{len(self.step_times)} steps in {total:.1f}s ({avg:.2f}s/step)" | |
| class WaveCollapseTracker: | |
| """Tracks spatial deltas to visualize Turing Morphogenesis.""" | |
| def __init__(self, epsilon, timer): | |
| self.epsilon = epsilon | |
| self.timer = timer | |
| self.prev_latents = None | |
| self.cumulative_mask = None | |
| self.snapshot = None | |
| def callback(self, pipe, step_index, timestep, callback_kwargs): | |
| self.timer.step(step_index + 1) | |
| latents = callback_kwargs["latents"] | |
| if self.prev_latents is not None: | |
| delta = (latents - self.prev_latents).abs().mean(dim=1, keepdim=True) | |
| new_settled = delta < self.epsilon | |
| if self.cumulative_mask is None: | |
| self.cumulative_mask = new_settled | |
| self.snapshot = latents.clone() | |
| else: | |
| newly_frozen = new_settled & ~self.cumulative_mask | |
| if newly_frozen.any(): | |
| stamp = newly_frozen.expand_as(latents).to(latents.dtype) | |
| self.snapshot = (self.snapshot * (1.0 - stamp)) + (latents * stamp) | |
| self.cumulative_mask = self.cumulative_mask | new_settled | |
| self.prev_latents = latents.clone() | |
| return callback_kwargs | |
| def infer( | |
| prompt, negative_prompt, seed, randomize_seed, | |
| width, height, guidance_scale, num_inference_steps, | |
| auto_anti_prompt, epsilon, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| try: | |
| seed = int(seed) | |
| num_inference_steps = int(num_inference_steps) | |
| width = int(width) | |
| height = int(height) | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model_repo_id = "stabilityai/stable-diffusion-xl-base-1.0" | |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| if randomize_seed: | |
| seed = random.randint(0, MAX_SEED) | |
| t_start = time.time() | |
| pipe = DiffusionPipeline.from_pretrained( | |
| model_repo_id, torch_dtype=torch_dtype | |
| ) | |
| pipe = pipe.to(device) | |
| t_loaded = time.time() | |
| print(f"{'='*60}") | |
| print(f"[LockSeed] Pipeline loaded in {t_loaded - t_start:.1f}s") | |
| print(f"[LockSeed] Settings: steps={num_inference_steps}, cfg={guidance_scale}, " | |
| f"seed={seed}, size={width}x{height}") | |
| print(f"{'='*60}") | |
| generator = torch.Generator(device=device).manual_seed(seed) | |
| # --- The Genius Monkey-Patch --- | |
| _original_randn_like = torch.randn_like | |
| locked_seed = seed | |
| def _locked_randn_like(tensor, *args, **kwargs): | |
| torch.manual_seed(locked_seed) | |
| return _original_randn_like(tensor, *args, **kwargs) | |
| timer = StepTimer(num_inference_steps) | |
| wave_tracker = WaveCollapseTracker(epsilon, timer) | |
| try: | |
| torch.randn_like = _locked_randn_like | |
| # We pass output_type="latent" so the pipeline stops before the VAE decode | |
| if auto_anti_prompt and prompt: | |
| pos_hidden, pos_pooled = encode_prompt_sdxl(pipe, prompt, device) | |
| neg_hidden = -pos_hidden | |
| neg_pooled = -pos_pooled | |
| wave_tracker.timer.start() | |
| t_gen_start = time.time() | |
| pipeline_output = pipe( | |
| prompt_embeds=pos_hidden, | |
| negative_prompt_embeds=neg_hidden, | |
| pooled_prompt_embeds=pos_pooled, | |
| negative_pooled_prompt_embeds=neg_pooled, | |
| guidance_scale=guidance_scale, | |
| num_inference_steps=num_inference_steps, | |
| width=width, | |
| height=height, | |
| generator=generator, | |
| output_type="latent", | |
| callback_on_step_end=wave_tracker.callback, | |
| ) | |
| else: | |
| wave_tracker.timer.start() | |
| t_gen_start = time.time() | |
| pipeline_output = pipe( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt if negative_prompt else None, | |
| guidance_scale=guidance_scale, | |
| num_inference_steps=num_inference_steps, | |
| width=width, | |
| height=height, | |
| generator=generator, | |
| output_type="latent", | |
| callback_on_step_end=wave_tracker.callback, | |
| ) | |
| t_gen_end = time.time() | |
| final_latents = pipeline_output.images | |
| # --- Decode the Accumulated Wave Collapse Master Snapshot --- | |
| if wave_tracker.snapshot is not None and wave_tracker.cumulative_mask is not None: | |
| unsettled = (~wave_tracker.cumulative_mask).expand_as(final_latents).to(final_latents.dtype) | |
| final_snapshot = (wave_tracker.snapshot * (1.0 - unsettled)) + (final_latents * unsettled) | |
| else: | |
| final_snapshot = final_latents | |
| with torch.no_grad(): | |
| # Upcast to prevent VAE black-screen bug | |
| pipe.vae.to(dtype=torch.float32) | |
| final_snapshot_fp32 = (final_snapshot / pipe.vae.config.scaling_factor).to(torch.float32) | |
| collapse_tensor = pipe.vae.decode(final_snapshot_fp32, return_dict=False)[0] | |
| final_image = pipe.image_processor.postprocess(collapse_tensor, output_type="pil")[0] | |
| total_time = t_gen_end - t_start | |
| step_summary = timer.summary() | |
| status = f"{'CLIP Mirror ON' if auto_anti_prompt else 'Standard CFG'} | {step_summary} | Total: {total_time:.1f}s" | |
| return final_image, seed, status | |
| finally: | |
| torch.randn_like = _original_randn_like | |
| except Exception as e: | |
| print(f"ERROR: {str(e)}") | |
| print(traceback.format_exc()) | |
| raise | |
| with gr.Blocks(theme=gr.themes.Monochrome()) as demo: | |
| gr.Markdown("# LockSeed Martin: Wave Collapse Sampler (Turing Morphogenesis)") | |
| gr.Markdown( | |
| "**Auto Anti-Prompt**: Mirrors your positive prompt's CLIP embedding " | |
| "and feeds the negated vector directly as the negative conditioning. " | |
| "No token lookup — pure vector negation." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| prompt = gr.Textbox(label="Prompt", lines=2) | |
| negative_prompt = gr.Textbox( | |
| label="Negative Prompt (used when anti-prompt is off)", | |
| value="", lines=1, | |
| ) | |
| auto_anti_prompt = gr.Checkbox( | |
| label="Auto Anti-Prompt (mirror CLIP)", value=True | |
| ) | |
| with gr.Row(): | |
| seed = gr.Number(label="Seed", value=935000922, precision=0) | |
| randomize_seed = gr.Checkbox(label="Randomize Seed", value=False) | |
| guidance_scale = gr.Slider( | |
| label="Guidance Scale", minimum=0, maximum=15, step=0.1, value=7 | |
| ) | |
| num_inference_steps = gr.Slider( | |
| label="Inference Steps", minimum=1, maximum=50, step=1, value=10 | |
| ) | |
| epsilon = gr.Slider( | |
| label="Wave Collapse Epsilon", minimum=0.01, maximum=2.0, step=0.01, value=0.1 | |
| ) | |
| with gr.Row(): | |
| width = gr.Dropdown(choices=[512, 768, 1024, 1536, 2048], value=1024, label="Width") | |
| height = gr.Dropdown(choices=[512, 768, 1024, 1536, 2048], value=1024, label="Height") | |
| generate_button = gr.Button("Initiate Morphogenesis", variant="primary") | |
| with gr.Column(scale=1): | |
| output_image = gr.Image(label="Final Generated Image") | |
| with gr.Row(): | |
| output_seed = gr.Textbox(label="Used Seed", interactive=False) | |
| status_display = gr.Textbox(label="Generation Info", interactive=False) | |
| generate_button.click( | |
| fn=infer, | |
| inputs=[ | |
| prompt, negative_prompt, seed, randomize_seed, | |
| width, height, guidance_scale, num_inference_steps, | |
| auto_anti_prompt, epsilon | |
| ], | |
| outputs=[output_image, output_seed, status_display], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |