Spaces:
Runtime error
Runtime error
| #app.py | |
| import streamlit as st | |
| import subprocess | |
| import os | |
| import glob | |
| import time | |
| import streamlit_antd_components as sac | |
| import uuid | |
| import shutil | |
| import re | |
| import threading | |
| import psutil | |
| from video_processor import process_video, get_status, get_error_details, status_dict, cancel_job, cleanup_cancelled_jobs | |
| st.set_page_config(layout="wide") | |
| # Cache the check_dependencies function | |
| def check_dependencies(): | |
| dependencies_status = {} | |
| # Check ffmpeg | |
| ffmpeg_path = shutil.which('ffmpeg') | |
| if ffmpeg_path: | |
| dependencies_status['ffmpeg'] = "installed" | |
| try: | |
| result = subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) | |
| dependencies_status['ffmpeg_version'] = result.stdout.split('\n')[0] | |
| except subprocess.CalledProcessError: | |
| dependencies_status['ffmpeg'] = "error" | |
| else: | |
| dependencies_status['ffmpeg'] = "not accessible" | |
| # Check yt-dlp and auto-editor | |
| for dep in ['yt-dlp', 'auto-editor']: | |
| try: | |
| result = subprocess.run([dep, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) | |
| dependencies_status[dep] = "installed" | |
| dependencies_status[f"{dep}_version"] = result.stdout.split('\n')[0] | |
| except FileNotFoundError: | |
| dependencies_status[dep] = "not installed" | |
| except subprocess.CalledProcessError: | |
| dependencies_status[dep] = "not accessible" | |
| return dependencies_status | |
| def display_dependencies_status(): | |
| with st.sidebar: | |
| st.header("Dependency Check") | |
| dependencies_status = check_dependencies() | |
| # Display ffmpeg status | |
| if dependencies_status['ffmpeg'] == "installed": | |
| st.success("ffmpeg is installed and accessible.") | |
| st.text(dependencies_status['ffmpeg_version']) | |
| elif dependencies_status['ffmpeg'] == "error": | |
| st.warning("ffmpeg is found but there was an error running it.") | |
| else: | |
| st.error("ffmpeg is not accessible.") | |
| # Display yt-dlp and auto-editor status | |
| for dep in ['yt-dlp', 'auto-editor']: | |
| if dependencies_status[dep] == "installed": | |
| st.success(f"{dep} is installed and accessible.") | |
| st.text(dependencies_status[f"{dep}_version"]) | |
| elif dependencies_status[dep] == "not installed": | |
| st.error(f"{dep} is not installed.") | |
| else: | |
| st.error(f"{dep} is not accessible.") | |
| # Add button to delete all output folders | |
| if st.button("Delete All Output Folders"): | |
| delete_all_output_folders() | |
| # Display system statistics | |
| st.header("System Statistics") | |
| cpu_usage = psutil.cpu_percent(interval=1) | |
| memory_info = psutil.virtual_memory() | |
| disk_info = psutil.disk_usage('/') | |
| st.text(f"CPU Usage: {cpu_usage}%") | |
| st.text(f"Memory Usage: {memory_info.percent}%") | |
| st.text(f"Total Memory: {memory_info.total / (1024 ** 3):.2f} GB") | |
| st.text(f"Available Memory: {memory_info.available / (1024 ** 3):.2f} GB") | |
| st.text(f"Disk Usage: {disk_info.percent}%") | |
| st.text(f"Total Disk Space: {disk_info.total / (1024 ** 3):.2f} GB") | |
| st.text(f"Free Disk Space: {disk_info.free / (1024 ** 3):.2f} GB") | |
| def delete_all_output_folders(): | |
| output_folders = glob.glob("./output_folder_*") | |
| for folder in output_folders: | |
| shutil.rmtree(folder) | |
| st.success("All output folders have been deleted.") | |
| # Call this function when your app starts | |
| display_dependencies_status() | |
| def main(): | |
| st.title("⚡️ ISKM Spotify Service 🛎️") | |
| sac.alert(label='🪷🌹🪷Hare Krishna 🪷🌹🪷 | 🙌dandwat 🙇 pranam🙌 | 🙏please accept my humble obeisances | 🫡 🪷🌹All glories Srila Prabhupada🌹🪷🫡 | 🎵 Welcome to ISKM Spotify Service! 🎶 | 🎧 Enter a YouTube URL to transform it into divine format 🙏 ✅ v0.0.2 Nanda Gopa 👶🏻 Stage | 🚧 Maintaince Mode', size='xs', radius=55, color='orange', banner=sac.Banner(play=True, direction='right', speed=51, pauseOnHover=True), icon=True, closable=True) | |
| # Add instruction guide in an expander | |
| with st.expander("📚 Instructions"): | |
| st.markdown(""" | |
| ### How to Use ISKM Spotify Service | |
| 1. **Enter YouTube URL**: | |
| - Paste a valid YouTube video or playlist URL in the text box. | |
| - The URL should start with https://www.youtube.com/ or https://youtu.be/. | |
| 2. **Process Video**: | |
| - Click the "Process Video" button to start the conversion. | |
| - The tool will download, trim, and process the video(s). | |
| 3. **Download**: | |
| - Once processing is complete, download buttons will appear. | |
| - For single videos, you'll see one download button. | |
| - For playlists, you'll see multiple download buttons (one for each video). | |
| 4. **Dependency Check**: | |
| - Check the sidebar to ensure all required dependencies are installed and accessible. | |
| ### Notes: | |
| - Processing may take some time, especially for longer videos or playlists. | |
| - Ensure you have a stable internet connection throughout the process. | |
| - The tool will trim silence and unnecessary parts from the videos. | |
| - Final output will be in MP4 format, optimized for audio quality. | |
| If you encounter any issues, please check the dependency status in the sidebar or contact support. | |
| """) | |
| tabs = st.tabs(["Submit Job", "Check Status", "Cancel Job", "Job Table"]) | |
| with tabs[0]: | |
| st.header("Submit Job") | |
| url = st.text_input("Enter the YouTube URL:") | |
| if st.button("Process Video"): | |
| if url: | |
| session_id = str(uuid.uuid4()) | |
| status_dict[session_id] = {"status": "Processing", "started_time": time.strftime("%Y-%m-%d %H:%M:%S"), "video_url": url, "link_submitted": url} | |
| threading.Thread(target=process_video, args=(url, session_id)).start() | |
| st.success(f"{session_id}") | |
| st.info(f"Video processing started! Your session ID is ☝️ Use this ID to check the status.") | |
| else: | |
| st.warning("Please enter a YouTube URL.") | |
| with tabs[1]: | |
| st.header("Check Status") | |
| session_id_input = st.text_input("Enter your session ID:") | |
| if st.button("Check Status"): | |
| if session_id_input: | |
| status = get_status(session_id_input) | |
| st.info(f"Status: {status}") | |
| # Check if processing is completed and provide a download link | |
| if "Processing completed!" in status: | |
| file_path = status.split("File: ")[-1] | |
| with open(file_path, "rb") as file: | |
| btn = st.download_button( | |
| label="Download Processed Video", | |
| data=file, | |
| file_name=os.path.basename(file_path), | |
| mime="video/mp4" | |
| ) | |
| elif "Error occurred" in status: | |
| error_details = get_error_details(session_id_input) | |
| st.error(f"Error Details: {error_details}") | |
| else: | |
| st.warning("Please enter a session ID.") | |
| with tabs[2]: | |
| st.header("Cancel Job") | |
| cancel_session_id = st.text_input("Enter the session ID to cancel:") | |
| if st.button("Cancel Job"): | |
| if cancel_job(cancel_session_id): | |
| st.success(f"Job {cancel_session_id} has been cancelled.") | |
| else: | |
| st.warning("Invalid session ID or job already completed.") | |
| with tabs[3]: | |
| st.header("Job Table") | |
| cleanup_cancelled_jobs() # Clean up cancelled jobs before displaying the table | |
| job_data = [{"Job ID": job_id, **details} for job_id, details in status_dict.items()] | |
| st.table(job_data) | |
| if __name__ == "__main__": | |
| main() | |