Spaces:
Paused
Paused
| import streamlit as st | |
| from huggingface_hub import HfApi | |
| import os | |
| import json | |
| from datetime import datetime | |
| import cv2 | |
| import random | |
| from PIL import Image | |
| import string | |
| import subprocess | |
| import glob | |
| import shutil | |
| from groq import Groq | |
| import tempfile | |
| from pydub import AudioSegment | |
| # Initialize the Hugging Face and Groq APIs | |
| hf_api = HfApi(token=os.getenv("HF_API_TOKEN")) | |
| groq_client = Groq(api_key=os.getenv("GROQ_API_KEY")) | |
| def generate_random_string(length=4): | |
| return ''.join(random.choices(string.ascii_lowercase, k=length)) | |
| def add_random_to_filename(filename): | |
| name, ext = os.path.splitext(filename) | |
| random_string = generate_random_string() | |
| return f"{name}-{random_string}{ext}" | |
| def extract_thumbnail(video_path, thumbnail_path): | |
| video = cv2.VideoCapture(video_path) | |
| total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| random_frame = random.randint(0, total_frames - 1) | |
| video.set(cv2.CAP_PROP_POS_FRAMES, random_frame) | |
| success, frame = video.read() | |
| if success: | |
| cv2.imwrite(thumbnail_path, frame) | |
| video.release() | |
| return success | |
| def save_custom_thumbnail(thumbnail_file, thumbnail_path): | |
| img = Image.open(thumbnail_file) | |
| img.save(thumbnail_path) | |
| return True | |
| def get_video_length(video_path): | |
| video = cv2.VideoCapture(video_path) | |
| fps = video.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = int(total_frames / fps) if fps > 0 else 0 | |
| video.release() | |
| return duration | |
| def generate_metadata(video_name, title, description, uploader, file_location, thumbnail_location, subtitle_location, duration): | |
| return { | |
| "fileName": video_name, | |
| "title": title, | |
| "description": description, | |
| "uploader": uploader, | |
| "uploadTimestamp": datetime.now().isoformat(), | |
| "fileLocation": file_location, | |
| "thumbnailLocation": thumbnail_location, | |
| "subtitleLocation": subtitle_location, | |
| "duration": duration, | |
| "views": 0, | |
| "likes": 0 | |
| } | |
| def update_index_file(new_metadata_path): | |
| temp_dir = "temp_repo" | |
| # Remove existing temp directory if it exists | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| try: | |
| # Clone the Hugging Face repo | |
| subprocess.run('GIT_LFS_SKIP_SMUDGE=1 git clone https://huggingface.co/spaces/vericudebuget/ok4231 ' + temp_dir, | |
| shell=True, | |
| check=True) | |
| # Find all existing JSON metadata files | |
| metadata_dir = os.path.join(temp_dir, 'metadata') | |
| json_files = glob.glob(os.path.join(metadata_dir, '*-index.json')) | |
| base_url = "https://huggingface.co/spaces/vericudebuget/ok4231/raw/main/metadata/" | |
| paths = [] | |
| # Collect existing metadata files with timestamps | |
| for f in json_files: | |
| file_timestamp = datetime.now().isoformat() # Get the current timestamp | |
| file_path = f"{base_url}{os.path.basename(f)}" | |
| paths.append({"url": file_path, "timestamp": file_timestamp}) | |
| # Add the new metadata file with the current timestamp | |
| new_metadata_filename = os.path.basename(new_metadata_path) | |
| new_full_path = f"{base_url}{new_metadata_filename}" | |
| file_timestamp = datetime.now().isoformat() # Get timestamp for the new metadata file | |
| # Check if the new file is already in the list, if not, add it | |
| if not any(entry['url'] == new_full_path for entry in paths): | |
| paths.append({"url": new_full_path, "timestamp": file_timestamp}) | |
| # Sort the paths by timestamp in descending order (latest to oldest) | |
| paths.sort(key=lambda x: x['timestamp'], reverse=True) | |
| # Convert the paths list to a JSON format | |
| index_content = json.dumps(paths, indent=2) | |
| # Write the sorted index to 'video-index.json' | |
| index_path = os.path.join(temp_dir, 'metadata', 'video-index.json') | |
| os.makedirs(os.path.dirname(index_path), exist_ok=True) | |
| with open(index_path, 'w') as f: | |
| f.write(index_content) | |
| # Upload the updated index file to the Hugging Face space | |
| hf_api.upload_file( | |
| path_or_fileobj=index_path, | |
| path_in_repo="metadata/video-index.json", | |
| repo_id="vericudebuget/ok4231", | |
| repo_type="space", | |
| ) | |
| finally: | |
| # Clean up by removing the temp directory | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| def create_subtitles(video_path): # Renamed from generate_subtitles | |
| with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_audio: | |
| # Convert video to mono 128kbps MP3 | |
| audio = AudioSegment.from_file(video_path) | |
| audio = audio.set_channels(1).set_frame_rate(44100).set_sample_width(2) | |
| audio.export(temp_audio.name, format='mp3', bitrate='128k') | |
| # Generate subtitles using Groq | |
| with open(temp_audio.name, 'rb') as audio_file: | |
| translation = groq_client.audio.translations.create( | |
| file=(temp_audio.name, audio_file.read()), | |
| model="whisper-large-v3", | |
| response_format="verbose_json", | |
| temperature=0.0 | |
| ) | |
| # Generate VTT content | |
| vtt_content = "WEBVTT\n\n" | |
| for segment in translation.segments: | |
| start_time = segment['start'] | |
| end_time = segment['end'] | |
| text = segment['text'].strip() | |
| start_time_vtt = f"{int(start_time // 3600):02}:{int((start_time % 3600) // 60):02}:{start_time % 60:06.3f}" | |
| end_time_vtt = f"{int(end_time // 3600):02}:{int((end_time % 3600) // 60):02}:{end_time % 60:06.3f}" | |
| vtt_content += f"{start_time_vtt} --> {end_time_vtt}\n{text}\n\n" | |
| os.unlink(temp_audio.name) # Clean up temp file | |
| return vtt_content | |
| def upload_video_to_hf(video_file, original_video_name, title, description, uploader, should_generate_subs=False, custom_thumbnail=None): | |
| temp_dir = "temp" | |
| if not os.path.exists(temp_dir): | |
| os.makedirs(temp_dir) | |
| try: | |
| video_name = add_random_to_filename(original_video_name) | |
| video_path = os.path.join(temp_dir, video_name) | |
| base_name = os.path.splitext(video_name)[0] | |
| thumbnail_name = f"{base_name}_thumb.jpg" | |
| thumbnail_path = os.path.join(temp_dir, thumbnail_name) | |
| json_name = f"{base_name}-index.json" | |
| json_path = os.path.join(temp_dir, json_name) | |
| with open(video_path, "wb") as f: | |
| f.write(video_file.read()) | |
| if custom_thumbnail: | |
| thumbnail_extracted = save_custom_thumbnail(custom_thumbnail, thumbnail_path) | |
| else: | |
| thumbnail_extracted = extract_thumbnail(video_path, thumbnail_path) | |
| if not thumbnail_extracted: | |
| st.error("Failed to process thumbnail") | |
| return None | |
| video_length = get_video_length(video_path) | |
| # Analyze audio level | |
| audio = AudioSegment.from_file(video_path) | |
| audio_dBFS = audio.dBFS | |
| # Generate and upload subtitles if requested and video is not too long | |
| subtitle_location = "" | |
| if should_generate_subs and video_length <= 3600: # 1 hour in seconds | |
| if audio_dBFS < -90: | |
| subtitle_location = "" # Set to empty if audio is too quiet | |
| else: | |
| try: | |
| vtt_content = create_subtitles(video_path) # Using renamed function | |
| subtitle_name = f"{base_name}.vtt" | |
| subtitle_path = os.path.join(temp_dir, subtitle_name) | |
| with open(subtitle_path, 'w') as f: | |
| f.write(vtt_content) | |
| subtitle_location = f"subtitles/{subtitle_name}" | |
| hf_api.upload_file( | |
| path_or_fileobj=subtitle_path, | |
| path_in_repo=subtitle_location, | |
| repo_id="vericudebuget/ok4231", | |
| repo_type="space", | |
| ) | |
| except Exception as e: | |
| st.warning(f"Failed to generate subtitles: {str(e)}") | |
| # Upload video and thumbnail | |
| video_location = f"videos/{video_name}" | |
| hf_api.upload_file( | |
| path_or_fileobj=video_path, | |
| path_in_repo=video_location, | |
| repo_id="vericudebuget/ok4231", | |
| repo_type="space", | |
| ) | |
| thumbnail_location = f"thumbnails/{thumbnail_name}" | |
| hf_api.upload_file( | |
| path_or_fileobj=thumbnail_path, | |
| path_in_repo=thumbnail_location, | |
| repo_id="vericudebuget/ok4231", | |
| repo_type="space", | |
| ) | |
| # Generate and upload metadata | |
| metadata = generate_metadata(video_name, title, description, uploader, video_location, thumbnail_location, subtitle_location, video_length) | |
| with open(json_path, "w") as f: | |
| json.dump(metadata, f, indent=2) | |
| metadata_location = f"metadata/{json_name}" | |
| hf_api.upload_file( | |
| path_or_fileobj=json_path, | |
| path_in_repo=metadata_location, | |
| repo_id="vericudebuget/ok4231", | |
| repo_type="space", | |
| ) | |
| update_index_file(metadata_location) | |
| return metadata | |
| finally: | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| # Streamlit app interface | |
| st.title("Upload your video") | |
| st.markdown("---") | |
| uploaded_video = st.file_uploader("Choose video file", type=["mp4", "avi", "mov", "webm", "mkv"]) | |
| if uploaded_video: | |
| with st.form("video_details"): | |
| st.write("Video Details") | |
| title = st.text_input("Title", placeholder="Enter video title") | |
| description = st.text_area("Description", placeholder="Enter video description") | |
| uploader = st.text_input("Uploader Name", placeholder="Enter your name") | |
| # Create a temporary file to get video duration | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_video: | |
| temp_video.write(uploaded_video.getvalue()) | |
| video_duration = get_video_length(temp_video.name) | |
| os.unlink(temp_video.name) # Clean up temp file | |
| # Subtitle generation toggle, disabled if video is longer than 2 hours | |
| should_generate_subs = st.toggle("Generate Subtitles. - If enabled, the subtitles will automatically be translated into English.", disabled=video_duration > 7200, value=True) # Renamed variable | |
| if video_duration > 1180: | |
| st.warning("Hey there! Just wanted to warn you that uploading pirated movies is not allowed.") | |
| if video_duration > 3600 and should_generate_subs: | |
| st.warning("Warning, for videos longer than an hour, generating subtitles will take some time! Please wait :)") | |
| if video_duration > 7000: | |
| st.warning("Now that's a long video. It will take a long time to upload. Make sure you have the right uploader details!") | |
| custom_thumbnail = st.file_uploader("Upload custom thumbnail (optional)", type=["jpg", "jpeg", "png"]) | |
| submit_button = st.form_submit_button("Upload Video") | |
| if submit_button: | |
| if not title or not uploader: | |
| st.error("Please fill in the title and uploader name.") | |
| else: | |
| with st.spinner("Uploading video, generating thumbnail and metadata... This may take some time. Please wait."): | |
| metadata = upload_video_to_hf( | |
| uploaded_video, | |
| uploaded_video.name, | |
| title, | |
| description, | |
| uploader, | |
| should_generate_subs, # Using renamed variable | |
| custom_thumbnail | |
| ) | |
| if metadata: | |
| st.success("Upload completed successfully!") | |
| st.json(metadata) | |
| else: | |
| st.info("Please upload a video file to begin.") |