jebin2 commited on
Commit
ea33c8c
·
1 Parent(s): 659fbdb

refactor: create asset_manager module with singleton classes

Browse files

- Add VideoLib singleton for video library loading from GSheet
- Add AudioLib singleton for audio library and music selection
- Add AssetDownloader singleton for centralized downloads
- Add AssetProcessor for AI video selection via Gemini
- Simplify AssetSelector to thin wrapper using singletons
- Add public HTTP/HTTPS URL support in FileDownloader
- Fix video processing order: remove_black_padding -> resize_video
- Remove SHARED_ASSET_SELECTOR (singletons handle caching)
- Use FileDownloader directly instead of wrapper methods

src/asset_manager/__init__.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Asset Manager Module
3
+
4
+ Provides singleton classes for managing video/audio libraries and asset downloads.
5
+ """
6
+
7
+ from .video_lib import get_video_lib, VideoLib
8
+ from .audio_lib import get_audio_lib, AudioLib
9
+ from .asset_downloader import get_asset_downloader, AssetDownloader
10
+ from .asset_processor import AssetProcessor
11
+
12
+ __all__ = [
13
+ "get_video_lib",
14
+ "VideoLib",
15
+ "get_audio_lib",
16
+ "AudioLib",
17
+ "get_asset_downloader",
18
+ "AssetDownloader",
19
+ "AssetProcessor",
20
+ ]
src/asset_manager/asset_downloader.py ADDED
@@ -0,0 +1,230 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ AssetDownloader - Singleton class for downloading video and visual assets
3
+ Uses FileDownloader for all download operations.
4
+ """
5
+
6
+ import asyncio
7
+ from pathlib import Path
8
+ from typing import Dict, List, Optional, Any
9
+
10
+ from utils import logger, is_valid_video, resize_video, remove_black_padding
11
+ from file_downloader import FileDownloader
12
+ from .video_lib import get_video_lib, VideoLib
13
+
14
+
15
+ class AssetDownloader:
16
+ """
17
+ Singleton class that handles downloading all video and visual assets.
18
+ Uses FileDownloader for all download operations.
19
+
20
+ Usage:
21
+ downloader = get_asset_downloader()
22
+ videos = await downloader.download_all_videos()
23
+ await downloader.download_all_visual_assets(data_holder)
24
+ """
25
+
26
+ def __init__(self):
27
+ self.file_downloader = FileDownloader()
28
+ self._downloaded_videos: List[Dict] = []
29
+ self._videos_downloaded = False
30
+ logger.info("✓ AssetDownloader initialized")
31
+
32
+ @property
33
+ def downloaded_videos(self) -> List[Dict]:
34
+ """Get list of downloaded videos with their local paths"""
35
+ return self._downloaded_videos
36
+
37
+ async def download_all_videos(self, video_lib: Optional[VideoLib] = None) -> List[Dict]:
38
+ """
39
+ Download all library videos once and cache them.
40
+
41
+ Args:
42
+ video_lib: VideoLib instance (uses singleton if not provided)
43
+
44
+ Returns:
45
+ List of dicts with 'url' and 'local_path' keys
46
+ """
47
+ # Return cached if already downloaded
48
+ if self._videos_downloaded and self._downloaded_videos:
49
+ logger.info("✅ All videos already downloaded — returning cached.")
50
+ return self._downloaded_videos
51
+
52
+ video_lib = video_lib or get_video_lib()
53
+
54
+ download_path = "testData/video_for_workflow"
55
+ Path(download_path).mkdir(parents=True, exist_ok=True)
56
+
57
+ videos = []
58
+ for _, row in video_lib.video_library.iterrows():
59
+ url = str(row.get("VIDEO_LINK", "")).strip()
60
+ if not url:
61
+ continue
62
+
63
+ local_path = self.file_downloader.safe_download(url=url)
64
+ if not local_path or not is_valid_video(local_path):
65
+ logger.warning(f"⚠️ Skipped invalid video: {url}")
66
+ continue
67
+
68
+ # Process video: first remove padding, then resize to 1080x1920
69
+ try:
70
+ remove_black_padding(local_path, overwrite=True)
71
+ resize_video(local_path, overwrite=True)
72
+ except Exception as e:
73
+ logger.warning(f"⚠️ Could not process {local_path}: {e}")
74
+ # Continue anyway - video is still usable
75
+
76
+ videos.append({
77
+ "url": url,
78
+ "local_path": str(local_path),
79
+ })
80
+
81
+ self._downloaded_videos = videos
82
+ self._videos_downloaded = True
83
+ logger.info(f"✅ Downloaded {len(videos)} library videos")
84
+
85
+ return videos
86
+
87
+ async def download_all_visual_assets(
88
+ self,
89
+ data_holder: Any,
90
+ api_clients: Any = None, # Kept for backward compatibility, not used
91
+ resize: bool = True
92
+ ) -> None:
93
+ """
94
+ Download ALL visual assets with proper error handling.
95
+ Uses FileDownloader for all downloads.
96
+
97
+ Args:
98
+ data_holder: DataHolder instance with visual_assets
99
+ api_clients: Deprecated, kept for backward compatibility
100
+ resize: Whether to resize videos after download
101
+ """
102
+ download_tasks = []
103
+ assets = data_holder.visual_assets
104
+
105
+ # Download hook video with explicit local_path assignment
106
+ if assets.get("hook_video") and assets["hook_video"].get("video_url"):
107
+ hook_url = assets["hook_video"]["video_url"]
108
+ download_tasks.append(
109
+ self._download_file(
110
+ hook_url, "hook_video.mp4",
111
+ assets["hook_video"], "local_path", resize=resize
112
+ )
113
+ )
114
+ # VEO library videos
115
+ if assets["hook_video"].get("veo_video_data") and assets["hook_video"].get("veo_video_data").get("video_url"):
116
+ veo_hook_url = assets["hook_video"]["veo_video_data"]["video_url"]
117
+ download_tasks.append(
118
+ self._download_file(
119
+ veo_hook_url, "veo_hook_url.mp4",
120
+ assets["hook_video"]["veo_video_data"], "local_path",
121
+ resize=resize, remove_padding=True
122
+ )
123
+ )
124
+
125
+ # Download library videos
126
+ for i, video in enumerate(assets.get("selected_videos", [])):
127
+ if video.get("url"):
128
+ download_tasks.append(
129
+ self._download_file(
130
+ video["url"], f"library_video_{i}.mp4",
131
+ video, "local_path", resize=resize
132
+ )
133
+ )
134
+ if video.get("alternate_url"):
135
+ download_tasks.append(
136
+ self._download_file(
137
+ video["alternate_url"], f"library_all_video_alternate_url_{i}.mp4",
138
+ video, "alternate_url_local_path", resize=resize
139
+ )
140
+ )
141
+
142
+ # Download library videos from all_videos
143
+ for i, video in enumerate(assets.get("all_videos", [])):
144
+ if video.get("url") and not video.get("local_path", None):
145
+ download_tasks.append(
146
+ self._download_file(
147
+ video["url"], f"library_all_video_{i}.mp4",
148
+ video, "local_path"
149
+ )
150
+ )
151
+
152
+ # Wait for all downloads to complete
153
+ if download_tasks:
154
+ results = await asyncio.gather(*download_tasks, return_exceptions=True)
155
+
156
+ # Check for failures
157
+ for i, result in enumerate(results):
158
+ if isinstance(result, Exception):
159
+ logger.error(f"❌ Download task {i} failed: {result}")
160
+
161
+ async def _download_file(
162
+ self,
163
+ url: str,
164
+ filename: str,
165
+ target_dict: Dict,
166
+ key: str = "local_path",
167
+ resize: bool = False,
168
+ remove_padding: bool = False
169
+ ) -> Optional[str]:
170
+ """
171
+ Download file using FileDownloader and store in target_dict.
172
+
173
+ Args:
174
+ url: URL to download from (GCS or Drive)
175
+ filename: Suggested filename (used if can't determine from URL)
176
+ target_dict: Dict to store the local_path in
177
+ key: Key to use in target_dict (default: 'local_path')
178
+ resize: Whether to resize video after download
179
+ remove_padding: Whether to remove black padding after download
180
+
181
+ Returns:
182
+ Local path to downloaded file, or None on failure
183
+ """
184
+ try:
185
+ # Use FileDownloader for the actual download
186
+ local_path = self.file_downloader.safe_download(url=url)
187
+
188
+ if not local_path:
189
+ raise Exception(f"Download returned None for {url}")
190
+
191
+ if remove_padding:
192
+ remove_black_padding(str(local_path), overwrite=True)
193
+ if resize:
194
+ resize_video(str(local_path), overwrite=True)
195
+
196
+ target_dict[key] = str(local_path)
197
+ logger.info(f"✓ Downloaded {filename}")
198
+ return str(local_path)
199
+ except Exception as e:
200
+ logger.error(f"❌ Failed to download {filename}: {e}")
201
+ raise
202
+
203
+ def reset(self) -> None:
204
+ """Reset the downloader state (useful for testing)"""
205
+ self._downloaded_videos = []
206
+ self._videos_downloaded = False
207
+ logger.info("🔄 AssetDownloader reset")
208
+
209
+
210
+ # Module-level singleton instance
211
+ _asset_downloader: Optional[AssetDownloader] = None
212
+
213
+
214
+ def get_asset_downloader() -> AssetDownloader:
215
+ """
216
+ Get the singleton AssetDownloader instance.
217
+
218
+ Returns:
219
+ AssetDownloader: The singleton instance
220
+ """
221
+ global _asset_downloader
222
+ if _asset_downloader is None:
223
+ _asset_downloader = AssetDownloader()
224
+ return _asset_downloader
225
+
226
+
227
+ def reset_asset_downloader() -> None:
228
+ """Reset the singleton (useful for testing)"""
229
+ global _asset_downloader
230
+ _asset_downloader = None
src/asset_manager/asset_processor.py ADDED
@@ -0,0 +1,177 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ AssetProcessor - Handles video selection and processing using AI
3
+ """
4
+
5
+ import json
6
+ import re
7
+ import random
8
+ from typing import List, Dict, Optional, Tuple
9
+
10
+ import pandas as pd
11
+ import json_repair
12
+ from moviepy.editor import VideoFileClip
13
+
14
+ import gemini_sdk
15
+ from utils import logger
16
+ from .video_lib import get_video_lib
17
+
18
+
19
+ class AssetProcessor:
20
+ """
21
+ Handles video selection and processing using AI (Gemini).
22
+
23
+ Usage:
24
+ processor = AssetProcessor(data_holder)
25
+ videos = await processor.select_videos(tts_script, timed_transcript)
26
+ """
27
+
28
+ def __init__(self, data_holder):
29
+ self.data_holder = data_holder
30
+ self._video_lib = get_video_lib()
31
+
32
+ @property
33
+ def video_library(self) -> pd.DataFrame:
34
+ """Get video library from singleton"""
35
+ return self._video_lib.video_library
36
+
37
+ def _parse_duration(self, duration_str: str) -> int:
38
+ """Parse duration from various string formats to integer seconds"""
39
+ try:
40
+ if pd.isna(duration_str) or duration_str == "":
41
+ return 0
42
+
43
+ duration_str = str(duration_str).lower().strip()
44
+ numbers = re.findall(r"(\d+\.?\d*)", duration_str)
45
+ if numbers:
46
+ return int(float(numbers[0]))
47
+
48
+ return 0
49
+ except (ValueError, TypeError) as e:
50
+ logger.warning(f"Failed to parse duration '{duration_str}': {e}")
51
+ return 0
52
+
53
+ async def select_videos(self, tts_script: str, timed_transcript, max_duration: int = 12) -> List[Dict]:
54
+ """Select videos using AI analysis of TTS script"""
55
+ try:
56
+ logger.info(f"🤖 AI video selection for script: {tts_script[:300]}...")
57
+
58
+ selected_videos = await self._analyze_with_gemini(
59
+ tts_script=tts_script,
60
+ timed_transcript=timed_transcript
61
+ )
62
+
63
+ if not selected_videos:
64
+ raise Exception("⚠️ AI selection failed")
65
+
66
+ for video in selected_videos:
67
+ if isinstance(video.get("duration"), str):
68
+ video["duration"] = self._parse_duration(video["duration"])
69
+
70
+ total_duration = sum(int(v.get("duration", 0)) for v in selected_videos)
71
+ logger.info(f"✓ Selected {len(selected_videos)} videos, total: {total_duration}s")
72
+
73
+ return selected_videos
74
+
75
+ except Exception as e:
76
+ logger.error(f"❌ Video selection failed: {e}")
77
+ raise
78
+
79
+ async def _analyze_with_gemini(self, tts_script: str, timed_transcript) -> List[Dict]:
80
+ """Use Gemini API for contextual video selection"""
81
+ try:
82
+ video_context = await self.prepare_video_context()
83
+ with open("src/prompt/best_matches_two_video_tracking.md", "r", encoding="utf-8") as file:
84
+ system_prompt = file.read()
85
+
86
+ model_input = f"""SYSTEM INSTRUCTION::
87
+ {system_prompt}
88
+
89
+
90
+ USER PROMPT:
91
+ TTS Script: {tts_script}
92
+ Video Options: {video_context}
93
+ """
94
+ response = gemini_sdk.generate(model_input)
95
+
96
+ response_text = response.strip()
97
+
98
+ selection = json_repair.loads(response_text)
99
+
100
+ selected = []
101
+ for item in selection:
102
+ video_index = item["video_index"]
103
+ if video_index < len(self.video_library):
104
+ video_row = self.video_library[self.video_library["Video URL (No Audio)"] == item["video_url"]]
105
+ video = video_row.iloc[0]
106
+ selected.append(
107
+ {
108
+ "url": video.get("Video URL (No Audio)", video.get("url", "")),
109
+ "alternate_url": None,
110
+ "alternate_url_local_path": None,
111
+ "video_summary": video.get("Full Video Description Summary"),
112
+ "tts_script_segment": item["tts_script_segment"],
113
+ "duration": video.get("duration", 0),
114
+ "reason": item["reason"],
115
+ "alignment": video.get("Video Alignment with the TTS Script", video.get("alignment", "")),
116
+ "energy": video.get("energy_score", 0),
117
+ }
118
+ )
119
+ if "alternate_video_index" in item:
120
+ video_row = self.video_library[self.video_library["Video URL (No Audio)"] == item["alternate_video_url"]]
121
+ video = video_row.iloc[0]
122
+ selected[-1]["alternate_url"] = video.get("Video URL (No Audio)", video.get("url", ""))
123
+
124
+ logger.info(f"✓ Gemini selected {len(selected)}")
125
+ return selected
126
+
127
+ except json.JSONDecodeError as e:
128
+ logger.error(f"Failed to parse Gemini JSON response: {e}")
129
+ logger.debug(f"Raw response: {response_text[:500]}")
130
+ raise
131
+ except Exception as e:
132
+ logger.error(f"Gemini analysis failed: {e}")
133
+ import traceback
134
+ traceback.print_exc()
135
+ raise
136
+
137
+ async def prepare_video_context(self) -> str:
138
+ """Prepare video context for AI analysis by reading actual durations"""
139
+ # Update durations using actual local files
140
+ for video in self.data_holder.visual_assets["all_videos"]:
141
+ local_path = video.get("local_path")
142
+ if local_path:
143
+ try:
144
+ with VideoFileClip(local_path) as clip:
145
+ video["duration"] = round(clip.duration, 2)
146
+ except Exception as e:
147
+ logger.warning(f"⚠️ Error reading duration for {local_path}: {e}")
148
+ video["duration"] = 0
149
+ else:
150
+ video["duration"] = 0
151
+
152
+ # Form video_context string (using actual durations)
153
+ video_context = "\n".join(
154
+ [
155
+ f"{i+1}. {row.get('Video URL (No Audio)')} - "
156
+ f"{row.get('Full Video Description Summary', row.get('description', ''))} - "
157
+ f"{next((v.get('duration', 0) for v in self.data_holder.visual_assets['all_videos'] if v['url'] == row.get('Video URL (No Audio)')), 0)}s - "
158
+ f"Alignment: {row.get('Video Alignment with the TTS Script', row.get('alignment', ''))} - "
159
+ f"Usage Count: {self.data_holder.video_usage_count.get(row.get('Video URL (No Audio)'), 0)}"
160
+ for i, row in self.video_library.iterrows()
161
+ ]
162
+ )
163
+
164
+ return video_context
165
+
166
+ def select_random_videos(self, count: int) -> List[str]:
167
+ """Select random videos from downloaded library"""
168
+ all_videos = self.data_holder.visual_assets.get("all_videos", [])
169
+ available_videos = [v for v in all_videos if v.get("local_path")]
170
+
171
+ if len(available_videos) < count:
172
+ logger.warning(f"⚠️ Not enough videos to select {count} random videos. Selecting {len(available_videos)} instead.")
173
+ count = len(available_videos)
174
+
175
+ selected_videos = random.sample(available_videos, count)
176
+
177
+ return [v["local_path"] for v in selected_videos]
src/asset_manager/audio_lib.py ADDED
@@ -0,0 +1,205 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ AudioLib - Singleton class for managing audio library from Google Sheets
3
+ """
4
+
5
+ import os
6
+ import re
7
+ import pandas as pd
8
+ from typing import Optional, List
9
+
10
+ from utils import logger, clean_and_drop_empty
11
+ from google_sheet_reader import GoogleSheetReader
12
+ from google_src import get_default_wrapper, GCloudWrapper
13
+ import setup_config
14
+
15
+
16
+ class AudioLib:
17
+ """
18
+ Singleton class that loads and manages audio library from Google Sheets.
19
+ Handles background music selection and beat timing.
20
+
21
+ Usage:
22
+ audio_lib = get_audio_lib()
23
+ music_url = audio_lib.select_background_music()
24
+ beats = audio_lib.get_audio_beats(music_url)
25
+ """
26
+
27
+ def __init__(self, gcloud_wrapper: Optional[GCloudWrapper] = None, initial_audio_index: int = 0):
28
+ self._gcloud_wrapper = gcloud_wrapper or get_default_wrapper()
29
+ self._audio_library: pd.DataFrame = self._load_from_gsheet()
30
+
31
+ if len(self._audio_library) == 0:
32
+ raise ValueError("Audio library is empty! Check AUDIO_LIBRARY_GSHEET_WORKSHEET env var and Google Sheet access.")
33
+
34
+ self._current_audio_index = initial_audio_index % len(self._audio_library)
35
+ logger.info(f"✓ AudioLib initialized with {len(self._audio_library)} audio tracks, starting at index {self._current_audio_index}")
36
+
37
+ @property
38
+ def audio_library(self) -> pd.DataFrame:
39
+ """Get the audio library DataFrame"""
40
+ return self._audio_library
41
+
42
+ @property
43
+ def current_audio_index(self) -> int:
44
+ """Get current audio index"""
45
+ return self._current_audio_index
46
+
47
+ @current_audio_index.setter
48
+ def current_audio_index(self, value: int) -> None:
49
+ """Set current audio index (wraps around)"""
50
+ if len(self._audio_library) > 0:
51
+ self._current_audio_index = value % len(self._audio_library)
52
+
53
+ def _load_from_gsheet(self, account_id: str = "test_data") -> pd.DataFrame:
54
+ """
55
+ Load audio library from Google Sheet.
56
+
57
+ Args:
58
+ account_id: Which account to use ('final_data' or 'test_data')
59
+ """
60
+ try:
61
+ worksheet_name = os.getenv("AUDIO_LIBRARY_GSHEET_WORKSHEET")
62
+ if not worksheet_name:
63
+ logger.error("AUDIO_LIBRARY_GSHEET_WORKSHEET env var not set!")
64
+ return pd.DataFrame()
65
+
66
+ logger.info(f"Loading audio library using account: {account_id}")
67
+ googleSheetReader = GoogleSheetReader(
68
+ worksheet_name=worksheet_name,
69
+ gcloud_wrapper=self._gcloud_wrapper,
70
+ account_id=account_id,
71
+ )
72
+ audio_df = googleSheetReader.get_filtered_dataframe()
73
+
74
+ # Filter by beats timing if in beats_cut mode
75
+ if setup_config.get_str("setup_type") == "beats_cut":
76
+ audio_df = clean_and_drop_empty(audio_df, "Beats Timing(SS:FF) AT 25FPS")
77
+
78
+ return clean_and_drop_empty(audio_df, "AUDIO_LINK")
79
+ except Exception as e:
80
+ error_msg = str(e) if str(e) else type(e).__name__
81
+ if "403" in error_msg or "permission" in error_msg.lower() or "forbidden" in error_msg.lower():
82
+ logger.error(f"❌ PERMISSION ERROR loading audio library: {error_msg}")
83
+ logger.error("Share the Google Sheet with the service account email as Editor!")
84
+ elif "404" in error_msg or "not found" in error_msg.lower():
85
+ logger.error(f"❌ WORKSHEET NOT FOUND: '{os.getenv('AUDIO_LIBRARY_GSHEET_WORKSHEET')}'")
86
+ else:
87
+ logger.error(f"Failed to load audio library from Google Sheet: {error_msg}")
88
+ return pd.DataFrame()
89
+
90
+ def inc_audio_index(self) -> None:
91
+ """Increment current audio index (wraps around)"""
92
+ self._current_audio_index = (self._current_audio_index + 1) % len(self._audio_library)
93
+
94
+ def select_background_music(self) -> str:
95
+ """
96
+ Select background music SEQUENTIALLY (not random).
97
+ Each call increments the index to ensure different music for each video.
98
+
99
+ Returns:
100
+ URL of the selected audio track
101
+ """
102
+ if self._audio_library.empty:
103
+ logger.error("❌ Audio library is empty")
104
+ return ""
105
+
106
+ selected = self._audio_library.iloc[self._current_audio_index]["AUDIO_LINK"]
107
+
108
+ logger.info(
109
+ f"🎵 Selected background music #{self._current_audio_index + 1}/{len(self._audio_library)}: {selected}"
110
+ )
111
+
112
+ # Increment index for next call (loop back to start if needed)
113
+ self._current_audio_index = (self._current_audio_index + 1) % len(self._audio_library)
114
+
115
+ return selected
116
+
117
+ def get_audio_beats(self, audio_link: str) -> Optional[List[float]]:
118
+ """
119
+ Load audio beats timing from audio_library and convert
120
+ SS:FF (25 FPS) → seconds (float)
121
+
122
+ Example:
123
+ "01:12" → 1 + 12/25 = 1.48
124
+
125
+ Args:
126
+ audio_link: URL of the audio track
127
+
128
+ Returns:
129
+ List of beat times in seconds, or None if not found
130
+ """
131
+ try:
132
+ if self._audio_library.empty:
133
+ logger.error("Audio library is empty")
134
+ return None
135
+
136
+ # Find matching row
137
+ row = self._audio_library.loc[
138
+ self._audio_library["AUDIO_LINK"] == audio_link
139
+ ]
140
+
141
+ if row.empty:
142
+ logger.error(f"No audio entry found for: {audio_link}")
143
+ return None
144
+
145
+ beats_raw = row.iloc[0]["Beats Timing(SS:FF) AT 25FPS"]
146
+
147
+ if pd.isna(beats_raw) or not str(beats_raw).strip():
148
+ logger.warning(f"No beat data for audio: {audio_link}")
149
+ return None
150
+
151
+ beats: List[float] = []
152
+
153
+ for token in str(beats_raw).split(","):
154
+ token = token.strip()
155
+
156
+ if ":" not in token:
157
+ continue
158
+
159
+ sec, frame = token.split(":", 1)
160
+
161
+ beats.append(
162
+ round(int(sec) + (int(frame) / 25.0), 2)
163
+ )
164
+
165
+ return beats if beats else None
166
+
167
+ except Exception as e:
168
+ logger.error(
169
+ f"Failed to compute audio beats map for {audio_link}: {e}"
170
+ )
171
+ return None
172
+
173
+ def reset_audio_index(self) -> None:
174
+ """Reset audio index to start from beginning (useful for batch processing)"""
175
+ self._current_audio_index = 0
176
+ logger.info("🔄 Reset background music index to 0")
177
+
178
+ def __len__(self) -> int:
179
+ return len(self._audio_library)
180
+
181
+
182
+ # Module-level singleton instance
183
+ _audio_lib: Optional[AudioLib] = None
184
+
185
+
186
+ def get_audio_lib(initial_audio_index: int = 0) -> AudioLib:
187
+ """
188
+ Get the singleton AudioLib instance.
189
+
190
+ Args:
191
+ initial_audio_index: Starting index for audio selection (only used on first call)
192
+
193
+ Returns:
194
+ AudioLib: The singleton instance
195
+ """
196
+ global _audio_lib
197
+ if _audio_lib is None:
198
+ _audio_lib = AudioLib(initial_audio_index=initial_audio_index)
199
+ return _audio_lib
200
+
201
+
202
+ def reset_audio_lib() -> None:
203
+ """Reset the singleton (useful for testing)"""
204
+ global _audio_lib
205
+ _audio_lib = None
src/asset_manager/video_lib.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ VideoLib - Singleton class for managing video library from Google Sheets
3
+ """
4
+
5
+ import os
6
+ import pandas as pd
7
+ from typing import Optional, List, Dict
8
+
9
+ from utils import logger, clean_and_drop_empty
10
+ from google_sheet_reader import GoogleSheetReader
11
+ from google_src import get_default_wrapper, GCloudWrapper
12
+
13
+
14
+ class VideoLib:
15
+ """
16
+ Singleton class that loads and manages video library from Google Sheets.
17
+
18
+ Usage:
19
+ video_lib = get_video_lib()
20
+ for url in video_lib.get_video_urls():
21
+ print(url)
22
+ """
23
+
24
+ def __init__(self, gcloud_wrapper: Optional[GCloudWrapper] = None):
25
+ self._gcloud_wrapper = gcloud_wrapper or get_default_wrapper()
26
+ self._video_library: pd.DataFrame = self._load_from_gsheet()
27
+ logger.info(f"✓ VideoLib initialized with {len(self._video_library)} videos")
28
+
29
+ @property
30
+ def video_library(self) -> pd.DataFrame:
31
+ """Get the video library DataFrame"""
32
+ return self._video_library
33
+
34
+ def _load_from_gsheet(self, account_id: str = "test_data") -> pd.DataFrame:
35
+ """
36
+ Load video library from Google Sheet.
37
+
38
+ Args:
39
+ account_id: Which account to use ('final_data' or 'test_data')
40
+ """
41
+ try:
42
+ worksheet_name = os.getenv("VIDEO_LIBRARY_GSHEET_WORKSHEET")
43
+ if not worksheet_name:
44
+ logger.error("VIDEO_LIBRARY_GSHEET_WORKSHEET env var not set!")
45
+ return pd.DataFrame()
46
+
47
+ logger.info(f"Loading video library using account: {account_id}")
48
+ googleSheetReader = GoogleSheetReader(
49
+ worksheet_name=worksheet_name,
50
+ gcloud_wrapper=self._gcloud_wrapper,
51
+ account_id=account_id,
52
+ )
53
+ video_df = googleSheetReader.get_filtered_dataframe()
54
+ return clean_and_drop_empty(video_df, "VIDEO_LINK")
55
+ except Exception as e:
56
+ error_msg = str(e) if str(e) else type(e).__name__
57
+ if "403" in error_msg or "permission" in error_msg.lower() or "forbidden" in error_msg.lower():
58
+ logger.error(f"❌ PERMISSION ERROR loading video library: {error_msg}")
59
+ logger.error("Share the Google Sheet with the service account email as Editor!")
60
+ elif "404" in error_msg or "not found" in error_msg.lower():
61
+ logger.error(f"❌ WORKSHEET NOT FOUND: '{os.getenv('VIDEO_LIBRARY_GSHEET_WORKSHEET')}'")
62
+ else:
63
+ logger.error(f"Failed to load video library from Google Sheet: {error_msg}")
64
+ return pd.DataFrame()
65
+
66
+ def get_video_urls(self) -> List[str]:
67
+ """Get list of all video URLs"""
68
+ if self._video_library.empty:
69
+ return []
70
+ return [
71
+ str(row.get("VIDEO_LINK", "")).strip()
72
+ for _, row in self._video_library.iterrows()
73
+ if row.get("VIDEO_LINK", "").strip()
74
+ ]
75
+
76
+ def get_video_by_url(self, url: str) -> Optional[Dict]:
77
+ """Get video data by URL"""
78
+ if self._video_library.empty:
79
+ return None
80
+ matches = self._video_library[self._video_library["VIDEO_LINK"] == url]
81
+ if matches.empty:
82
+ return None
83
+ return matches.iloc[0].to_dict()
84
+
85
+ def __len__(self) -> int:
86
+ return len(self._video_library)
87
+
88
+
89
+ # Module-level singleton instance
90
+ _video_lib: Optional[VideoLib] = None
91
+
92
+
93
+ def get_video_lib() -> VideoLib:
94
+ """
95
+ Get the singleton VideoLib instance.
96
+
97
+ Returns:
98
+ VideoLib: The singleton instance
99
+ """
100
+ global _video_lib
101
+ if _video_lib is None:
102
+ _video_lib = VideoLib()
103
+ return _video_lib
104
+
105
+
106
+ def reset_video_lib() -> None:
107
+ """Reset the singleton (useful for testing)"""
108
+ global _video_lib
109
+ _video_lib = None
src/asset_selector.py CHANGED
@@ -1,387 +1,79 @@
 
 
 
 
 
1
  import pandas as pd
2
- import utils
3
- import json
4
  from typing import List, Dict, Optional, Tuple
5
  from utils import logger
6
- import os
7
- import re
8
- import json_repair
9
- from moviepy.editor import VideoFileClip, AudioFileClip
10
  from data_holder import DataHolder
11
 
12
- import gemini_sdk
13
- from google_sheet_reader import GoogleSheetReader
14
- from google_src import GCloudWrapper, GCloudAccount, get_default_wrapper
15
- import setup_config
16
 
17
 
18
  class AssetSelector:
19
- def __init__(self, config: Dict, data_holder: DataHolder = None, gcloud_wrapper: GCloudWrapper = None):
 
 
 
 
 
 
 
 
20
  self.config = config
21
  self.data_holder = data_holder
22
 
23
- # Setup GCloud wrapper with two accounts: final_data and test_data
24
- self._gcloud_wrapper = gcloud_wrapper or get_default_wrapper()
25
-
26
- self.video_library = self._load_video_library_from_gsheet()
27
- self.audio_library = self._load_audio_library_from_gsheet()
28
-
29
- # Track current background music index for sequential selection
30
- self.current_audio_index = 0 if "current_audio_index" not in self.config else self.config["current_audio_index"]
31
-
32
- if len(self.audio_library) == 0:
33
- raise ValueError("Audio library is empty! Check AUDIO_LIBRARY_GSHEET_WORKSHEET env var and Google Sheet access.")
34
 
35
- self.current_audio_index = (self.current_audio_index) % len(self.audio_library)
36
-
37
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
 
39
  def inc_audio_index(self):
40
- """Increment and save current audio index"""
41
- self.current_audio_index = (self.current_audio_index + 1) % len(self.audio_library)
42
- self.config["current_audio_index"] = self.current_audio_index
43
-
44
- def _parse_duration(self, duration_str: str) -> int:
45
- """Parse duration from various string formats to integer seconds"""
46
- try:
47
- if pd.isna(duration_str) or duration_str == "":
48
- return 0
49
-
50
- duration_str = str(duration_str).lower().strip()
51
- numbers = re.findall(r"(\d+\.?\d*)", duration_str)
52
- if numbers:
53
- return int(float(numbers[0]))
54
-
55
- return 0
56
- except (ValueError, TypeError) as e:
57
- logger.warning(f"Failed to parse duration '{duration_str}': {e}")
58
- return 0
59
 
60
  def get_audio_beats(self, audio_link: str) -> Optional[List[float]]:
61
- """
62
- Load audio beats timing from audio_library and convert
63
- SS:FF (25 FPS) → seconds (float)
64
-
65
- Example:
66
- "01:12" → 1 + 12/25 = 1.48
67
- """
68
- try:
69
- if self.audio_library.empty:
70
- logger.error("Audio library is empty")
71
- return None
72
-
73
- # Find matching row
74
- row = self.audio_library.loc[
75
- self.audio_library["AUDIO_LINK"] == audio_link
76
- ]
77
-
78
- if row.empty:
79
- logger.error(f"No audio entry found for: {audio_link}")
80
- return None
81
-
82
- beats_raw = row.iloc[0]["Beats Timing(SS:FF) AT 25FPS"]
83
-
84
- if pd.isna(beats_raw) or not str(beats_raw).strip():
85
- logger.warning(f"No beat data for audio: {audio_link}")
86
- return None
87
-
88
- beats: List[float] = []
89
-
90
- for token in str(beats_raw).split(","):
91
- token = token.strip()
92
-
93
- if ":" not in token:
94
- continue
95
-
96
- sec, frame = token.split(":", 1)
97
-
98
- beats.append(
99
- round(int(sec) + (int(frame) / 25.0), 2)
100
- )
101
-
102
- return beats if beats else None
103
-
104
- except Exception as e:
105
- logger.error(
106
- f"Failed to compute audio beats map for {audio_link}: {e}"
107
- )
108
- return None
109
-
110
- def _load_audio_library_from_gsheet(self, account_id: str = "test_data") -> pd.DataFrame:
111
- """
112
- Load audio library from Google Sheet.
113
-
114
- Args:
115
- account_id: Which account to use ('final_data' or 'test_data')
116
- """
117
- try:
118
- worksheet_name = os.getenv("AUDIO_LIBRARY_GSHEET_WORKSHEET")
119
- if not worksheet_name:
120
- logger.error("AUDIO_LIBRARY_GSHEET_WORKSHEET env var not set!")
121
- return pd.DataFrame()
122
-
123
- logger.info(f"Loading audio library using account: {account_id}")
124
- googleSheetReader = GoogleSheetReader(
125
- worksheet_name=worksheet_name,
126
- gcloud_wrapper=self._gcloud_wrapper,
127
- account_id=account_id,
128
- )
129
- audio_df = googleSheetReader.get_filtered_dataframe()
130
- if setup_config.get_str("setup_type") == "beats_cut":
131
- audio_df = utils.clean_and_drop_empty(audio_df, "Beats Timing(SS:FF) AT 25FPS")
132
- return utils.clean_and_drop_empty(audio_df, "AUDIO_LINK")
133
- except Exception as e:
134
- error_msg = str(e) if str(e) else type(e).__name__
135
- if "403" in error_msg or "permission" in error_msg.lower() or "forbidden" in error_msg.lower():
136
- logger.error(f"❌ PERMISSION ERROR loading audio library: {error_msg}")
137
- logger.error("Share the Google Sheet with the service account email as Editor!")
138
- elif "404" in error_msg or "not found" in error_msg.lower():
139
- logger.error(f"❌ WORKSHEET NOT FOUND: '{os.getenv('AUDIO_LIBRARY_GSHEET_WORKSHEET')}'")
140
- else:
141
- logger.error(f"Failed to load audio library from Google Sheet: {error_msg}")
142
- return pd.DataFrame()
143
-
144
- def _load_video_library_from_gsheet(self, account_id: str = "test_data") -> pd.DataFrame:
145
- """
146
- Load video library from Google Sheet.
147
-
148
- Args:
149
- account_id: Which account to use ('final_data' or 'test_data')
150
- """
151
- try:
152
- worksheet_name = os.getenv("VIDEO_LIBRARY_GSHEET_WORKSHEET")
153
- if not worksheet_name:
154
- logger.error("VIDEO_LIBRARY_GSHEET_WORKSHEET env var not set!")
155
- return pd.DataFrame()
156
-
157
- logger.info(f"Loading video library using account: {account_id}")
158
- googleSheetReader = GoogleSheetReader(
159
- worksheet_name=worksheet_name,
160
- gcloud_wrapper=self._gcloud_wrapper,
161
- account_id=account_id,
162
- )
163
- video_df = googleSheetReader.get_filtered_dataframe()
164
- return utils.clean_and_drop_empty(video_df, "VIDEO_LINK")
165
- except Exception as e:
166
- error_msg = str(e) if str(e) else type(e).__name__
167
- if "403" in error_msg or "permission" in error_msg.lower() or "forbidden" in error_msg.lower():
168
- logger.error(f"❌ PERMISSION ERROR loading video library: {error_msg}")
169
- logger.error("Share the Google Sheet with the service account email as Editor!")
170
- elif "404" in error_msg or "not found" in error_msg.lower():
171
- logger.error(f"❌ WORKSHEET NOT FOUND: '{os.getenv('VIDEO_LIBRARY_GSHEET_WORKSHEET')}'")
172
- else:
173
- logger.error(f"Failed to load video library from Google Sheet: {error_msg}")
174
- return pd.DataFrame()
175
-
176
- async def select_videos(self, tts_script, timed_transcript, max_duration: int = 12) -> Tuple[List[Dict], str]:
177
- """Select videos using AI analysis of TTS script"""
178
- try:
179
- logger.info(f"🤖 AI video selection for script: {tts_script[:300]}...")
180
-
181
- selected_videos = await self._analyze_with_gemini(
182
- tts_script=tts_script,
183
- timed_transcript=timed_transcript
184
- )
185
-
186
- if not selected_videos:
187
- logger.warning("⚠️ AI selection failed, using fallback")
188
- selected_videos = self._fallback_selection(tts_script, max_duration)
189
-
190
- for video in selected_videos:
191
- if isinstance(video.get("duration"), str):
192
- video["duration"] = self._parse_duration(video["duration"])
193
-
194
- total_duration = sum(int(v.get("duration", 0)) for v in selected_videos)
195
- logger.info(f"✓ Selected {len(selected_videos)} videos, total: {total_duration}s")
196
 
197
- return selected_videos
198
-
199
- except Exception as e:
200
- logger.error(f"❌ Video selection failed: {e}")
201
- raise
202
- # return self._fallback_selection(self.data_holder.tts_script, max_duration)
203
-
204
- def _parse_energy_score(self, energy_score_str: str) -> int:
205
- """Parse energy score from string format to integer"""
206
- try:
207
- if pd.isna(energy_score_str) or energy_score_str == "":
208
- return 0
209
-
210
- match = re.search(r"(\d+)\s*out of\s*\d+", str(energy_score_str))
211
- if match:
212
- return int(match.group(1))
213
-
214
- numbers = re.findall(r"\d+", str(energy_score_str))
215
- if numbers:
216
- return int(numbers[0])
217
-
218
- return 0
219
- except (ValueError, TypeError) as e:
220
- logger.warning(f"Failed to parse energy score '{energy_score_str}': {e}")
221
- return 0
222
-
223
- async def _analyze_with_gemini(self, tts_script, timed_transcript) -> List[Dict]:
224
- """Use Gemini API for contextual video selection"""
225
- try:
226
- video_context = await self.prepare_video_context()
227
- # with open("src/prompt/best_matches_video.md", "r", encoding="utf-8") as file:
228
- # with open("src/prompt/best_matches_video_with_timestamp.md", "r", encoding="utf-8") as file:
229
- with open("src/prompt/best_matches_two_video_tracking.md", "r", encoding="utf-8") as file:
230
- system_prompt = file.read()
231
-
232
- model_input = f"""SYSTEM INSTRUCTION::
233
- {system_prompt}
234
-
235
-
236
- USER PROMPT:
237
- TTS Script: {tts_script}
238
- Video Options: {video_context}
239
- """
240
- response = gemini_sdk.generate(model_input)
241
-
242
- response_text = response.strip()
243
-
244
- selection = json_repair.loads(response_text)
245
-
246
- selected = []
247
- for item in selection:
248
- video_index = item["video_index"]
249
- if video_index < len(self.video_library):
250
- video_row = self.video_library[self.video_library["Video URL (No Audio)"] == item["video_url"]]
251
- video = video_row.iloc[0]
252
- selected.append(
253
- {
254
- "url": video.get("Video URL (No Audio)", video.get("url", "")),
255
- "alternate_url": None,
256
- "alternate_url_local_path": None,
257
- "video_summary": video.get("Full Video Description Summary"),
258
- "tts_script_segment":item["tts_script_segment"],
259
- "duration": video.get("duration", 0),
260
- "reason": item["reason"],
261
- "alignment": video.get("Video Alignment with the TTS Script", video.get("alignment", "")),
262
- "energy": video.get("energy_score", 0),
263
- }
264
- )
265
- if "alternate_video_index" in item:
266
- video_row = self.video_library[self.video_library["Video URL (No Audio)"] == item["alternate_video_url"]]
267
- video = video_row.iloc[0]
268
- selected[-1]["alternate_url"] = video.get("Video URL (No Audio)", video.get("url", ""))
269
-
270
- logger.info(f"✓ Gemini selected {len(selected)}")
271
- return selected
272
-
273
- except json.JSONDecodeError as e:
274
- logger.error(f"Failed to parse Gemini JSON response: {e}")
275
- logger.debug(f"Raw response: {response_text[:500]}")
276
- raise
277
- except Exception as e:
278
- logger.error(f"Gemini analysis failed: {e}")
279
- import traceback
280
- traceback.print_exc()
281
- raise
282
-
283
- async def prepare_video_context(self):
284
- # STEP 3: Update durations using actual local files
285
- for video in self.data_holder.visual_assets["all_videos"]:
286
- local_path = video.get("local_path")
287
- if local_path:
288
- try:
289
- with VideoFileClip(local_path) as clip:
290
- video["duration"] = round(clip.duration, 2)
291
- except Exception as e:
292
- logger.warning(f"⚠️ Error reading duration for {local_path}: {e}")
293
- video["duration"] = 0
294
- else:
295
- video["duration"] = 0
296
-
297
- # STEP 4: Form video_context string (using actual durations)
298
- video_context = "\n".join(
299
- [
300
- f"{i+1}. {row.get('Video URL (No Audio)')} - "
301
- f"{row.get('Full Video Description Summary', row.get('description', ''))} - "
302
- f"{next((v.get('duration', 0) for v in self.data_holder.visual_assets['all_videos'] if v['url'] == row.get('Video URL (No Audio)')), 0)}s - "
303
- f"Alignment: {row.get('Video Alignment with the TTS Script', row.get('alignment', ''))} - "
304
- f"Usage Count: {self.data_holder.video_usage_count.get(row.get('Video URL (No Audio)'), 0)}"
305
- for i, row in self.video_library.iterrows()
306
- ]
307
- )
308
-
309
- return video_context
310
-
311
-
312
- def _fallback_selection(self, tts_script: str, max_duration: int) -> List[Dict]:
313
- """Fallback selection based on keyword matching"""
314
- script_lower = tts_script.lower()
315
- selected = []
316
- total_duration = 0
317
-
318
- fallback_videos = [
319
- {
320
- "url": "https://storage.googleapis.com/somira/Somira%20Massager.mp4",
321
- "duration": 2,
322
- "reason": "Product showcase",
323
- "alignment": "product",
324
- "energy": 5,
325
- },
326
- {
327
- "url": "https://storage.googleapis.com/somira/FemaleWomenPuttingOnNeckMassagerr.mp4",
328
- "duration": 2,
329
- "reason": "Usage demonstration",
330
- "alignment": "usage",
331
- "energy": 35,
332
- },
333
- {
334
- "url": "https://storage.googleapis.com/somira/PersonEnjoyingTheNeckMassager.mp4",
335
- "duration": 1.5,
336
- "reason": "User satisfaction",
337
- "alignment": "satisfaction",
338
- "energy": 40,
339
- },
340
- ]
341
-
342
- for video in fallback_videos:
343
- if total_duration + video["duration"] <= max_duration:
344
- selected.append(video)
345
- total_duration += video["duration"]
346
-
347
- return selected[:3]
348
 
349
  def select_background_music(self) -> str:
350
- """
351
- Select background music SEQUENTIALLY (not random)
352
- Each call increments the index to ensure different music for each video
353
- """
354
- if self.audio_library.empty:
355
- logger.error("❌ Audio library is empty")
356
- return ""
357
-
358
- # Select current index
359
- selected = self.audio_library.iloc[self.current_audio_index]["AUDIO_LINK"]
360
-
361
- logger.info(
362
- f"🎵 Selected background music #{self.current_audio_index + 1}/{len(self.audio_library)}: {selected}"
363
- )
364
-
365
- # Increment index for next call (loop back to start if needed)
366
- self.current_audio_index = (self.current_audio_index + 1) % len(self.audio_library)
367
-
368
  return selected
369
 
370
  def reset_audio_index(self):
371
- """Reset audio index to start from beginning (useful for batch processing)"""
372
- self.current_audio_index = 0
373
- logger.info("🔄 Reset background music index to 0")
374
 
375
  def select_random_videos(self, count: int) -> List[str]:
376
- import random
377
-
378
- all_videos = self.data_holder.visual_assets.get("all_videos", [])
379
- available_videos = [v for v in all_videos if v.get("local_path")]
380
-
381
- if len(available_videos) < count:
382
- logger.warning(f"⚠️ Not enough videos to select {count} random videos. Selecting {len(available_videos)} instead.")
383
- count = len(available_videos)
384
-
385
- selected_videos = random.sample(available_videos, count)
386
-
387
- return [v["local_path"] for v in selected_videos]
 
1
+ """
2
+ AssetSelector - Thin wrapper for backward compatibility
3
+ Use asset_manager classes directly for new code.
4
+ """
5
+
6
  import pandas as pd
 
 
7
  from typing import List, Dict, Optional, Tuple
8
  from utils import logger
 
 
 
 
9
  from data_holder import DataHolder
10
 
11
+ from asset_manager import get_video_lib, get_audio_lib, AssetProcessor
 
 
 
12
 
13
 
14
  class AssetSelector:
15
+ """
16
+ Wrapper class for backward compatibility.
17
+ New code should use asset_manager classes directly:
18
+ - get_video_lib() for video library
19
+ - get_audio_lib() for audio library
20
+ - AssetProcessor(data_holder) for video selection
21
+ """
22
+
23
+ def __init__(self, config: Dict, data_holder: DataHolder = None, gcloud_wrapper=None):
24
  self.config = config
25
  self.data_holder = data_holder
26
 
27
+ # Use singletons from asset_manager
28
+ self._video_lib = get_video_lib()
29
+ initial_audio_index = config.get("current_audio_index", 0)
30
+ self._audio_lib = get_audio_lib(initial_audio_index)
31
+ self._audio_lib.current_audio_index = initial_audio_index
 
 
 
 
 
 
32
 
33
+ # Processor for video selection (only create when data_holder available)
34
+ self._processor = AssetProcessor(data_holder) if data_holder else None
35
+
36
+ @property
37
+ def video_library(self) -> pd.DataFrame:
38
+ return self._video_lib.video_library
39
+
40
+ @property
41
+ def audio_library(self) -> pd.DataFrame:
42
+ return self._audio_lib.audio_library
43
+
44
+ @property
45
+ def current_audio_index(self) -> int:
46
+ return self._audio_lib.current_audio_index
47
+
48
+ @current_audio_index.setter
49
+ def current_audio_index(self, value: int):
50
+ self._audio_lib.current_audio_index = value
51
+ self.config["current_audio_index"] = value
52
 
53
  def inc_audio_index(self):
54
+ self._audio_lib.inc_audio_index()
55
+ self.config["current_audio_index"] = self._audio_lib.current_audio_index
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
 
57
  def get_audio_beats(self, audio_link: str) -> Optional[List[float]]:
58
+ return self._audio_lib.get_audio_beats(audio_link)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
 
60
+ async def select_videos(self, tts_script, timed_transcript, max_duration: int = 12) -> List[Dict]:
61
+ """Delegate to AssetProcessor"""
62
+ if not self._processor:
63
+ self._processor = AssetProcessor(self.data_holder)
64
+ return await self._processor.select_videos(tts_script, timed_transcript, max_duration)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
 
66
  def select_background_music(self) -> str:
67
+ selected = self._audio_lib.select_background_music()
68
+ self.config["current_audio_index"] = self._audio_lib.current_audio_index
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  return selected
70
 
71
  def reset_audio_index(self):
72
+ self._audio_lib.reset_audio_index()
73
+ self.config["current_audio_index"] = 0
 
74
 
75
  def select_random_videos(self, count: int) -> List[str]:
76
+ """Delegate to AssetProcessor"""
77
+ if not self._processor:
78
+ self._processor = AssetProcessor(self.data_holder)
79
+ return self._processor.select_random_videos(count)
 
 
 
 
 
 
 
 
src/automation.py CHANGED
@@ -27,6 +27,8 @@ import numpy as np
27
  from file_downloader import FileDownloader
28
  from data_holder import DataHolder
29
  import setup_config
 
 
30
 
31
  class ContentAutomation:
32
  def __init__(self, config: Dict[str, Any], data_holder: DataHolder = None, asset_selector: 'AssetSelector' = None, api_clients: 'APIClients' = None):
@@ -37,6 +39,7 @@ class ContentAutomation:
37
  self.video_renderer = VideoRenderer(config, self.data_holder)
38
  # Reuse provided asset_selector or create new one
39
  self.asset_selector = asset_selector or AssetSelector(config, self.data_holder)
 
40
  self.file_downloader = FileDownloader()
41
  self.pipeline_start_time = None
42
 
@@ -78,11 +81,17 @@ class ContentAutomation:
78
  "speaking_rate": 1.2
79
  }
80
  self.data_holder.visual_assets["a2e_video_url"] = video_url
81
- await self._download_to_local(audio_url, f'a2e_tts.mp3', self.data_holder.visual_assets["tts_audio"])
 
 
 
82
  with AudioFileClip(self.data_holder.visual_assets["tts_audio"]["local_path"]) as audio:
83
  self.data_holder.visual_assets["tts_audio"]["duration"] = audio.duration
84
 
85
- await self._download_to_local(video_url, f'a2e_video.mp4', self.data_holder.visual_assets, "a2e_video_local_path")
 
 
 
86
  # await self.api_clients.upload_to_temp_gcs(self.data_holder.visual_assets["tts_audio"]["local_path"], "audio")
87
  # await self.api_clients.upload_to_temp_gcs(self.data_holder.visual_assets["a2e_video_local_path"], "video")
88
  else:
@@ -94,7 +103,8 @@ class ContentAutomation:
94
  await self.create_audio()
95
 
96
  logger.info("\n STEP 4: Download all the video assets.")
97
- await self._download_all_video()
 
98
 
99
  # STEP 3: Generate visual assets
100
  logger.info("\n📦 STEP 3: Generate Visual Assets")
@@ -106,7 +116,7 @@ class ContentAutomation:
106
 
107
  # STEP 2: Download ALL visual assets with proper error handling
108
  logger.info("\n⬇️ STEP 4: Download Visual Assets")
109
- await self._download_all_visual_assets()
110
 
111
  # STEP 3: Render video WITHOUT audio (natural speed)
112
  logger.info("\n🎬 STEP 5: Render Video (Natural Speed, No Audio)")
@@ -135,9 +145,9 @@ class ContentAutomation:
135
 
136
  logger.info("\n🎵 STEP 7: Background Music")
137
  self.data_holder.visual_assets["background_music_url"] = self.asset_selector.select_background_music()
138
- await self._download_to_local(
139
- self.data_holder.visual_assets["background_music_url"], "background_music.mp3", self.data_holder.visual_assets, "background_music_local"
140
- )
141
 
142
  # STEP 7: Add audio to video
143
  logger.info("\n🔊 STEP 8: Add Audio to Video")
@@ -198,7 +208,8 @@ class ContentAutomation:
198
 
199
  async def execute_random_pipeline(self, content_strategy: Dict[str, str], tts_script: str) -> Dict[str, Any]:
200
  try:
201
- await self._download_all_video()
 
202
 
203
  music_duration = None
204
 
@@ -350,9 +361,9 @@ class ContentAutomation:
350
  self.asset_selector.inc_audio_index()
351
 
352
  self.data_holder.visual_assets["background_music_url"] = self.asset_selector.select_background_music()
353
- await self._download_to_local(
354
- self.data_holder.visual_assets["background_music_url"], "background_music.mp3", self.data_holder.visual_assets, "background_music_local"
355
- )
356
 
357
  async def create_audio(self):
358
  try_again = False
@@ -393,42 +404,6 @@ class ContentAutomation:
393
 
394
  return tts_audio, timed_words
395
 
396
- async def _download_all_video(self):
397
- all_videos = self.data_holder.visual_assets.get("all_videos", [])
398
-
399
- # ✅ Skip downloading if all have local_path
400
- if all_videos and all(v.get("local_path") for v in all_videos):
401
- logger.info("✅ All videos already have local_path — skipping download.")
402
- return
403
-
404
- download_path = "testData/video_for_workflow"
405
- Path(download_path).mkdir(parents=True, exist_ok=True)
406
-
407
- videos = []
408
- for _, row in self.asset_selector.video_library.iterrows():
409
- url = str(row.get("VIDEO_LINK", "")).strip()
410
- if not url:
411
- continue
412
-
413
- local_path = self.file_downloader.safe_download(url=url)
414
- if not local_path or not utils.is_valid_video(local_path):
415
- continue
416
-
417
- # Resize and remove padding (handle potential errors)
418
- try:
419
- utils.resize_video(local_path, overwrite=True)
420
- utils.remove_black_padding(local_path, overwrite=True)
421
- except Exception as e:
422
- logger.warning(f"⚠️ Could not process {local_path}: {e}")
423
- # Continue anyway - video is still usable
424
-
425
- videos.append({
426
- "url": url,
427
- "local_path": str(local_path),
428
- })
429
-
430
- self.data_holder.visual_assets["all_videos"] = videos
431
-
432
  async def _generate_visual_assets_parallel(self, content_strategy: Dict) -> Dict:
433
  """Generate visual assets in parallel (hook video + library videos)"""
434
  tasks = {
@@ -493,96 +468,6 @@ class ContentAutomation:
493
  traceback.print_exc()
494
  raise
495
 
496
- async def _download_all_visual_assets(self):
497
- """Download ALL visual assets with proper error handling"""
498
- download_tasks = []
499
-
500
- # Download hook video with explicit local_path assignment
501
- assets = self.data_holder.visual_assets
502
- if assets.get("hook_video") and assets["hook_video"].get("video_url"):
503
- hook_url = assets["hook_video"]["video_url"]
504
- download_tasks.append(
505
- self._download_with_fallback(hook_url, "hook_video.mp4", assets["hook_video"], "local_path", resize=True)
506
- )
507
- # VEO library videos
508
- if assets["hook_video"].get("veo_video_data") and assets["hook_video"].get("veo_video_data").get("video_url"):
509
- veo_hook_url = assets["hook_video"]["veo_video_data"]["video_url"]
510
- download_tasks.append(
511
- self._download_with_fallback(veo_hook_url, "veo_hook_url.mp4", assets["hook_video"]["veo_video_data"], "local_path", resize=True, remove_black_padding=True)
512
- )
513
-
514
- # Download library videos
515
- for i, video in enumerate(assets.get("selected_videos", [])):
516
- if video.get("url"):
517
- download_tasks.append(
518
- self._download_with_fallback(video["url"], f"library_video_{i}.mp4", video, "local_path", resize=True)
519
- )
520
- if video.get("alternate_url"):
521
- download_tasks.append(
522
- self._download_with_fallback(video["alternate_url"], f"library_all_video_alternate_url_{i}.mp4", video, "alternate_url_local_path", resize=True)
523
- )
524
-
525
- # Download library videos
526
- for i, video in enumerate(assets.get("all_videos", [])):
527
- if video.get("url") and not video.get("local_path", None):
528
- download_tasks.append(
529
- self._download_with_fallback(video["url"], f"library_all_video_{i}.mp4", video, "local_path")
530
- )
531
-
532
- # Wait for all downloads to complete
533
- if download_tasks:
534
- results = await asyncio.gather(*download_tasks, return_exceptions=True)
535
-
536
- # Check for failures
537
- for i, result in enumerate(results):
538
- if isinstance(result, Exception):
539
- logger.error(f"❌ Download task {i} failed: {result}")
540
-
541
- # Verify all required assets have local_path
542
- self._verify_assets_downloaded(assets)
543
-
544
- async def _download_with_fallback(self, url: str, filename: str, target_dict: Dict, key: str = "local_path", resize: bool = False, remove_black_padding: bool = False):
545
- """Download file with fallback to ensure local_path is always set"""
546
- try:
547
- local_path = await self.api_clients.download_file(url, filename)
548
- if remove_black_padding:
549
- utils.remove_black_padding(local_path, overwrite=True)
550
- if resize:
551
- utils.resize_video(local_path, overwrite=True)
552
- target_dict[key] = local_path
553
- logger.info(f"✓ Downloaded {filename}")
554
- return local_path
555
- except Exception as e:
556
- logger.error(f"❌ Failed to download {filename}: {e}")
557
- raise
558
-
559
- def _verify_assets_downloaded(self, assets: Dict):
560
- """Verify that all required assets have local_path"""
561
- missing_assets = []
562
-
563
- # Check hook video
564
- if assets.get("hook_video") and not assets["hook_video"].get("local_path"):
565
- missing_assets.append("hook_video")
566
-
567
- # Check library videos
568
- for i, video in enumerate(assets.get("selected_videos", [])):
569
- if not video.get("local_path"):
570
- missing_assets.append(f"library_video_{i}")
571
-
572
- if missing_assets:
573
- logger.warning(f"⚠️ Missing local_path for: {', '.join(missing_assets)}")
574
- # Don't raise exception here, let the pipeline continue with fallbacks
575
-
576
- async def _download_to_local(self, url: str, filename: str, target_dict: Dict, key: str = "local_path"):
577
- """Download file from URL and store local path in target dictionary"""
578
- try:
579
- local_path = await self.api_clients.download_file(url, filename)
580
- target_dict[key] = local_path
581
- logger.info(f"✓ Downloaded {filename}")
582
- except Exception as e:
583
- logger.error(f"❌ Failed to download {filename}: {e}")
584
- raise
585
-
586
  async def health_check(self) -> Dict[str, bool]:
587
  """Comprehensive health check of all components"""
588
  logger.info("🏥 Running comprehensive health check...")
 
27
  from file_downloader import FileDownloader
28
  from data_holder import DataHolder
29
  import setup_config
30
+ from asset_manager import get_asset_downloader
31
+ from file_downloader import FileDownloader
32
 
33
  class ContentAutomation:
34
  def __init__(self, config: Dict[str, Any], data_holder: DataHolder = None, asset_selector: 'AssetSelector' = None, api_clients: 'APIClients' = None):
 
39
  self.video_renderer = VideoRenderer(config, self.data_holder)
40
  # Reuse provided asset_selector or create new one
41
  self.asset_selector = asset_selector or AssetSelector(config, self.data_holder)
42
+ self.asset_downloader = get_asset_downloader()
43
  self.file_downloader = FileDownloader()
44
  self.pipeline_start_time = None
45
 
 
81
  "speaking_rate": 1.2
82
  }
83
  self.data_holder.visual_assets["a2e_video_url"] = video_url
84
+ # Download audio using file_downloader directly
85
+ local_path = self.file_downloader.safe_download(audio_url)
86
+ if local_path:
87
+ self.data_holder.visual_assets["tts_audio"]["local_path"] = str(local_path)
88
  with AudioFileClip(self.data_holder.visual_assets["tts_audio"]["local_path"]) as audio:
89
  self.data_holder.visual_assets["tts_audio"]["duration"] = audio.duration
90
 
91
+ # Download video using file_downloader directly
92
+ local_path = self.file_downloader.safe_download(video_url)
93
+ if local_path:
94
+ self.data_holder.visual_assets["a2e_video_local_path"] = str(local_path)
95
  # await self.api_clients.upload_to_temp_gcs(self.data_holder.visual_assets["tts_audio"]["local_path"], "audio")
96
  # await self.api_clients.upload_to_temp_gcs(self.data_holder.visual_assets["a2e_video_local_path"], "video")
97
  else:
 
103
  await self.create_audio()
104
 
105
  logger.info("\n STEP 4: Download all the video assets.")
106
+ videos = await self.asset_downloader.download_all_videos()
107
+ self.data_holder.visual_assets["all_videos"] = videos
108
 
109
  # STEP 3: Generate visual assets
110
  logger.info("\n📦 STEP 3: Generate Visual Assets")
 
116
 
117
  # STEP 2: Download ALL visual assets with proper error handling
118
  logger.info("\n⬇️ STEP 4: Download Visual Assets")
119
+ await self.asset_downloader.download_all_visual_assets(self.data_holder)
120
 
121
  # STEP 3: Render video WITHOUT audio (natural speed)
122
  logger.info("\n🎬 STEP 5: Render Video (Natural Speed, No Audio)")
 
145
 
146
  logger.info("\n🎵 STEP 7: Background Music")
147
  self.data_holder.visual_assets["background_music_url"] = self.asset_selector.select_background_music()
148
+ local_path = self.file_downloader.safe_download(self.data_holder.visual_assets["background_music_url"])
149
+ if local_path:
150
+ self.data_holder.visual_assets["background_music_local"] = str(local_path)
151
 
152
  # STEP 7: Add audio to video
153
  logger.info("\n🔊 STEP 8: Add Audio to Video")
 
208
 
209
  async def execute_random_pipeline(self, content_strategy: Dict[str, str], tts_script: str) -> Dict[str, Any]:
210
  try:
211
+ videos = await self.asset_downloader.download_all_videos()
212
+ self.data_holder.visual_assets["all_videos"] = videos
213
 
214
  music_duration = None
215
 
 
361
  self.asset_selector.inc_audio_index()
362
 
363
  self.data_holder.visual_assets["background_music_url"] = self.asset_selector.select_background_music()
364
+ local_path = self.file_downloader.safe_download(self.data_holder.visual_assets["background_music_url"])
365
+ if local_path:
366
+ self.data_holder.visual_assets["background_music_local"] = str(local_path)
367
 
368
  async def create_audio(self):
369
  try_again = False
 
404
 
405
  return tts_audio, timed_words
406
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
407
  async def _generate_visual_assets_parallel(self, content_strategy: Dict) -> Dict:
408
  """Generate visual assets in parallel (hook video + library videos)"""
409
  tasks = {
 
468
  traceback.print_exc()
469
  raise
470
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
471
  async def health_check(self) -> Dict[str, bool]:
472
  """Comprehensive health check of all components"""
473
  logger.info("🏥 Running comprehensive health check...")
src/file_downloader.py CHANGED
@@ -159,12 +159,14 @@ class FileDownloader:
159
 
160
  def _detect_url_type(self, url: str) -> str:
161
  """
162
- Detect URL type: 'drive', 'gcs', or 'unknown'
163
  """
164
  if "drive.google.com" in url:
165
  return "drive"
166
  elif url.startswith("gs://") or "storage.googleapis.com" in url or "storage.cloud.google.com" in url:
167
  return "gcs"
 
 
168
  else:
169
  return "unknown"
170
 
@@ -338,6 +340,55 @@ class FileDownloader:
338
  logger.info("Downloaded from GCS to %s", output_path)
339
  return output_path
340
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
341
  def safe_download(self, url: str, output_path: Path | None = None, account_id: str | None = None) -> Path | None:
342
  """
343
  Safe download wrapper to handle exceptions.
@@ -400,8 +451,11 @@ class FileDownloader:
400
  return self.download_from_gcs(bucket_name, blob_name, output_path=output_path, public=False, account_id=account_id)
401
  raise
402
 
 
 
 
403
  else:
404
- raise ValueError(f"Unknown URL type. Expected Google Drive or GCS URL, got: {url}")
405
 
406
  # ------------------ Batch download ------------------
407
 
 
159
 
160
  def _detect_url_type(self, url: str) -> str:
161
  """
162
+ Detect URL type: 'drive', 'gcs', 'public', or 'unknown'
163
  """
164
  if "drive.google.com" in url:
165
  return "drive"
166
  elif url.startswith("gs://") or "storage.googleapis.com" in url or "storage.cloud.google.com" in url:
167
  return "gcs"
168
+ elif url.startswith("http://") or url.startswith("https://"):
169
+ return "public"
170
  else:
171
  return "unknown"
172
 
 
340
  logger.info("Downloaded from GCS to %s", output_path)
341
  return output_path
342
 
343
+ def download_from_url(
344
+ self,
345
+ url: str,
346
+ output_path: Path | None = None,
347
+ filename: str | None = None,
348
+ ) -> Path:
349
+ """
350
+ Download a file from a regular HTTP/HTTPS URL.
351
+
352
+ Args:
353
+ url: Public HTTP/HTTPS URL
354
+ output_path: Full path where to save the file (optional)
355
+ filename: Filename to use if output_path not specified (optional)
356
+
357
+ Returns:
358
+ Path to the downloaded file
359
+ """
360
+ logger.info("Downloading from public URL: %s", url)
361
+
362
+ # Determine filename from URL if not provided
363
+ if filename is None and output_path is None:
364
+ from urllib.parse import urlparse, unquote
365
+ parsed = urlparse(url)
366
+ filename = Path(unquote(parsed.path)).name
367
+ if not filename:
368
+ filename = "downloaded_file"
369
+
370
+ # Determine output path
371
+ if output_path is None:
372
+ output_path = self.temp_dir / filename
373
+
374
+ # Check if file already exists
375
+ if self.skip_existing and output_path.exists():
376
+ logger.info("File already exists, skipping download: %s", output_path)
377
+ return output_path
378
+
379
+ output_path.parent.mkdir(parents=True, exist_ok=True)
380
+
381
+ # Download via HTTP
382
+ response = requests.get(url, stream=True)
383
+ response.raise_for_status()
384
+
385
+ with output_path.open("wb") as f:
386
+ for chunk in response.iter_content(chunk_size=8192):
387
+ f.write(chunk)
388
+
389
+ logger.info("Downloaded from URL to %s", output_path)
390
+ return output_path
391
+
392
  def safe_download(self, url: str, output_path: Path | None = None, account_id: str | None = None) -> Path | None:
393
  """
394
  Safe download wrapper to handle exceptions.
 
451
  return self.download_from_gcs(bucket_name, blob_name, output_path=output_path, public=False, account_id=account_id)
452
  raise
453
 
454
+ elif url_type == "public":
455
+ return self.download_from_url(url, output_path=output_path)
456
+
457
  else:
458
+ raise ValueError(f"Unknown URL type. Expected Google Drive, GCS, or public HTTP URL, got: {url}")
459
 
460
  # ------------------ Batch download ------------------
461
 
src/process_csv.py CHANGED
@@ -12,17 +12,15 @@ from automation import ContentAutomation
12
  from api_clients import APIClients
13
  from utils import logger
14
  from data_holder import DataHolder
15
- from asset_selector import AssetSelector
16
  from google_sheet_reader import GoogleSheetReader
17
  import argparse
18
  import uuid
19
  from cleanup_manager import process_delete_entries
20
  from google_src.gcs_utils import list_gcs_files
21
  import setup_config
 
22
 
23
  DATA_DIR = Path("data")
24
- ALL_VIDEO_FILE_INFO = None
25
- SHARED_ASSET_SELECTOR = None # Shared instance to avoid redundant sheet loads
26
  SHARED_API_CLIENTS = None # Shared instance to avoid redundant GCS/TTS client initialization
27
 
28
 
@@ -99,7 +97,7 @@ def log_progress_to_gsheet(tts_script: str, result: dict, job_index: int, commit
99
 
100
  async def process_row(row, config: dict):
101
  """Process one CSV row using the main pipeline."""
102
- global ALL_VIDEO_FILE_INFO, SHARED_ASSET_SELECTOR, SHARED_API_CLIENTS
103
  tts_script = row.get("TTS Script (AI Avatar)", "")
104
  if os.getenv("ON_SCREEN_TEXT", "false").lower() == "true":
105
  tts_script = row.get("On-Screen Text", "").strip()
@@ -107,18 +105,17 @@ async def process_row(row, config: dict):
107
  logger.info(f"▶️ Executing: {tts_script}...")
108
 
109
  dataHolder = DataHolder()
110
- dataHolder.visual_assets["all_videos"] = ALL_VIDEO_FILE_INFO
 
 
111
 
112
- # Update shared instances with current dataHolder before use
113
- if SHARED_ASSET_SELECTOR:
114
- SHARED_ASSET_SELECTOR.data_holder = dataHolder
115
  if SHARED_API_CLIENTS:
116
  SHARED_API_CLIENTS.data_holder = dataHolder
117
 
118
- # Reuse shared AssetSelector and APIClients to avoid redundant initialization
119
  automation = ContentAutomation(
120
  config, dataHolder,
121
- asset_selector=SHARED_ASSET_SELECTOR,
122
  api_clients=SHARED_API_CLIENTS
123
  )
124
 
@@ -141,41 +138,24 @@ async def process_row(row, config: dict):
141
 
142
 
143
  async def download_all_video(config: dict):
144
- """Download all library videos once and cache them. Creates shared instances."""
145
- global ALL_VIDEO_FILE_INFO, SHARED_ASSET_SELECTOR, SHARED_API_CLIENTS
146
-
147
- if ALL_VIDEO_FILE_INFO is None:
148
- logger.info("📥 Pre-downloading all library videos...")
149
-
150
- # Create the shared AssetSelector once - this loads video/audio libraries from sheets
151
- if SHARED_ASSET_SELECTOR is None:
152
- SHARED_ASSET_SELECTOR = AssetSelector(config)
153
-
154
- video_urls = [
155
- row.get("Video URL (No Audio)", "").strip()
156
- for _, row in SHARED_ASSET_SELECTOR.video_library.iterrows()
157
- if row.get("Video URL (No Audio)", "").strip()
158
- ]
159
 
 
 
 
 
 
 
 
 
 
160
  dataHolder = DataHolder()
161
- dataHolder.visual_assets["all_videos"] = [{"url": url} for url in video_urls]
162
-
163
- # Create the shared APIClients once - this initializes GCS/TTS clients
164
- if SHARED_API_CLIENTS is None:
165
- SHARED_API_CLIENTS = APIClients(config, dataHolder)
166
-
167
- # Pass the shared instances to avoid creating new ones
168
- automation = ContentAutomation(
169
- config, dataHolder,
170
- asset_selector=SHARED_ASSET_SELECTOR,
171
- api_clients=SHARED_API_CLIENTS
172
- )
173
- await automation._download_all_visual_assets()
174
-
175
- ALL_VIDEO_FILE_INFO = dataHolder.visual_assets.get("all_videos", [])
176
- logger.info(f"✓ Downloaded {len(ALL_VIDEO_FILE_INFO)} library videos")
177
-
178
- return ALL_VIDEO_FILE_INFO
179
 
180
  async def process_all_csvs(config, commit=False, job_index=None, total_jobs=None):
181
  """Process all CSVs in data directory."""
 
12
  from api_clients import APIClients
13
  from utils import logger
14
  from data_holder import DataHolder
 
15
  from google_sheet_reader import GoogleSheetReader
16
  import argparse
17
  import uuid
18
  from cleanup_manager import process_delete_entries
19
  from google_src.gcs_utils import list_gcs_files
20
  import setup_config
21
+ from asset_manager import get_video_lib, get_audio_lib, get_asset_downloader
22
 
23
  DATA_DIR = Path("data")
 
 
24
  SHARED_API_CLIENTS = None # Shared instance to avoid redundant GCS/TTS client initialization
25
 
26
 
 
97
 
98
  async def process_row(row, config: dict):
99
  """Process one CSV row using the main pipeline."""
100
+ global SHARED_API_CLIENTS
101
  tts_script = row.get("TTS Script (AI Avatar)", "")
102
  if os.getenv("ON_SCREEN_TEXT", "false").lower() == "true":
103
  tts_script = row.get("On-Screen Text", "").strip()
 
105
  logger.info(f"▶️ Executing: {tts_script}...")
106
 
107
  dataHolder = DataHolder()
108
+ # Get downloaded videos from singleton
109
+ asset_downloader = get_asset_downloader()
110
+ dataHolder.visual_assets["all_videos"] = asset_downloader.downloaded_videos
111
 
112
+ # Update shared APIClients with current dataHolder
 
 
113
  if SHARED_API_CLIENTS:
114
  SHARED_API_CLIENTS.data_holder = dataHolder
115
 
116
+ # AssetSelector uses singletons internally, no need to share
117
  automation = ContentAutomation(
118
  config, dataHolder,
 
119
  api_clients=SHARED_API_CLIENTS
120
  )
121
 
 
138
 
139
 
140
  async def download_all_video(config: dict):
141
+ """Download all library videos once using singletons."""
142
+ global SHARED_API_CLIENTS
 
 
 
 
 
 
 
 
 
 
 
 
 
143
 
144
+ # Get the asset downloader singleton
145
+ asset_downloader = get_asset_downloader()
146
+
147
+ # Download all videos using the singleton
148
+ logger.info("📥 Pre-downloading all library videos...")
149
+ videos = await asset_downloader.download_all_videos()
150
+
151
+ # Create the shared APIClients once
152
+ if SHARED_API_CLIENTS is None:
153
  dataHolder = DataHolder()
154
+ dataHolder.visual_assets["all_videos"] = videos
155
+ SHARED_API_CLIENTS = APIClients(config, dataHolder)
156
+
157
+ logger.info(f"✓ Downloaded {len(videos)} library videos")
158
+ return videos
 
 
 
 
 
 
 
 
 
 
 
 
 
159
 
160
  async def process_all_csvs(config, commit=False, job_index=None, total_jobs=None):
161
  """Process all CSVs in data directory."""