import yt_dlp import os import gradio as gr import random import time import threading import requests from gradio_client import Client # ===================================================== # CONFIG & GLOBALS # ===================================================== # --- Downloader Config --- PROXY_SPACE_URL = "lexicalspace/emergency" OUTPUT_DIR = "downloads" os.makedirs(OUTPUT_DIR, exist_ok=True) MAX_CONCURRENCY = 6 MAX_QUEUE = 20 active_jobs = {} job_lock = threading.Lock() # --- Traffic Bot Config --- TRAFFIC_PROXY_SOURCE = "lexicalspace/emergency" HISTORY_FILE = "used_proxies.txt" traffic_active = False # Global flag to control the loop traffic_log_history = [] # Store logs to display in UI # Locks for Traffic Bot file_lock = threading.Lock() traffic_log_lock = threading.Lock() # ===================================================== # TRAFFIC BOT LOGIC (Fixed) # ===================================================== def log_traffic(msg): """Thread-safe logging for the traffic bot""" with traffic_log_lock: timestamp = time.strftime("%H:%M:%S") entry = f"[{timestamp}] {msg}" traffic_log_history.append(entry) # Keep log size manageable if len(traffic_log_history) > 50: traffic_log_history.pop(0) def get_traffic_logs(): """Returns current logs joined as string""" with traffic_log_lock: return "\n".join(reversed(traffic_log_history)) def save_proxy_as_used(proxy): with file_lock: with open(HISTORY_FILE, "a") as f: f.write(f"{proxy}\n") def get_fresh_proxies(needed_count): """ Fetches proxies and handles various API return formats (Strings, Lists, or Nested Lists) to prevent 'unhashable type' errors. """ # 1. Load History used_proxies = set() if os.path.exists(HISTORY_FILE): with open(HISTORY_FILE, "r") as f: used_proxies = set(line.strip() for line in f) # 2. Fetch from Hugging Face try: client = Client(TRAFFIC_PROXY_SOURCE) result = client.predict(api_name="/get_proxies_api") all_proxies = [] # --- FIX START: ROBUST PARSING --- # Handle case where result is a nested list like [['p1', 'p2']] if isinstance(result, (list, tuple)) and len(result) > 0 and isinstance(result[0], list): all_proxies = result[0] # Handle case where result is a string representation elif isinstance(result, str): all_proxies = eval(result) if "[" in result else result.splitlines() # Handle case where result is a simple list elif isinstance(result, list): all_proxies = result # --- FIX END --- # 3. Filter (Ensure p is a string before checking set membership) fresh_proxies = [] for p in all_proxies: if isinstance(p, str) and p not in used_proxies: fresh_proxies.append(p) if len(fresh_proxies) < needed_count: # Only log if significantly low to avoid spamming "0 fresh" if len(fresh_proxies) == 0: log_traffic(f"⚠️ No fresh proxies found (History: {len(used_proxies)})") return fresh_proxies return fresh_proxies[:needed_count] except Exception as e: log_traffic(f"❌ Proxy API Error: {str(e)[:50]}") return [] def visitor_bot(proxy, target_url, dwell_time, thread_id): proxies = {"http": proxy, "https": proxy} try: # STEP 1: ARRIVAL # log_traffic(f"➡️ [T{thread_id}] Arriving...") r1 = requests.get(target_url, proxies=proxies, timeout=15) if r1.status_code != 200: save_proxy_as_used(proxy) return # STEP 2: DWELL time.sleep(dwell_time) # STEP 3: EXIT SIGNAL r2 = requests.get(target_url, proxies=proxies, timeout=15) if r2.status_code == 200: log_traffic(f"✅ [T{thread_id}] Session Complete.") # STEP 4: BURN PROXY save_proxy_as_used(proxy) except Exception: pass # Silently fail to keep logs clean def traffic_manager(target_url, duration_mins, concurrency): global traffic_active if traffic_active: return "⚠️ Already Running" traffic_active = True start_time = time.time() end_time = start_time + (duration_mins * 60) dwell_time = 65 log_traffic(f"🚀 STARTING: {duration_mins}m Timer | Target: {target_url}") while time.time() < end_time and traffic_active: remaining = int((end_time - time.time()) / 60) # log_traffic(f"🔄 Cycle Start (Time left: ~{remaining}m)") proxies = get_fresh_proxies(concurrency) if not proxies: log_traffic("⏳ Waiting for fresh proxies (10s)...") time.sleep(10) continue log_traffic(f"⚡ Launching {len(proxies)} bots...") threads = [] for i, proxy in enumerate(proxies): if not traffic_active: break t = threading.Thread(target=visitor_bot, args=(proxy, target_url, dwell_time, i+1)) threads.append(t) t.start() time.sleep(0.1) # Wait for this batch to finish for t in threads: t.join() traffic_active = False log_traffic("🏁 TIMER ENDED. Traffic stopped.") return "Stopped" def start_traffic_thread(url, mins, conc): if traffic_active: return "⚠️ Already Running" threading.Thread(target=traffic_manager, args=(url, mins, conc)).start() return "✅ Traffic Started (Check Logs)" def stop_traffic(): global traffic_active traffic_active = False return "🛑 Stopping after current batch..." # ===================================================== # DOWNLOADER LOGIC (Script A) # ===================================================== def get_proxy_batch(): try: client = Client(PROXY_SPACE_URL) result = client.predict(api_name="/get_proxies") proxies = result[0] if proxies: random.shuffle(proxies) return [f"http://{p}" for p in proxies[:6]] except: pass return [] def get_best_proxy(): try: client = Client(PROXY_SPACE_URL) result = client.predict(api_name="/get_proxies") proxies = result[0] if proxies: return f"http://{random.choice(proxies)}" except: pass return None def core_download_v1(url, progress=gr.Progress()): engine = "V1 • Stable" if not url: return None, "⚠️ No URL", "Idle" with job_lock: if active_jobs.get(url) == "running": return None, "⛔ Already running", "Blocked" active_jobs[url] = "running" log = [f"🛡️ {engine}"] proxy_pool = get_proxy_batch() or [None] success, final_file = False, None for proxy in proxy_pool: strat = random.choice(["android", "ios", "web"]) ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s.%(ext)s'), 'postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': 'mp3'}], 'noplaylist': True, 'quiet': True, 'retries': 3, 'extractor_args': {'youtube': {'player_client': [strat]}}, } if proxy: ydl_opts['proxy'] = proxy try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) final_file = os.path.splitext(ydl.prepare_filename(info))[0] + ".mp3" if os.path.exists(final_file): success = True; break except Exception as e: log.append(f"❌ {str(e)[:50]}") with job_lock: active_jobs.pop(url, None) return (final_file, "✅ Done", "\n".join(log)) if success else (None, "❌ Failed", "\n".join(log)) def core_download_v2(url, progress=gr.Progress()): engine = "V2 • Fast" if not url: return None, "⚠️ No URL", "Idle" log = [f"⚡ {engine}"] proxy = get_best_proxy() strategies = ["android", "ios", "web", "tv"] for strat in strategies: log.append(f"🚀 Client: {strat}") ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s.%(ext)s'), 'postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': 'mp3'}], 'noplaylist': True, 'quiet': True, 'retries': 5, 'extractor_args': {'youtube': {'player_client': [strat]}}, } if proxy: ydl_opts['proxy'] = proxy try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) final_file = os.path.splitext(ydl.prepare_filename(info))[0] + ".mp3" if os.path.exists(final_file): return final_file, "✅ Done", "\n".join(log) except Exception as e: log.append(f"❌ {str(e)[:50]}") return None, "❌ Failed", "\n".join(log) def download_router(mode, url, progress=gr.Progress()): if mode == "Auto (Smart)": chosen = "V2" if (len(url) > 70 or "list=" in url) else "V1" else: chosen = "V1" if "V1" in mode else "V2" return core_download_v1(url, progress) if chosen == "V1" else core_download_v2(url, progress) # ===================================================== # UI # ===================================================== with gr.Blocks(title="UltraMax Suite") as app: with gr.Row(): # --- SIDEBAR --- with gr.Column(scale=1, variant="panel"): gr.Markdown("### ⚙️ Engine Control") engine_mode = gr.Radio( ["Auto (Smart)", "Downloader V1", "Downloader V2"], value="Auto (Smart)", label="Download Engine" ) gr.Markdown("---") gr.Markdown("### 🚀 Traffic Generator") tr_url = gr.Textbox(label="Target Blog URL", value="https://your-blog.com") tr_time = gr.Slider(minimum=1, maximum=60, value=20, step=1, label="Duration (Minutes)") tr_conc = gr.Slider(minimum=10, maximum=100, value=50, step=10, label="Concurrency") with gr.Row(): tr_start = gr.Button("▶ Run", variant="primary", size="sm") tr_stop = gr.Button("⏹ Stop", variant="stop", size="sm") tr_status = gr.Textbox(label="Traffic Logs", lines=6, interactive=False) # Timer to refresh logs log_refresher = gr.Timer(value=2) # --- MAIN CONTENT --- with gr.Column(scale=3): gr.Markdown("## 🎵 UltraMax Downloader") url_input = gr.Textbox(label="YouTube URL", placeholder="Paste link here") dl_btn = gr.Button("Download MP3", variant="primary") with gr.Row(): file_out = gr.File(label="File Output") status = gr.Textbox(label="Status") logs = gr.Accordion("Debug Logs", open=False) with logs: log_box = gr.Textbox(show_label=False, lines=10) # --- EVENTS --- # Downloader Events dl_btn.click( fn=download_router, inputs=[engine_mode, url_input], outputs=[file_out, status, log_box], concurrency_limit=MAX_CONCURRENCY ) # Traffic Bot Events tr_start.click(fn=start_traffic_thread, inputs=[tr_url, tr_time, tr_conc], outputs=None) tr_stop.click(fn=stop_traffic, inputs=None, outputs=None) log_refresher.tick(fn=get_traffic_logs, inputs=None, outputs=tr_status) app.queue(max_size=MAX_QUEUE) app.launch()