jebin2 commited on
Commit
9ed628b
Β·
1 Parent(s): d524fdc

Refactor: Unify configuration management and replace legacy content strategy loading

Browse files

- Unified config into src/config.py
- Removed legacy load_config.py and setup_config.py
- Updated consumers to use src/config.py
- Refactored publishers to use ContentStrategyLib instead of local CSVs
- Cleaned up imports in a2e_avatar.py

requirements.txt CHANGED
@@ -16,6 +16,7 @@ google-api-python-client==2.184.0
16
  google-auth-oauthlib==1.2.3
17
  librosa==0.11.0
18
  gspread
 
19
 
20
  # aiosignal==1.4.0
21
  # annotated-types==0.7.0
 
16
  google-auth-oauthlib==1.2.3
17
  librosa==0.11.0
18
  gspread
19
+ tomllib
20
 
21
  # aiosignal==1.4.0
22
  # annotated-types==0.7.0
src/a2e_avatar.py CHANGED
@@ -13,7 +13,7 @@ from google_src import ai_studio_sdk
13
  import json_repair
14
  from data_holder import DataHolder
15
  from moviepy.editor import AudioFileClip
16
- from api_clients import APIClients
17
  import uuid
18
  import json
19
 
 
13
  import json_repair
14
  from data_holder import DataHolder
15
  from moviepy.editor import AudioFileClip
16
+
17
  import uuid
18
  import json
19
 
src/asset_manager/audio_lib.py CHANGED
@@ -10,9 +10,7 @@ from typing import Optional, List
10
  from utils import logger, clean_and_drop_empty
11
  from google_src.google_sheet import GoogleSheetReader
12
  from google_src import get_default_wrapper, GCloudWrapper
13
- import setup_config
14
-
15
-
16
  class AudioLib:
17
  """
18
  Singleton class that loads and manages audio library from Google Sheets.
@@ -72,7 +70,7 @@ class AudioLib:
72
  audio_df = googleSheetReader.get_filtered_dataframe()
73
 
74
  # Filter by beats timing if in beats_cut mode
75
- if setup_config.get_str("setup_type") == "beats_cut":
76
  audio_df = clean_and_drop_empty(audio_df, "Beats Timing(SS:FF) AT 25FPS")
77
 
78
  return clean_and_drop_empty(audio_df, "AUDIO_LINK")
 
10
  from utils import logger, clean_and_drop_empty
11
  from google_src.google_sheet import GoogleSheetReader
12
  from google_src import get_default_wrapper, GCloudWrapper
13
+ from config import get_str
 
 
14
  class AudioLib:
15
  """
16
  Singleton class that loads and manages audio library from Google Sheets.
 
70
  audio_df = googleSheetReader.get_filtered_dataframe()
71
 
72
  # Filter by beats timing if in beats_cut mode
73
+ if get_str("setup_type") == "beats_cut":
74
  audio_df = clean_and_drop_empty(audio_df, "Beats Timing(SS:FF) AT 25FPS")
75
 
76
  return clean_and_drop_empty(audio_df, "AUDIO_LINK")
src/automation.py CHANGED
@@ -13,6 +13,7 @@ from pathlib import Path
13
  from video_renderer import VideoRenderer
14
  from utils import logger
15
  import utils
 
16
  from moviepy.config import change_settings
17
  from moviepy.config import change_settings
18
  from google_src.gcs_utils import upload_file_to_gcs
@@ -30,7 +31,7 @@ import math
30
  import numpy as np
31
  from file_downloader import FileDownloader
32
  from data_holder import DataHolder
33
- import setup_config
34
  from asset_manager import get_asset_downloader, get_audio_lib, AssetProcessor
35
  from file_downloader import FileDownloader
36
 
@@ -65,7 +66,7 @@ class ContentAutomation:
65
  logger.info("\n🎭 STEP 1: Clean TTS Script")
66
  self.data_holder.tts_script = utils.clean_tts_script(tts_script)
67
 
68
- if setup_config.get_str("setup_type") in ["beats_cut", "hard_cut"]:
69
  return await self.execute_random_pipeline(content_strategy, tts_script)
70
 
71
  prompt_refer = content_strategy.get("gemini_prompt", "")
@@ -221,8 +222,8 @@ class ContentAutomation:
221
 
222
  beat_times = None
223
  try_next = False
224
- hard_cut_mode = setup_config.get_str("setup_type") == "hard_cut"
225
- hard_cut_mode_interval = setup_config.get_str("hard_cut_random_videos_interval", "0.5")
226
 
227
  if hard_cut_mode:
228
  # No beat detection needed, just download music once
@@ -283,7 +284,7 @@ class ContentAutomation:
283
  logger.info(f"[{idx}/{total}] Done")
284
 
285
 
286
- if setup_config.get_str("setup_type") == "hard_cut":
287
  # IMPORTANT: Pass filtered_beat_times, not beat_intervals!
288
  video_no_audio_path = await self.video_renderer.render_interval_video(
289
  float(hard_cut_mode_interval),
 
13
  from video_renderer import VideoRenderer
14
  from utils import logger
15
  import utils
16
+ from config import get_str
17
  from moviepy.config import change_settings
18
  from moviepy.config import change_settings
19
  from google_src.gcs_utils import upload_file_to_gcs
 
31
  import numpy as np
32
  from file_downloader import FileDownloader
33
  from data_holder import DataHolder
34
+
35
  from asset_manager import get_asset_downloader, get_audio_lib, AssetProcessor
36
  from file_downloader import FileDownloader
37
 
 
66
  logger.info("\n🎭 STEP 1: Clean TTS Script")
67
  self.data_holder.tts_script = utils.clean_tts_script(tts_script)
68
 
69
+ if get_str("setup_type") in ["beats_cut", "hard_cut"]:
70
  return await self.execute_random_pipeline(content_strategy, tts_script)
71
 
72
  prompt_refer = content_strategy.get("gemini_prompt", "")
 
222
 
223
  beat_times = None
224
  try_next = False
225
+ hard_cut_mode = get_str("setup_type") == "hard_cut"
226
+ hard_cut_mode_interval = get_str("hard_cut_random_videos_interval", "0.5")
227
 
228
  if hard_cut_mode:
229
  # No beat detection needed, just download music once
 
284
  logger.info(f"[{idx}/{total}] Done")
285
 
286
 
287
+ if get_str("setup_type") == "hard_cut":
288
  # IMPORTANT: Pass filtered_beat_times, not beat_intervals!
289
  video_no_audio_path = await self.video_renderer.render_interval_video(
290
  float(hard_cut_mode_interval),
src/{setup_config.py β†’ config.py} RENAMED
@@ -1,18 +1,22 @@
1
  """
2
- Setup Configuration Loader
3
 
4
- Loads TOML configuration from setup/<setup_name>/config.toml files.
5
- This allows different setups to be selected via SETUP_NAME env var,
6
- keeping secrets in .env and config in version-controlled TOML files.
7
  """
8
 
9
- import logging
10
  import os
11
  import sys
 
 
12
  from pathlib import Path
13
  from typing import Dict, Any, Optional
14
 
15
- # Use standalone logger to avoid heavy imports from utils
 
 
 
 
16
  logger = logging.getLogger(__name__)
17
 
18
  # Use tomllib (Python 3.11+) or fall back to tomli
@@ -24,11 +28,13 @@ else:
24
  except ImportError:
25
  tomllib = None
26
 
 
27
 
28
- # Cached config singleton
29
  _cached_config: Optional[Dict[str, Any]] = None
30
  _cached_setup_name: Optional[str] = None
 
31
 
 
32
 
33
  def get_setup_dir() -> Path:
34
  """Get the setup directory path."""
@@ -50,83 +56,9 @@ def list_available_setups() -> list[str]:
50
  return sorted(setups)
51
 
52
 
53
- def load_setup_config(setup_name: Optional[str] = None, force_reload: bool = False) -> Dict[str, Any]:
54
- """
55
- Load configuration from setup/<setup_name>/config.toml.
56
-
57
- Args:
58
- setup_name: Name of the setup folder. If None, uses SETUP_NAME env var.
59
- force_reload: If True, bypass cache and reload from file.
60
-
61
- Returns:
62
- Dictionary with flattened config values, with env var overrides applied.
63
-
64
- Raises:
65
- ValueError: If setup_name is not provided and SETUP_NAME env var is not set.
66
- FileNotFoundError: If the config.toml file doesn't exist.
67
- """
68
- global _cached_config, _cached_setup_name
69
-
70
- # Use env var if setup_name not provided
71
- if setup_name is None:
72
- setup_name = os.getenv("SETUP_NAME")
73
-
74
- if not setup_name:
75
- available = list_available_setups()
76
- raise ValueError(
77
- f"SETUP_NAME environment variable not set. "
78
- f"Available setups: {', '.join(available) if available else 'none found'}"
79
- )
80
-
81
- # Return cached config if same setup and not forcing reload
82
- if not force_reload and _cached_config is not None and _cached_setup_name == setup_name:
83
- return _cached_config
84
-
85
- # Check if tomllib is available
86
- if tomllib is None:
87
- raise ImportError(
88
- "TOML parsing requires Python 3.11+ or the 'tomli' package. "
89
- "Install with: pip install tomli"
90
- )
91
-
92
- # Load the TOML file
93
- setup_dir = get_setup_dir()
94
- config_path = setup_dir / setup_name / "config.toml"
95
-
96
- if not config_path.exists():
97
- available = list_available_setups()
98
- raise FileNotFoundError(
99
- f"Config file not found: {config_path}\n"
100
- f"Available setups: {', '.join(available) if available else 'none found'}"
101
- )
102
-
103
- logger.info(f"Loading setup config from: {config_path}")
104
-
105
- with open(config_path, "rb") as f:
106
- raw_config = tomllib.load(f)
107
-
108
- # Flatten the config and apply env var overrides
109
- config = _flatten_config(raw_config)
110
- config = _apply_env_overrides(config)
111
-
112
- # Cache the result
113
- _cached_config = config
114
- _cached_setup_name = setup_name
115
-
116
- logger.info(f"βœ“ Loaded setup config: {setup_name} ({len(config)} settings)")
117
-
118
- return config
119
-
120
-
121
  def _flatten_config(config: Dict[str, Any], prefix: str = "") -> Dict[str, Any]:
122
  """
123
  Flatten nested TOML config into a flat dictionary.
124
-
125
- Example:
126
- {"video": {"only_random_videos": true}}
127
- -> {"video.only_random_videos": true, "only_random_videos": true}
128
-
129
- Both nested key and flat key are provided for flexibility.
130
  """
131
  result = {}
132
 
@@ -150,10 +82,6 @@ def _flatten_config(config: Dict[str, Any], prefix: str = "") -> Dict[str, Any]:
150
  def _apply_env_overrides(config: Dict[str, Any]) -> Dict[str, Any]:
151
  """
152
  Apply environment variable overrides to config values.
153
-
154
- Env var names are mapped from config keys:
155
- only_random_videos -> ONLY_RANDOM_VIDEOS
156
- video.hard_cut_random_videos -> HARD_CUT_RANDOM_VIDEOS
157
  """
158
  result = config.copy()
159
 
@@ -209,67 +137,277 @@ def _apply_env_overrides(config: Dict[str, Any]) -> Dict[str, Any]:
209
  return result
210
 
211
 
212
- def get_config_value(key: str, default: Any = None) -> Any:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
213
  """
214
- Get a single config value by key.
 
215
 
216
- Args:
217
- key: Config key (e.g., "only_random_videos" or "video.only_random_videos")
218
- default: Default value if key not found
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
219
 
220
- Returns:
221
- Config value or default
 
 
222
  """
223
- config = load_setup_config()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  return config.get(key, default)
225
 
 
 
 
226
 
227
  def get_bool(key: str, default: bool = False) -> bool:
228
- """Get a boolean config value."""
229
- value = get_config_value(key, default)
230
  if isinstance(value, bool):
231
  return value
232
  if isinstance(value, str):
233
  return value.lower() in ("true", "1", "yes")
234
  return bool(value)
235
 
236
-
237
  def get_int(key: str, default: int = 0) -> int:
238
- """Get an integer config value."""
239
- value = get_config_value(key, default)
240
  try:
241
  return int(value)
242
  except (ValueError, TypeError):
243
  return default
244
 
245
-
246
  def get_str(key: str, default: str = "") -> str:
247
- """Get a string config value."""
248
- value = get_config_value(key, default)
249
  return str(value) if value is not None else default
250
 
 
 
 
251
 
252
- # Convenience function for common pattern
253
- def is_enabled(key: str) -> bool:
254
- """Check if a feature flag is enabled."""
255
- return get_bool(key, False)
256
 
 
 
257
 
258
- # ------------------ CLI Usage ------------------
259
 
260
  if __name__ == "__main__":
261
- from dotenv import load_dotenv
262
- load_dotenv()
263
-
264
- print("\n=== Available Setups ===")
265
- for setup in list_available_setups():
266
- print(f" - {setup}")
267
-
268
- print("\n=== Loading Config ===")
269
  try:
270
- config = load_setup_config()
271
- print(f"\nLoaded config ({len(config)} keys):")
272
- for key, value in sorted(config.items()):
273
- print(f" {key}: {value}")
274
- except (ValueError, FileNotFoundError) as e:
275
- print(f"\nError: {e}")
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ Unified Configuration Module
3
 
4
+ Combines functionality from load_config.py and setup_config.py to provide
5
+ a single source of truth for application configuration.
 
6
  """
7
 
 
8
  import os
9
  import sys
10
+ import json
11
+ import logging
12
  from pathlib import Path
13
  from typing import Dict, Any, Optional
14
 
15
+ from dotenv import load_dotenv
16
+ from google.auth import default
17
+
18
+ # Configure logging
19
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
20
  logger = logging.getLogger(__name__)
21
 
22
  # Use tomllib (Python 3.11+) or fall back to tomli
 
28
  except ImportError:
29
  tomllib = None
30
 
31
+ # ------------------ Singleton & Cache ------------------
32
 
 
33
  _cached_config: Optional[Dict[str, Any]] = None
34
  _cached_setup_name: Optional[str] = None
35
+ _config_initialized: bool = False
36
 
37
+ # ------------------ Setup Config Logic ------------------
38
 
39
  def get_setup_dir() -> Path:
40
  """Get the setup directory path."""
 
56
  return sorted(setups)
57
 
58
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
  def _flatten_config(config: Dict[str, Any], prefix: str = "") -> Dict[str, Any]:
60
  """
61
  Flatten nested TOML config into a flat dictionary.
 
 
 
 
 
 
62
  """
63
  result = {}
64
 
 
82
  def _apply_env_overrides(config: Dict[str, Any]) -> Dict[str, Any]:
83
  """
84
  Apply environment variable overrides to config values.
 
 
 
 
85
  """
86
  result = config.copy()
87
 
 
137
  return result
138
 
139
 
140
+ def _load_setup_config_data(setup_name: Optional[str] = None) -> Dict[str, Any]:
141
+ """Helper to load and process the TOML configuration file."""
142
+ if setup_name is None:
143
+ setup_name = os.getenv("SETUP_NAME")
144
+
145
+ if not setup_name:
146
+ # If no setup name is provided or in env, return empty or raise?
147
+ # Preserving logic from setup_config.py which raised an error only if strictly required
148
+ # But here we want to handle the case where it might be missing gracefully if usage allows
149
+ return {}
150
+
151
+ setup_dir = get_setup_dir()
152
+ config_path = setup_dir / setup_name / "config.toml"
153
+
154
+ if not config_path.exists():
155
+ # Fallback or error handled by caller/validator
156
+ return {}
157
+
158
+ if tomllib is None:
159
+ logger.warning("TOML supported needed but not available. Install 'tomli' for Python < 3.11")
160
+ return {}
161
+
162
+ try:
163
+ with open(config_path, "rb") as f:
164
+ raw_config = tomllib.load(f)
165
+
166
+ flat_config = _flatten_config(raw_config)
167
+ return _apply_env_overrides(flat_config)
168
+ except Exception as e:
169
+ logger.error(f"Error loading setup config: {e}")
170
+ return {}
171
+
172
+
173
+ # ------------------ GCP Auth Configuration ------------------
174
+
175
+ def _resolve_gcp_project_id() -> tuple[Optional[str], Optional[str]]:
176
+ """
177
+ Resolve GCP Project ID and auth method.
178
+ Returns (project_id, auth_method)
179
  """
180
+ gcp_project_id = None
181
+ auth_method = None
182
 
183
+ # 1. Service Account JSON (CI/CD)
184
+ gcp_creds_path = (os.getenv("GOOGLE_GHA_CREDS_PATH") or
185
+ os.getenv("CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE") or
186
+ os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
187
+
188
+ if gcp_creds_path:
189
+ # Set temp bucket env var side-effect (from original load_config)
190
+ os.environ["MY_TEMP_GCS_BUCKET"] = os.getenv("MY_TEMP_GCS_BUCKET", "")
191
+
192
+ try:
193
+ if Path(gcp_creds_path).exists():
194
+ logger.info(f"Loading GCP credentials from file: {gcp_creds_path}")
195
+ with open(gcp_creds_path, "r") as f:
196
+ creds_data = json.load(f)
197
+ gcp_project_id = creds_data.get("project_id")
198
+ auth_method = "service_account_file"
199
+ else:
200
+ try:
201
+ creds_data = json.loads(gcp_creds_path)
202
+ gcp_project_id = creds_data.get("project_id")
203
+ auth_method = "service_account_json"
204
+ except json.JSONDecodeError:
205
+ pass
206
+ except Exception as e:
207
+ logger.warning(f"Error processing GCP credentials path: {e}")
208
+
209
+ # 2. Workload Identity Federation / ADC
210
+ if not gcp_project_id:
211
+ try:
212
+ # This handles both WIF and ADC
213
+ creds, project = default()
214
+ if project:
215
+ gcp_project_id = project
216
+ if os.getenv("WORKLOAD_IDENTITY_PROVIDER"):
217
+ auth_method = "workload_identity_federation"
218
+ else:
219
+ auth_method = "adc"
220
+ except Exception:
221
+ pass
222
+
223
+ # 3. Environment Variables
224
+ if not gcp_project_id:
225
+ gcp_project_id = (
226
+ os.getenv("GOOGLE_CLOUD_PROJECT") or
227
+ os.getenv("GCP_PROJECT") or
228
+ os.getenv("GCLOUD_PROJECT") or
229
+ os.getenv("CLOUDSDK_CORE_PROJECT") or
230
+ os.getenv("CLOUDSDK_PROJECT") or
231
+ os.getenv("GCP_PROJECT_ID")
232
+ )
233
+ if gcp_project_id:
234
+ auth_method = "environment_variable"
235
+
236
+ # 4. GCloud Config
237
+ if not gcp_project_id:
238
+ try:
239
+ import subprocess
240
+ result = subprocess.run(
241
+ ["gcloud", "config", "get-value", "project"],
242
+ capture_output=True,
243
+ text=True,
244
+ timeout=5,
245
+ )
246
+ if result.returncode == 0:
247
+ pid = result.stdout.strip()
248
+ if pid and pid != "(unset)":
249
+ gcp_project_id = pid
250
+ auth_method = "gcloud_config"
251
+ except Exception:
252
+ pass
253
+
254
+ return gcp_project_id, auth_method
255
+
256
+
257
+ # ------------------ Main Load Function ------------------
258
+
259
+ def load_configuration(force_reload: bool = False) -> Dict[str, Any]:
260
+ """
261
+ Load configuration from all sources.
262
 
263
+ 1. Load .env
264
+ 2. Load Setup Config (TOML)
265
+ 3. Resolve GCP Project & Secrets
266
+ 4. Merge & Validate
267
  """
268
+ global _cached_config, _cached_setup_name, _config_initialized
269
+
270
+ setup_name = os.getenv("SETUP_NAME")
271
+
272
+ # Return cache if valid
273
+ if (_config_initialized and not force_reload and
274
+ _cached_config is not None and
275
+ _cached_setup_name == setup_name):
276
+ return _cached_config
277
+
278
+ load_dotenv()
279
+
280
+ # Load Setup Config (TOML)
281
+ # Note: We don't fail hard here if SETUP_NAME is missing, we just get empty setup config
282
+ # Validation happens later if critical keys are missing.
283
+ setup_config = _load_setup_config_data(setup_name)
284
+ if setup_config:
285
+ logger.info(f"βœ“ Loaded setup config: {setup_name}")
286
+
287
+ # Resolve GCP Project
288
+ gcp_project_id, auth_method = _resolve_gcp_project_id()
289
+ if gcp_project_id:
290
+ logger.info(f"βœ“ GCP Project ID: {gcp_project_id} ({auth_method})")
291
+
292
+ # Merge into final config
293
+ config = {
294
+ **setup_config,
295
+ "gemini_api_key": os.getenv("GEMINI_API_KEY"),
296
+ "runwayml_api_key": os.getenv("RUNWAYML_API_KEY"),
297
+ "gcs_bucket_name": os.getenv("GCS_BUCKET_NAME"),
298
+ "gcp_project_id": gcp_project_id,
299
+ "default_voice": setup_config.get("voice") or os.getenv("DEFAULT_VOICE", "en-US-Neural2-F"),
300
+ "auth_method": auth_method,
301
+ "setup_name": setup_name,
302
+ }
303
+
304
+ # Required keys validation (soft validation - log error but don't crash module import)
305
+ # Crashes should handle at application start
306
+ _cached_config = config
307
+ _cached_setup_name = setup_name
308
+ _config_initialized = True
309
+
310
+ return config
311
+
312
+
313
+ # ------------------ Public API ------------------
314
+
315
+ class ConfigProxy:
316
+ """
317
+ Singleton proxy to access configuration.
318
+ Lazily loads config on first access.
319
+ """
320
+ def __init__(self):
321
+ self._config = None
322
+
323
+ def _ensure_loaded(self):
324
+ if self._config is None:
325
+ self._config = load_configuration()
326
+
327
+ def get(self, key: str, default: Any = None) -> Any:
328
+ self._ensure_loaded()
329
+ return self._config.get(key, default)
330
+
331
+ def __getitem__(self, key: str) -> Any:
332
+ self._ensure_loaded()
333
+ return self._config[key]
334
+
335
+ def __contains__(self, key: str) -> bool:
336
+ self._ensure_loaded()
337
+ return key in self._config
338
+
339
+ def items(self):
340
+ self._ensure_loaded()
341
+ return self._config.items()
342
+
343
+ def set(self, key: str, value: Any):
344
+ """Set a configuration value."""
345
+ self._ensure_loaded()
346
+ self._config[key] = value
347
+
348
+ def __setitem__(self, key: str, value: Any):
349
+ self._ensure_loaded()
350
+ self._config[key] = value
351
+
352
+ def reload(self):
353
+ self._config = load_configuration(force_reload=True)
354
+
355
+ # Global singleton
356
+ config = ConfigProxy()
357
+
358
+
359
+ def get_config_value(key: str, default: Any = None) -> Any:
360
  return config.get(key, default)
361
 
362
+ def set_config_value(key: str, value: Any):
363
+ """Set a config value."""
364
+ config.set(key, value)
365
 
366
  def get_bool(key: str, default: bool = False) -> bool:
367
+ value = config.get(key, default)
 
368
  if isinstance(value, bool):
369
  return value
370
  if isinstance(value, str):
371
  return value.lower() in ("true", "1", "yes")
372
  return bool(value)
373
 
 
374
  def get_int(key: str, default: int = 0) -> int:
375
+ value = config.get(key, default)
 
376
  try:
377
  return int(value)
378
  except (ValueError, TypeError):
379
  return default
380
 
 
381
  def get_str(key: str, default: str = "") -> str:
382
+ value = config.get(key, default)
 
383
  return str(value) if value is not None else default
384
 
385
+ def set_str(key: str, value: str):
386
+ """Set a string config value."""
387
+ config.set(key, str(value))
388
 
 
 
 
 
389
 
390
+ def get_gcp_project_id() -> Optional[str]:
391
+ return config.get("gcp_project_id")
392
 
393
+ # ------------------ CLI Test ------------------
394
 
395
  if __name__ == "__main__":
396
+ print("\n=== Unified Config Test ===\n")
 
 
 
 
 
 
 
397
  try:
398
+ conf = load_configuration()
399
+ print("Configuration Loaded Successfully!")
400
+ print(f"Setup Name: {conf.get('setup_name')}")
401
+ print(f"Project ID: {conf.get('gcp_project_id')}")
402
+
403
+ # Check required keys
404
+ required = ["gemini_api_key", "runwayml_api_key", "gcs_bucket_name", "gcp_project_id"]
405
+ missing = [k for k in required if not conf.get(k)]
406
+
407
+ if missing:
408
+ print(f"\n[WARNING] Missing keys: {missing}")
409
+ else:
410
+ print("\nAll required keys present.")
411
+
412
+ except Exception as e:
413
+ print(f"\n[ERROR] {e}")
src/load_config.py DELETED
@@ -1,225 +0,0 @@
1
- import os
2
- import json
3
- from pathlib import Path
4
- from typing import Dict
5
-
6
- from dotenv import load_dotenv
7
- from google.auth import default
8
-
9
- from utils import logger
10
-
11
-
12
- def load_configuration() -> Dict:
13
- """
14
- Load configuration from environment variables with validation.
15
-
16
- Supports two authentication methods:
17
- 1. Service Account JSON (CI/CD): Extracts project ID from JSON file or string
18
- 2. Application Default Credentials (Local): Uses ADC and gcloud config
19
- """
20
- load_dotenv()
21
-
22
- gcp_project_id = None
23
- creds_data = None
24
- auth_method = None
25
-
26
- # Try multiple possible credential paths (CI/CD environments)
27
- gcp_creds_path = (
28
- os.getenv("GOOGLE_GHA_CREDS_PATH") or
29
- os.getenv("CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE") or
30
- os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
31
- )
32
-
33
- # Method 1: Try to load from service account JSON file/string
34
- if gcp_creds_path:
35
- try:
36
- os.environ["MY_TEMP_GCS_BUCKET"] = os.getenv("MY_TEMP_GCS_BUCKET", "")
37
-
38
- # Check if it's a file path that exists
39
- if Path(gcp_creds_path).exists():
40
- logger.info(f"Loading GCP credentials from file: {gcp_creds_path}")
41
- with open(gcp_creds_path, "r") as f:
42
- creds_data = json.load(f)
43
- auth_method = "service_account_file"
44
- else:
45
- # Try to parse as raw JSON string
46
- logger.info("Attempting to parse GCP credentials as JSON string")
47
- creds_data = json.loads(gcp_creds_path)
48
- auth_method = "service_account_json"
49
-
50
- if creds_data:
51
- gcp_project_id = creds_data.get("project_id")
52
- logger.info(f"βœ“ GCP Project ID loaded from service account: {gcp_project_id}")
53
-
54
- except json.JSONDecodeError as e:
55
- logger.warning(f"Could not parse GCP credentials as JSON. Error: {e}")
56
- except FileNotFoundError as e:
57
- logger.warning(f"GCP credentials file not found: {e}")
58
- except Exception as e:
59
- logger.error(f"Unexpected error loading GCP credentials: {e}")
60
-
61
- # Method 2: Check for Workload Identity Federation (GitHub Actions)
62
- if not gcp_project_id:
63
- wif_provider = os.getenv("WORKLOAD_IDENTITY_PROVIDER")
64
- wif_service_account = os.getenv("SERVICE_ACCOUNT_EMAIL")
65
-
66
- if wif_provider and wif_service_account:
67
- try:
68
- logger.info("Attempting to load project from Workload Identity Federation")
69
- # WIF credentials are automatically handled by google.auth.default()
70
- # when GOOGLE_APPLICATION_CREDENTIALS is not set
71
- creds, project = default()
72
-
73
- if project:
74
- gcp_project_id = project
75
- auth_method = "workload_identity_federation"
76
- logger.info(f"βœ“ GCP Project ID loaded from WIF: {gcp_project_id}")
77
- else:
78
- logger.debug("WIF credentials found but no project set")
79
- except Exception as e:
80
- logger.debug(f"Could not load from WIF: {e}")
81
- else:
82
- logger.debug("WIF environment variables not found")
83
-
84
- # Method 3: Try to get project from Application Default Credentials (ADC)
85
- if not gcp_project_id:
86
- try:
87
- logger.info("Attempting to load project from Application Default Credentials (ADC)")
88
- creds, project = default()
89
-
90
- if project:
91
- gcp_project_id = project
92
- auth_method = "adc"
93
- logger.info(f"βœ“ GCP Project ID loaded from ADC: {gcp_project_id}")
94
- else:
95
- logger.debug("ADC credentials found but no project set")
96
- except Exception as e:
97
- logger.debug(f"Could not load from ADC: {e}")
98
-
99
- # Method 4: Try environment variables
100
- if not gcp_project_id:
101
- gcp_project_id = (
102
- os.getenv("GOOGLE_CLOUD_PROJECT") or
103
- os.getenv("GCP_PROJECT") or
104
- os.getenv("GCLOUD_PROJECT") or
105
- os.getenv("CLOUDSDK_CORE_PROJECT") or
106
- os.getenv("CLOUDSDK_PROJECT") or
107
- os.getenv("GCP_PROJECT_ID")
108
- )
109
- if gcp_project_id:
110
- auth_method = "environment_variable"
111
- logger.info(f"βœ“ GCP Project ID loaded from environment: {gcp_project_id}")
112
-
113
- # Method 5: Try gcloud config as last resort
114
- if not gcp_project_id:
115
- try:
116
- import subprocess
117
- result = subprocess.run(
118
- ["gcloud", "config", "get-value", "project"],
119
- capture_output=True,
120
- text=True,
121
- timeout=5,
122
- )
123
- if result.returncode == 0:
124
- gcp_project_id = result.stdout.strip()
125
- if gcp_project_id and gcp_project_id != "(unset)":
126
- auth_method = "gcloud_config"
127
- logger.info(f"βœ“ GCP Project ID loaded from gcloud config: {gcp_project_id}")
128
- else:
129
- gcp_project_id = None
130
- except Exception as e:
131
- logger.debug(f"Could not load from gcloud config: {e}")
132
-
133
- # Build configuration dictionary
134
- # Start with setup config from TOML if available
135
- try:
136
- from setup_config import load_setup_config
137
- setup_config = load_setup_config()
138
- logger.info(f"βœ“ Loaded setup config: {setup_config.get('setup_type', 'unknown')}")
139
- except (ValueError, FileNotFoundError, ImportError) as e:
140
- logger.debug(f"Setup config not loaded (optional): {e}")
141
- setup_config = {}
142
-
143
- # Merge setup config with secrets from environment
144
- config = {
145
- **setup_config, # TOML config values (can be overridden below)
146
- "gemini_api_key": os.getenv("GEMINI_API_KEY"),
147
- "runwayml_api_key": os.getenv("RUNWAYML_API_KEY"),
148
- "gcs_bucket_name": os.getenv("GCS_BUCKET_NAME"),
149
- "gcp_project_id": gcp_project_id,
150
- "default_voice": setup_config.get("voice") or os.getenv("DEFAULT_VOICE", "en-US-Neural2-F"),
151
- "auth_method": auth_method, # Track how project was loaded
152
- }
153
-
154
- # Validate required keys
155
- required_keys = ["gemini_api_key", "runwayml_api_key", "gcs_bucket_name", "gcp_project_id"]
156
- missing_keys = [key for key in required_keys if not config.get(key)]
157
-
158
- if missing_keys:
159
- logger.error(f"Missing required configuration: {', '.join(missing_keys)}")
160
- logger.error("Configuration loading attempted via:")
161
- logger.error(" 1. Service account JSON file/string")
162
- logger.error(" 2. Workload Identity Federation (GitHub Actions)")
163
- logger.error(" 3. Application Default Credentials (ADC)")
164
- logger.error(" 4. Environment variables")
165
- logger.error(" 5. gcloud config")
166
- logger.error("")
167
- logger.error("Available environment variables:")
168
- for key in [
169
- "GOOGLE_GHA_CREDS_PATH",
170
- "CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE",
171
- "GOOGLE_APPLICATION_CREDENTIALS",
172
- "WORKLOAD_IDENTITY_PROVIDER",
173
- "SERVICE_ACCOUNT_EMAIL",
174
- "GOOGLE_CLOUD_PROJECT",
175
- "GCP_PROJECT",
176
- "GCP_PROJECT_ID",
177
- ]:
178
- logger.error(f" {key}: {os.getenv(key, 'NOT SET')}")
179
-
180
- logger.error("")
181
- logger.error("For local development with ADC:")
182
- logger.error(" 1. Run: gcloud config set project YOUR_PROJECT_ID")
183
- logger.error(" 2. Or set: export GCP_PROJECT_ID=YOUR_PROJECT_ID")
184
- logger.error(" 3. Ensure ADC is set up: gcloud auth application-default login")
185
- logger.error("")
186
- logger.error("For GitHub Actions with Workload Identity Federation:")
187
- logger.error(" 1. Set WORKLOAD_IDENTITY_PROVIDER in your workflow")
188
- logger.error(" 2. Set SERVICE_ACCOUNT_EMAIL in your workflow")
189
- logger.error(" 3. Or set GCP_PROJECT_ID directly in secrets")
190
-
191
- raise ValueError(
192
- f"Missing required configuration: {', '.join(missing_keys)}.\n"
193
- f"Please check your .env file, gcloud config, or GitHub secrets."
194
- )
195
-
196
- logger.info(f"βœ“ Configuration loaded successfully (auth method: {auth_method})")
197
- return config
198
-
199
-
200
- def get_gcp_project_id() -> str:
201
- """
202
- Quick helper to get just the GCP project ID.
203
- Useful when you only need the project ID without loading full config.
204
- """
205
- config = load_configuration()
206
- return config["gcp_project_id"]
207
-
208
-
209
- # ------------------ Usage Examples ------------------
210
-
211
- if __name__ == "__main__":
212
- try:
213
- from dotenv import load_dotenv
214
- load_dotenv()
215
- config = load_configuration()
216
- print("\nβœ“ Configuration loaded successfully!\n")
217
- print("Configuration:")
218
- for key, value in config.items():
219
- if "key" in key.lower() and value:
220
- # Mask API keys
221
- print(f" {key}: {value[:10]}...{value[-4:]}")
222
- else:
223
- print(f" {key}: {value}")
224
- except ValueError as e:
225
- print(f"\nβœ— Configuration error:\n{e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/main.py CHANGED
@@ -14,7 +14,7 @@ from automation import ContentAutomation
14
  from utils import logger
15
  import pandas as pd
16
  import warnings
17
- from load_config import load_configuration
18
 
19
 
20
 
 
14
  from utils import logger
15
  import pandas as pd
16
  import warnings
17
+ from config import load_configuration
18
 
19
 
20
 
src/process_csv.py CHANGED
@@ -3,7 +3,7 @@ import csv
3
  import os, time
4
  from pathlib import Path
5
  from datetime import datetime
6
- from load_config import load_configuration
7
  from main import (
8
  run_pipeline,
9
  )
@@ -15,7 +15,7 @@ import argparse
15
  import uuid
16
  from cleanup_manager import process_delete_entries
17
  from google_src.gcs_utils import list_gcs_files
18
- import setup_config
19
  from asset_manager import get_video_lib, get_audio_lib, get_asset_downloader, get_content_strategy_lib
20
 
21
  DATA_DIR = Path("data")
@@ -109,20 +109,14 @@ async def process_row(row, config: dict):
109
  automation = ContentAutomation(
110
  config, dataHolder
111
  )
112
-
113
  content_strategy = {
114
  "gemini_prompt": row.get("Gemini Imagen4 Ultra Prompt (specific)", ""),
115
  "runway_prompt": row.get("Runway Prompt Gen4 Turbo", ""),
116
  "runway_veo_prompt": row.get("Veo-3.1 Fast Prompt (Text-to-Video)", ""),
117
  "tts_script": tts_script,
118
- "captions": row.get("Captions", ""),
119
- "style": "commercial",
120
- "aspect_ratio": "9:16",
121
- "duration": 3,
122
- "brand": "Somira",
123
  }
124
 
125
- result = await run_pipeline(automation, content_strategy, tts_script)
126
  logger.info(f"βœ… Completed {tts_script[:20]}...: success={result.get('success', False)}")
127
  list_gcs_files()
128
  return result
@@ -353,7 +347,7 @@ Examples:
353
  config = load_configuration()
354
 
355
  await download_all_video(config)
356
- if os.getenv("ON_SCREEN_TEXT", "false").lower() != "true" and setup_config.get_str("setup_type") in ["beats_cut", "hard_cut"]:
357
  await create_plain_videos(config, commit=args.commit, job_index=job_index, total_jobs=total_jobs)
358
  else:
359
  await process_all_csvs(config, commit=args.commit, job_index=job_index, total_jobs=total_jobs)
 
3
  import os, time
4
  from pathlib import Path
5
  from datetime import datetime
6
+ from config import load_configuration, get_str
7
  from main import (
8
  run_pipeline,
9
  )
 
15
  import uuid
16
  from cleanup_manager import process_delete_entries
17
  from google_src.gcs_utils import list_gcs_files
18
+
19
  from asset_manager import get_video_lib, get_audio_lib, get_asset_downloader, get_content_strategy_lib
20
 
21
  DATA_DIR = Path("data")
 
109
  automation = ContentAutomation(
110
  config, dataHolder
111
  )
 
112
  content_strategy = {
113
  "gemini_prompt": row.get("Gemini Imagen4 Ultra Prompt (specific)", ""),
114
  "runway_prompt": row.get("Runway Prompt Gen4 Turbo", ""),
115
  "runway_veo_prompt": row.get("Veo-3.1 Fast Prompt (Text-to-Video)", ""),
116
  "tts_script": tts_script,
 
 
 
 
 
117
  }
118
 
119
+ result = await run_pipeline(automation, content_strategy)
120
  logger.info(f"βœ… Completed {tts_script[:20]}...: success={result.get('success', False)}")
121
  list_gcs_files()
122
  return result
 
347
  config = load_configuration()
348
 
349
  await download_all_video(config)
350
+ if os.getenv("ON_SCREEN_TEXT", "false").lower() != "true" and get_str("setup_type") in ["beats_cut", "hard_cut"]:
351
  await create_plain_videos(config, commit=args.commit, job_index=job_index, total_jobs=total_jobs)
352
  else:
353
  await process_all_csvs(config, commit=args.commit, job_index=job_index, total_jobs=total_jobs)
src/social_media_publishers/instagram_publisher.py CHANGED
@@ -17,10 +17,8 @@ from dotenv import load_dotenv
17
  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
18
 
19
  from pathlib import Path
20
- from load_config import load_configuration
21
- from main import (
22
- load_content_strategies
23
- )
24
  import hashlib
25
  from google_src.gcs_utils import find_and_download_gcs_file, upload_file_to_gcs
26
 
@@ -153,14 +151,13 @@ async def main():
153
  config = load_configuration()
154
  scheduler = InstagramPublisher()
155
 
156
- csv_files = sorted(DATA_DIR.glob("content_strategies*.csv"))
 
157
  all_rows = []
158
 
159
- for csv_file in csv_files:
160
- print(f"πŸ“‚ Reading {csv_file.name}")
161
- df = load_content_strategies(str(csv_file))
162
- for i, row in df.iterrows():
163
- all_rows.append((csv_file.name, row.to_dict()))
164
 
165
  print(f"πŸ“ˆ Found {len(all_rows)} reels to upload")
166
 
 
17
  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
18
 
19
  from pathlib import Path
20
+ from config import load_configuration
21
+ from asset_manager.content_strategy_lib import get_content_strategy_lib
 
 
22
  import hashlib
23
  from google_src.gcs_utils import find_and_download_gcs_file, upload_file_to_gcs
24
 
 
151
  config = load_configuration()
152
  scheduler = InstagramPublisher()
153
 
154
+ content_lib = get_content_strategy_lib()
155
+ df = content_lib.get_strategies()
156
  all_rows = []
157
 
158
+ worksheet_name = os.getenv("CONTENT_STRATEGY_GSHEET_WORKSHEET", "Instagram_Upload")
159
+ for i, row in df.iterrows():
160
+ all_rows.append((worksheet_name, row.to_dict()))
 
 
161
 
162
  print(f"πŸ“ˆ Found {len(all_rows)} reels to upload")
163
 
src/social_media_publishers/publisher.py CHANGED
@@ -16,8 +16,8 @@ import time
16
  from pathlib import Path
17
  import hashlib
18
 
19
- from load_config import load_configuration
20
- from main import load_content_strategies
21
  from google_src.gcs_utils import find_and_download_gcs_file, upload_file_to_gcs
22
 
23
  # Import individual platform publishers
@@ -124,13 +124,18 @@ async def run_publisher(media: str, commit=False):
124
  sys.exit(1)
125
 
126
  # Load CSVs
127
- csv_files = sorted(DATA_DIR.glob("content_strategies*.csv"))
 
 
 
 
 
 
 
128
  all_rows = []
129
- for csv_file in csv_files:
130
- print(f"πŸ“‚ Reading {csv_file.name}")
131
- df = load_content_strategies(str(csv_file))
132
- for i, row in df.iterrows():
133
- all_rows.append((csv_file.name, row.to_dict()))
134
 
135
  print(f"πŸ“ˆ Total rows to process: {len(all_rows)}")
136
 
 
16
  from pathlib import Path
17
  import hashlib
18
 
19
+ from config import load_configuration
20
+ from asset_manager.content_strategy_lib import get_content_strategy_lib
21
  from google_src.gcs_utils import find_and_download_gcs_file, upload_file_to_gcs
22
 
23
  # Import individual platform publishers
 
124
  sys.exit(1)
125
 
126
  # Load CSVs
127
+ # Load strategies from Google Sheet
128
+ content_lib = get_content_strategy_lib()
129
+ df = content_lib.get_strategies()
130
+
131
+ if df.empty:
132
+ print("❌ No content strategies found in Google Sheet!")
133
+ sys.exit(1)
134
+
135
  all_rows = []
136
+ worksheet_name = os.getenv("CONTENT_STRATEGY_GSHEET_WORKSHEET", "Unknown_Worksheet")
137
+ for i, row in df.iterrows():
138
+ all_rows.append((worksheet_name, row.to_dict()))
 
 
139
 
140
  print(f"πŸ“ˆ Total rows to process: {len(all_rows)}")
141
 
src/social_media_publishers/tiktok_publisher.py CHANGED
@@ -18,10 +18,8 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
18
 
19
  from dotenv import load_dotenv
20
  from pathlib import Path
21
- from load_config import load_configuration
22
- from main import (
23
- load_content_strategies
24
- )
25
  import hashlib
26
  from google_src.gcs_utils import find_and_download_gcs_file
27
 
@@ -182,17 +180,16 @@ class TikTokPublisher:
182
  # ===========================================================
183
  async def main():
184
  try:
185
- config = load_configuration()
186
  config = load_configuration()
187
  scheduler = TikTokPublisher()
188
 
189
- csv_files = sorted(DATA_DIR.glob("content_strategies*.csv"))
 
 
190
  all_rows = []
191
- for csv_file in csv_files:
192
- print(f"πŸ“‚ Reading: {csv_file.name}")
193
- df = load_content_strategies(str(csv_file))
194
- for i, row in df.iterrows():
195
- all_rows.append((csv_file.name, row.to_dict()))
196
 
197
  print(f"πŸ“ˆ Found {len(all_rows)} TikTok videos to upload.")
198
 
 
18
 
19
  from dotenv import load_dotenv
20
  from pathlib import Path
21
+ from config import load_configuration
22
+ from asset_manager.content_strategy_lib import get_content_strategy_lib
 
 
23
  import hashlib
24
  from google_src.gcs_utils import find_and_download_gcs_file
25
 
 
180
  # ===========================================================
181
  async def main():
182
  try:
 
183
  config = load_configuration()
184
  scheduler = TikTokPublisher()
185
 
186
+ content_lib = get_content_strategy_lib()
187
+ df = content_lib.get_strategies()
188
+
189
  all_rows = []
190
+ worksheet_name = os.getenv("CONTENT_STRATEGY_GSHEET_WORKSHEET", "TikTok_Upload")
191
+ for i, row in df.iterrows():
192
+ all_rows.append((worksheet_name, row.to_dict()))
 
 
193
 
194
  print(f"πŸ“ˆ Found {len(all_rows)} TikTok videos to upload.")
195
 
src/social_media_publishers/youtube_publisher.py CHANGED
@@ -20,10 +20,8 @@ from datetime import datetime, timedelta
20
  # Add parent directory to path to allow importing modules from src
21
  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
22
 
23
- from load_config import load_configuration
24
- from main import (
25
- load_content_strategies
26
- )
27
  from pathlib import Path
28
  from dotenv import load_dotenv
29
  import hashlib
@@ -307,12 +305,14 @@ async def main():
307
  # Initialize and upload
308
  scheduler = YouTubePublisher()
309
 
310
- csv_files = sorted(DATA_DIR.glob("content_strategies*.csv"))
 
 
 
311
  all_rows = []
312
- for csv_file in csv_files:
313
- df = load_content_strategies(str(csv_file))
314
- for i, row in df.iterrows():
315
- all_rows.append((csv_file.name, row.to_dict()))
316
 
317
  for idx, (csv_name, row) in enumerate(all_rows):
318
  tts_script = row.get("TTS Script (AI Avatar)", "").strip()
 
20
  # Add parent directory to path to allow importing modules from src
21
  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
22
 
23
+ from config import load_configuration
24
+ from asset_manager.content_strategy_lib import get_content_strategy_lib
 
 
25
  from pathlib import Path
26
  from dotenv import load_dotenv
27
  import hashlib
 
305
  # Initialize and upload
306
  scheduler = YouTubePublisher()
307
 
308
+ # Load strategies
309
+ content_lib = get_content_strategy_lib()
310
+ df = content_lib.get_strategies()
311
+
312
  all_rows = []
313
+ worksheet_name = os.getenv("CONTENT_STRATEGY_GSHEET_WORKSHEET", "YouTube_Upload")
314
+ for i, row in df.iterrows():
315
+ all_rows.append((worksheet_name, row.to_dict()))
 
316
 
317
  for idx, (csv_name, row) in enumerate(all_rows):
318
  tts_script = row.get("TTS Script (AI Avatar)", "").strip()