teoat commited on
Commit
00a2734
·
verified ·
1 Parent(s): ab521b6

Upload core/cdn.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. core/cdn.py +49 -15
core/cdn.py CHANGED
@@ -59,13 +59,15 @@ class CDNManager:
59
  with open(self.private_key_path, "rb") as f:
60
  self.private_key = rsa.PrivateKey.load_pkcs1(f.read())
61
 
62
- print(f"Loaded private key for {self.provider} signing")
63
 
64
  except Exception as e:
65
- print(f"Failed to load private key: {e}")
66
  self.enable_signing = False
67
 
68
- def get_asset_url(self, asset_path: str, signed: bool = False, expiry_seconds: int = 3600) -> str:
 
 
69
  """
70
  Get the full CDN URL for an asset with optional signing.
71
  Falls back to local path if CDN is not configured.
@@ -107,7 +109,14 @@ class CDNManager:
107
  # Create the policy
108
  expires = int(time.time()) + expiry_seconds
109
 
110
- policy = {"Statement": [{"Resource": url, "Condition": {"DateLessThan": {"AWS:EpochTime": expires}}}]}
 
 
 
 
 
 
 
111
 
112
  # Convert policy to JSON and base64
113
  policy_json = json.dumps(policy, separators=(",", ":"))
@@ -118,13 +127,17 @@ class CDNManager:
118
  signature_b64 = base64.b64encode(signature).decode("utf-8")
119
 
120
  # Construct signed URL
121
- params = {"Policy": policy_b64, "Signature": signature_b64, "Key-Pair-Id": self.key_pair_id}
 
 
 
 
122
 
123
  signed_url = f"{base_url}?{urlencode(params)}"
124
  return signed_url
125
 
126
  except Exception as e:
127
- print(f"CloudFront signing failed: {e}")
128
  return url
129
 
130
  def _sign_cloudflare_url(self, url: str, expiry_seconds: int) -> str:
@@ -136,14 +149,16 @@ class CDNManager:
136
  token_data = f"{url}{expires}"
137
 
138
  # Create HMAC signature
139
- signature = hmac.new(self.private_key, token_data.encode("utf-8"), hashlib.sha256).hexdigest()
 
 
140
 
141
  # Construct signed URL
142
  signed_url = f"{url}?expires={expires}&signature={signature}"
143
  return signed_url
144
 
145
  except Exception as e:
146
- print(f"Cloudflare signing failed: {e}")
147
  return url
148
 
149
  def _sign_generic_url(self, url: str, expiry_seconds: int) -> str:
@@ -156,7 +171,11 @@ class CDNManager:
156
 
157
  # Create HMAC signature
158
  signature = hmac.new(
159
- self.private_key if isinstance(self.private_key, bytes) else str(self.private_key).encode(),
 
 
 
 
160
  message.encode("utf-8"),
161
  hashlib.sha256,
162
  ).hexdigest()
@@ -166,14 +185,16 @@ class CDNManager:
166
  return signed_url
167
 
168
  except Exception as e:
169
- print(f"Generic signing failed: {e}")
170
  return url
171
 
172
  def validate_signed_url(self, signed_url: str) -> bool:
173
  """Validate that a signed URL is properly signed and not expired"""
174
  try:
175
  parsed = urlparse(signed_url)
176
- query_params = dict(param.split("=") for param in parsed.query.split("&") if "=" in param)
 
 
177
 
178
  expires = int(query_params.get("expires", 0))
179
  signature = query_params.get("signature", "")
@@ -188,7 +209,11 @@ class CDNManager:
188
 
189
  # Verify signature
190
  expected_signature = hmac.new(
191
- self.private_key if isinstance(self.private_key, bytes) else str(self.private_key).encode(),
 
 
 
 
192
  message.encode("utf-8"),
193
  hashlib.sha256,
194
  ).hexdigest()
@@ -204,7 +229,9 @@ class CDNManager:
204
  "signing_enabled": self.enable_signing,
205
  "provider": self.provider,
206
  "private_key_loaded": self.private_key is not None,
207
- "key_pair_id": self.key_pair_id is not None if self.provider == "cloudfront" else None,
 
 
208
  "cdn_url": self.cdn_url,
209
  }
210
 
@@ -233,7 +260,9 @@ cdn_service = create_cdn_manager()
233
  # Utility functions for CDN operations
234
  def generate_signed_asset_url(asset_path: str, expiry_seconds: int = 3600) -> str:
235
  """Generate a signed URL for a protected asset"""
236
- return cdn_service.get_asset_url(asset_path, signed=True, expiry_seconds=expiry_seconds)
 
 
237
 
238
 
239
  def validate_asset_access(signed_url: str) -> bool:
@@ -244,7 +273,12 @@ def validate_asset_access(signed_url: str) -> bool:
244
  def get_cdn_health_status() -> dict[str, Any]:
245
  """Get comprehensive CDN health and configuration status"""
246
  status = cdn_service.get_signing_status()
247
- status.update({"cdn_reachable": _test_cdn_connectivity(), "signing_functional": _test_signing_functionality()})
 
 
 
 
 
248
  return status
249
 
250
 
 
59
  with open(self.private_key_path, "rb") as f:
60
  self.private_key = rsa.PrivateKey.load_pkcs1(f.read())
61
 
62
+ print(f"Loaded private key for {self.provider} signing")
63
 
64
  except Exception as e:
65
+ logger.error(f"Failed to load private key: {e}")
66
  self.enable_signing = False
67
 
68
+ def get_asset_url(
69
+ self, asset_path: str, signed: bool = False, expiry_seconds: int = 3600
70
+ ) -> str:
71
  """
72
  Get the full CDN URL for an asset with optional signing.
73
  Falls back to local path if CDN is not configured.
 
109
  # Create the policy
110
  expires = int(time.time()) + expiry_seconds
111
 
112
+ policy = {
113
+ "Statement": [
114
+ {
115
+ "Resource": url,
116
+ "Condition": {"DateLessThan": {"AWS:EpochTime": expires}},
117
+ }
118
+ ]
119
+ }
120
 
121
  # Convert policy to JSON and base64
122
  policy_json = json.dumps(policy, separators=(",", ":"))
 
127
  signature_b64 = base64.b64encode(signature).decode("utf-8")
128
 
129
  # Construct signed URL
130
+ params = {
131
+ "Policy": policy_b64,
132
+ "Signature": signature_b64,
133
+ "Key-Pair-Id": self.key_pair_id,
134
+ }
135
 
136
  signed_url = f"{base_url}?{urlencode(params)}"
137
  return signed_url
138
 
139
  except Exception as e:
140
+ logger.error(f"CloudFront signing failed: {e}")
141
  return url
142
 
143
  def _sign_cloudflare_url(self, url: str, expiry_seconds: int) -> str:
 
149
  token_data = f"{url}{expires}"
150
 
151
  # Create HMAC signature
152
+ signature = hmac.new(
153
+ self.private_key, token_data.encode("utf-8"), hashlib.sha256
154
+ ).hexdigest()
155
 
156
  # Construct signed URL
157
  signed_url = f"{url}?expires={expires}&signature={signature}"
158
  return signed_url
159
 
160
  except Exception as e:
161
+ logger.error(f"Cloudflare signing failed: {e}")
162
  return url
163
 
164
  def _sign_generic_url(self, url: str, expiry_seconds: int) -> str:
 
171
 
172
  # Create HMAC signature
173
  signature = hmac.new(
174
+ (
175
+ self.private_key
176
+ if isinstance(self.private_key, bytes)
177
+ else str(self.private_key).encode()
178
+ ),
179
  message.encode("utf-8"),
180
  hashlib.sha256,
181
  ).hexdigest()
 
185
  return signed_url
186
 
187
  except Exception as e:
188
+ logger.error(f"Generic signing failed: {e}")
189
  return url
190
 
191
  def validate_signed_url(self, signed_url: str) -> bool:
192
  """Validate that a signed URL is properly signed and not expired"""
193
  try:
194
  parsed = urlparse(signed_url)
195
+ query_params = dict(
196
+ param.split("=") for param in parsed.query.split("&") if "=" in param
197
+ )
198
 
199
  expires = int(query_params.get("expires", 0))
200
  signature = query_params.get("signature", "")
 
209
 
210
  # Verify signature
211
  expected_signature = hmac.new(
212
+ (
213
+ self.private_key
214
+ if isinstance(self.private_key, bytes)
215
+ else str(self.private_key).encode()
216
+ ),
217
  message.encode("utf-8"),
218
  hashlib.sha256,
219
  ).hexdigest()
 
229
  "signing_enabled": self.enable_signing,
230
  "provider": self.provider,
231
  "private_key_loaded": self.private_key is not None,
232
+ "key_pair_id": (
233
+ self.key_pair_id is not None if self.provider == "cloudfront" else None
234
+ ),
235
  "cdn_url": self.cdn_url,
236
  }
237
 
 
260
  # Utility functions for CDN operations
261
  def generate_signed_asset_url(asset_path: str, expiry_seconds: int = 3600) -> str:
262
  """Generate a signed URL for a protected asset"""
263
+ return cdn_service.get_asset_url(
264
+ asset_path, signed=True, expiry_seconds=expiry_seconds
265
+ )
266
 
267
 
268
  def validate_asset_access(signed_url: str) -> bool:
 
273
  def get_cdn_health_status() -> dict[str, Any]:
274
  """Get comprehensive CDN health and configuration status"""
275
  status = cdn_service.get_signing_status()
276
+ status.update(
277
+ {
278
+ "cdn_reachable": _test_cdn_connectivity(),
279
+ "signing_functional": _test_signing_functionality(),
280
+ }
281
+ )
282
  return status
283
 
284