Spaces:
Paused
Paused
| import yt_dlp | |
| import os | |
| import gradio as gr | |
| import random | |
| import time | |
| import threading | |
| import requests | |
| from gradio_client import Client | |
| # ===================================================== | |
| # CONFIG & GLOBALS | |
| # ===================================================== | |
| # --- Downloader Config --- | |
| PROXY_SPACE_URL = "lexicalspace/emergency" | |
| OUTPUT_DIR = "downloads" | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| MAX_CONCURRENCY = 6 | |
| MAX_QUEUE = 20 | |
| active_jobs = {} | |
| job_lock = threading.Lock() | |
| # --- Traffic Bot Config --- | |
| TRAFFIC_PROXY_SOURCE = "lexicalspace/emergency" | |
| HISTORY_FILE = "used_proxies.txt" | |
| traffic_active = False # Global flag to control the loop | |
| traffic_log_history = [] # Store logs to display in UI | |
| # Locks for Traffic Bot | |
| file_lock = threading.Lock() | |
| traffic_log_lock = threading.Lock() | |
| # ===================================================== | |
| # TRAFFIC BOT LOGIC (Fixed) | |
| # ===================================================== | |
| def log_traffic(msg): | |
| """Thread-safe logging for the traffic bot""" | |
| with traffic_log_lock: | |
| timestamp = time.strftime("%H:%M:%S") | |
| entry = f"[{timestamp}] {msg}" | |
| traffic_log_history.append(entry) | |
| # Keep log size manageable | |
| if len(traffic_log_history) > 50: | |
| traffic_log_history.pop(0) | |
| def get_traffic_logs(): | |
| """Returns current logs joined as string""" | |
| with traffic_log_lock: | |
| return "\n".join(reversed(traffic_log_history)) | |
| def save_proxy_as_used(proxy): | |
| with file_lock: | |
| with open(HISTORY_FILE, "a") as f: | |
| f.write(f"{proxy}\n") | |
| def get_fresh_proxies(needed_count): | |
| """ | |
| Fetches proxies and handles various API return formats | |
| (Strings, Lists, or Nested Lists) to prevent 'unhashable type' errors. | |
| """ | |
| # 1. Load History | |
| used_proxies = set() | |
| if os.path.exists(HISTORY_FILE): | |
| with open(HISTORY_FILE, "r") as f: | |
| used_proxies = set(line.strip() for line in f) | |
| # 2. Fetch from Hugging Face | |
| try: | |
| client = Client(TRAFFIC_PROXY_SOURCE) | |
| result = client.predict(api_name="/get_proxies_api") | |
| all_proxies = [] | |
| # --- FIX START: ROBUST PARSING --- | |
| # Handle case where result is a nested list like [['p1', 'p2']] | |
| if isinstance(result, (list, tuple)) and len(result) > 0 and isinstance(result[0], list): | |
| all_proxies = result[0] | |
| # Handle case where result is a string representation | |
| elif isinstance(result, str): | |
| all_proxies = eval(result) if "[" in result else result.splitlines() | |
| # Handle case where result is a simple list | |
| elif isinstance(result, list): | |
| all_proxies = result | |
| # --- FIX END --- | |
| # 3. Filter (Ensure p is a string before checking set membership) | |
| fresh_proxies = [] | |
| for p in all_proxies: | |
| if isinstance(p, str) and p not in used_proxies: | |
| fresh_proxies.append(p) | |
| if len(fresh_proxies) < needed_count: | |
| # Only log if significantly low to avoid spamming "0 fresh" | |
| if len(fresh_proxies) == 0: | |
| log_traffic(f"⚠️ No fresh proxies found (History: {len(used_proxies)})") | |
| return fresh_proxies | |
| return fresh_proxies[:needed_count] | |
| except Exception as e: | |
| log_traffic(f"❌ Proxy API Error: {str(e)[:50]}") | |
| return [] | |
| def visitor_bot(proxy, target_url, dwell_time, thread_id): | |
| proxies = {"http": proxy, "https": proxy} | |
| try: | |
| # STEP 1: ARRIVAL | |
| # log_traffic(f"➡️ [T{thread_id}] Arriving...") | |
| r1 = requests.get(target_url, proxies=proxies, timeout=15) | |
| if r1.status_code != 200: | |
| save_proxy_as_used(proxy) | |
| return | |
| # STEP 2: DWELL | |
| time.sleep(dwell_time) | |
| # STEP 3: EXIT SIGNAL | |
| r2 = requests.get(target_url, proxies=proxies, timeout=15) | |
| if r2.status_code == 200: | |
| log_traffic(f"✅ [T{thread_id}] Session Complete.") | |
| # STEP 4: BURN PROXY | |
| save_proxy_as_used(proxy) | |
| except Exception: | |
| pass # Silently fail to keep logs clean | |
| def traffic_manager(target_url, duration_mins, concurrency): | |
| global traffic_active | |
| if traffic_active: | |
| return "⚠️ Already Running" | |
| traffic_active = True | |
| start_time = time.time() | |
| end_time = start_time + (duration_mins * 60) | |
| dwell_time = 65 | |
| log_traffic(f"🚀 STARTING: {duration_mins}m Timer | Target: {target_url}") | |
| while time.time() < end_time and traffic_active: | |
| remaining = int((end_time - time.time()) / 60) | |
| # log_traffic(f"🔄 Cycle Start (Time left: ~{remaining}m)") | |
| proxies = get_fresh_proxies(concurrency) | |
| if not proxies: | |
| log_traffic("⏳ Waiting for fresh proxies (10s)...") | |
| time.sleep(10) | |
| continue | |
| log_traffic(f"⚡ Launching {len(proxies)} bots...") | |
| threads = [] | |
| for i, proxy in enumerate(proxies): | |
| if not traffic_active: break | |
| t = threading.Thread(target=visitor_bot, args=(proxy, target_url, dwell_time, i+1)) | |
| threads.append(t) | |
| t.start() | |
| time.sleep(0.1) | |
| # Wait for this batch to finish | |
| for t in threads: | |
| t.join() | |
| traffic_active = False | |
| log_traffic("🏁 TIMER ENDED. Traffic stopped.") | |
| return "Stopped" | |
| def start_traffic_thread(url, mins, conc): | |
| if traffic_active: | |
| return "⚠️ Already Running" | |
| threading.Thread(target=traffic_manager, args=(url, mins, conc)).start() | |
| return "✅ Traffic Started (Check Logs)" | |
| def stop_traffic(): | |
| global traffic_active | |
| traffic_active = False | |
| return "🛑 Stopping after current batch..." | |
| # ===================================================== | |
| # DOWNLOADER LOGIC (Script A) | |
| # ===================================================== | |
| def get_proxy_batch(): | |
| try: | |
| client = Client(PROXY_SPACE_URL) | |
| result = client.predict(api_name="/get_proxies") | |
| proxies = result[0] | |
| if proxies: | |
| random.shuffle(proxies) | |
| return [f"http://{p}" for p in proxies[:6]] | |
| except: | |
| pass | |
| return [] | |
| def get_best_proxy(): | |
| try: | |
| client = Client(PROXY_SPACE_URL) | |
| result = client.predict(api_name="/get_proxies") | |
| proxies = result[0] | |
| if proxies: | |
| return f"http://{random.choice(proxies)}" | |
| except: | |
| pass | |
| return None | |
| def core_download_v1(url, progress=gr.Progress()): | |
| engine = "V1 • Stable" | |
| if not url: return None, "⚠️ No URL", "Idle" | |
| with job_lock: | |
| if active_jobs.get(url) == "running": | |
| return None, "⛔ Already running", "Blocked" | |
| active_jobs[url] = "running" | |
| log = [f"🛡️ {engine}"] | |
| proxy_pool = get_proxy_batch() or [None] | |
| success, final_file = False, None | |
| for proxy in proxy_pool: | |
| strat = random.choice(["android", "ios", "web"]) | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s.%(ext)s'), | |
| 'postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': 'mp3'}], | |
| 'noplaylist': True, 'quiet': True, 'retries': 3, | |
| 'extractor_args': {'youtube': {'player_client': [strat]}}, | |
| } | |
| if proxy: ydl_opts['proxy'] = proxy | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| final_file = os.path.splitext(ydl.prepare_filename(info))[0] + ".mp3" | |
| if os.path.exists(final_file): | |
| success = True; break | |
| except Exception as e: log.append(f"❌ {str(e)[:50]}") | |
| with job_lock: active_jobs.pop(url, None) | |
| return (final_file, "✅ Done", "\n".join(log)) if success else (None, "❌ Failed", "\n".join(log)) | |
| def core_download_v2(url, progress=gr.Progress()): | |
| engine = "V2 • Fast" | |
| if not url: return None, "⚠️ No URL", "Idle" | |
| log = [f"⚡ {engine}"] | |
| proxy = get_best_proxy() | |
| strategies = ["android", "ios", "web", "tv"] | |
| for strat in strategies: | |
| log.append(f"🚀 Client: {strat}") | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s.%(ext)s'), | |
| 'postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': 'mp3'}], | |
| 'noplaylist': True, 'quiet': True, 'retries': 5, | |
| 'extractor_args': {'youtube': {'player_client': [strat]}}, | |
| } | |
| if proxy: ydl_opts['proxy'] = proxy | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| final_file = os.path.splitext(ydl.prepare_filename(info))[0] + ".mp3" | |
| if os.path.exists(final_file): return final_file, "✅ Done", "\n".join(log) | |
| except Exception as e: log.append(f"❌ {str(e)[:50]}") | |
| return None, "❌ Failed", "\n".join(log) | |
| def download_router(mode, url, progress=gr.Progress()): | |
| if mode == "Auto (Smart)": | |
| chosen = "V2" if (len(url) > 70 or "list=" in url) else "V1" | |
| else: | |
| chosen = "V1" if "V1" in mode else "V2" | |
| return core_download_v1(url, progress) if chosen == "V1" else core_download_v2(url, progress) | |
| # ===================================================== | |
| # UI | |
| # ===================================================== | |
| with gr.Blocks(title="UltraMax Suite") as app: | |
| with gr.Row(): | |
| # --- SIDEBAR --- | |
| with gr.Column(scale=1, variant="panel"): | |
| gr.Markdown("### ⚙️ Engine Control") | |
| engine_mode = gr.Radio( | |
| ["Auto (Smart)", "Downloader V1", "Downloader V2"], | |
| value="Auto (Smart)", label="Download Engine" | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("### 🚀 Traffic Generator") | |
| tr_url = gr.Textbox(label="Target Blog URL", value="https://your-blog.com") | |
| tr_time = gr.Slider(minimum=1, maximum=60, value=20, step=1, label="Duration (Minutes)") | |
| tr_conc = gr.Slider(minimum=10, maximum=100, value=50, step=10, label="Concurrency") | |
| with gr.Row(): | |
| tr_start = gr.Button("▶ Run", variant="primary", size="sm") | |
| tr_stop = gr.Button("⏹ Stop", variant="stop", size="sm") | |
| tr_status = gr.Textbox(label="Traffic Logs", lines=6, interactive=False) | |
| # Timer to refresh logs | |
| log_refresher = gr.Timer(value=2) | |
| # --- MAIN CONTENT --- | |
| with gr.Column(scale=3): | |
| gr.Markdown("## 🎵 UltraMax Downloader") | |
| url_input = gr.Textbox(label="YouTube URL", placeholder="Paste link here") | |
| dl_btn = gr.Button("Download MP3", variant="primary") | |
| with gr.Row(): | |
| file_out = gr.File(label="File Output") | |
| status = gr.Textbox(label="Status") | |
| logs = gr.Accordion("Debug Logs", open=False) | |
| with logs: | |
| log_box = gr.Textbox(show_label=False, lines=10) | |
| # --- EVENTS --- | |
| # Downloader Events | |
| dl_btn.click( | |
| fn=download_router, | |
| inputs=[engine_mode, url_input], | |
| outputs=[file_out, status, log_box], | |
| concurrency_limit=MAX_CONCURRENCY | |
| ) | |
| # Traffic Bot Events | |
| tr_start.click(fn=start_traffic_thread, inputs=[tr_url, tr_time, tr_conc], outputs=None) | |
| tr_stop.click(fn=stop_traffic, inputs=None, outputs=None) | |
| log_refresher.tick(fn=get_traffic_logs, inputs=None, outputs=tr_status) | |
| app.queue(max_size=MAX_QUEUE) | |
| app.launch() |