jebin2 commited on
Commit
2d331ea
Β·
1 Parent(s): 00594f4

feat: Integrate ImageKit.io as a fallback for media uploads in social media publishers with new utilities, configuration, and tests.

Browse files
requirements.txt CHANGED
@@ -30,3 +30,4 @@ fal-client
30
  xai-sdk
31
  json5
32
  xdk
 
 
30
  xai-sdk
31
  json5
32
  xdk
33
+ imagekitio==4.0.0
social_media_publishers/instagram/publisher.py CHANGED
@@ -170,18 +170,24 @@ class InstagramPublisher(SocialPublisher):
170
 
171
  video_url = upload_result.get("public_url")
172
  if not video_url:
173
- return {"error": "Failed to get public URL for video"}
174
-
175
- return {"error": "Failed to get public URL for video"}
176
 
177
  logger.info(f"πŸ”— Public URL: {video_url}")
178
 
179
- except ImportError as e:
180
- logger.error(f"❌ Import Error for GCS: {e}")
181
- return {"error": f"gcs_utils not available: {e}"}
182
  except Exception as e:
183
- logger.error(f"❌ Failed to upload video: {e}", exc_info=True)
184
- return {"error": f"Failed to upload video: {e}"}
 
 
 
 
 
 
 
 
 
 
 
185
  else:
186
  return {"error": f"Invalid content source: {content_source}"}
187
 
@@ -198,7 +204,17 @@ class InstagramPublisher(SocialPublisher):
198
  caption = "\n\n".join(parts)
199
  share_to_feed = metadata.get('share_to_feed', True)
200
 
201
- return self._upload_reel(video_url, caption, share_to_feed)
 
 
 
 
 
 
 
 
 
 
202
 
203
  def _upload_reel(self, video_url: str, caption: str, share_to_feed: bool = True) -> Dict[str, Any]:
204
  """
 
170
 
171
  video_url = upload_result.get("public_url")
172
  if not video_url:
173
+ raise ValueError("Failed to get public URL from GCS upload result")
 
 
174
 
175
  logger.info(f"πŸ”— Public URL: {video_url}")
176
 
 
 
 
177
  except Exception as e:
178
+ logger.warning(f"⚠️ GCS Upload failed, attempting ImageKit as last resort: {e}")
179
+ try:
180
+ from src.imagekit_utils import upload_file_to_imagekit
181
+ ik_result = upload_file_to_imagekit(content_source)
182
+ if ik_result and ik_result.get("url"):
183
+ video_url = ik_result["url"]
184
+ metadata["_imagekit_file_id"] = ik_result["file_id"]
185
+ logger.info(f"πŸ”— ImageKit Public URL: {video_url}")
186
+ else:
187
+ return {"error": f"Both GCS and ImageKit uploads failed: {e}"}
188
+ except Exception as ik_error:
189
+ logger.error(f"❌ ImageKit upload also failed: {ik_error}")
190
+ return {"error": f"Failed to upload video to any provider: {e} | {ik_error}"}
191
  else:
192
  return {"error": f"Invalid content source: {content_source}"}
193
 
 
204
  caption = "\n\n".join(parts)
205
  share_to_feed = metadata.get('share_to_feed', True)
206
 
207
+ result = self._upload_reel(video_url, caption, share_to_feed)
208
+
209
+ # Cleanup ImageKit if used
210
+ if metadata.get("_imagekit_file_id"):
211
+ try:
212
+ from src.imagekit_utils import delete_file_from_imagekit
213
+ delete_file_from_imagekit(metadata["_imagekit_file_id"])
214
+ except Exception as cleanup_error:
215
+ logger.warning(f"⚠️ Failed to cleanup ImageKit file: {cleanup_error}")
216
+
217
+ return result
218
 
219
  def _upload_reel(self, video_url: str, caption: str, share_to_feed: bool = True) -> Dict[str, Any]:
220
  """
social_media_publishers/threads/publisher.py CHANGED
@@ -167,7 +167,7 @@ class ThreadsPublisher(SocialPublisher):
167
 
168
  public_url = upload_result.get("public_url")
169
  if not public_url:
170
- return {"error": "Failed to get public URL for media"}
171
 
172
  if media_type == "VIDEO":
173
  video_url = public_url
@@ -177,9 +177,22 @@ class ThreadsPublisher(SocialPublisher):
177
  print(f"πŸ”— Public URL: {public_url}")
178
 
179
  except Exception as e:
180
- import traceback
181
- traceback.print_exc()
182
- return {"error": f"Failed to upload media: {e}"}
 
 
 
 
 
 
 
 
 
 
 
 
 
183
 
184
  # Step 1: Create Container
185
  # Endpoint: POST /<user_id>/threads
@@ -241,6 +254,13 @@ class ThreadsPublisher(SocialPublisher):
241
  if 'id' in pub_result:
242
  media_id = pub_result['id']
243
  print(f"βœ… Published! Thread ID: {media_id}")
 
 
 
 
 
 
 
244
  # Construct URL (Threads URLs are usually https://www.threads.net/@user/post/ID)
245
  # But the ID returned is Graph ID. We might need permalink field.
246
  return {
@@ -250,6 +270,13 @@ class ThreadsPublisher(SocialPublisher):
250
  'url': f"https://www.threads.net/post/{media_id}" # Approximation
251
  }
252
  else:
 
 
 
 
 
 
 
253
  error_msg = pub_result.get('error', {}).get('message', 'Publishing failed')
254
  print(f"❌ Publish error: {error_msg}")
255
  return {'error': error_msg, 'full_response': pub_result}
 
167
 
168
  public_url = upload_result.get("public_url")
169
  if not public_url:
170
+ raise ValueError("Failed to get public URL from GCS upload result")
171
 
172
  if media_type == "VIDEO":
173
  video_url = public_url
 
177
  print(f"πŸ”— Public URL: {public_url}")
178
 
179
  except Exception as e:
180
+ print(f"⚠️ GCS Upload failed, attempting ImageKit as last resort: {e}")
181
+ try:
182
+ from src.imagekit_utils import upload_file_to_imagekit
183
+ ik_result = upload_file_to_imagekit(content_source)
184
+ if ik_result and ik_result.get("url"):
185
+ if media_type == "VIDEO":
186
+ video_url = ik_result["url"]
187
+ else:
188
+ image_url = ik_result["url"]
189
+ metadata["_imagekit_file_id"] = ik_result["file_id"]
190
+ print(f"πŸ”— ImageKit Public URL: {ik_result['url']}")
191
+ else:
192
+ return {"error": f"Both GCS and ImageKit uploads failed: {e}"}
193
+ except Exception as ik_error:
194
+ print(f"❌ ImageKit upload also failed: {ik_error}")
195
+ return {"error": f"Failed to upload media to any provider: {e} | {ik_error}"}
196
 
197
  # Step 1: Create Container
198
  # Endpoint: POST /<user_id>/threads
 
254
  if 'id' in pub_result:
255
  media_id = pub_result['id']
256
  print(f"βœ… Published! Thread ID: {media_id}")
257
+ # Cleanup ImageKit if used
258
+ if metadata.get("_imagekit_file_id"):
259
+ try:
260
+ from src.imagekit_utils import delete_file_from_imagekit
261
+ delete_file_from_imagekit(metadata["_imagekit_file_id"])
262
+ except Exception as cleanup_error:
263
+ print(f"⚠️ Failed to cleanup ImageKit file: {cleanup_error}")
264
  # Construct URL (Threads URLs are usually https://www.threads.net/@user/post/ID)
265
  # But the ID returned is Graph ID. We might need permalink field.
266
  return {
 
270
  'url': f"https://www.threads.net/post/{media_id}" # Approximation
271
  }
272
  else:
273
+ # Cleanup ImageKit even on failure if it was uploaded
274
+ if metadata.get("_imagekit_file_id"):
275
+ try:
276
+ from src.imagekit_utils import delete_file_from_imagekit
277
+ delete_file_from_imagekit(metadata["_imagekit_file_id"])
278
+ except Exception as cleanup_error:
279
+ print(f"⚠️ Failed to cleanup ImageKit file: {cleanup_error}")
280
  error_msg = pub_result.get('error', {}).get('message', 'Publishing failed')
281
  print(f"❌ Publish error: {error_msg}")
282
  return {'error': error_msg, 'full_response': pub_result}
src/config.py CHANGED
@@ -168,6 +168,12 @@ def load_configuration(force_reload: bool = False) -> Dict[str, Any]:
168
  # X (Twitter)
169
  "x_client_id": os.getenv("X_CLIENT_ID"),
170
  "x_client_secret": os.getenv("X_CLIENT_SECRET"),
 
 
 
 
 
 
171
  }
172
 
173
  # On-screen CTA options
 
168
  # X (Twitter)
169
  "x_client_id": os.getenv("X_CLIENT_ID"),
170
  "x_client_secret": os.getenv("X_CLIENT_SECRET"),
171
+
172
+ # ImageKit
173
+ "imagekit_public_key": os.getenv("IMAGEKIT_PUBLIC_KEY"),
174
+ "imagekit_private_key": os.getenv("IMAGEKIT_PRIVATE_KEY"),
175
+ "imagekit_id": os.getenv("IMAGEKIT_ID"),
176
+ "imagekit_url_endpoint": os.getenv("IMAGEKIT_URL_ENDPOINT") or (f"https://ik.imagekit.io/{os.getenv('IMAGEKIT_ID')}/" if os.getenv("IMAGEKIT_ID") else None),
177
  }
178
 
179
  # On-screen CTA options
src/imagekit_utils.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ ImageKit.io utilities for temporary file hosting.
3
+ Used as a fallback when GCS is unavailable.
4
+ """
5
+ import os
6
+ from typing import Optional, Dict, Any
7
+ from imagekitio import ImageKit
8
+ from src.config import get_config_value
9
+ from src.utils import logger
10
+
11
+ def get_imagekit_client() -> Optional[ImageKit]:
12
+ """Initialize and return ImageKit client using config values."""
13
+ private_key = get_config_value("imagekit_private_key")
14
+
15
+ if not private_key:
16
+ logger.warning("⚠️ ImageKit private key missing. Cannot initialize client.")
17
+ return None
18
+
19
+ return ImageKit(
20
+ private_key=private_key
21
+ )
22
+
23
+ def upload_file_to_imagekit(local_path: str, filename: Optional[str] = None) -> Optional[Dict[str, Any]]:
24
+ """
25
+ Upload a local file to ImageKit.
26
+ Returns a dict with 'url' and 'file_id' or None if failed.
27
+ """
28
+ client = get_imagekit_client()
29
+ if not client:
30
+ return None
31
+
32
+ if not os.path.exists(local_path):
33
+ logger.error(f"❌ Local file not found for ImageKit upload: {local_path}")
34
+ return None
35
+
36
+ if not filename:
37
+ filename = os.path.basename(local_path)
38
+
39
+ try:
40
+ from imagekitio import file_from_path
41
+ logger.info(f"πŸ“€ Uploading {filename} to ImageKit...")
42
+
43
+ # Use v5 SDK upload method
44
+ upload_result = client.files.upload(
45
+ file=file_from_path(local_path),
46
+ file_name=filename,
47
+ folder="/temp_social_uploads",
48
+ use_unique_file_name=True
49
+ )
50
+
51
+ if upload_result and upload_result.url:
52
+ logger.info(f"βœ… ImageKit upload successful: {upload_result.url}")
53
+ return {
54
+ "url": upload_result.url,
55
+ "file_id": upload_result.file_id,
56
+ "storage_type": "imagekit"
57
+ }
58
+ else:
59
+ logger.error(f"❌ ImageKit upload failed: No URL in response")
60
+ return None
61
+
62
+ except Exception as e:
63
+ logger.error(f"❌ ImageKit upload error: {e}")
64
+ return None
65
+
66
+ def delete_file_from_imagekit(file_id: str) -> bool:
67
+ """Delete a file from ImageKit by file_id."""
68
+ client = get_imagekit_client()
69
+ if not client:
70
+ return False
71
+
72
+ try:
73
+ logger.info(f"πŸ—‘οΈ Deleting file from ImageKit: {file_id}")
74
+ # Use v5 SDK delete method
75
+ client.files.delete(file_id)
76
+ return True
77
+ except Exception as e:
78
+ logger.warning(f"⚠️ Failed to delete from ImageKit ({file_id}): {e}")
79
+ return False
tests/test_imagekit.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Test script to verify ImageKit integration.
3
+ Run this script after setting IMAGEKIT_PUBLIC_KEY, IMAGEKIT_PRIVATE_KEY, and IMAGEKIT_ID.
4
+ """
5
+ import os
6
+ import sys
7
+
8
+ # Add project root to path
9
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
10
+
11
+ from src.imagekit_utils import upload_file_to_imagekit, delete_file_from_imagekit
12
+ from src.utils import logger
13
+
14
+ def test_imagekit_flow():
15
+ # Create a small dummy file
16
+ test_file = "test_imagekit_upload.txt"
17
+ with open(test_file, "w") as f:
18
+ f.write("This is a test file for ImageKit integration.")
19
+
20
+ try:
21
+ # Test Upload
22
+ result = upload_file_to_imagekit(test_file)
23
+ if result and result.get("url"):
24
+ print(f"βœ… Upload successful! URL: {result['url']}")
25
+ print(f"File ID: {result['file_id']}")
26
+
27
+ # Test Delete
28
+ success = delete_file_from_imagekit(result['file_id'])
29
+ if success:
30
+ print("βœ… Delete successful!")
31
+ else:
32
+ print("❌ Delete failed!")
33
+ else:
34
+ print("❌ Upload failed! Check your credentials and logs.")
35
+
36
+ finally:
37
+ if os.path.exists(test_file):
38
+ os.remove(test_file)
39
+
40
+ if __name__ == "__main__":
41
+ test_imagekit_flow()