hub / app.py
lexicalspace's picture
Update app.py
408bf44 verified
import yt_dlp
import os
import gradio as gr
import random
import time
import threading
import requests
from gradio_client import Client
# =====================================================
# CONFIG & GLOBALS
# =====================================================
# --- Downloader Config ---
PROXY_SPACE_URL = "lexicalspace/emergency"
OUTPUT_DIR = "downloads"
os.makedirs(OUTPUT_DIR, exist_ok=True)
MAX_CONCURRENCY = 6
MAX_QUEUE = 20
active_jobs = {}
job_lock = threading.Lock()
# --- Traffic Bot Config ---
TRAFFIC_PROXY_SOURCE = "lexicalspace/emergency"
HISTORY_FILE = "used_proxies.txt"
traffic_active = False # Global flag to control the loop
traffic_log_history = [] # Store logs to display in UI
# Locks for Traffic Bot
file_lock = threading.Lock()
traffic_log_lock = threading.Lock()
# =====================================================
# TRAFFIC BOT LOGIC (Fixed)
# =====================================================
def log_traffic(msg):
"""Thread-safe logging for the traffic bot"""
with traffic_log_lock:
timestamp = time.strftime("%H:%M:%S")
entry = f"[{timestamp}] {msg}"
traffic_log_history.append(entry)
# Keep log size manageable
if len(traffic_log_history) > 50:
traffic_log_history.pop(0)
def get_traffic_logs():
"""Returns current logs joined as string"""
with traffic_log_lock:
return "\n".join(reversed(traffic_log_history))
def save_proxy_as_used(proxy):
with file_lock:
with open(HISTORY_FILE, "a") as f:
f.write(f"{proxy}\n")
def get_fresh_proxies(needed_count):
"""
Fetches proxies and handles various API return formats
(Strings, Lists, or Nested Lists) to prevent 'unhashable type' errors.
"""
# 1. Load History
used_proxies = set()
if os.path.exists(HISTORY_FILE):
with open(HISTORY_FILE, "r") as f:
used_proxies = set(line.strip() for line in f)
# 2. Fetch from Hugging Face
try:
client = Client(TRAFFIC_PROXY_SOURCE)
result = client.predict(api_name="/get_proxies_api")
all_proxies = []
# --- FIX START: ROBUST PARSING ---
# Handle case where result is a nested list like [['p1', 'p2']]
if isinstance(result, (list, tuple)) and len(result) > 0 and isinstance(result[0], list):
all_proxies = result[0]
# Handle case where result is a string representation
elif isinstance(result, str):
all_proxies = eval(result) if "[" in result else result.splitlines()
# Handle case where result is a simple list
elif isinstance(result, list):
all_proxies = result
# --- FIX END ---
# 3. Filter (Ensure p is a string before checking set membership)
fresh_proxies = []
for p in all_proxies:
if isinstance(p, str) and p not in used_proxies:
fresh_proxies.append(p)
if len(fresh_proxies) < needed_count:
# Only log if significantly low to avoid spamming "0 fresh"
if len(fresh_proxies) == 0:
log_traffic(f"⚠️ No fresh proxies found (History: {len(used_proxies)})")
return fresh_proxies
return fresh_proxies[:needed_count]
except Exception as e:
log_traffic(f"❌ Proxy API Error: {str(e)[:50]}")
return []
def visitor_bot(proxy, target_url, dwell_time, thread_id):
proxies = {"http": proxy, "https": proxy}
try:
# STEP 1: ARRIVAL
# log_traffic(f"➡️ [T{thread_id}] Arriving...")
r1 = requests.get(target_url, proxies=proxies, timeout=15)
if r1.status_code != 200:
save_proxy_as_used(proxy)
return
# STEP 2: DWELL
time.sleep(dwell_time)
# STEP 3: EXIT SIGNAL
r2 = requests.get(target_url, proxies=proxies, timeout=15)
if r2.status_code == 200:
log_traffic(f"✅ [T{thread_id}] Session Complete.")
# STEP 4: BURN PROXY
save_proxy_as_used(proxy)
except Exception:
pass # Silently fail to keep logs clean
def traffic_manager(target_url, duration_mins, concurrency):
global traffic_active
if traffic_active:
return "⚠️ Already Running"
traffic_active = True
start_time = time.time()
end_time = start_time + (duration_mins * 60)
dwell_time = 65
log_traffic(f"🚀 STARTING: {duration_mins}m Timer | Target: {target_url}")
while time.time() < end_time and traffic_active:
remaining = int((end_time - time.time()) / 60)
# log_traffic(f"🔄 Cycle Start (Time left: ~{remaining}m)")
proxies = get_fresh_proxies(concurrency)
if not proxies:
log_traffic("⏳ Waiting for fresh proxies (10s)...")
time.sleep(10)
continue
log_traffic(f"⚡ Launching {len(proxies)} bots...")
threads = []
for i, proxy in enumerate(proxies):
if not traffic_active: break
t = threading.Thread(target=visitor_bot, args=(proxy, target_url, dwell_time, i+1))
threads.append(t)
t.start()
time.sleep(0.1)
# Wait for this batch to finish
for t in threads:
t.join()
traffic_active = False
log_traffic("🏁 TIMER ENDED. Traffic stopped.")
return "Stopped"
def start_traffic_thread(url, mins, conc):
if traffic_active:
return "⚠️ Already Running"
threading.Thread(target=traffic_manager, args=(url, mins, conc)).start()
return "✅ Traffic Started (Check Logs)"
def stop_traffic():
global traffic_active
traffic_active = False
return "🛑 Stopping after current batch..."
# =====================================================
# DOWNLOADER LOGIC (Script A)
# =====================================================
def get_proxy_batch():
try:
client = Client(PROXY_SPACE_URL)
result = client.predict(api_name="/get_proxies")
proxies = result[0]
if proxies:
random.shuffle(proxies)
return [f"http://{p}" for p in proxies[:6]]
except:
pass
return []
def get_best_proxy():
try:
client = Client(PROXY_SPACE_URL)
result = client.predict(api_name="/get_proxies")
proxies = result[0]
if proxies:
return f"http://{random.choice(proxies)}"
except:
pass
return None
def core_download_v1(url, progress=gr.Progress()):
engine = "V1 • Stable"
if not url: return None, "⚠️ No URL", "Idle"
with job_lock:
if active_jobs.get(url) == "running":
return None, "⛔ Already running", "Blocked"
active_jobs[url] = "running"
log = [f"🛡️ {engine}"]
proxy_pool = get_proxy_batch() or [None]
success, final_file = False, None
for proxy in proxy_pool:
strat = random.choice(["android", "ios", "web"])
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s.%(ext)s'),
'postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': 'mp3'}],
'noplaylist': True, 'quiet': True, 'retries': 3,
'extractor_args': {'youtube': {'player_client': [strat]}},
}
if proxy: ydl_opts['proxy'] = proxy
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
final_file = os.path.splitext(ydl.prepare_filename(info))[0] + ".mp3"
if os.path.exists(final_file):
success = True; break
except Exception as e: log.append(f"❌ {str(e)[:50]}")
with job_lock: active_jobs.pop(url, None)
return (final_file, "✅ Done", "\n".join(log)) if success else (None, "❌ Failed", "\n".join(log))
def core_download_v2(url, progress=gr.Progress()):
engine = "V2 • Fast"
if not url: return None, "⚠️ No URL", "Idle"
log = [f"⚡ {engine}"]
proxy = get_best_proxy()
strategies = ["android", "ios", "web", "tv"]
for strat in strategies:
log.append(f"🚀 Client: {strat}")
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s.%(ext)s'),
'postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': 'mp3'}],
'noplaylist': True, 'quiet': True, 'retries': 5,
'extractor_args': {'youtube': {'player_client': [strat]}},
}
if proxy: ydl_opts['proxy'] = proxy
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
final_file = os.path.splitext(ydl.prepare_filename(info))[0] + ".mp3"
if os.path.exists(final_file): return final_file, "✅ Done", "\n".join(log)
except Exception as e: log.append(f"❌ {str(e)[:50]}")
return None, "❌ Failed", "\n".join(log)
def download_router(mode, url, progress=gr.Progress()):
if mode == "Auto (Smart)":
chosen = "V2" if (len(url) > 70 or "list=" in url) else "V1"
else:
chosen = "V1" if "V1" in mode else "V2"
return core_download_v1(url, progress) if chosen == "V1" else core_download_v2(url, progress)
# =====================================================
# UI
# =====================================================
with gr.Blocks(title="UltraMax Suite") as app:
with gr.Row():
# --- SIDEBAR ---
with gr.Column(scale=1, variant="panel"):
gr.Markdown("### ⚙️ Engine Control")
engine_mode = gr.Radio(
["Auto (Smart)", "Downloader V1", "Downloader V2"],
value="Auto (Smart)", label="Download Engine"
)
gr.Markdown("---")
gr.Markdown("### 🚀 Traffic Generator")
tr_url = gr.Textbox(label="Target Blog URL", value="https://your-blog.com")
tr_time = gr.Slider(minimum=1, maximum=60, value=20, step=1, label="Duration (Minutes)")
tr_conc = gr.Slider(minimum=10, maximum=100, value=50, step=10, label="Concurrency")
with gr.Row():
tr_start = gr.Button("▶ Run", variant="primary", size="sm")
tr_stop = gr.Button("⏹ Stop", variant="stop", size="sm")
tr_status = gr.Textbox(label="Traffic Logs", lines=6, interactive=False)
# Timer to refresh logs
log_refresher = gr.Timer(value=2)
# --- MAIN CONTENT ---
with gr.Column(scale=3):
gr.Markdown("## 🎵 UltraMax Downloader")
url_input = gr.Textbox(label="YouTube URL", placeholder="Paste link here")
dl_btn = gr.Button("Download MP3", variant="primary")
with gr.Row():
file_out = gr.File(label="File Output")
status = gr.Textbox(label="Status")
logs = gr.Accordion("Debug Logs", open=False)
with logs:
log_box = gr.Textbox(show_label=False, lines=10)
# --- EVENTS ---
# Downloader Events
dl_btn.click(
fn=download_router,
inputs=[engine_mode, url_input],
outputs=[file_out, status, log_box],
concurrency_limit=MAX_CONCURRENCY
)
# Traffic Bot Events
tr_start.click(fn=start_traffic_thread, inputs=[tr_url, tr_time, tr_conc], outputs=None)
tr_stop.click(fn=stop_traffic, inputs=None, outputs=None)
log_refresher.tick(fn=get_traffic_logs, inputs=None, outputs=tr_status)
app.queue(max_size=MAX_QUEUE)
app.launch()