broadfield-dev commited on
Commit
6bb98a3
·
verified ·
1 Parent(s): caa916f

Update keylock_component.py

Browse files
Files changed (1) hide show
  1. keylock_component.py +121 -56
keylock_component.py CHANGED
@@ -1,40 +1,41 @@
1
  import gradio as gr
2
- from PIL import Image, ImageOps
3
- import requests
4
- import io
5
- import json
6
- import logging
7
  import os
8
  import struct
 
 
9
  import tempfile
 
10
  from cryptography.hazmat.primitives import serialization
11
  from cryptography.hazmat.primitives.asymmetric import rsa, padding
12
  from cryptography.hazmat.primitives.ciphers.aead import AESGCM
13
  from cryptography.hazmat.primitives import hashes
14
- import numpy as np
15
 
16
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
17
 
18
- # --- Backend Logic Class ---
19
  class AppServerLogic:
20
- """Contains all backend logic and mock data needed by the application."""
21
  def __init__(self):
22
  self.private_key_object = None
23
  self.public_key_pem = ""
24
- self.mock_user_db = {
25
- "demo-user": {"api_key": "sk-12345-abcde", "password": "password123"},
26
- "admin-user": {"api_key": "sk-67890-fghij", "password": "adminpass"}
27
- }
28
  self._initialize_keys()
29
 
30
  def _initialize_keys(self):
31
  key_pem = os.environ.get('KEYLOCK_PRIV_KEY')
32
  if not key_pem:
33
  pk = rsa.generate_private_key(public_exponent=65537, key_size=2048)
34
- key_pem = pk.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
 
 
 
 
35
  try:
36
  self.private_key_object = serialization.load_pem_private_key(key_pem.encode(), password=None)
37
- self.public_key_pem = self.private_key_object.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8')
 
 
 
38
  except Exception as e:
39
  logging.error(f"Key initialization failed: {e}")
40
 
@@ -52,105 +53,169 @@ class AppServerLogic:
52
  crypto_payload = int(data_binary, 2).to_bytes(data_length, byteorder='big')
53
  offset = 4
54
  encrypted_aes_key_len = struct.unpack('>I', crypto_payload[:offset])[0]
55
- encrypted_aes_key = crypto_payload[offset:offset + encrypted_aes_key_len]; offset += encrypted_aes_key_len
56
- nonce = crypto_payload[offset:offset + 12]; offset += 12
 
 
57
  ciphertext = crypto_payload[offset:]
58
- recovered_aes_key = self.private_key_object.decrypt(encrypted_aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
 
 
 
59
  payload = json.loads(AESGCM(recovered_aes_key).decrypt(nonce, ciphertext, None).decode())
60
  return {"status": "Success", "payload": payload}
61
  except Exception as e:
62
  return {"status": "Error", "message": f"Decryption Failed: {e}"}
63
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  def generate_encrypted_image(self, payload_dict):
65
- """Creates a KeyLock image and returns its filepath for both preview and download."""
66
- try:
67
- response = requests.get("https://images.unsplash.com/photo-1506318137071-a8e063b4bec0?q=80&w=1200", timeout=10)
68
- img = ImageOps.fit(Image.open(io.BytesIO(response.content)).convert("RGB"), (600, 600))
69
- except:
70
- img = Image.new('RGB', (600, 600), color=(15, 23, 42))
71
-
72
  json_bytes = json.dumps(payload_dict).encode('utf-8')
73
  public_key = serialization.load_pem_public_key(self.public_key_pem.encode('utf-8'))
74
  aes_key, nonce = os.urandom(32), os.urandom(12)
75
  ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
76
- rsa_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
 
 
 
77
  payload = struct.pack('>I', len(rsa_key)) + rsa_key + nonce + ciphertext
78
-
79
- pixel_data = np.array(img).ravel()
80
  binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(payload)) + payload)
81
  pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
82
- final_image = Image.fromarray(pixel_data.reshape(img.size[1], img.size[0], 3), 'RGB')
83
-
84
  with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
85
  final_image.save(f.name, "PNG")
86
- # Return the filepath for both the preview image and the download file.
87
  return f.name, f.name
88
 
89
  @staticmethod
90
  def generate_pem_keys():
91
  pk = rsa.generate_private_key(public_exponent=65537, key_size=2048)
92
- priv = pk.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode()
93
- pub = pk.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode()
 
 
 
 
 
 
 
94
  return priv, pub
95
 
96
- # --- UI Component Class ---
97
  class KeylockDecoderComponent(gr.components.Component):
98
- """A Gradio Component that decodes a KeyLock image and provides advanced tools."""
99
  EVENTS = ["change"]
100
 
101
- def __init__(self, server_logic, **kwargs):
102
- self.server_logic = server_logic
103
- self.value = None
104
- super().__init__(**kwargs)
105
 
106
  def _format_message(self, result: dict | None) -> str:
107
- if not result or not result.get("status"): return "Upload a KeyLock image to auto-fill credentials."
 
108
  status = result["status"]
109
  if status == "Success":
110
- user = result.get("payload", {}).get("USER", "N/A")
111
- return f"<p style='color:green; font-weight:bold;'>✅ Success! Decoded credentials for '{user}'.</p>"
112
  else:
113
  message = result.get("message", "An unknown error occurred.")
114
  return f"<p style='color:red; font-weight:bold;'>❌ Error: {message}</p>"
115
 
116
- def preprocess(self, payload): return payload
117
- def postprocess(self, value): return value
118
- def api_info(self): return {"type": "object", "example": {"status": "Success", "payload": {"USER": "demo-user"}}}
 
 
 
 
 
119
 
120
  def render(self):
121
  value_state = gr.State(value=self.value)
122
-
123
  with gr.Column():
124
  image_input = gr.Image(label="KeyLock Image", type="pil", show_label=False)
125
  status_display = gr.Markdown(self._format_message(self.value))
126
-
127
  with gr.Accordion("Generate Encrypted Image", open=False):
128
- gr.Markdown("Create a test image using the site's public key.")
129
- payload_input = gr.JSON(label="Payload to Encrypt", value={"API_KEY": "sk-12345-abcde", "USER": "demo-user"})
130
  generate_img_button = gr.Button("Generate Image", variant="secondary")
131
- # This component now expects a filepath to display the uncorrupted PNG.
132
  generated_image_preview = gr.Image(label="Generated Image Preview", type="filepath", interactive=False)
133
  generated_file_download = gr.File(label="Download Uncorrupted PNG", interactive=False)
134
-
135
  with gr.Accordion("Create New Standalone Key Pair", open=False):
136
- gr.Markdown("Generate a new, random key pair for testing or other uses.")
137
  generate_keys_button = gr.Button("Generate Keys", variant="secondary")
138
  with gr.Row():
139
  output_private_key = gr.Code(label="Generated Private Key", language="python")
140
  output_public_key = gr.Code(label="Generated Public Key", language="python")
141
 
142
  def on_image_upload(image):
143
- if image is None: return None, "Upload a KeyLock image to auto-fill credentials."
 
144
  result_dict = self.server_logic.decode_payload(image)
 
145
  formatted_message = self._format_message(result_dict)
146
  return result_dict, formatted_message
147
 
148
- image_input.upload(fn=on_image_upload, inputs=[image_input], outputs=[value_state, status_display])
 
 
 
 
149
  generate_img_button.click(
150
  fn=self.server_logic.generate_encrypted_image,
151
  inputs=[payload_input],
152
  outputs=[generated_image_preview, generated_file_download]
153
  )
154
- generate_keys_button.click(fn=self.server_logic.generate_pem_keys, inputs=None, outputs=[output_private_key, output_public_key])
155
-
 
 
 
156
  return {"value": value_state}
 
1
  import gradio as gr
2
+ from PIL import Image, ImageDraw, ImageFont
3
+ import numpy as np
 
 
 
4
  import os
5
  import struct
6
+ import json
7
+ import random
8
  import tempfile
9
+ import logging
10
  from cryptography.hazmat.primitives import serialization
11
  from cryptography.hazmat.primitives.asymmetric import rsa, padding
12
  from cryptography.hazmat.primitives.ciphers.aead import AESGCM
13
  from cryptography.hazmat.primitives import hashes
 
14
 
15
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
16
+ PREFERRED_FONTS = ["Arial", "Helvetica", "DejaVu Sans", "Verdana", "Calibri", "sans-serif"]
17
 
 
18
  class AppServerLogic:
 
19
  def __init__(self):
20
  self.private_key_object = None
21
  self.public_key_pem = ""
 
 
 
 
22
  self._initialize_keys()
23
 
24
  def _initialize_keys(self):
25
  key_pem = os.environ.get('KEYLOCK_PRIV_KEY')
26
  if not key_pem:
27
  pk = rsa.generate_private_key(public_exponent=65537, key_size=2048)
28
+ key_pem = pk.private_bytes(
29
+ encoding=serialization.Encoding.PEM,
30
+ format=serialization.PrivateFormat.PKCS8,
31
+ encryption_algorithm=serialization.NoEncryption()
32
+ ).decode('utf-8')
33
  try:
34
  self.private_key_object = serialization.load_pem_private_key(key_pem.encode(), password=None)
35
+ self.public_key_pem = self.private_key_object.public_key().public_bytes(
36
+ encoding=serialization.Encoding.PEM,
37
+ format=serialization.PublicFormat.SubjectPublicKeyInfo
38
+ ).decode('utf-8')
39
  except Exception as e:
40
  logging.error(f"Key initialization failed: {e}")
41
 
 
53
  crypto_payload = int(data_binary, 2).to_bytes(data_length, byteorder='big')
54
  offset = 4
55
  encrypted_aes_key_len = struct.unpack('>I', crypto_payload[:offset])[0]
56
+ encrypted_aes_key = crypto_payload[offset:offset + encrypted_aes_key_len]
57
+ offset += encrypted_aes_key_len
58
+ nonce = crypto_payload[offset:offset + 12]
59
+ offset += 12
60
  ciphertext = crypto_payload[offset:]
61
+ recovered_aes_key = self.private_key_object.decrypt(
62
+ encrypted_aes_key,
63
+ padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
64
+ )
65
  payload = json.loads(AESGCM(recovered_aes_key).decrypt(nonce, ciphertext, None).decode())
66
  return {"status": "Success", "payload": payload}
67
  except Exception as e:
68
  return {"status": "Error", "message": f"Decryption Failed: {e}"}
69
+
70
+ @staticmethod
71
+ def _get_font(preferred_fonts, base_size):
72
+ for font_name in preferred_fonts:
73
+ try:
74
+ return ImageFont.truetype(font_name.lower() + ".ttf", base_size)
75
+ except IOError:
76
+ continue
77
+ return ImageFont.load_default(size=base_size)
78
+
79
+ @staticmethod
80
+ def _generate_starfield_image(w=800, h=800):
81
+ center_x, center_y = w / 2, h / 2
82
+ y_coords, x_coords = np.mgrid[0:h, 0:w]
83
+ distance = np.sqrt((x_coords - center_x)**2 + (y_coords - center_y)**2)
84
+ max_distance = np.sqrt(center_x**2 + center_y**2)
85
+ distance_norm = distance / max_distance
86
+ bg_center_color = np.array([20, 25, 40])
87
+ bg_outer_color = np.array([0, 0, 5])
88
+ gradient = bg_outer_color + (bg_center_color - bg_outer_color) * (1 - distance_norm[..., np.newaxis])
89
+ img = Image.fromarray(gradient.astype(np.uint8), 'RGB')
90
+ draw = ImageDraw.Draw(img)
91
+ for _ in range(int((w * h) / 200)):
92
+ x, y = random.randint(0, w - 1), random.randint(0, h - 1)
93
+ brightness = random.randint(30, 90)
94
+ draw.point((x, y), fill=(int(brightness * 0.9), int(brightness * 0.9), brightness))
95
+ star_colors = [(255, 255, 255), (220, 230, 255), (255, 240, 220)]
96
+ for _ in range(int((w * h) / 1000)):
97
+ x, y = random.randint(0, w - 1), random.randint(0, h - 1)
98
+ size = 0.5 + (2.5 * (random.random() ** 2))
99
+ brightness = 120 + (135 * (random.random() ** 1.5))
100
+ color = random.choice(star_colors)
101
+ final_color = tuple(int(c * (brightness / 255.0)) for c in color)
102
+ glow_size = size * 3
103
+ glow_color = tuple(int(c * 0.3) for c in final_color)
104
+ draw.ellipse([x - glow_size, y - glow_size, x + glow_size, y + glow_size], fill=glow_color)
105
+ draw.ellipse([x - size, y - size, x + size, y + size], fill=final_color)
106
+ return img
107
+
108
+ def _draw_overlay(self, image: Image.Image) -> Image.Image:
109
+ img_overlayed = image.copy().convert("RGBA")
110
+ draw = ImageDraw.Draw(img_overlayed, "RGBA")
111
+ width, height = img_overlayed.size
112
+ overlay_color = (10, 15, 30, 200)
113
+ title_color = (200, 220, 255)
114
+ font_bold = self._get_font(PREFERRED_FONTS, 30)
115
+ draw.rectangle([0, 20, width, 80], fill=overlay_color)
116
+ draw.text((width / 2, 50), "KeyLock Secure Data", fill=title_color, font=font_bold, anchor="ms")
117
+ final_image_rgb = Image.new("RGB", img_overlayed.size, (0, 0, 0))
118
+ final_image_rgb.paste(img_overlayed, (0, 0), img_overlayed)
119
+ return final_image_rgb
120
+
121
  def generate_encrypted_image(self, payload_dict):
122
+ base_image = self._generate_starfield_image()
123
+ image_with_overlay = self._draw_overlay(base_image)
 
 
 
 
 
124
  json_bytes = json.dumps(payload_dict).encode('utf-8')
125
  public_key = serialization.load_pem_public_key(self.public_key_pem.encode('utf-8'))
126
  aes_key, nonce = os.urandom(32), os.urandom(12)
127
  ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
128
+ rsa_key = public_key.encrypt(
129
+ aes_key,
130
+ padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
131
+ )
132
  payload = struct.pack('>I', len(rsa_key)) + rsa_key + nonce + ciphertext
133
+ pixel_data = np.array(image_with_overlay).ravel()
 
134
  binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(payload)) + payload)
135
  pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
136
+ final_image = Image.fromarray(pixel_data.reshape(image_with_overlay.size[1], image_with_overlay.size[0], 3), 'RGB')
 
137
  with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
138
  final_image.save(f.name, "PNG")
 
139
  return f.name, f.name
140
 
141
  @staticmethod
142
  def generate_pem_keys():
143
  pk = rsa.generate_private_key(public_exponent=65537, key_size=2048)
144
+ priv = pk.private_bytes(
145
+ encoding=serialization.Encoding.PEM,
146
+ format=serialization.PrivateFormat.PKCS8,
147
+ encryption_algorithm=serialization.NoEncryption()
148
+ ).decode()
149
+ pub = pk.public_key().public_bytes(
150
+ encoding=serialization.Encoding.PEM,
151
+ format=serialization.PublicFormat.SubjectPublicKeyInfo
152
+ ).decode()
153
  return priv, pub
154
 
 
155
  class KeylockDecoderComponent(gr.components.Component):
 
156
  EVENTS = ["change"]
157
 
158
+ def __init__(self, server_logic=None, **kwargs):
159
+ self.server_logic = server_logic or AppServerLogic()
160
+ self.value = None # Initialize value for change event
161
+ super().__init__(value=self.value, **kwargs)
162
 
163
  def _format_message(self, result: dict | None) -> str:
164
+ if not result or not result.get("status"):
165
+ return "Upload a KeyLock image to decode data."
166
  status = result["status"]
167
  if status == "Success":
168
+ return "<p style='color:green; font-weight:bold;'>✅ Decoding Success</p>"
 
169
  else:
170
  message = result.get("message", "An unknown error occurred.")
171
  return f"<p style='color:red; font-weight:bold;'>❌ Error: {message}</p>"
172
 
173
+ def preprocess(self, payload):
174
+ return payload
175
+
176
+ def postprocess(self, value):
177
+ return value
178
+
179
+ def api_info(self):
180
+ return {"type": "object", "example": {"status": "Success", "payload": {"USER": "demo-user", "PASSWORD": "password123"}}}
181
 
182
  def render(self):
183
  value_state = gr.State(value=self.value)
 
184
  with gr.Column():
185
  image_input = gr.Image(label="KeyLock Image", type="pil", show_label=False)
186
  status_display = gr.Markdown(self._format_message(self.value))
 
187
  with gr.Accordion("Generate Encrypted Image", open=False):
188
+ payload_input = gr.JSON(label="Payload to Encrypt", value={"USER": "demo-user", "PASSWORD": "password123"})
 
189
  generate_img_button = gr.Button("Generate Image", variant="secondary")
 
190
  generated_image_preview = gr.Image(label="Generated Image Preview", type="filepath", interactive=False)
191
  generated_file_download = gr.File(label="Download Uncorrupted PNG", interactive=False)
 
192
  with gr.Accordion("Create New Standalone Key Pair", open=False):
 
193
  generate_keys_button = gr.Button("Generate Keys", variant="secondary")
194
  with gr.Row():
195
  output_private_key = gr.Code(label="Generated Private Key", language="python")
196
  output_public_key = gr.Code(label="Generated Public Key", language="python")
197
 
198
  def on_image_upload(image):
199
+ if image is None:
200
+ return None, "Upload a KeyLock image to decode data."
201
  result_dict = self.server_logic.decode_payload(image)
202
+ self.value = result_dict # Update component value for change event
203
  formatted_message = self._format_message(result_dict)
204
  return result_dict, formatted_message
205
 
206
+ image_input.upload(
207
+ fn=on_image_upload,
208
+ inputs=[image_input],
209
+ outputs=[value_state, status_display]
210
+ )
211
  generate_img_button.click(
212
  fn=self.server_logic.generate_encrypted_image,
213
  inputs=[payload_input],
214
  outputs=[generated_image_preview, generated_file_download]
215
  )
216
+ generate_keys_button.click(
217
+ fn=self.server_logic.generate_pem_keys,
218
+ inputs=None,
219
+ outputs=[output_private_key, output_public_key]
220
+ )
221
  return {"value": value_state}