spotify_service / app.py
Pranjal Pruthi
Add application files and dependencies
d6193e8
#app.py
import streamlit as st
import subprocess
import os
import glob
import time
import streamlit_antd_components as sac
import uuid
import shutil
import re
import threading
import psutil
from video_processor import process_video, get_status, get_error_details, status_dict, cancel_job, cleanup_cancelled_jobs
st.set_page_config(layout="wide")
# Cache the check_dependencies function
@st.cache_data
def check_dependencies():
dependencies_status = {}
# Check ffmpeg
ffmpeg_path = shutil.which('ffmpeg')
if ffmpeg_path:
dependencies_status['ffmpeg'] = "installed"
try:
result = subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
dependencies_status['ffmpeg_version'] = result.stdout.split('\n')[0]
except subprocess.CalledProcessError:
dependencies_status['ffmpeg'] = "error"
else:
dependencies_status['ffmpeg'] = "not accessible"
# Check yt-dlp and auto-editor
for dep in ['yt-dlp', 'auto-editor']:
try:
result = subprocess.run([dep, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
dependencies_status[dep] = "installed"
dependencies_status[f"{dep}_version"] = result.stdout.split('\n')[0]
except FileNotFoundError:
dependencies_status[dep] = "not installed"
except subprocess.CalledProcessError:
dependencies_status[dep] = "not accessible"
return dependencies_status
def display_dependencies_status():
with st.sidebar:
st.header("Dependency Check")
dependencies_status = check_dependencies()
# Display ffmpeg status
if dependencies_status['ffmpeg'] == "installed":
st.success("ffmpeg is installed and accessible.")
st.text(dependencies_status['ffmpeg_version'])
elif dependencies_status['ffmpeg'] == "error":
st.warning("ffmpeg is found but there was an error running it.")
else:
st.error("ffmpeg is not accessible.")
# Display yt-dlp and auto-editor status
for dep in ['yt-dlp', 'auto-editor']:
if dependencies_status[dep] == "installed":
st.success(f"{dep} is installed and accessible.")
st.text(dependencies_status[f"{dep}_version"])
elif dependencies_status[dep] == "not installed":
st.error(f"{dep} is not installed.")
else:
st.error(f"{dep} is not accessible.")
# Add button to delete all output folders
if st.button("Delete All Output Folders"):
delete_all_output_folders()
# Display system statistics
st.header("System Statistics")
cpu_usage = psutil.cpu_percent(interval=1)
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage('/')
st.text(f"CPU Usage: {cpu_usage}%")
st.text(f"Memory Usage: {memory_info.percent}%")
st.text(f"Total Memory: {memory_info.total / (1024 ** 3):.2f} GB")
st.text(f"Available Memory: {memory_info.available / (1024 ** 3):.2f} GB")
st.text(f"Disk Usage: {disk_info.percent}%")
st.text(f"Total Disk Space: {disk_info.total / (1024 ** 3):.2f} GB")
st.text(f"Free Disk Space: {disk_info.free / (1024 ** 3):.2f} GB")
def delete_all_output_folders():
output_folders = glob.glob("./output_folder_*")
for folder in output_folders:
shutil.rmtree(folder)
st.success("All output folders have been deleted.")
# Call this function when your app starts
display_dependencies_status()
def main():
st.title("⚡️ ISKM Spotify Service 🛎️")
sac.alert(label='🪷🌹🪷Hare Krishna 🪷🌹🪷 | 🙌dandwat 🙇 pranam🙌 | 🙏please accept my humble obeisances | 🫡 🪷🌹All glories Srila Prabhupada🌹🪷🫡 | 🎵 Welcome to ISKM Spotify Service! 🎶 | 🎧 Enter a YouTube URL to transform it into divine format 🙏 ✅ v0.0.2 Nanda Gopa 👶🏻 Stage | 🚧 Maintaince Mode', size='xs', radius=55, color='orange', banner=sac.Banner(play=True, direction='right', speed=51, pauseOnHover=True), icon=True, closable=True)
# Add instruction guide in an expander
with st.expander("📚 Instructions"):
st.markdown("""
### How to Use ISKM Spotify Service
1. **Enter YouTube URL**:
- Paste a valid YouTube video or playlist URL in the text box.
- The URL should start with https://www.youtube.com/ or https://youtu.be/.
2. **Process Video**:
- Click the "Process Video" button to start the conversion.
- The tool will download, trim, and process the video(s).
3. **Download**:
- Once processing is complete, download buttons will appear.
- For single videos, you'll see one download button.
- For playlists, you'll see multiple download buttons (one for each video).
4. **Dependency Check**:
- Check the sidebar to ensure all required dependencies are installed and accessible.
### Notes:
- Processing may take some time, especially for longer videos or playlists.
- Ensure you have a stable internet connection throughout the process.
- The tool will trim silence and unnecessary parts from the videos.
- Final output will be in MP4 format, optimized for audio quality.
If you encounter any issues, please check the dependency status in the sidebar or contact support.
""")
tabs = st.tabs(["Submit Job", "Check Status", "Cancel Job", "Job Table"])
with tabs[0]:
st.header("Submit Job")
url = st.text_input("Enter the YouTube URL:")
if st.button("Process Video"):
if url:
session_id = str(uuid.uuid4())
status_dict[session_id] = {"status": "Processing", "started_time": time.strftime("%Y-%m-%d %H:%M:%S"), "video_url": url, "link_submitted": url}
threading.Thread(target=process_video, args=(url, session_id)).start()
st.success(f"{session_id}")
st.info(f"Video processing started! Your session ID is ☝️ Use this ID to check the status.")
else:
st.warning("Please enter a YouTube URL.")
with tabs[1]:
st.header("Check Status")
session_id_input = st.text_input("Enter your session ID:")
if st.button("Check Status"):
if session_id_input:
status = get_status(session_id_input)
st.info(f"Status: {status}")
# Check if processing is completed and provide a download link
if "Processing completed!" in status:
file_path = status.split("File: ")[-1]
with open(file_path, "rb") as file:
btn = st.download_button(
label="Download Processed Video",
data=file,
file_name=os.path.basename(file_path),
mime="video/mp4"
)
elif "Error occurred" in status:
error_details = get_error_details(session_id_input)
st.error(f"Error Details: {error_details}")
else:
st.warning("Please enter a session ID.")
with tabs[2]:
st.header("Cancel Job")
cancel_session_id = st.text_input("Enter the session ID to cancel:")
if st.button("Cancel Job"):
if cancel_job(cancel_session_id):
st.success(f"Job {cancel_session_id} has been cancelled.")
else:
st.warning("Invalid session ID or job already completed.")
with tabs[3]:
st.header("Job Table")
cleanup_cancelled_jobs() # Clean up cancelled jobs before displaying the table
job_data = [{"Job ID": job_id, **details} for job_id, details in status_dict.items()]
st.table(job_data)
if __name__ == "__main__":
main()