CB commited on
Commit
ce1817c
·
verified ·
1 Parent(s): 396c8af

Update streamlit_app.py

Browse files
Files changed (1) hide show
  1. streamlit_app.py +156 -180
streamlit_app.py CHANGED
@@ -1,215 +1,191 @@
1
  import os
2
- import uuid
3
- import yt_dlp
4
  import string
5
- import ffmpeg
6
- from glob import glob
7
- import streamlit as st
8
- from phi.agent import Agent
9
- from phi.model.google import Gemini
10
- from phi.tools.duckduckgo import DuckDuckGo
11
- from google.generativeai import upload_file, get_file
12
  import time
13
- import google.generativeai as genai
14
  from pathlib import Path
15
- import tempfile
16
 
 
 
 
17
  from dotenv import load_dotenv
 
18
  load_dotenv()
19
 
20
- # Page Configuration
21
- st.set_page_config(
22
- page_title="Generate the story of videos:",
23
- layout="wide"
24
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
 
26
- # Config
27
- if "videos" not in st.session_state:
28
- st.session_state["videos"] = ""
29
 
30
- if "downloaded" not in st.session_state:
31
- st.session_state["downloaded"] = []
 
32
 
 
 
 
 
 
 
 
33
 
34
- # Callbacks
35
- def _remove_video():
36
- if st.session_state["videos"]:
37
- try:
38
- os.remove(st.session_state["videos"])
39
- st.session_state["videos"] = ""
40
- except FileNotFoundError:
41
- print("Couldn't delete file")
42
-
43
-
44
- def _remove_all_videos():
45
- videos = (
46
- glob(os.path.join(".", "data", "*.mp4"))
47
- + glob(os.path.join(".", "data", "*.webm"))
48
- + glob(os.path.join(".", "data", "*.mov"))
49
- + glob(os.path.join(".", "data", "*.m4a"))
50
- + glob(os.path.join(".", "data", "*.mkv"))
51
- )
52
- for video in videos:
 
 
 
53
  try:
54
- os.remove(video)
55
  except FileNotFoundError:
56
- print("Couldn't delete file:", video)
57
-
58
- #Sidebar Content
59
- st.sidebar.text_input(
60
- "**Video URL**",
61
- key="url",
62
- placeholder="Enter Video URL",
63
- on_change=_remove_video,
64
- )
65
-
66
- expander = st.sidebar.expander("Settings", expanded=False)
67
- # ✅ Configure Gemini API
68
- API_KEY = expander.text_input("Google API Key", "AIzaSyB2uDj6IeuNszSuk80feW-eBgpIDH0WFSY")
69
- # ✅ Check if API key is available
70
- if API_KEY:
71
- genai.configure(api_key=API_KEY)
72
-
73
- else:
74
- expander.error("No GOOGLE_API_KEY", icon="⚠️")
75
- # ✅ Load API key securely
76
- API_KEY = os.getenv("GOOGLE_API_KEY")
77
-
78
- #Safety Settings
79
- safety_settings = [ { "category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF" }, { "category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF" }, { "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF" }, { "category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF" }, ]
80
 
81
- #Gemini Model
82
- model_id = expander.text_input("Gemini Model", "gemini-2.0-flash-lite")
 
83
 
84
- #Analysis prompt
85
- analysis_prompt = expander.text_area("Enter analysis", "watch entire video and describe")
 
86
 
87
- #Password
88
- expander.text_input("Video Password", key="video-password", placeholder="Enter Video Password")
 
 
89
 
90
- @st.cache_resource
91
- def initialize_agent():
92
- return Agent(
93
- name="Video AI summarizer",
94
- model=Gemini(id=model_id),
95
- tools=[DuckDuckGo()],
96
- markdown=True,
97
- )
98
-
99
- # Initialize the agent
100
- multimodal_Agent = initialize_agent()
101
-
102
- def download_video(url: str, output_path: str):
103
- """Downloads a video using yt-dlp."""
104
- ydl_opts = {
105
- 'outtmpl': output_path, # Path where the video will be saved
106
- 'format': 'best', # Download the best quality available
107
- }
108
-
109
  try:
110
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
111
  ydl.download([url])
112
- return True, f"Video downloaded successfully to {output_path}"
113
  except Exception as e:
114
- return False, str(e)
115
-
116
- # --- Main Functions ---
117
- def download_video(url: str, save_path: str, **kwargs):
118
- video_id = url.split("/")[-1]
119
-
120
- if len(video_id) < 1:
121
- video_id = url.split("/")[-2]
122
-
123
- ydl_opts = {
124
- "outtmpl": f"{save_path}/{video_id}.%(ext)s",
125
- }
126
- for opt in kwargs.keys():
127
- ydl_opts[opt] = kwargs[opt]
128
- print(ydl_opts)
129
 
130
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
131
- ydl.download(url)
 
 
132
 
133
- videos = glob(os.path.join(".", "data", f"{video_id}.*"))
 
134
 
135
- print("videos found:", videos)
136
- for video in videos:
137
- if video_id in video:
138
- video_path = video
139
- converted_video_path = convert_video_to_mp4(video_path)
140
- return converted_video_path
141
 
142
-
143
- def convert_video_to_mp4(video_path):
144
- target_path = f"{os.path.splitext(video_path)[0]}.mp4"
145
- if not os.path.exists(target_path): # Convert only if target doesn't exist
146
- (ffmpeg.input(video_path).output(target_path).run(overwrite_output=True))
147
- try:
148
- os.remove(video_path)
149
- except FileNotFoundError:
150
- print("Couldn't delete file:", video_path)
151
- return target_path
152
-
153
- if st.session_state["url"]:
154
- download = st.sidebar.button("**Load Video** 🚀", use_container_width=True)
155
- if download:
156
- download_options = {}
157
- if st.session_state["video-password"]:
158
- download_options["videopassword"] = st.session_state["video-password"]
159
- video_path = download_video(
160
- st.session_state["url"], save_path=os.path.join(".", "data"), **download_options
161
- )
162
  st.session_state["videos"] = video_path
 
 
163
 
 
164
  if st.session_state["videos"]:
165
-
166
  try:
167
  st.sidebar.video(st.session_state["videos"], loop=st.session_state.get("loop_video", True))
168
- except Exception as e:
169
- st.write("Couldn't show video")
170
-
171
  with st.sidebar.expander("Options", expanded=False):
172
- #Loop checkbox
173
- loop_checkbox = st.checkbox("Enable Loop", value=st.session_state.get("loop_video", True), key="loop_checkbox")
174
- st.session_state["loop_video"] = loop_checkbox # Update session state based on checkbox
 
 
 
 
 
 
 
175
 
176
- #Clear button
177
- st.button("Clear Video 🔥", on_click=_remove_all_videos, type="secondary", help="Clear all downloaded videos", use_container_width=True,)
178
-
179
  try:
180
- video_file = open(st.session_state["videos"], "rb")
181
- st.download_button(
182
- "Download Video 📁",
183
- data=video_file,
184
- file_name=f"{st.session_state['videos'].lower().translate(str.maketrans('', '', string.punctuation)).replace(' ', '_')}.mp4",
185
- mime="video/mp4",
186
- type="secondary",
187
- use_container_width=True,
188
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
189
  except Exception as e:
190
- st.error("Failed downloading the video", icon="😔")
191
-
192
- st.write("**Title**:", st.session_state["videos"].split("/")[-1])
193
- # --- Streamlit App ---
194
-
195
- # Main content
196
- if st.button('**Generate the story**', type="primary"):
197
-
198
- try:
199
- with st.spinner("Generating the story of the video"):
200
- # Upload and process the video
201
- processed_video = upload_file(st.session_state["videos"])
202
-
203
- while processed_video.state.name == "PROCESSING":
204
- time.sleep(2)
205
- processed_video = get_file(processed_video.name)
206
-
207
- # AI agent processing
208
- response = multimodal_Agent.run(analysis_prompt, videos=[processed_video], safety_settings=safety_settings)
209
-
210
- st.subheader('Analysis Result')
211
- st.markdown(response.content)
212
-
213
- except Exception as error:
214
- st.error(f"An error occurred: {error}")
215
-
 
1
  import os
 
 
2
  import string
 
 
 
 
 
 
 
3
  import time
4
+ from glob import glob
5
  from pathlib import Path
 
6
 
7
+ import yt_dlp
8
+ import ffmpeg
9
+ import streamlit as st
10
  from dotenv import load_dotenv
11
+
12
  load_dotenv()
13
 
14
+ # Optional imports for Phi/Gemini and Google genai
15
+ try:
16
+ from phi.agent import Agent
17
+ from phi.model.google import Gemini
18
+ from phi.tools.duckduckgo import DuckDuckGo
19
+ PHI_AVAILABLE = True
20
+ except Exception:
21
+ PHI_AVAILABLE = False
22
+
23
+ try:
24
+ import google.generativeai as genai
25
+ from google.generativeai import upload_file, get_file
26
+ GENAI_AVAILABLE = True
27
+ except Exception:
28
+ GENAI_AVAILABLE = False
29
+
30
+ # Page config
31
+ st.set_page_config(page_title="Generate the story of videos:", layout="wide")
32
+
33
+ DATA_DIR = Path("./data")
34
+ DATA_DIR.mkdir(exist_ok=True)
35
+
36
+ # Session state defaults
37
+ st.session_state.setdefault("videos", "")
38
+ st.session_state.setdefault("downloaded", [])
39
+ st.session_state.setdefault("loop_video", True)
40
+
41
+ # Sidebar UI (all controls live here)
42
+ st.sidebar.header("Video Input")
43
+ st.sidebar.text_input("Video URL (or local .mp4 path)", key="url", placeholder="Enter Video URL or path")
44
+
45
+ settings_exp = st.sidebar.expander("Settings", expanded=False)
46
+ env_api_key = os.getenv("GOOGLE_API_KEY", "")
47
+ API_KEY = settings_exp.text_input("Google API Key", value=env_api_key, placeholder="Set GOOGLE_API_KEY in .env or enter here")
48
+
49
+ if API_KEY and GENAI_AVAILABLE:
50
+ genai.configure(api_key=API_KEY)
51
 
52
+ if not API_KEY:
53
+ settings_exp.warning("No Google API key provided. Uploading/processing will not work.", icon="⚠️")
 
54
 
55
+ model_id = settings_exp.text_input("Gemini Model", "gemini-2.0-flash-lite")
56
+ analysis_prompt = settings_exp.text_area("Enter analysis", "watch entire video and describe")
57
+ settings_exp.text_input("Video Password", key="video-password", placeholder="Enter Video Password (if needed)")
58
 
59
+ # Safety settings (kept but optional)
60
+ safety_settings = [
61
+ {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
62
+ {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"},
63
+ {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"},
64
+ {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"},
65
+ ]
66
 
67
+ # Initialize phi agent (cached)
68
+ @st.cache_resource
69
+ def initialize_agent(model_id: str):
70
+ if not PHI_AVAILABLE:
71
+ return None
72
+ return Agent(name="Video AI summarizer", model=Gemini(id=model_id), tools=[DuckDuckGo()], markdown=True)
73
+
74
+ multimodal_Agent = None
75
+ if PHI_AVAILABLE:
76
+ multimodal_Agent = initialize_agent(model_id)
77
+
78
+ def sanitize_filename(path_str: str):
79
+ name = Path(path_str).name
80
+ name = name.lower().translate(str.maketrans("", "", string.punctuation)).replace(" ", "_")
81
+ return name
82
+
83
+ def convert_video_to_mp4(video_path: str) -> str:
84
+ target_path = str(Path(video_path).with_suffix(".mp4"))
85
+ if os.path.exists(target_path):
86
+ return target_path
87
+ try:
88
+ ffmpeg.input(video_path).output(target_path).run(overwrite_output=True, quiet=True)
89
  try:
90
+ os.remove(video_path)
91
  except FileNotFoundError:
92
+ pass
93
+ except Exception as e:
94
+ st.error(f"FFmpeg conversion failed: {e}")
95
+ raise
96
+ return target_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
 
98
+ def download_video_ytdlp(url: str, save_dir: str, video_password: str = None) -> str:
99
+ if not url:
100
+ raise ValueError("No URL provided")
101
 
102
+ # If it's a local file path and exists, just return it (convert if needed)
103
+ if os.path.exists(url) and os.path.isfile(url):
104
+ return convert_video_to_mp4(url)
105
 
106
+ outtmpl = os.path.join(save_dir, "%(id)s.%(ext)s")
107
+ ydl_opts = {"outtmpl": outtmpl, "format": "best"}
108
+ if video_password:
109
+ ydl_opts["videopassword"] = video_password
110
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
111
  try:
112
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
113
  ydl.download([url])
 
114
  except Exception as e:
115
+ raise RuntimeError(f"yt-dlp download failed: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
 
117
+ video_id = url.rstrip("/").split("/")[-1] or url.rstrip("/").split("/")[-2]
118
+ matches = glob(os.path.join(save_dir, f"{video_id}.*"))
119
+ if not matches:
120
+ matches = sorted(glob(os.path.join(save_dir, "*")), key=os.path.getmtime, reverse=True)[:1]
121
 
122
+ if not matches:
123
+ raise FileNotFoundError("Downloaded video not found")
124
 
125
+ return convert_video_to_mp4(matches[0])
 
 
 
 
 
126
 
127
+ # Sidebar actions
128
+ if st.sidebar.button("Load Video", use_container_width=True):
129
+ try:
130
+ video_password = st.session_state.get("video-password", "")
131
+ video_path = download_video_ytdlp(st.session_state.get("url", ""), str(DATA_DIR), video_password)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  st.session_state["videos"] = video_path
133
+ except Exception as e:
134
+ st.sidebar.error(f"Failed to load video: {e}")
135
 
136
+ # Preview & options in sidebar
137
  if st.session_state["videos"]:
 
138
  try:
139
  st.sidebar.video(st.session_state["videos"], loop=st.session_state.get("loop_video", True))
140
+ except Exception:
141
+ st.sidebar.write("Couldn't preview video")
142
+
143
  with st.sidebar.expander("Options", expanded=False):
144
+ loop_checkbox = st.checkbox("Enable Loop", value=st.session_state.get("loop_video", True))
145
+ st.session_state["loop_video"] = loop_checkbox
146
+
147
+ if st.button("Clear Video(s)"):
148
+ for f in glob(str(DATA_DIR / "*")):
149
+ try:
150
+ os.remove(f)
151
+ except Exception:
152
+ pass
153
+ st.session_state["videos"] = ""
154
 
 
 
 
155
  try:
156
+ with open(st.session_state["videos"], "rb") as video_file:
157
+ st.download_button(
158
+ "Download Video",
159
+ data=video_file,
160
+ file_name=sanitize_filename(st.session_state["videos"]),
161
+ mime="video/mp4",
162
+ use_container_width=True,
163
+ )
164
+ except Exception:
165
+ st.sidebar.error("Failed to prepare download")
166
+
167
+ st.sidebar.write("Title:", Path(st.session_state["videos"]).name)
168
+
169
+ # Main area action button
170
+ if st.button("Generate the story", type="primary"):
171
+ if not st.session_state.get("videos"):
172
+ st.error("No video loaded. Use 'Load Video' in the sidebar.")
173
+ elif not GENAI_AVAILABLE or not API_KEY:
174
+ st.error("Google generative API not configured. Provide API key in Settings.")
175
+ elif not PHI_AVAILABLE or multimodal_Agent is None:
176
+ st.error("Phi/Gemini agent not available in this environment.")
177
+ else:
178
+ try:
179
+ with st.spinner("Uploading video to Google for processing..."):
180
+ processed_video = upload_file(st.session_state["videos"])
181
+ while getattr(processed_video, "state", None) and processed_video.state.name == "PROCESSING":
182
+ time.sleep(2)
183
+ processed_video = get_file(processed_video.name)
184
+
185
+ with st.spinner("Running Gemini analysis..."):
186
+ response = multimodal_Agent.run(analysis_prompt, videos=[processed_video], safety_settings=safety_settings)
187
+
188
+ st.subheader("Analysis Result")
189
+ st.markdown(response.content if hasattr(response, "content") else str(response))
190
  except Exception as e:
191
+ st.error(f"An error occurred: {e}")