| | import gradio as gr |
| | import subprocess |
| | import os |
| | import uuid |
| | import json |
| |
|
| | |
| | if not os.path.exists("node_modules"): |
| | subprocess.run(["npm", "install"], check=True) |
| |
|
| | def get_video_dimensions(url): |
| | try: |
| | cmd = [ |
| | "ffprobe", "-v", "error", "-select_streams", "v:0", |
| | "-show_entries", "stream=width,height", "-of", "json", url |
| | ] |
| | result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) |
| | data = json.loads(result.stdout) |
| | return data['streams'][0]['width'], data['streams'][0]['height'] |
| | except: |
| | return 0, 0 |
| |
|
| | def run_scraper(mode, query, orientation="Any"): |
| | """Helper to run the Node.js scraper in specific mode""" |
| | try: |
| | |
| | cmd = ["node", "scraper.js", mode, query, orientation] |
| | result = subprocess.run(cmd, capture_output=True, text=True, timeout=240) |
| | urls = [line.strip() for line in result.stdout.split('\n') if line.strip().startswith("http")] |
| | return urls |
| | except Exception as e: |
| | print(f"Scraper Error ({mode}): {e}") |
| | return [] |
| |
|
| | def process_video(query, orientation, start_time, end_time, mute_audio, progress=gr.Progress()): |
| | if not query: |
| | return None, "β οΈ Enter a keyword." |
| | |
| | search_query = query if "video" in query.lower() else f"{query} video" |
| | |
| | |
| | progress(0.1, desc="π Searching Pinterest...") |
| | all_urls = run_scraper("pinterest", search_query) |
| | |
| | |
| | selected_url = None |
| | for i, url in enumerate(all_urls): |
| | progress(0.3 + (i/len(all_urls)*0.1), desc=f"Checking Pinterest {i+1}...") |
| | if orientation == "Any": selected_url = url; break |
| | w, h = get_video_dimensions(url) |
| | if orientation == "Portrait" and h > w: selected_url = url; break |
| | if orientation == "Landscape" and w > h: selected_url = url; break |
| | |
| | |
| | if not selected_url: |
| | progress(0.5, desc=f"β οΈ No {orientation} videos on Pinterest. Switching to Pexels Fallback...") |
| | |
| | |
| | pexels_urls = run_scraper("pexels", query, orientation) |
| | |
| | for i, url in enumerate(pexels_urls): |
| | progress(0.6 + (i/len(pexels_urls)*0.1), desc=f"Checking Pexels {i+1}...") |
| | |
| | if orientation == "Any": selected_url = url; break |
| | w, h = get_video_dimensions(url) |
| | if orientation == "Portrait" and h > w: selected_url = url; break |
| | if orientation == "Landscape" and w > h: selected_url = url; break |
| |
|
| | if not selected_url: |
| | return None, f"β Failed. Searched Pinterest & Pexels, but found no {orientation} videos." |
| |
|
| | |
| | progress(0.8, desc="βοΈ Processing Video...") |
| | output_filename = f"final_{uuid.uuid4().hex[:5]}.mp4" |
| | |
| | ffmpeg_cmd = [ |
| | "ffmpeg", |
| | "-ss", str(start_time), |
| | "-to", str(end_time) if end_time > 0 else "999", |
| | "-i", selected_url, |
| | "-vf", "unsharp=3:3:1.2:3:3:0.0,format=yuv420p", |
| | "-c:v", "libx264", |
| | "-crf", "18", |
| | "-preset", "veryfast", |
| | "-y", output_filename |
| | ] |
| |
|
| | if mute_audio: |
| | ffmpeg_cmd.insert(-1, "-an") |
| | else: |
| | ffmpeg_cmd.insert(-1, "-c:a") |
| | ffmpeg_cmd.insert(-1, "aac") |
| |
|
| | subprocess.run(ffmpeg_cmd, check=True) |
| | |
| | source_name = "Pinterest" if "pinimg" in selected_url else "Pexels" |
| | return output_filename, f"β
Success! Source: {source_name} ({orientation})" |
| |
|
| | |
| | with gr.Blocks(theme=gr.themes.Default()) as demo: |
| | gr.Markdown("# π Pinterest + Pexels (Auto-Fallback)") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | q = gr.Textbox(label="Topic", placeholder="e.g. Ocean Waves") |
| | o = gr.Dropdown(choices=["Any", "Portrait", "Landscape"], value="Portrait", label="Orientation") |
| | with gr.Row(): |
| | s = gr.Number(label="Start (sec)", value=0) |
| | e = gr.Number(label="End (sec)", value=5) |
| | mute = gr.Checkbox(label="Mute Audio", value=False) |
| | btn = gr.Button("π Search & Process", variant="primary") |
| | |
| | with gr.Column(): |
| | out_v = gr.Video(label="Result Video") |
| | out_s = gr.Textbox(label="Status") |
| |
|
| | btn.click(process_video, [q, o, s, e, mute], [out_v, out_s]) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |