Video-Analysis / streamlit_app.py
CB
Update streamlit_app.py
a190913 verified
raw
history blame
6.8 kB
import os
import uuid
import yt_dlp
import string
import ffmpeg
from glob import glob
import streamlit as st
from phi.agent import Agent
from phi.model.google import Gemini
from phi.tools.duckduckgo import DuckDuckGo
from google.generativeai import upload_file, get_file
import time
import google.generativeai as genai
from pathlib import Path
import tempfile
from dotenv import load_dotenv
load_dotenv()
# Page Configuration
st.set_page_config(
page_title="Generate the story of videos:",
layout="wide"
)
# Config
if "videos" not in st.session_state:
st.session_state["videos"] = ""
if "downloaded" not in st.session_state:
st.session_state["downloaded"] = []
# Callbacks
def _remove_video():
if st.session_state["videos"]:
try:
os.remove(st.session_state["videos"])
st.session_state["videos"] = ""
except FileNotFoundError:
print("Couldn't delete file")
def _remove_all_videos():
videos = (
glob(os.path.join(".", "data", "*.mp4"))
+ glob(os.path.join(".", "data", "*.webm"))
+ glob(os.path.join(".", "data", "*.mov"))
+ glob(os.path.join(".", "data", "*.m4a"))
+ glob(os.path.join(".", "data", "*.mkv"))
)
for video in videos:
try:
os.remove(video)
except FileNotFoundError:
print("Couldn't delete file:", video)
#Sidebar Content
st.sidebar.text_input(
"**Video URL**",
key="url",
placeholder="Enter Video URL",
on_change=_remove_video,
)
expander = st.sidebar.expander("Settings", expanded=False)
# βœ… Configure Gemini API
API_KEY = expander.text_input("Google API Key", "AIzaSyB2uDj6IeuNszSuk80feW-eBgpIDH0WFSY")
# βœ… Check if API key is available
if API_KEY:
genai.configure(api_key=API_KEY)
else:
expander.error("No GOOGLE_API_KEY", icon="⚠️")
# βœ… Load API key securely
API_KEY = os.getenv("GOOGLE_API_KEY")
#Safety Settings
safety_settings = [ { "category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF" }, { "category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF" }, { "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF" }, { "category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF" }, ]
#Gemini Model
model_id = expander.text_input("Gemini Model", "gemini-2.0-flash-lite")
#Analysis prompt
analysis_prompt = expander.text_area("Enter analysis", "watch entire video and describe")
#Password
expander.text_input("Video Password", key="video-password", placeholder="Enter Video Password")
@st.cache_resource
def initialize_agent():
return Agent(
name="Video AI summarizer",
model=Gemini(id=model_id),
tools=[DuckDuckGo()],
markdown=True,
)
# Initialize the agent
multimodal_Agent = initialize_agent()
def download_video(url: str, output_path: str):
"""Downloads a video using yt-dlp."""
ydl_opts = {
'outtmpl': output_path, # Path where the video will be saved
'format': 'best', # Download the best quality available
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return True, f"Video downloaded successfully to {output_path}"
except Exception as e:
return False, str(e)
# --- Main Functions ---
def download_video(url: str, save_path: str, **kwargs):
video_id = url.split("/")[-1]
if len(video_id) < 1:
video_id = url.split("/")[-2]
ydl_opts = {
"outtmpl": f"{save_path}/{video_id}.%(ext)s",
}
for opt in kwargs.keys():
ydl_opts[opt] = kwargs[opt]
print(ydl_opts)
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download(url)
videos = glob(os.path.join(".", "data", f"{video_id}.*"))
print("videos found:", videos)
for video in videos:
if video_id in video:
video_path = video
converted_video_path = convert_video_to_mp4(video_path)
return converted_video_path
def convert_video_to_mp4(video_path):
target_path = f"{os.path.splitext(video_path)[0]}.mp4"
if not os.path.exists(target_path): # Convert only if target doesn't exist
(ffmpeg.input(video_path).output(target_path).run(overwrite_output=True))
try:
os.remove(video_path)
except FileNotFoundError:
print("Couldn't delete file:", video_path)
return target_path
if st.session_state["url"]:
download = st.sidebar.button("**Load Video** πŸš€", use_container_width=True)
if download:
download_options = {}
if st.session_state["video-password"]:
download_options["videopassword"] = st.session_state["video-password"]
video_path = download_video(
st.session_state["url"], save_path=os.path.join(".", "data"), **download_options
)
st.session_state["videos"] = video_path
if st.session_state["videos"]:
try:
st.sidebar.video(st.session_state["videos"], loop=True)
except Exception as e:
st.write("Couldn't show video")
with st.sidebar.expander("Options", expanded=False):
st.button("Clear Video πŸ”₯", on_click=_remove_all_videos, type="secondary", help="Clear all downloaded videos", use_container_width=True,)
try:
video_file = open(st.session_state["videos"], "rb")
st.download_button(
"Download Video πŸ“",
data=video_file,
file_name=f"{st.session_state['videos'].lower().translate(str.maketrans('', '', string.punctuation)).replace(' ', '_')}.mp4",
mime="video/mp4",
type="secondary",
use_container_width=True,
)
except Exception as e:
st.error("Failed downloading the video", icon="πŸ˜”")
st.sidebar.write("**Title**:", st.session_state["videos"].split("/")[-1])
# --- Streamlit App ---
# Main content
if st.button('**Generate the story**', type="primary"):
try:
with st.spinner("Generating the story of the video"):
# Upload and process the video
processed_video = upload_file(st.session_state["videos"])
while processed_video.state.name == "PROCESSING":
time.sleep(2)
processed_video = get_file(processed_video.name)
# AI agent processing
response = multimodal_Agent.run(analysis_prompt, videos=[processed_video], safety_settings=safety_settings)
st.subheader('Analysis Result')
st.markdown(response.content)
except Exception as error:
st.error(f"An error occurred: {error}")