Spaces:
Build error
Build error
| import os | |
| import uuid | |
| import yt_dlp | |
| import string | |
| import ffmpeg | |
| from glob import glob | |
| import streamlit as st | |
| from phi.agent import Agent | |
| from phi.model.google import Gemini | |
| from phi.tools.duckduckgo import DuckDuckGo | |
| from google.generativeai import upload_file, get_file | |
| import time | |
| import google.generativeai as genai | |
| from pathlib import Path | |
| import tempfile | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Page Configuration | |
| st.set_page_config( | |
| page_title="Generate the story of videos:", | |
| layout="wide" | |
| ) | |
| # Config | |
| if "videos" not in st.session_state: | |
| st.session_state["videos"] = "" | |
| if "downloaded" not in st.session_state: | |
| st.session_state["downloaded"] = [] | |
| # Callbacks | |
| def _remove_video(): | |
| if st.session_state["videos"]: | |
| try: | |
| os.remove(st.session_state["videos"]) | |
| st.session_state["videos"] = "" | |
| except FileNotFoundError: | |
| print("Couldn't delete file") | |
| def _remove_all_videos(): | |
| videos = ( | |
| glob(os.path.join(".", "data", "*.mp4")) | |
| + glob(os.path.join(".", "data", "*.webm")) | |
| + glob(os.path.join(".", "data", "*.mov")) | |
| + glob(os.path.join(".", "data", "*.m4a")) | |
| + glob(os.path.join(".", "data", "*.mkv")) | |
| ) | |
| for video in videos: | |
| try: | |
| os.remove(video) | |
| except FileNotFoundError: | |
| print("Couldn't delete file:", video) | |
| #Sidebar Content | |
| st.sidebar.text_input( | |
| "**Video URL**", | |
| key="url", | |
| placeholder="Enter Video URL", | |
| on_change=_remove_video, | |
| ) | |
| expander = st.sidebar.expander("Settings", expanded=False) | |
| # β Configure Gemini API | |
| API_KEY = expander.text_input("Google API Key", "AIzaSyB2uDj6IeuNszSuk80feW-eBgpIDH0WFSY") | |
| # β Check if API key is available | |
| if API_KEY: | |
| genai.configure(api_key=API_KEY) | |
| else: | |
| expander.error("No GOOGLE_API_KEY", icon="β οΈ") | |
| # β Load API key securely | |
| API_KEY = os.getenv("GOOGLE_API_KEY") | |
| #Safety Settings | |
| safety_settings = [ { "category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF" }, { "category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF" }, { "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF" }, { "category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF" }, ] | |
| #Gemini Model | |
| model_id = expander.text_input("Gemini Model", "gemini-2.0-flash-lite") | |
| #Analysis prompt | |
| analysis_prompt = expander.text_area("Enter analysis", "watch entire video and describe") | |
| #Password | |
| expander.text_input("Video Password", key="video-password", placeholder="Enter Video Password") | |
| def initialize_agent(): | |
| return Agent( | |
| name="Video AI summarizer", | |
| model=Gemini(id=model_id), | |
| tools=[DuckDuckGo()], | |
| markdown=True, | |
| ) | |
| # Initialize the agent | |
| multimodal_Agent = initialize_agent() | |
| def download_video(url: str, output_path: str): | |
| """Downloads a video using yt-dlp.""" | |
| ydl_opts = { | |
| 'outtmpl': output_path, # Path where the video will be saved | |
| 'format': 'best', # Download the best quality available | |
| } | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| return True, f"Video downloaded successfully to {output_path}" | |
| except Exception as e: | |
| return False, str(e) | |
| # --- Main Functions --- | |
| def download_video(url: str, save_path: str, **kwargs): | |
| video_id = url.split("/")[-1] | |
| if len(video_id) < 1: | |
| video_id = url.split("/")[-2] | |
| ydl_opts = { | |
| "outtmpl": f"{save_path}/{video_id}.%(ext)s", | |
| } | |
| for opt in kwargs.keys(): | |
| ydl_opts[opt] = kwargs[opt] | |
| print(ydl_opts) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download(url) | |
| videos = glob(os.path.join(".", "data", f"{video_id}.*")) | |
| print("videos found:", videos) | |
| for video in videos: | |
| if video_id in video: | |
| video_path = video | |
| converted_video_path = convert_video_to_mp4(video_path) | |
| return converted_video_path | |
| def convert_video_to_mp4(video_path): | |
| target_path = f"{os.path.splitext(video_path)[0]}.mp4" | |
| if not os.path.exists(target_path): # Convert only if target doesn't exist | |
| (ffmpeg.input(video_path).output(target_path).run(overwrite_output=True)) | |
| try: | |
| os.remove(video_path) | |
| except FileNotFoundError: | |
| print("Couldn't delete file:", video_path) | |
| return target_path | |
| if st.session_state["url"]: | |
| download = st.sidebar.button("**Load Video** π", use_container_width=True) | |
| if download: | |
| download_options = {} | |
| if st.session_state["video-password"]: | |
| download_options["videopassword"] = st.session_state["video-password"] | |
| video_path = download_video( | |
| st.session_state["url"], save_path=os.path.join(".", "data"), **download_options | |
| ) | |
| st.session_state["videos"] = video_path | |
| if st.session_state["videos"]: | |
| try: | |
| st.sidebar.video(st.session_state["videos"], loop=True) | |
| except Exception as e: | |
| st.write("Couldn't show video") | |
| with st.sidebar.expander("Options", expanded=False): | |
| st.button("Clear Video π₯", on_click=_remove_all_videos, type="secondary", help="Clear all downloaded videos", use_container_width=True,) | |
| try: | |
| video_file = open(st.session_state["videos"], "rb") | |
| st.download_button( | |
| "Download Video π", | |
| data=video_file, | |
| file_name=f"{st.session_state['videos'].lower().translate(str.maketrans('', '', string.punctuation)).replace(' ', '_')}.mp4", | |
| mime="video/mp4", | |
| type="secondary", | |
| use_container_width=True, | |
| ) | |
| except Exception as e: | |
| st.error("Failed downloading the video", icon="π") | |
| st.sidebar.write("**Title**:", st.session_state["videos"].split("/")[-1]) | |
| # --- Streamlit App --- | |
| # Main content | |
| if st.button('**Generate the story**', type="primary"): | |
| try: | |
| with st.spinner("Generating the story of the video"): | |
| # Upload and process the video | |
| processed_video = upload_file(st.session_state["videos"]) | |
| while processed_video.state.name == "PROCESSING": | |
| time.sleep(2) | |
| processed_video = get_file(processed_video.name) | |
| # AI agent processing | |
| response = multimodal_Agent.run(analysis_prompt, videos=[processed_video], safety_settings=safety_settings) | |
| st.subheader('Analysis Result') | |
| st.markdown(response.content) | |
| except Exception as error: | |
| st.error(f"An error occurred: {error}") | |