Spaces:
Sleeping
Sleeping
| ##STREAMLINK CODE | |
| import cv2 | |
| import streamlink | |
| import streamlit as st | |
| import time | |
| import tempfile | |
| import base64 | |
| import os | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| import assemblyai as aai | |
| # Load environment variables | |
| load_dotenv() | |
| aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY") | |
| OpenAI.api_key = os.getenv("OPENAI_API_KEY") | |
| client = OpenAI() | |
| def extract_recent_frames(video_url, output_folder, duration=10, frames_per_second=1): | |
| streams = streamlink.streams(video_url) | |
| if not streams: | |
| st.error("Error: Unable to retrieve streams. Make sure the YouTube video URL is valid.") | |
| return | |
| stream_url = streams['best'].url | |
| cap = cv2.VideoCapture(stream_url) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(fps * duration) | |
| frame_interval = int(fps / frames_per_second) | |
| frame_count = 0 | |
| start_time = time.time() | |
| extracted_frames = [] | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| st.error("Error: Couldn't read frame.") | |
| break | |
| elapsed_time = time.time() - start_time | |
| if frame_count % frame_interval == 0 and elapsed_time <= duration: | |
| # Convert frame to base64 | |
| _, buffer = cv2.imencode(".jpg", frame) | |
| base64_frame = base64.b64encode(buffer).decode("utf-8") | |
| extracted_frames.append(base64_frame) | |
| frame_count += 1 | |
| if elapsed_time > duration: | |
| break | |
| cap.release() | |
| return extracted_frames | |
| def main(): | |
| st.title("Insightly Live Video Analysis") | |
| youtube_video_url = st.text_input("Enter YouTube Video URL:") | |
| duration = st.slider("Select Duration (seconds):", min_value=1, max_value=60, value=10) | |
| frames_per_second = st.slider("Select Frames per Second:", min_value=1, max_value=10, value=1) | |
| if st.button("Extract Frames"): | |
| st.info("Extracting frames. Please wait...") | |
| extracted_frames = extract_recent_frames(youtube_video_url, "temp_frames", duration, frames_per_second) | |
| if extracted_frames: | |
| st.success("Frames extracted successfully!") | |
| # Display frames in a grid format with frame description on click | |
| display_frame_grid(extracted_frames) | |
| else: | |
| st.error("Failed to extract frames.") | |
| #####################33 | |
| def generate_description(base64_frames): | |
| try: | |
| prompt_messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| "1. Generate a description for this sequence of video frames in about 90 words. Return the following: 1. List of objects in the video 2. Any restrictive content or sensitive content and if so which frame.", | |
| *map(lambda x: {"image": x, "resize": 428}, base64_frames[0::30]), | |
| ], | |
| }, | |
| ] | |
| response = client.chat.completions.create( | |
| model="gpt-4-vision-preview", | |
| messages=prompt_messages, | |
| max_tokens=3000, | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| print(f"Error in generate_description: {e}") | |
| return None | |
| #########################################3333 | |
| def display_frame_grid(extracted_frames): | |
| cols_per_row = 3 | |
| n_frames = len(extracted_frames) | |
| for idx in range(0, n_frames, cols_per_row): | |
| cols = st.columns(cols_per_row) | |
| for col_index in range(cols_per_row): | |
| frame_idx = idx + col_index | |
| if frame_idx < n_frames: | |
| with cols[col_index]: | |
| # Decode base64 and display the frame | |
| decoded_frame = base64.b64decode(extracted_frames[frame_idx]) | |
| st.image(decoded_frame, channels="BGR", caption=f'Frame {frame_idx + 1}', use_column_width=True, output_format="JPEG") | |
| if __name__ == "__main__": | |
| main() | |