|
|
import streamlit as st |
|
|
from phi.agent import Agent |
|
|
from phi.model.google import Gemini |
|
|
from phi.tools.duckduckgo import DuckDuckGo |
|
|
from google.generativeai import upload_file, get_file |
|
|
import google.generativeai as genai |
|
|
import time |
|
|
from pathlib import Path |
|
|
import tempfile |
|
|
from dotenv import load_dotenv |
|
|
import os |
|
|
import yt_dlp |
|
|
import re |
|
|
import requests |
|
|
import json |
|
|
import subprocess |
|
|
import random |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
API_KEY = os.getenv("GOOGLE_API_KEY") |
|
|
if API_KEY: |
|
|
genai.configure(api_key=API_KEY) |
|
|
|
|
|
|
|
|
st.set_page_config( |
|
|
page_title="The Plug", |
|
|
page_icon="📹" |
|
|
) |
|
|
|
|
|
st.title("The Plug") |
|
|
|
|
|
@st.cache_resource |
|
|
def initialize_agent(): |
|
|
return Agent( |
|
|
name="Video AI Summarizer", |
|
|
model=Gemini(id="gemini-2.0-flash-exp"), |
|
|
tools=[DuckDuckGo()], |
|
|
markdown=True, |
|
|
) |
|
|
|
|
|
def download_video(url): |
|
|
"""Download video from URL using yt-dlp with production-optimized settings""" |
|
|
temp_video = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') |
|
|
temp_video.close() |
|
|
|
|
|
|
|
|
proxy_url = os.getenv('HTTP_PROXY') or os.getenv('HTTPS_PROXY') |
|
|
|
|
|
|
|
|
base_opts = { |
|
|
'outtmpl': temp_video.name, |
|
|
'quiet': False, |
|
|
'no_warnings': False, |
|
|
'nooverwrites': False, |
|
|
|
|
|
'user_agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'referer': 'https://www.youtube.com/', |
|
|
'http_headers': { |
|
|
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', |
|
|
'Accept-Language': 'en-US,en;q=0.9', |
|
|
'Accept-Encoding': 'gzip, deflate, br', |
|
|
'DNT': '1', |
|
|
'Connection': 'keep-alive', |
|
|
'Upgrade-Insecure-Requests': '1', |
|
|
'Sec-Fetch-Dest': 'document', |
|
|
'Sec-Fetch-Mode': 'navigate', |
|
|
'Sec-Fetch-Site': 'none', |
|
|
'Sec-Fetch-User': '?1', |
|
|
'Cache-Control': 'max-age=0', |
|
|
}, |
|
|
|
|
|
'retries': 5, |
|
|
'fragment_retries': 5, |
|
|
'retry_sleep_functions': { |
|
|
'http': lambda n: min(120, 3 ** n), |
|
|
'fragment': lambda n: min(120, 3 ** n), |
|
|
}, |
|
|
'extractor_retries': 5, |
|
|
'socket_timeout': 60, |
|
|
|
|
|
'nocheckcertificate': True, |
|
|
'ignoreerrors': False, |
|
|
'logtostderr': False, |
|
|
'sleep_interval': 2, |
|
|
'max_sleep_interval': 10, |
|
|
'sleep_interval_requests': 2, |
|
|
'sleep_interval_subtitles': 2, |
|
|
|
|
|
'geo_bypass': True, |
|
|
'geo_bypass_country': 'US', |
|
|
|
|
|
'force_ipv4': True, |
|
|
|
|
|
'add_header': [ |
|
|
'Sec-CH-UA:"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', |
|
|
'Sec-CH-UA-Mobile:?0', |
|
|
'Sec-CH-UA-Platform:"Linux"' |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
if proxy_url: |
|
|
base_opts['proxy'] = proxy_url |
|
|
st.info(f"🌐 Using proxy: {proxy_url[:20]}...") |
|
|
|
|
|
|
|
|
free_proxies = _get_free_proxies() if not proxy_url else [] |
|
|
|
|
|
|
|
|
base_strategies = [ |
|
|
|
|
|
{**base_opts, 'format': 'worst[height<=720][ext=mp4]/worst[ext=mp4]/worst'}, |
|
|
|
|
|
|
|
|
{**base_opts, 'format': '18/22/37/38/136+140/135+140/134+140', 'extract_flat': False}, |
|
|
|
|
|
|
|
|
{**base_opts, |
|
|
'user_agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1', |
|
|
'http_headers': { |
|
|
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1', |
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
|
'Accept-Language': 'en-US,en;q=0.9', |
|
|
'Accept-Encoding': 'gzip, deflate, br', |
|
|
}, |
|
|
'format': 'worst[ext=mp4]/worst'}, |
|
|
|
|
|
|
|
|
{ |
|
|
'outtmpl': temp_video.name, |
|
|
'format': '18/worst', |
|
|
'quiet': True, |
|
|
'no_warnings': True, |
|
|
'user_agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', |
|
|
'socket_timeout': 30, |
|
|
'retries': 2, |
|
|
'sleep_interval': 3, |
|
|
'force_ipv4': True, |
|
|
}, |
|
|
|
|
|
|
|
|
{**base_opts, |
|
|
'format': 'best[filesize<50M]/worst', |
|
|
'prefer_free_formats': True, |
|
|
'youtube_include_dash_manifest': False, |
|
|
'extract_flat': False} |
|
|
] |
|
|
|
|
|
|
|
|
download_strategies = base_strategies.copy() |
|
|
if free_proxies: |
|
|
st.info(f"🔄 Adding proxy strategies with {len(free_proxies)} proxies...") |
|
|
proxy_strategies = _add_proxy_strategies(base_strategies, free_proxies) |
|
|
download_strategies.extend(proxy_strategies) |
|
|
|
|
|
last_error = None |
|
|
total_strategies = len(download_strategies) |
|
|
|
|
|
for attempt, strategy in enumerate(download_strategies): |
|
|
try: |
|
|
st.info(f"🔄 Download strategy {attempt + 1}/{total_strategies}: {_get_strategy_description(attempt)}") |
|
|
|
|
|
|
|
|
import random |
|
|
initial_delay = random.uniform(1, 3) |
|
|
time.sleep(initial_delay) |
|
|
|
|
|
with yt_dlp.YoutubeDL(strategy) as ydl: |
|
|
ydl.download([url]) |
|
|
|
|
|
|
|
|
if os.path.exists(temp_video.name) and os.path.getsize(temp_video.name) > 0: |
|
|
file_size_mb = os.path.getsize(temp_video.name) / (1024 * 1024) |
|
|
st.success(f"✅ Download successful! Strategy {attempt + 1} worked. File size: {file_size_mb:.2f} MB") |
|
|
return temp_video.name |
|
|
else: |
|
|
raise Exception("Downloaded file is empty or doesn't exist") |
|
|
|
|
|
except Exception as e: |
|
|
last_error = e |
|
|
error_msg = str(e) |
|
|
|
|
|
|
|
|
if "403" in error_msg: |
|
|
st.warning(f"🚫 Strategy {attempt + 1} blocked (403 Forbidden)") |
|
|
elif "404" in error_msg: |
|
|
st.warning(f"❓ Strategy {attempt + 1} failed (404 Not Found)") |
|
|
elif "timeout" in error_msg.lower(): |
|
|
st.warning(f"⏰ Strategy {attempt + 1} timed out") |
|
|
else: |
|
|
st.warning(f"❌ Strategy {attempt + 1} failed: {error_msg[:100]}...") |
|
|
|
|
|
|
|
|
if os.path.exists(temp_video.name): |
|
|
try: |
|
|
os.unlink(temp_video.name) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
if attempt < total_strategies - 1: |
|
|
|
|
|
wait_time = min(15, 3 * (attempt + 1)) |
|
|
st.info(f"⏳ Waiting {wait_time} seconds before trying next strategy...") |
|
|
time.sleep(wait_time) |
|
|
continue |
|
|
|
|
|
|
|
|
st.warning("🔄 yt-dlp strategies failed. Trying alternative download methods...") |
|
|
|
|
|
try: |
|
|
|
|
|
return _fallback_youtube_dl(url, temp_video.name) |
|
|
except Exception as fallback_error: |
|
|
st.warning(f"📱 youtube-dl fallback failed: {str(fallback_error)[:50]}...") |
|
|
|
|
|
try: |
|
|
|
|
|
return _fallback_direct_download(url, temp_video.name) |
|
|
except Exception as api_error: |
|
|
st.error(f"🌐 Direct download fallback failed: {str(api_error)[:50]}...") |
|
|
|
|
|
|
|
|
st.error(f"💥 All {total_strategies} yt-dlp strategies + 2 fallback methods failed!") |
|
|
raise Exception(f"Complete download failure. yt-dlp error: {str(last_error)}. Fallback errors: youtube-dl={str(fallback_error)}, direct={str(api_error)}") |
|
|
|
|
|
def _fallback_youtube_dl(url, output_path): |
|
|
"""Fallback to youtube-dl if yt-dlp fails""" |
|
|
st.info("🔄 Trying youtube-dl fallback...") |
|
|
|
|
|
try: |
|
|
|
|
|
cmd = [ |
|
|
'youtube-dl', |
|
|
'--format', 'worst[ext=mp4]/worst', |
|
|
'--output', output_path, |
|
|
'--user-agent', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', |
|
|
'--referer', 'https://www.youtube.com/', |
|
|
'--socket-timeout', '30', |
|
|
'--retries', '3', |
|
|
url |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) |
|
|
|
|
|
if result.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
|
|
st.success("✅ youtube-dl fallback successful!") |
|
|
return output_path |
|
|
else: |
|
|
raise Exception(f"youtube-dl failed: {result.stderr}") |
|
|
|
|
|
except FileNotFoundError: |
|
|
raise Exception("youtube-dl not installed") |
|
|
except subprocess.TimeoutExpired: |
|
|
raise Exception("youtube-dl timeout") |
|
|
except Exception as e: |
|
|
raise Exception(f"youtube-dl error: {str(e)}") |
|
|
|
|
|
def _fallback_direct_download(url, output_path): |
|
|
"""Last resort: try to extract video URL and download directly""" |
|
|
st.info("🌐 Trying direct download fallback...") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
video_id = None |
|
|
patterns = [ |
|
|
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/shorts/)([^&\n?#]+)', |
|
|
r'youtube\.com/embed/([^&\n?#]+)', |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
match = re.search(pattern, url) |
|
|
if match: |
|
|
video_id = match.group(1) |
|
|
break |
|
|
|
|
|
if not video_id: |
|
|
raise Exception("Could not extract video ID") |
|
|
|
|
|
|
|
|
|
|
|
info_url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json" |
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', |
|
|
'Accept': 'application/json', |
|
|
} |
|
|
|
|
|
response = requests.get(info_url, headers=headers, timeout=30) |
|
|
|
|
|
if response.status_code == 200: |
|
|
info = response.json() |
|
|
st.info(f"📹 Found video: {info.get('title', 'Unknown title')}") |
|
|
|
|
|
|
|
|
|
|
|
raise Exception("Direct download method needs video stream URL extraction") |
|
|
else: |
|
|
raise Exception(f"Could not get video info: HTTP {response.status_code}") |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Direct download failed: {str(e)}") |
|
|
|
|
|
def _get_free_proxies(): |
|
|
"""Get a list of free proxy servers for fallback (basic implementation)""" |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
return [ |
|
|
'http://proxy1.example.com:8080', |
|
|
'http://proxy2.example.com:8080', |
|
|
] |
|
|
except: |
|
|
return [] |
|
|
|
|
|
def _add_proxy_strategies(base_strategies, proxies): |
|
|
"""Add proxy-enabled versions of strategies""" |
|
|
proxy_strategies = [] |
|
|
for proxy in proxies[:2]: |
|
|
for strategy in base_strategies[:2]: |
|
|
proxy_strategy = strategy.copy() |
|
|
proxy_strategy['proxy'] = proxy |
|
|
proxy_strategies.append(proxy_strategy) |
|
|
return proxy_strategies |
|
|
|
|
|
def _get_strategy_description(attempt): |
|
|
"""Get human-readable description of download strategy""" |
|
|
descriptions = [ |
|
|
"Production-optimized (720p max)", |
|
|
"Specific format codes", |
|
|
"Mobile user agent", |
|
|
"Minimal configuration", |
|
|
"Alternative extractor", |
|
|
"Proxy method 1", |
|
|
"Proxy method 2", |
|
|
"Proxy method 3", |
|
|
"Proxy method 4" |
|
|
] |
|
|
return descriptions[attempt] if attempt < len(descriptions) else f"Strategy {attempt + 1}" |
|
|
|
|
|
def is_valid_url(url): |
|
|
"""Check if URL is from supported platforms""" |
|
|
patterns = [ |
|
|
r'(https?://)?(www\.)?(youtube\.com|youtu\.be)', |
|
|
r'(https?://)?(www\.)?(instagram\.com|instagr\.am)', |
|
|
r'(https?://)?(www\.)?(tiktok\.com)', |
|
|
r'(https?://)?(www\.)?(twitter\.com|x\.com)', |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
if re.search(pattern, url, re.IGNORECASE): |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
multimodal_Agent = initialize_agent() |
|
|
|
|
|
|
|
|
if 'video_path' not in st.session_state: |
|
|
st.session_state.video_path = None |
|
|
if 'current_video_url' not in st.session_state: |
|
|
st.session_state.current_video_url = None |
|
|
if 'current_video_file' not in st.session_state: |
|
|
st.session_state.current_video_file = None |
|
|
if 'last_input_method' not in st.session_state: |
|
|
st.session_state.last_input_method = None |
|
|
if 'current_page' not in st.session_state: |
|
|
st.session_state.current_page = "upload" |
|
|
if 'chat_history' not in st.session_state: |
|
|
st.session_state.chat_history = [] |
|
|
|
|
|
def cleanup_video_cache(): |
|
|
"""Clean up cached video file and reset session""" |
|
|
if st.session_state.video_path and os.path.exists(st.session_state.video_path): |
|
|
Path(st.session_state.video_path).unlink(missing_ok=True) |
|
|
st.session_state.video_path = None |
|
|
st.session_state.current_video_url = None |
|
|
st.session_state.current_video_file = None |
|
|
st.session_state.current_page = "upload" |
|
|
st.session_state.chat_history = [] |
|
|
|
|
|
|
|
|
if st.session_state.current_page == "upload": |
|
|
|
|
|
st.subheader("Upload Your Video") |
|
|
|
|
|
|
|
|
input_method = st.radio( |
|
|
"Choose how to provide your video:", |
|
|
["Upload Video File", "Paste Video Link"], |
|
|
help="Select how you want to provide the video for analysis" |
|
|
) |
|
|
|
|
|
|
|
|
if st.session_state.last_input_method != input_method: |
|
|
cleanup_video_cache() |
|
|
st.session_state.last_input_method = input_method |
|
|
|
|
|
video_path = None |
|
|
|
|
|
if input_method == "Upload Video File": |
|
|
|
|
|
video_file = st.file_uploader( |
|
|
"Upload a video file", |
|
|
type=['mp4', 'mov', 'avi'], |
|
|
help="Upload a video for AI analysis" |
|
|
) |
|
|
|
|
|
if video_file: |
|
|
|
|
|
if (st.session_state.current_video_file != video_file.name or |
|
|
st.session_state.video_path is None or |
|
|
not os.path.exists(st.session_state.video_path)): |
|
|
|
|
|
|
|
|
if st.session_state.video_path and os.path.exists(st.session_state.video_path): |
|
|
Path(st.session_state.video_path).unlink(missing_ok=True) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_video: |
|
|
temp_video.write(video_file.read()) |
|
|
st.session_state.video_path = temp_video.name |
|
|
st.session_state.current_video_file = video_file.name |
|
|
st.session_state.current_video_url = None |
|
|
|
|
|
video_path = st.session_state.video_path |
|
|
|
|
|
else: |
|
|
|
|
|
video_url = st.text_input( |
|
|
"Paste video link", |
|
|
placeholder="https://youtube.com/watch?v=... or Instagram/TikTok/X link", |
|
|
help="Paste a video URL from YouTube, Instagram, TikTok, or X" |
|
|
) |
|
|
|
|
|
if video_url: |
|
|
if is_valid_url(video_url): |
|
|
|
|
|
if (st.session_state.current_video_url != video_url or |
|
|
st.session_state.video_path is None or |
|
|
not os.path.exists(st.session_state.video_path)): |
|
|
|
|
|
|
|
|
if st.session_state.video_path and os.path.exists(st.session_state.video_path): |
|
|
Path(st.session_state.video_path).unlink(missing_ok=True) |
|
|
|
|
|
try: |
|
|
with st.spinner("Downloading video..."): |
|
|
st.session_state.video_path = download_video(video_url) |
|
|
st.session_state.current_video_url = video_url |
|
|
st.session_state.current_video_file = None |
|
|
except Exception as e: |
|
|
st.error(f"Error downloading video: {e}") |
|
|
st.session_state.video_path = None |
|
|
st.session_state.current_video_url = None |
|
|
|
|
|
video_path = st.session_state.video_path |
|
|
else: |
|
|
st.warning("Please enter a valid YouTube, Instagram, TikTok, or X video URL") |
|
|
|
|
|
|
|
|
if video_path and os.path.exists(video_path) and os.path.getsize(video_path) > 0: |
|
|
if st.button("Start Chat", type="primary", use_container_width=True): |
|
|
st.session_state.current_page = "chat" |
|
|
st.rerun() |
|
|
|
|
|
elif st.session_state.current_page == "chat": |
|
|
|
|
|
video_path = st.session_state.video_path |
|
|
|
|
|
if not video_path or not os.path.exists(video_path) or os.path.getsize(video_path) == 0: |
|
|
st.error("Video not found. Please upload a video first.") |
|
|
if st.button("Back to Upload"): |
|
|
st.session_state.current_page = "upload" |
|
|
st.rerun() |
|
|
else: |
|
|
|
|
|
col1, col2 = st.columns([3, 1]) |
|
|
with col1: |
|
|
st.subheader("Chat with Your Video") |
|
|
with col2: |
|
|
if st.button("New Video", help="Upload a different video"): |
|
|
cleanup_video_cache() |
|
|
st.session_state.current_page = "upload" |
|
|
st.rerun() |
|
|
|
|
|
|
|
|
if st.session_state.chat_history: |
|
|
st.markdown("### Chat History") |
|
|
for i, (query, response) in enumerate(st.session_state.chat_history): |
|
|
with st.container(): |
|
|
st.markdown(f"**You:** {query}") |
|
|
st.markdown(f"**AI:** {response}") |
|
|
st.divider() |
|
|
|
|
|
|
|
|
st.markdown("### Ask a Question") |
|
|
user_query = st.text_input( |
|
|
"What would you like to know about this video?", |
|
|
placeholder="Example: What is the main topic? Summarize the key points...", |
|
|
help="Ask any question about the video content.", |
|
|
key="chat_input" |
|
|
) |
|
|
|
|
|
if st.button("Send", type="primary", use_container_width=True): |
|
|
if not user_query.strip(): |
|
|
st.warning("Please enter a question about the video.") |
|
|
else: |
|
|
try: |
|
|
with st.spinner("Analyzing video and gathering insights..."): |
|
|
|
|
|
processed_video = upload_file(video_path) |
|
|
|
|
|
while processed_video.state.name == "PROCESSING": |
|
|
time.sleep(1) |
|
|
processed_video = get_file(processed_video.name) |
|
|
|
|
|
|
|
|
analysis_prompt = ( |
|
|
f""" |
|
|
You are an expert video analyst. Analyze the uploaded video and respond to this query: |
|
|
|
|
|
Query: {user_query} |
|
|
|
|
|
Provide a comprehensive, insightful response that includes: |
|
|
1. Direct analysis of the video content |
|
|
2. Key insights and observations |
|
|
3. Any supplementary context that would be helpful |
|
|
4. Actionable takeaways |
|
|
|
|
|
Be conversational and engaging while being thorough and accurate. |
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
response = multimodal_Agent.run(analysis_prompt, videos=[processed_video]) |
|
|
|
|
|
|
|
|
st.session_state.chat_history.append((user_query, response.content)) |
|
|
|
|
|
|
|
|
st.rerun() |
|
|
|
|
|
except Exception as error: |
|
|
st.error(f"An error occurred during analysis: {error}") |
|
|
|
|
|
|
|
|
st.markdown( |
|
|
""" |
|
|
<style> |
|
|
.stTextInput input { |
|
|
min-height: 40px; |
|
|
} |
|
|
.stRadio [role="radiogroup"] { |
|
|
display: flex; |
|
|
gap: 20px; |
|
|
} |
|
|
.stRadio label { |
|
|
font-weight: 500; |
|
|
} |
|
|
</style> |
|
|
""", |
|
|
unsafe_allow_html=True |
|
|
) |