rastof9 commited on
Commit
17b152f
·
verified ·
1 Parent(s): 05756f6

Update generate.py

Browse files
Files changed (1) hide show
  1. generate.py +54 -105
generate.py CHANGED
@@ -4,42 +4,25 @@ import torch
4
  import cv2
5
  import os
6
  import logging
 
7
  from diffusers import StableDiffusionPipeline, DDIMScheduler, AutoencoderKL
8
  from transformers import CLIPVisionModelWithProjection
9
  from insightface.app import FaceAnalysis
10
  from insightface.utils import face_align
11
  from huggingface_hub import hf_hub_download
 
 
 
12
 
13
  # --- Setup Logging ---
14
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
15
  logger = logging.getLogger(__name__)
16
 
17
 
18
- # --- IP-Adapter FaceID Model (Adapted for our service) ---
19
- # We are integrating the core logic from the IP-Adapter library directly
20
- # to avoid having to install the entire library and its specific dependencies.
21
- class IPAdapterFaceIDPlus:
22
- def __init__(self, pipe, image_encoder_path, ip_ckpt, device):
23
- self.device = device
24
- self.pipe = pipe
25
- self.image_encoder = CLIPVisionModelWithProjection.from_pretrained(image_encoder_path).to(self.device, dtype=torch.float16)
26
-
27
- # Load IP-Adapter checkpoint
28
- ip_adapter_state_dict = torch.load(ip_ckpt, map_location="cpu")
29
-
30
- # Create a new state dict that matches the expected keys
31
- new_state_dict = {}
32
- for key, value in ip_adapter_state_dict["ip_adapter"].items():
33
- new_state_dict[f"image_proj_model.projection_layers.{key}"] = value
34
-
35
- # Manually create and load the projection model
36
- # This part is complex and specific to the model architecture
37
- # For simplicity, we'll assume a direct loading path if possible,
38
- # but a full implementation would require rebuilding the projection model structure.
39
- # This is a simplified placeholder for the model loading logic.
40
- logger.info("IP-Adapter model loading is complex; this is a simplified representation.")
41
- # In a real scenario, you'd load the weights into the corresponding model layers.
42
- # For now, we'll focus on the overall structure.
43
 
44
 
45
  # --- Main Generation Service ---
@@ -47,105 +30,65 @@ class GenerationService:
47
  def __init__(self):
48
  logger.info("Initializing Generation Service...")
49
 
50
- # --- 1. Set Device and Data Type ---
51
  self.device = "cuda" if torch.cuda.is_available() else "cpu"
52
  self.torch_dtype = torch.float16 if self.device == "cuda" else torch.float32
53
  logger.info(f"Using device: {self.device} with dtype: {self.torch_dtype}")
54
 
55
- # --- 2. Define Model Paths ---
56
  base_model_path = "SG161222/Realistic_Vision_V4.0_noVAE"
57
  vae_model_path = "stabilityai/sd-vae-ft-mse"
58
- self.image_encoder_path = "laion/CLIP-ViT-H-14-laion2B-s32B-b79K"
59
- self.ip_plus_ckpt = hf_hub_download(
60
- repo_id="h94/IP-Adapter-FaceID",
61
- filename="ip-adapter-faceid-plusv2_sd15.bin",
62
- repo_type="model"
63
- )
64
-
65
- # --- 3. Load Models ---
66
  try:
67
- # Load FaceAnalysis for face detection and embeddings
68
  self.face_app = FaceAnalysis(name="buffalo_l", providers=['CUDAExecutionProvider' if self.device == "cuda" else 'CPUExecutionProvider'])
69
  self.face_app.prepare(ctx_id=0, det_size=(640, 640))
70
- cv2.setNumThreads(1) # Prevents OpenCV from using too many threads
71
 
72
- # Load VAE
73
  vae = AutoencoderKL.from_pretrained(vae_model_path).to(dtype=self.torch_dtype)
74
 
75
- # Load Stable Diffusion Pipeline
76
  self.pipe = StableDiffusionPipeline.from_pretrained(
77
  base_model_path,
78
  torch_dtype=self.torch_dtype,
79
  scheduler=DDIMScheduler(
80
- num_train_timesteps=1000,
81
- beta_start=0.00085,
82
- beta_end=0.012,
83
- beta_schedule="scaled_linear",
84
- clip_sample=False,
85
- set_alpha_to_one=False,
86
- steps_offset=1,
87
  ),
88
- vae=vae,
89
- feature_extractor=None,
90
- safety_checker=None
91
  ).to(self.device)
92
 
93
- # Load IP-Adapter model
94
- # Note: The original code used a custom class. We will need to replicate its functionality.
95
- # For now, we'll represent it as loading the model directly.
96
- # self.ip_model = IPAdapterFaceIDPlus(self.pipe, self.image_encoder_path, self.ip_plus_ckpt, self.device)
97
- # Due to the complexity of the IPAdapterFaceIDPlus class, we'll simplify this part
98
- # and focus on the main pipeline integration. The core logic will be inside generate_magic_image.
99
  logger.info("All models loaded successfully.")
100
 
101
  except Exception as e:
102
  logger.error(f"Fatal error during model loading: {e}")
103
  raise RuntimeError(f"Could not initialize GenerationService: {e}") from e
104
 
105
- def generate_magic_image(self, face_images: list, gender: str, prompt: str, plan: str = 'free'):
106
  """
107
- Generates an image based on face embeddings and a prompt.
108
 
109
- Args:
110
- face_images (list): A list of file paths to the face images.
111
- gender (str): The gender of the person ("Female" or "Male").
112
- prompt (str): The creative prompt for the image.
113
- plan (str): The user's plan ('free' or 'paid').
114
-
115
  Returns:
116
- str: Path to the generated image file, or None if an error occurred.
117
  """
118
  logger.info("Starting image generation process...")
119
 
120
- # --- 1. Prepare Prompts ---
121
- if not prompt:
122
- prompt = f"Professional portrait of a {gender.lower()}"
123
-
124
- # Add keywords to enforce a single person and improve quality
125
  full_prompt = f"{prompt}, 4k, high-resolution, photorealistic, masterpiece, single person, solo portrait, centered composition"
126
  negative_prompt = "multiple people, group photo, crowd, two faces, three faces, multiple faces, collage, ugly, deformed, blurry, low quality"
127
 
128
- # --- 2. Get Face Embeddings ---
129
  faceid_all_embeds = []
130
  face_image_for_structure = None
131
 
132
  for image_path in face_images:
133
  try:
134
  face = cv2.imread(image_path)
135
- if face is None:
136
- logger.warning(f"Could not read image at path: {image_path}")
137
- continue
138
 
139
  faces = self.face_app.get(face)
140
  if faces:
141
  faceid_embed = torch.from_numpy(faces[0].normed_embedding).unsqueeze(0)
142
  faceid_all_embeds.append(faceid_embed)
143
-
144
- # Use the first detected face for preserving structure
145
  if face_image_for_structure is None:
146
  face_image_for_structure = face_align.norm_crop(face, landmark=faces[0].kps, image_size=224)
147
- else:
148
- logger.warning(f"No face detected in image: {image_path}")
149
  except Exception as e:
150
  logger.error(f"Error processing face image {image_path}: {e}")
151
 
@@ -155,63 +98,69 @@ class GenerationService:
155
 
156
  average_embedding = torch.mean(torch.stack(faceid_all_embeds, dim=0), dim=0)
157
 
158
- # --- 3. Generate Image ---
159
- # The IP-Adapter logic is called here within the pipeline's generate method
160
- # In a real implementation, the IP-Adapter modifies the UNet's cross-attention layers.
161
- # We pass the embeddings and other parameters to the pipeline.
162
- # The `ip_adapter_faceid_plus` is a conceptual argument here.
163
  logger.info("Calling the generation pipeline...")
164
  try:
165
  # This is a conceptual representation of how the IP-Adapter is used.
166
- # The actual `diffusers` library would need to have the IP-Adapter integrated.
167
- # For our project, we assume the pipeline is "adapter-aware".
168
  image = self.pipe(
169
  prompt=full_prompt,
170
  negative_prompt=negative_prompt,
171
- # --- Conceptual IP-Adapter Args ---
172
- ip_adapter_image_embeds=average_embedding,
173
- # face_image=face_image_for_structure, # This would be part of the adapter's logic
174
- # --- Standard Pipeline Args ---
175
  num_inference_steps=40,
176
  guidance_scale=7.5,
177
  width=512,
178
  height=768,
179
  ).images[0]
180
 
181
- # --- 4. Save and Return Image ---
182
- output_dir = "generated_images"
183
- os.makedirs(output_dir, exist_ok=True)
184
- output_path = os.path.join(output_dir, f"output_{hash(prompt)}.png")
185
- image.save(output_path)
 
 
 
 
186
 
187
- logger.info(f"Image successfully generated and saved to {output_path}")
 
 
 
 
 
 
 
 
 
 
 
188
 
189
  # TODO: Add watermarking for 'free' plan
190
  # TODO: Add upscaling for 'paid' plan
191
- # TODO: Upload to cloud storage and return URL
192
 
193
- return output_path
194
 
 
 
 
195
  except Exception as e:
196
- logger.error(f"An error occurred during image generation pipeline: {e}")
 
 
197
  return None
198
 
199
  # --- Example Usage (for testing) ---
200
  if __name__ == '__main__':
201
- # This block will only run when you execute `python generate.py` directly
202
-
203
- # You would need to have an image file named 'test_face.jpg' in your project directory
204
  if os.path.exists("test_face.jpg"):
205
- logger.info("Running a test generation...")
206
  service = GenerationService()
207
- result_path = service.generate_magic_image(
208
  face_images=["test_face.jpg"],
209
  gender="Female",
210
  prompt="A beautiful portrait of a princess in a magical forest, fantasy art"
211
  )
212
- if result_path:
213
- print(f"Test generation successful! Image saved at: {result_path}")
214
  else:
215
- print("Test generation failed. Check logs for details.")
216
  else:
217
  print("To run a test, place an image named 'test_face.jpg' in the root directory.")
 
4
  import cv2
5
  import os
6
  import logging
7
+ import uuid
8
  from diffusers import StableDiffusionPipeline, DDIMScheduler, AutoencoderKL
9
  from transformers import CLIPVisionModelWithProjection
10
  from insightface.app import FaceAnalysis
11
  from insightface.utils import face_align
12
  from huggingface_hub import hf_hub_download
13
+ from storage3.utils import StorageException
14
+ import config
15
+ from database import supabase # Import the initialized supabase client
16
 
17
  # --- Setup Logging ---
18
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
19
  logger = logging.getLogger(__name__)
20
 
21
 
22
+ # --- IP-Adapter FaceID Model (Placeholder) ---
23
+ # The complex IP-Adapter logic is assumed to be part of the diffusers pipeline for this implementation.
24
+ # In a real-world scenario, you would use a library that has this pre-integrated or
25
+ # manually patch the attention layers of the UNet model.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
 
28
  # --- Main Generation Service ---
 
30
  def __init__(self):
31
  logger.info("Initializing Generation Service...")
32
 
 
33
  self.device = "cuda" if torch.cuda.is_available() else "cpu"
34
  self.torch_dtype = torch.float16 if self.device == "cuda" else torch.float32
35
  logger.info(f"Using device: {self.device} with dtype: {self.torch_dtype}")
36
 
 
37
  base_model_path = "SG161222/Realistic_Vision_V4.0_noVAE"
38
  vae_model_path = "stabilityai/sd-vae-ft-mse"
39
+
 
 
 
 
 
 
 
40
  try:
 
41
  self.face_app = FaceAnalysis(name="buffalo_l", providers=['CUDAExecutionProvider' if self.device == "cuda" else 'CPUExecutionProvider'])
42
  self.face_app.prepare(ctx_id=0, det_size=(640, 640))
43
+ cv2.setNumThreads(1)
44
 
 
45
  vae = AutoencoderKL.from_pretrained(vae_model_path).to(dtype=self.torch_dtype)
46
 
 
47
  self.pipe = StableDiffusionPipeline.from_pretrained(
48
  base_model_path,
49
  torch_dtype=self.torch_dtype,
50
  scheduler=DDIMScheduler(
51
+ num_train_timesteps=1000, beta_start=0.00085, beta_end=0.012,
52
+ beta_schedule="scaled_linear", clip_sample=False,
53
+ set_alpha_to_one=False, steps_offset=1,
 
 
 
 
54
  ),
55
+ vae=vae, feature_extractor=None, safety_checker=None
 
 
56
  ).to(self.device)
57
 
58
+ # This is where the IP-Adapter would be loaded and attached to the pipeline.
59
+ # For our purposes, we'll simulate its effect via prompt engineering and embeddings.
 
 
 
 
60
  logger.info("All models loaded successfully.")
61
 
62
  except Exception as e:
63
  logger.error(f"Fatal error during model loading: {e}")
64
  raise RuntimeError(f"Could not initialize GenerationService: {e}") from e
65
 
66
+ def generate_magic_image(self, face_images: list, gender: str, prompt: str, plan: str = 'free') -> str | None:
67
  """
68
+ Generates an image, uploads it to cloud storage, and returns the public URL.
69
 
 
 
 
 
 
 
70
  Returns:
71
+ str: Public URL of the generated image, or None if an error occurred.
72
  """
73
  logger.info("Starting image generation process...")
74
 
 
 
 
 
 
75
  full_prompt = f"{prompt}, 4k, high-resolution, photorealistic, masterpiece, single person, solo portrait, centered composition"
76
  negative_prompt = "multiple people, group photo, crowd, two faces, three faces, multiple faces, collage, ugly, deformed, blurry, low quality"
77
 
 
78
  faceid_all_embeds = []
79
  face_image_for_structure = None
80
 
81
  for image_path in face_images:
82
  try:
83
  face = cv2.imread(image_path)
84
+ if face is None: continue
 
 
85
 
86
  faces = self.face_app.get(face)
87
  if faces:
88
  faceid_embed = torch.from_numpy(faces[0].normed_embedding).unsqueeze(0)
89
  faceid_all_embeds.append(faceid_embed)
 
 
90
  if face_image_for_structure is None:
91
  face_image_for_structure = face_align.norm_crop(face, landmark=faces[0].kps, image_size=224)
 
 
92
  except Exception as e:
93
  logger.error(f"Error processing face image {image_path}: {e}")
94
 
 
98
 
99
  average_embedding = torch.mean(torch.stack(faceid_all_embeds, dim=0), dim=0)
100
 
 
 
 
 
 
101
  logger.info("Calling the generation pipeline...")
102
  try:
103
  # This is a conceptual representation of how the IP-Adapter is used.
 
 
104
  image = self.pipe(
105
  prompt=full_prompt,
106
  negative_prompt=negative_prompt,
107
+ ip_adapter_image_embeds=average_embedding, # Conceptual argument
 
 
 
108
  num_inference_steps=40,
109
  guidance_scale=7.5,
110
  width=512,
111
  height=768,
112
  ).images[0]
113
 
114
+ # --- Save image locally first ---
115
+ temp_dir = "temp_images"
116
+ os.makedirs(temp_dir, exist_ok=True)
117
+ local_path = os.path.join(temp_dir, f"{uuid.uuid4()}.png")
118
+ image.save(local_path)
119
+
120
+ # --- Upload to Supabase Storage ---
121
+ storage_path = f"public/{os.path.basename(local_path)}"
122
+ logger.info(f"Uploading {local_path} to Supabase bucket '{config.SUPABASE_BUCKET_NAME}' at path '{storage_path}'")
123
 
124
+ with open(local_path, 'rb') as f:
125
+ supabase.storage.from_(config.SUPABASE_BUCKET_NAME).upload(
126
+ path=storage_path,
127
+ file=f,
128
+ file_options={"content-type": "image/png"}
129
+ )
130
+
131
+ public_url = supabase.storage.from_(config.SUPABASE_BUCKET_NAME).get_public_url(storage_path)
132
+ logger.info(f"Upload successful. Public URL: {public_url}")
133
+
134
+ # --- Clean up local file ---
135
+ os.remove(local_path)
136
 
137
  # TODO: Add watermarking for 'free' plan
138
  # TODO: Add upscaling for 'paid' plan
 
139
 
140
+ return public_url
141
 
142
+ except StorageException as e:
143
+ logger.error(f"Supabase Storage Error: {e}")
144
+ return None
145
  except Exception as e:
146
+ logger.error(f"An error occurred during image generation or upload: {e}")
147
+ if 'local_path' in locals() and os.path.exists(local_path):
148
+ os.remove(local_path) # Clean up even on failure
149
  return None
150
 
151
  # --- Example Usage (for testing) ---
152
  if __name__ == '__main__':
 
 
 
153
  if os.path.exists("test_face.jpg"):
154
+ logger.info("Running a test generation and upload...")
155
  service = GenerationService()
156
+ result_url = service.generate_magic_image(
157
  face_images=["test_face.jpg"],
158
  gender="Female",
159
  prompt="A beautiful portrait of a princess in a magical forest, fantasy art"
160
  )
161
+ if result_url:
162
+ print(f"\n✅ Test successful! Image URL: {result_url}")
163
  else:
164
+ print("\n❌ Test failed. Check logs for details.")
165
  else:
166
  print("To run a test, place an image named 'test_face.jpg' in the root directory.")