jebin2 commited on
Commit
d7c2fbc
Β·
1 Parent(s): 5f00d5a

fix: Remediate CodeQL security vulnerabilities

Browse files
github_workflow_app/app.py CHANGED
@@ -189,6 +189,9 @@ async def trigger_workflow(data: TriggerRequest):
189
 
190
  for key, value in inputs.items():
191
  if value:
 
 
 
192
  cmd.extend(['-f', f"{key}={value}"])
193
 
194
  try:
 
189
 
190
  for key, value in inputs.items():
191
  if value:
192
+ # Validate input key and value to prevent command injection
193
+ validate_safe_arg(key, "input key")
194
+ validate_safe_arg(str(value), "input value")
195
  cmd.extend(['-f', f"{key}={value}"])
196
 
197
  try:
social_media_publishers/app.py CHANGED
@@ -205,8 +205,8 @@ async def get_account_videos(
205
  result = publisher.get_uploaded_videos(account_id, limit, page_token, start_date, end_date)
206
  return result
207
  except Exception as e:
208
- logger.error(f"Get videos error: {e}")
209
- return JSONResponse(content={"error": str(e)}, status_code=500)
210
 
211
  @app.get("/api/{platform}/accounts/{account_id}/stats")
212
  async def get_account_stats(
@@ -224,8 +224,8 @@ async def get_account_stats(
224
  result = publisher.get_account_stats(account_id, start_date, end_date)
225
  return result
226
  except Exception as e:
227
- logger.error(f"Get stats error: {e}")
228
- return JSONResponse(content={"error": str(e)}, status_code=500)
229
 
230
  @app.get("/api/{platform}/accounts/{account_id}/creator_info")
231
  async def get_creator_info(
@@ -245,8 +245,8 @@ async def get_creator_info(
245
 
246
  return {} # Return empty for others
247
  except Exception as e:
248
- logger.error(f"Get creator info error: {e}")
249
- return JSONResponse(content={"error": str(e)}, status_code=500)
250
 
251
  @app.post("/api/publish")
252
  async def publish_video(
@@ -275,7 +275,9 @@ async def publish_video(
275
  # Save to temp
276
  temp_dir = os.path.join(PROJECT_ROOT, 'temp_uploads')
277
  os.makedirs(temp_dir, exist_ok=True)
278
- filename = f"{platform}_{os.urandom(4).hex()}_{file.filename}"
 
 
279
  file_path = os.path.join(temp_dir, filename)
280
 
281
  with open(file_path, "wb") as buffer:
@@ -334,7 +336,7 @@ async def publish_video(
334
 
335
  except Exception as e:
336
  logger.error(f"Publish Error: {e}", exc_info=True)
337
- return JSONResponse(content={'error': f"Request processing failed: {str(e)}"}, status_code=500)
338
  finally:
339
  # Cleanup
340
  if file_path and os.path.exists(file_path):
@@ -453,9 +455,8 @@ async def verify_app_review(
453
 
454
  except Exception as e:
455
  log(f"❌ Verification script error: {e}")
456
- import traceback
457
- traceback.print_exc()
458
- return JSONResponse(content={"success": False, "logs": logs, "error": str(e)}, status_code=500)
459
 
460
  @app.delete("/api/accounts/{filename}")
461
  async def delete_account(filename: str, platform: str = 'youtube'):
 
205
  result = publisher.get_uploaded_videos(account_id, limit, page_token, start_date, end_date)
206
  return result
207
  except Exception as e:
208
+ logger.error(f"Get videos error: {e}", exc_info=True)
209
+ return JSONResponse(content={"error": "An error occurred while fetching videos"}, status_code=500)
210
 
211
  @app.get("/api/{platform}/accounts/{account_id}/stats")
212
  async def get_account_stats(
 
224
  result = publisher.get_account_stats(account_id, start_date, end_date)
225
  return result
226
  except Exception as e:
227
+ logger.error(f"Get stats error: {e}", exc_info=True)
228
+ return JSONResponse(content={"error": "An error occurred while fetching stats"}, status_code=500)
229
 
230
  @app.get("/api/{platform}/accounts/{account_id}/creator_info")
231
  async def get_creator_info(
 
245
 
246
  return {} # Return empty for others
247
  except Exception as e:
248
+ logger.error(f"Get creator info error: {e}", exc_info=True)
249
+ return JSONResponse(content={"error": "An error occurred while fetching creator info"}, status_code=500)
250
 
251
  @app.post("/api/publish")
252
  async def publish_video(
 
275
  # Save to temp
276
  temp_dir = os.path.join(PROJECT_ROOT, 'temp_uploads')
277
  os.makedirs(temp_dir, exist_ok=True)
278
+ # Sanitize filename to prevent path traversal
279
+ safe_filename = os.path.basename(file.filename or "upload")
280
+ filename = f"{platform}_{os.urandom(4).hex()}_{safe_filename}"
281
  file_path = os.path.join(temp_dir, filename)
282
 
283
  with open(file_path, "wb") as buffer:
 
336
 
337
  except Exception as e:
338
  logger.error(f"Publish Error: {e}", exc_info=True)
339
+ return JSONResponse(content={'error': "An error occurred during publishing. Please try again."}, status_code=500)
340
  finally:
341
  # Cleanup
342
  if file_path and os.path.exists(file_path):
 
455
 
456
  except Exception as e:
457
  log(f"❌ Verification script error: {e}")
458
+ logger.error(f"Verify App Review Error: {e}", exc_info=True)
459
+ return JSONResponse(content={"success": False, "logs": logs, "error": "Verification encountered an error"}, status_code=500)
 
460
 
461
  @app.delete("/api/accounts/{filename}")
462
  async def delete_account(filename: str, platform: str = 'youtube'):
social_media_publishers/base.py CHANGED
@@ -2,6 +2,7 @@ import os
2
  import sys
3
  import json
4
  import glob
 
5
  from abc import ABC, abstractmethod
6
  from typing import Dict, List, Optional, Any
7
 
@@ -273,8 +274,9 @@ class SocialPublisher(ABC):
273
  print(f"⚠️ Unsafe content path rejected: {content_path}")
274
  return None
275
 
276
- if os.path.exists(content_path):
277
- return content_path
 
278
 
279
 
280
  # Case 4: GCS Download (existing logic)
 
2
  import sys
3
  import json
4
  import glob
5
+ import tempfile
6
  from abc import ABC, abstractmethod
7
  from typing import Dict, List, Optional, Any
8
 
 
274
  print(f"⚠️ Unsafe content path rejected: {content_path}")
275
  return None
276
 
277
+ normalized_path = os.path.abspath(content_path)
278
+ if os.path.exists(normalized_path):
279
+ return normalized_path
280
 
281
 
282
  # Case 4: GCS Download (existing logic)
social_media_publishers/facebook/publisher.py CHANGED
@@ -137,7 +137,13 @@ class FacebookPublisher(SocialPublisher):
137
  """
138
  Upload video to Facebook Page using resumable upload API.
139
  """
140
- file_size = os.path.getsize(video_path)
 
 
 
 
 
 
141
 
142
  # Step 1: Start upload session
143
  print(f"🎬 Starting Facebook video upload...")
@@ -166,7 +172,7 @@ class FacebookPublisher(SocialPublisher):
166
  chunk_size = 10 * 1024 * 1024 # 10MB chunks
167
  start_offset = 0
168
 
169
- with open(video_path, 'rb') as video_file:
170
  while start_offset < file_size:
171
  chunk = video_file.read(chunk_size)
172
  end_offset = start_offset + len(chunk)
 
137
  """
138
  Upload video to Facebook Page using resumable upload API.
139
  """
140
+ # Sanitize path to prevent traversal attacks
141
+ safe_path = os.path.abspath(video_path)
142
+ allowed_prefixes = [os.path.abspath(os.getcwd()), '/tmp', os.path.expanduser('~')]
143
+ if not any(safe_path.startswith(prefix) for prefix in allowed_prefixes):
144
+ return {"error": "Invalid video path: access denied"}
145
+
146
+ file_size = os.path.getsize(safe_path)
147
 
148
  # Step 1: Start upload session
149
  print(f"🎬 Starting Facebook video upload...")
 
172
  chunk_size = 10 * 1024 * 1024 # 10MB chunks
173
  start_offset = 0
174
 
175
+ with open(safe_path, 'rb') as video_file:
176
  while start_offset < file_size:
177
  chunk = video_file.read(chunk_size)
178
  end_offset = start_offset + len(chunk)
social_media_publishers/instagram/publisher.py CHANGED
@@ -140,6 +140,14 @@ class InstagramPublisher(SocialPublisher):
140
  is_url = content_source and (content_source.startswith('http://') or content_source.startswith('https://'))
141
  is_local = content_source and os.path.exists(content_source)
142
 
 
 
 
 
 
 
 
 
143
  video_url = None
144
 
145
  if is_url:
 
140
  is_url = content_source and (content_source.startswith('http://') or content_source.startswith('https://'))
141
  is_local = content_source and os.path.exists(content_source)
142
 
143
+ # Sanitize local path to prevent traversal attacks
144
+ if is_local:
145
+ safe_path = os.path.abspath(content_source)
146
+ allowed_prefixes = [os.path.abspath(os.getcwd()), '/tmp', os.path.expanduser('~')]
147
+ if not any(safe_path.startswith(prefix) for prefix in allowed_prefixes):
148
+ return {"error": "Invalid path: access denied"}
149
+ content_source = safe_path
150
+
151
  video_url = None
152
 
153
  if is_url:
social_media_publishers/threads/publisher.py CHANGED
@@ -122,6 +122,14 @@ class ThreadsPublisher(SocialPublisher):
122
  is_url = content_source and (content_source.startswith('http://') or content_source.startswith('https://'))
123
  is_local = content_source and os.path.exists(content_source)
124
 
 
 
 
 
 
 
 
 
125
  if is_local or is_url:
126
  # Check extension or MIME type logic?
127
  # For simplicity, assume video if typical extensions, else check.
 
122
  is_url = content_source and (content_source.startswith('http://') or content_source.startswith('https://'))
123
  is_local = content_source and os.path.exists(content_source)
124
 
125
+ # Sanitize local path to prevent traversal attacks
126
+ if is_local:
127
+ safe_path = os.path.abspath(content_source)
128
+ allowed_prefixes = [os.path.abspath(os.getcwd()), '/tmp', os.path.expanduser('~')]
129
+ if not any(safe_path.startswith(prefix) for prefix in allowed_prefixes):
130
+ return {"error": "Invalid path: access denied"}
131
+ content_source = safe_path
132
+
133
  if is_local or is_url:
134
  # Check extension or MIME type logic?
135
  # For simplicity, assume video if typical extensions, else check.
social_media_publishers/tiktok/publisher.py CHANGED
@@ -120,7 +120,13 @@ class TikTokPublisher(SocialPublisher):
120
  # Prepare content (handles GCS download if needed)
121
  local_path = self.prepare_content(content_path)
122
 
123
- if not os.path.exists(local_path):
 
 
 
 
 
 
124
  return {"error": f"Video file not found: {local_path}"}
125
 
126
  title = metadata.get('title', metadata.get('description', metadata.get('caption', '')))
@@ -146,7 +152,13 @@ class TikTokPublisher(SocialPublisher):
146
  Upload video to TikTok using the Content Posting API.
147
  3-step process: Initialize β†’ Upload β†’ Check Status
148
  """
149
- video_size = os.path.getsize(video_path)
 
 
 
 
 
 
150
  print(f"🎬 Uploading TikTok video: {title[:50]}... ({video_size / 1024 / 1024:.1f} MB)")
151
 
152
  # Step 1: Initialize upload
@@ -175,7 +187,7 @@ class TikTokPublisher(SocialPublisher):
175
  print(f"βœ… Upload initialized: {publish_id}")
176
 
177
  # Step 2: Upload video file
178
- upload_success = self._upload_file(upload_url, video_path)
179
  if not upload_success:
180
  return {"error": "Video upload failed"}
181
 
 
120
  # Prepare content (handles GCS download if needed)
121
  local_path = self.prepare_content(content_path)
122
 
123
+ # Sanitize path to prevent traversal attacks
124
+ safe_path = os.path.abspath(local_path) if local_path else None
125
+ allowed_prefixes = [os.path.abspath(os.getcwd()), '/tmp', os.path.expanduser('~')]
126
+ if not safe_path or not any(safe_path.startswith(prefix) for prefix in allowed_prefixes):
127
+ return {"error": "Invalid video path: access denied"}
128
+
129
+ if not os.path.exists(safe_path):
130
  return {"error": f"Video file not found: {local_path}"}
131
 
132
  title = metadata.get('title', metadata.get('description', metadata.get('caption', '')))
 
152
  Upload video to TikTok using the Content Posting API.
153
  3-step process: Initialize β†’ Upload β†’ Check Status
154
  """
155
+ # Sanitize path to prevent traversal attacks
156
+ safe_path = os.path.abspath(video_path)
157
+ allowed_prefixes = [os.path.abspath(os.getcwd()), '/tmp', os.path.expanduser('~')]
158
+ if not any(safe_path.startswith(prefix) for prefix in allowed_prefixes):
159
+ return {"error": "Invalid video path: access denied"}
160
+
161
+ video_size = os.path.getsize(safe_path)
162
  print(f"🎬 Uploading TikTok video: {title[:50]}... ({video_size / 1024 / 1024:.1f} MB)")
163
 
164
  # Step 1: Initialize upload
 
187
  print(f"βœ… Upload initialized: {publish_id}")
188
 
189
  # Step 2: Upload video file
190
+ upload_success = self._upload_file(upload_url, safe_path)
191
  if not upload_success:
192
  return {"error": "Video upload failed"}
193
 
social_media_publishers/youtube/publisher.py CHANGED
@@ -190,11 +190,18 @@ class YoutubePublisher(SocialPublisher):
190
  print(f"Uploading: {title}")
191
  print(f"File: {video_path}")
192
 
193
- if not os.path.exists(video_path):
 
 
 
 
 
 
 
194
  return {'error': f'Video file not found: {video_path}'}
195
 
196
  media = MediaFileUpload(
197
- video_path,
198
  chunksize=10 * 1024 * 1024, # 10MB chunks
199
  resumable=True,
200
  mimetype='video/*'
 
190
  print(f"Uploading: {title}")
191
  print(f"File: {video_path}")
192
 
193
+ # Sanitize path to prevent traversal attacks
194
+ # Normalize and validate against allowed directories
195
+ safe_path = os.path.abspath(video_path)
196
+ allowed_prefixes = [os.path.abspath(os.getcwd()), '/tmp', os.path.expanduser('~')]
197
+ if not any(safe_path.startswith(prefix) for prefix in allowed_prefixes):
198
+ return {'error': 'Invalid video path: access denied'}
199
+
200
+ if not os.path.exists(safe_path):
201
  return {'error': f'Video file not found: {video_path}'}
202
 
203
  media = MediaFileUpload(
204
+ safe_path,
205
  chunksize=10 * 1024 * 1024, # 10MB chunks
206
  resumable=True,
207
  mimetype='video/*'