rastof9 commited on
Commit
12ee826
·
verified ·
1 Parent(s): fac681b

Update generate.py

Browse files
Files changed (1) hide show
  1. generate.py +54 -28
generate.py CHANGED
@@ -1,7 +1,7 @@
1
  # generate.py
2
- # --- VERSION 9 (with Watermarking) ---
3
 
4
- print("--- RUNNING GENERATE.PY VERSION 9 (with Watermarking) ---")
5
 
6
  import torch
7
  import cv2
@@ -16,8 +16,12 @@ from insightface.app import FaceAnalysis
16
  from insightface.utils import face_align
17
  from huggingface_hub import hf_hub_download
18
  from storage3.utils import StorageException
 
 
 
 
19
  import config
20
- import utils # <-- IMPORT THE NEW UTILS FILE
21
  from database import supabase
22
 
23
  # --- Setup Logging ---
@@ -38,6 +42,7 @@ class GenerationService:
38
  vae_model_path = "stabilityai/sd-vae-ft-mse"
39
 
40
  try:
 
41
  self.face_app = FaceAnalysis(name="buffalo_l", providers=['CUDAExecutionProvider' if self.device == "cuda" else 'CPUExecutionProvider'])
42
  self.face_app.prepare(ctx_id=0, det_size=(640, 640))
43
  cv2.setNumThreads(1)
@@ -55,12 +60,46 @@ class GenerationService:
55
  vae=vae, feature_extractor=None, safety_checker=None
56
  ).to(self.device)
57
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  logger.info("All models loaded successfully.")
59
 
60
  except Exception as e:
61
  logger.error(f"Fatal error during model loading: {e}")
62
  raise RuntimeError(f"Could not initialize GenerationService: {e}") from e
63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  def generate_magic_image(self, face_images: list, gender: str, prompt: str, plan: str = 'free') -> str | None:
65
  logger.info(f"Starting image generation process for a user on the '{plan}' plan.")
66
 
@@ -68,12 +107,10 @@ class GenerationService:
68
  negative_prompt = "multiple people, group photo, crowd, two faces, three faces, multiple faces, collage, ugly, deformed, blurry, low quality"
69
 
70
  faceid_all_embeds = []
71
-
72
  for image_path in face_images:
73
  try:
74
  face = cv2.imread(image_path)
75
  if face is None: continue
76
-
77
  faces = self.face_app.get(face)
78
  if faces:
79
  faceid_embed = torch.from_numpy(faces[0].normed_embedding).unsqueeze(0)
@@ -94,43 +131,32 @@ class GenerationService:
94
  final_embedding = torch.cat([negative_embedding, positive_embedding], dim=0)
95
 
96
  output = self.pipe(
97
- prompt=full_prompt,
98
- negative_prompt=negative_prompt,
99
- ip_adapter_image_embeds=[final_embedding],
100
- num_inference_steps=40,
101
- guidance_scale=7.5,
102
- width=512,
103
- height=768,
104
  )
105
 
106
- if isinstance(output, StableDiffusionPipelineOutput):
107
- image = output.images[0]
108
- else:
109
- image = output[0][0]
110
 
111
  temp_dir = "temp_images"
112
  os.makedirs(temp_dir, exist_ok=True)
113
  local_path = os.path.join(temp_dir, f"{uuid.uuid4()}.png")
114
  image.save(local_path)
115
 
116
- # --- APPLY WATERMARK FOR FREE USERS ---
117
  if plan == 'free':
118
  utils.add_watermark(local_path, "@MagicFaceBot")
 
 
 
119
 
120
  # --- Upload to Supabase Storage ---
121
  storage_path = f"public/{os.path.basename(local_path)}"
122
- logger.info(f"Uploading {local_path} to Supabase bucket '{config.SUPABASE_BUCKET_NAME}' at path '{storage_path}'")
123
-
124
  with open(local_path, 'rb') as f:
125
  supabase.storage.from_(config.SUPABASE_BUCKET_NAME).upload(
126
- path=storage_path,
127
- file=f,
128
- file_options={"content-type": "image/png"}
129
  )
130
-
131
  public_url = supabase.storage.from_(config.SUPABASE_BUCKET_NAME).get_public_url(storage_path)
132
- logger.info(f"Upload successful. Public URL: {public_url}")
133
-
134
  os.remove(local_path)
135
 
136
  return public_url
@@ -152,12 +178,12 @@ if __name__ == '__main__':
152
  face_images=["test_face.jpg"],
153
  gender="Female",
154
  prompt="A beautiful portrait of a princess in a magical forest, fantasy art",
155
- plan='free' # <-- Test the 'free' plan to see the watermark
156
  )
157
  if result_url:
158
  print(f"\n✅ Test successful! Image URL: {result_url}")
159
- print("Check the image at the URL to see if it has a watermark.")
160
  else:
161
- print(f"\n❌ Test failed. Please check the new traceback logs for details.")
162
  else:
163
  print("To run a test, place an image named 'test_face.jpg' in the root directory.")
 
1
  # generate.py
2
+ # --- VERSION 10 (with Upscaling) ---
3
 
4
+ print("--- RUNNING GENERATE.PY VERSION 10 (with Upscaling) ---")
5
 
6
  import torch
7
  import cv2
 
16
  from insightface.utils import face_align
17
  from huggingface_hub import hf_hub_download
18
  from storage3.utils import StorageException
19
+ from realesrgan.archs.srvgg_arch import SRVGGNetCompact
20
+ from gfpgan import GFPGANer
21
+ from basicsr.utils.download_util import load_file_from_url
22
+
23
  import config
24
+ import utils
25
  from database import supabase
26
 
27
  # --- Setup Logging ---
 
42
  vae_model_path = "stabilityai/sd-vae-ft-mse"
43
 
44
  try:
45
+ # --- AI Models ---
46
  self.face_app = FaceAnalysis(name="buffalo_l", providers=['CUDAExecutionProvider' if self.device == "cuda" else 'CPUExecutionProvider'])
47
  self.face_app.prepare(ctx_id=0, det_size=(640, 640))
48
  cv2.setNumThreads(1)
 
60
  vae=vae, feature_extractor=None, safety_checker=None
61
  ).to(self.device)
62
 
63
+ # --- Upscaler Model ---
64
+ logger.info("Loading Real-ESRGAN upscaler model...")
65
+ model_path = os.path.join('weights', 'realesrgan-x4plus.pth')
66
+ if not os.path.exists(model_path):
67
+ model_url = 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth'
68
+ load_file_from_url(url=model_url, model_dir=os.path.join('weights'), progress=True)
69
+
70
+ self.upsampler = SRVGGNetCompact(num_in_ch=3, num_out_ch=3, num_feat=64, num_conv=32, upscale=4, act_type='prelu')
71
+ self.upsampler.load_state_dict(torch.load(model_path)['params_ema'])
72
+ self.upsampler.to(self.device)
73
+ logger.info("Upscaler model loaded.")
74
+
75
  logger.info("All models loaded successfully.")
76
 
77
  except Exception as e:
78
  logger.error(f"Fatal error during model loading: {e}")
79
  raise RuntimeError(f"Could not initialize GenerationService: {e}") from e
80
 
81
+ def _upscale_image(self, image_path: str) -> str:
82
+ """Upscales an image using Real-ESRGAN."""
83
+ try:
84
+ img = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
85
+ img = img.astype('float32') / 255.
86
+ img = torch.from_numpy(img).permute(2, 0, 1).unsqueeze(0).to(self.device)
87
+
88
+ with torch.no_grad():
89
+ output = self.upsampler(img)
90
+
91
+ output_img = output.squeeze().permute(1, 2, 0).cpu().numpy()
92
+ output_img = (output_img * 255.0).round().astype('uint8')
93
+
94
+ # Save the upscaled image back to the same path
95
+ cv2.imwrite(image_path, output_img)
96
+ logger.info(f"Successfully upscaled image: {image_path}")
97
+ return image_path
98
+ except Exception as e:
99
+ logger.error(f"Failed to upscale image {image_path}: {e}")
100
+ return image_path # Return original path on failure
101
+
102
+
103
  def generate_magic_image(self, face_images: list, gender: str, prompt: str, plan: str = 'free') -> str | None:
104
  logger.info(f"Starting image generation process for a user on the '{plan}' plan.")
105
 
 
107
  negative_prompt = "multiple people, group photo, crowd, two faces, three faces, multiple faces, collage, ugly, deformed, blurry, low quality"
108
 
109
  faceid_all_embeds = []
 
110
  for image_path in face_images:
111
  try:
112
  face = cv2.imread(image_path)
113
  if face is None: continue
 
114
  faces = self.face_app.get(face)
115
  if faces:
116
  faceid_embed = torch.from_numpy(faces[0].normed_embedding).unsqueeze(0)
 
131
  final_embedding = torch.cat([negative_embedding, positive_embedding], dim=0)
132
 
133
  output = self.pipe(
134
+ prompt=full_prompt, negative_prompt=negative_prompt,
135
+ ip_adapter_image_embeds=[final_embedding], num_inference_steps=40,
136
+ guidance_scale=7.5, width=512, height=768,
 
 
 
 
137
  )
138
 
139
+ image = output.images[0] if isinstance(output, StableDiffusionPipelineOutput) else output[0][0]
 
 
 
140
 
141
  temp_dir = "temp_images"
142
  os.makedirs(temp_dir, exist_ok=True)
143
  local_path = os.path.join(temp_dir, f"{uuid.uuid4()}.png")
144
  image.save(local_path)
145
 
146
+ # --- FEATURE TIER LOGIC ---
147
  if plan == 'free':
148
  utils.add_watermark(local_path, "@MagicFaceBot")
149
+ else:
150
+ # Upscale for paid users
151
+ self._upscale_image(local_path)
152
 
153
  # --- Upload to Supabase Storage ---
154
  storage_path = f"public/{os.path.basename(local_path)}"
 
 
155
  with open(local_path, 'rb') as f:
156
  supabase.storage.from_(config.SUPABASE_BUCKET_NAME).upload(
157
+ path=storage_path, file=f, file_options={"content-type": "image/png"}
 
 
158
  )
 
159
  public_url = supabase.storage.from_(config.SUPABASE_BUCKET_NAME).get_public_url(storage_path)
 
 
160
  os.remove(local_path)
161
 
162
  return public_url
 
178
  face_images=["test_face.jpg"],
179
  gender="Female",
180
  prompt="A beautiful portrait of a princess in a magical forest, fantasy art",
181
+ plan='paid' # <-- Test the 'paid' plan to see the upscaling
182
  )
183
  if result_url:
184
  print(f"\n✅ Test successful! Image URL: {result_url}")
185
+ print("Check the image at the URL. It should be high-resolution and have no watermark.")
186
  else:
187
+ print(f"\n❌ Test failed. Please check the logs for details.")
188
  else:
189
  print("To run a test, place an image named 'test_face.jpg' in the root directory.")