video / cli.py
kines9661's picture
Upload 4 files
5f5acc5 verified
Raw
History Blame Contribute Delete
23.4 kB
import os
import re
import sys
import time
import json
import random
import argparse
import subprocess
import requests
try:
from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
from rich.table import Table
from rich.text import Text
except ImportError:
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "rich", "-q"])
from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
from rich.table import Table
from rich.text import Text
console = Console()
# ─────────────────────────────────────────────────────────
# Proxy Rotator
# ─────────────────────────────────────────────────────────
class ProxyRotator:
def __init__(self, proxy_file):
self.proxies = []
self.current_index = 0
self._load(proxy_file)
def _load(self, filename):
try:
with open(filename, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(':')
if len(parts) == 4:
# IP:PORT:USER:PASS
proxy_url = f"http://{parts[2]}:{parts[3]}@{parts[0]}:{parts[1]}"
self.proxies.append(proxy_url)
elif len(parts) == 2:
self.proxies.append(f"http://{parts[0]}:{parts[1]}")
random.shuffle(self.proxies)
console.print(f"[bold green]βœ” Loaded {len(self.proxies)} proxies![/bold green]")
except Exception as e:
console.print(f"[bold red]βœ– Failed to load proxies: {e}[/bold red]")
def next(self):
if not self.proxies:
return None
proxy = self.proxies[self.current_index % len(self.proxies)]
self.current_index += 1
return proxy
def display_current(self, proxy):
if proxy:
safe = proxy.split('@')[-1] if '@' in proxy else proxy
console.print(f"[dim cyan]πŸ”„ Proxy:[/dim cyan] [bold white]{safe}[/bold white]")
# ─────────────────────────────────────────────────────────
# VeoAI Engine
# ─────────────────────────────────────────────────────────
class VeoEngine:
AJAX_URL = "https://veoaifree.com/wp-admin/admin-ajax.php"
VIDEO_BASE = "https://veoaifree.com/video/uploads/"
MODEL_CONFIGS = {
"veo": {
"name": "Veo 3.1",
"page_url": "https://veoaifree.com/veo-video-generator/",
},
"grok": {
"name": "Grok 4.5",
"page_url": "https://veoaifree.com/grok-ai-video-generator/",
},
}
DEFAULT_MODEL = "veo"
PAGE_URL = MODEL_CONFIGS[DEFAULT_MODEL]["page_url"]
MODEL_NAME = MODEL_CONFIGS[DEFAULT_MODEL]["name"]
HEADERS = {
"accept": "*/*",
"accept-language": "en-US,en;q=0.7",
"content-type": "application/x-www-form-urlencoded; charset=UTF-8",
"sec-ch-ua": '"Brave";v="147", "Not.A/Brand";v="8", "Chromium";v="147"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Linux"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"x-requested-with": "XMLHttpRequest",
}
def __init__(self, proxy_rotator, model_key=DEFAULT_MODEL):
self.rotator = proxy_rotator
self.model_key = self.normalize_model_key(model_key)
self.model_config = self.MODEL_CONFIGS[self.model_key]
self.MODEL_NAME = self.model_config["name"]
self.PAGE_URL = self.model_config["page_url"]
self.session = None
self.nonce = None
self._build_session()
@classmethod
def normalize_model_key(cls, model_key):
model_key = (model_key or cls.DEFAULT_MODEL).strip().lower()
aliases = {
"veo": "veo",
"veo 3.1": "veo",
"grok": "grok",
"grok 4.5": "grok",
}
model_key = aliases.get(model_key, model_key)
if model_key not in cls.MODEL_CONFIGS:
raise ValueError(f"Unknown model '{model_key}'. Available models: {', '.join(cls.MODEL_CONFIGS)}")
return model_key
@classmethod
def model_choices(cls):
return list(cls.MODEL_CONFIGS.keys())
def _build_session(self):
self.session = requests.Session()
self.session.headers.update(self.HEADERS)
self.session.headers["referer"] = self.PAGE_URL
self.session.proxies = {} # direct connection, no proxy
def rotate(self):
console.print("[bold dark_orange]↻ Rotating to next proxy...[/bold dark_orange]")
self._build_session()
def _rotate_poll_proxy(self):
"""No-op β€” running direct, no proxies to rotate."""
pass
# ── Step 1: Grab nonce from the page ──
def fetch_nonce(self):
with console.status("[bold green]⟳ Fetching session nonce...", spinner="bouncingBar"):
res = self.session.get(self.PAGE_URL, timeout=20)
res.raise_for_status()
# Primary pattern
matches = re.findall(r'nonce["\']?\s*[:=]\s*["\']([a-f0-9]{10})["\']', res.text)
if matches:
self.nonce = matches[0]
console.print(f"[bold green]βœ” Nonce locked:[/bold green] [white]{self.nonce}[/white]")
return
# Fallback β€” grab any 10-char hex that looks like a WP nonce
fallback = re.findall(r'"([a-f0-9]{10})"', res.text)
if fallback:
self.nonce = fallback[0]
console.print(f"[bold yellow]⚠ Guessed nonce:[/bold yellow] [white]{self.nonce}[/white]")
return
raise Exception("Nonce not found in page HTML β€” proxy might be blocked.")
# ── Step 2: Enhance prompt via their AI ──
def enhance_prompt(self, raw_prompt):
with console.status("[bold magenta]✦ Enhancing prompt...", spinner="dots12"):
payload = {
"action": "veo_video_generator",
"nonce": self.nonce,
"prompt": raw_prompt,
"model": self.MODEL_NAME,
"modelName": self.MODEL_NAME,
"actionType": "main-prompt-generation",
}
res = self.session.post(self.AJAX_URL, data=payload, timeout=30)
res.raise_for_status()
console.print(f"[dim] Raw enhance response: {res.text[:200]}[/dim]")
try:
data = res.json()
if isinstance(data, dict):
enhanced = str(data.get("data", "")).strip()
else:
enhanced = res.text.strip()
except:
enhanced = res.text.strip()
if not enhanced or len(enhanced) < 5:
raise Exception(f"Enhancement failed. Response: {res.text[:150]}")
console.print(Panel(enhanced, title="[bold magenta]✦ Enhanced Prompt[/bold magenta]", border_style="magenta"))
return enhanced
# ── Step 3: Queue video generation ──
def generate_video(self, enhanced_prompt, aspect_ratio="16:9", audio_mode="auto"):
ratio_map = {
"16:9": "VIDEO_ASPECT_RATIO_LANDSCAPE",
"9:16": "VIDEO_ASPECT_RATIO_PORTRAIT",
}
api_ratio = ratio_map.get(aspect_ratio, "VIDEO_ASPECT_RATIO_LANDSCAPE")
with console.status(f"[bold blue]⚑ Queueing video generation ({aspect_ratio})...", spinner="earth"):
payload = {
"action": "veo_video_generator",
"nonce": self.nonce,
"prompt": enhanced_prompt,
"model": self.MODEL_NAME,
"modelName": self.MODEL_NAME,
"totalVariations": "1",
"aspectRatio": api_ratio,
"actionType": "full-video-generate",
}
if audio_mode == "try":
payload.update({
"generateAudio": "true",
"enableAudio": "true",
"withAudio": "true",
"audio": "true",
})
res = self.session.post(self.AJAX_URL, data=payload, timeout=30)
res.raise_for_status()
console.print(f"[dim] Raw generate response: {res.text[:300]}[/dim]")
scene_data = None
try:
data = res.json()
if isinstance(data, dict):
scene_data = data.get("data") or data.get("sceneData")
if isinstance(scene_data, dict):
scene_data = scene_data.get("sceneData") or scene_data.get("id")
except:
pass
if not scene_data:
# Brute-extract any 6-8 digit number
nums = re.findall(r'(\d{6,8})', res.text)
if nums:
scene_data = nums[0]
if not scene_data:
raise Exception(f"Could not extract scene ID. Response: {res.text[:200]}")
scene_data = str(scene_data)
console.print(f"[bold blue]βœ” Queued! Scene ID:[/bold blue] [white]{scene_data}[/white]")
return scene_data
# ── Step 4: Wait for video ──
# Reverse-engineered from logic.js:
# - Browser waits 85s after generate before first poll
# - Empty response = still rendering, retry in 20s
# - Non-empty response = raw video URL (plain text, not JSON)
# - Site does: url.replace("videos/", "video/") on the result
def wait_for_video(self, scene_data):
payload = {
"action": "veo_video_generator",
"nonce": self.nonce,
"sceneData": scene_data,
"actionType": "final-video-results",
}
# ── Phase 1: Initial wait (85s, matching the browser's setTimeout) ──
console.print(f"\n[bold cyan]⏳ Server is rendering your video...[/bold cyan]")
console.print(f"[dim] (Waiting 85s before first check β€” this matches how the site works)[/dim]\n")
with Progress(
SpinnerColumn(),
TextColumn("[bold cyan]{task.description}"),
BarColumn(bar_width=40),
TextColumn("[bold]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("Rendering", total=85)
for i in range(85):
time.sleep(1)
progress.update(task, advance=1)
# ── Phase 2: Poll every 20s until we get a non-empty response ──
max_attempts = 15 # 15 Γ— 20s = 5 minutes max polling
for attempt in range(1, max_attempts + 1):
try:
console.print(f"[cyan] β†’ Checking for video (attempt {attempt}/{max_attempts})...[/cyan]")
res = self.session.post(self.AJAX_URL, data=payload, timeout=120)
body = res.text.strip()
console.print(f"[dim] ← Status: {res.status_code} | Size: {len(body)} bytes[/dim]")
if body == "":
# Empty = still processing (confirmed from site JS)
console.print(f"[yellow] ⏸ Still rendering... checking again in 20s[/yellow]")
time.sleep(20)
continue
# Non-empty response β€” extract the video URL
console.print(f"[dim] ← Body: {body[:500]}[/dim]")
video_url = self._extract_video_url(body, scene_data)
if video_url:
return video_url
# Got a non-empty response but couldn't parse a URL β€” log it and retry
console.print(f"[yellow] ⚠ Got response but no video URL found. Retrying in 20s...[/yellow]")
time.sleep(20)
except requests.exceptions.Timeout:
console.print(f"[yellow] ⏸ Request timed out (>120s). Retrying...[/yellow]")
time.sleep(5)
except Exception as e:
console.print(f"[red] βœ– Poll error: {e}[/red]")
time.sleep(10)
raise Exception("Could not retrieve video after all poll attempts.")
def _extract_video_url(self, response_text, scene_data):
text = response_text.strip()
# The site's JS does: url.replace("videos/", "video/")
text = text.replace("videos/", "video/")
# Method 1: Response IS a direct URL (most common from this site)
if text.startswith("http") and (".mp4" in text or "video" in text):
console.print(f"[bold bright_green]βœ” Got video URL![/bold bright_green]")
return text
# Method 2: URL embedded in a longer response
mp4_matches = re.findall(r'https?://[^\s<"\'\\\]]+\.mp4', text)
if mp4_matches:
url = mp4_matches[0].replace("videos/", "video/")
console.print(f"[bold bright_green]βœ” Extracted video URL from response![/bold bright_green]")
return url
# Method 3: Filename pattern β†’ construct full URL
filename_matches = re.findall(r'(video_\d+_\d+\.mp4)', text)
if filename_matches:
url = f"{self.VIDEO_BASE}{filename_matches[0]}"
console.print(f"[bold bright_green]βœ” Constructed video URL from filename![/bold bright_green]")
return url
# Method 4: Try JSON parsing for nested data
try:
data = json.loads(text)
data_str = json.dumps(data)
mp4_matches = re.findall(r'https?://[^\s<"\'\\\]]+\.mp4', data_str)
if mp4_matches:
return mp4_matches[0].replace("videos/", "video/")
if isinstance(data, dict):
inner = data.get("data", "")
if isinstance(inner, str) and inner.strip():
inner = inner.strip().replace("videos/", "video/")
if inner.startswith("http"):
return inner
if "mp4" in inner:
return f"{self.VIDEO_BASE}{inner}"
except (json.JSONDecodeError, ValueError):
pass
return None
# ── Step 5: Download the mp4 ──
def download(self, url, filename):
with console.status(f"[bold yellow]⬇ Downloading video...", spinner="arrow3"):
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"Referer": self.PAGE_URL,
"Range": "bytes=0-",
}
res = requests.get(url, stream=True, headers=headers, timeout=60)
if res.status_code not in (200, 206):
raise Exception(f"Download failed: HTTP {res.status_code}")
total = int(res.headers.get("content-length", 0))
downloaded = 0
# Use progress bar for download
with Progress(
SpinnerColumn(),
TextColumn("[bold yellow]{task.description}"),
BarColumn(bar_width=40),
TextColumn("[bold]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("Downloading", total=total or None)
with open(filename, "wb") as f:
for chunk in res.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)
downloaded += len(chunk)
progress.update(task, advance=len(chunk))
size_mb = os.path.getsize(filename) / (1024 * 1024)
console.print(f"[bold yellow]βœ” Saved:[/bold yellow] [green]{filename}[/green] ({size_mb:.1f} MB)")
return filename
def has_audio_track(self, filename):
ffprobe_result = self._has_audio_track_with_ffprobe(filename)
if ffprobe_result is not None:
return ffprobe_result
try:
with open(filename, "rb") as f:
data = f.read()
return b"soun" in data and any(codec in data for codec in (b"mp4a", b"aac", b"ac-3", b"ec-3", b"Opus", b"alac"))
except OSError:
return False
def _has_audio_track_with_ffprobe(self, filename):
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"a",
"-show_entries",
"stream=codec_type",
"-of",
"csv=p=0",
filename,
],
capture_output=True,
text=True,
timeout=20,
)
except (FileNotFoundError, subprocess.SubprocessError):
return None
if result.returncode != 0:
return None
return "audio" in result.stdout.lower()
# ─────────────────────────────────────────────────────────
# Main CLI Flow
# ─────────────────────────────────────────────────────────
def run_with_retry(engine, raw_prompt, aspect_ratio, max_retries=20, audio_mode="auto", require_audio=False, prompt_mode="raw"):
for attempt in range(1, max_retries + 1):
try:
console.rule(f"[bold]Attempt {attempt}/{max_retries}", style="dim")
engine.fetch_nonce()
if prompt_mode == "enhanced":
final_prompt = engine.enhance_prompt(raw_prompt)
else:
final_prompt = raw_prompt.strip()
console.print(Panel(final_prompt, title="[bold cyan]✦ Generation Prompt (Raw)[/bold cyan]", border_style="cyan"))
scene_id = engine.generate_video(final_prompt, aspect_ratio, audio_mode=audio_mode)
video_url = engine.wait_for_video(scene_id)
if video_url:
console.print(f"\n[bold bright_green]🎬 Video URL: {video_url}[/bold bright_green]\n")
filename = f"veo_video_{scene_id}.mp4"
saved_file = engine.download(video_url, filename)
has_audio = engine.has_audio_track(saved_file)
audio_status = "Audio track detected" if has_audio else "No audio track detected"
audio_style = "bright_green" if has_audio else "yellow"
console.print(f"[bold {audio_style}]πŸ”Š {audio_status}[/bold {audio_style}]")
if require_audio and not has_audio:
raise Exception("Audio track is required, but the downloaded video has no detectable audio stream.")
console.print(Panel(
f"[bold bright_green]ALL DONE![/bold bright_green]\n"
f"Video saved as: [white]{filename}[/white]\n"
f"Audio: [white]{audio_status}[/white]",
border_style="green"
))
return True
except Exception as e:
console.print(f"\n[bold red]βœ– Failed:[/bold red] {e}")
if attempt < max_retries:
console.print(f"[dark_orange] Rotating proxy and retrying...[/dark_orange]\n")
engine.rotate()
time.sleep(2)
else:
console.print("[bold red]All retries exhausted.[/bold red]")
return False
def parse_args():
parser = argparse.ArgumentParser(description="Generate a video from a text prompt using the VeoAI CLI flow.")
parser.add_argument("--prompt", help="Prompt to generate the video from. If omitted, interactive input is used.")
parser.add_argument(
"--aspect-ratio",
choices=["16:9", "9:16"],
default=None,
help="Video aspect ratio. If omitted, interactive input is used.",
)
parser.add_argument(
"--model",
choices=VeoEngine.model_choices(),
default=VeoEngine.DEFAULT_MODEL,
help="Model/page to use: 'veo' uses Veo 3.1; 'grok' uses Grok 4.5.",
)
parser.add_argument(
"--max-retries",
type=int,
default=20,
help="Maximum number of generation attempts before giving up.",
)
parser.add_argument(
"--prompt-mode",
choices=["raw", "enhanced"],
default="raw",
help="Prompt mode. 'raw' sends your exact prompt to generation; 'enhanced' rewrites it before generation.",
)
parser.add_argument(
"--audio-mode",
choices=["auto", "try", "none"],
default="auto",
help="Audio handling mode. 'try' sends experimental audio flags to the generator; 'auto' uses default API behavior.",
)
parser.add_argument(
"--require-audio",
action="store_true",
help="Fail the run if the downloaded MP4 has no detectable audio track.",
)
return parser.parse_args()
def main():
args = parse_args()
os.system("clear" if os.name == "posix" else "cls")
console.print(Panel.fit(
"[bold red]πŸ”₯ VEO AI VIDEO GENERATOR πŸ”₯[/bold red]\n"
"[dim white]Proxy-Rotated Β· Auto-Retry Β· Fully Automated[/dim white]",
border_style="bright_red",
))
proxy_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "proxies-res-any-1777564385571.txt")
rotator = ProxyRotator(proxy_file)
engine = VeoEngine(rotator, model_key=args.model)
console.print(f"[dim]Model: {engine.MODEL_NAME}[/dim]")
console.print(f"[dim]Referrer: {engine.PAGE_URL}[/dim]")
raw_prompt = args.prompt or Prompt.ask("\n[bold bright_white]Enter your prompt[/bold bright_white]")
if args.aspect_ratio:
aspect_ratio = args.aspect_ratio
else:
ratio_choice = Prompt.ask(
"[bold bright_white]Aspect Ratio[/bold bright_white]",
choices=["1", "2"],
default="1",
)
aspect_ratio = "9:16" if ratio_choice == "2" else "16:9"
console.print(f"[dim]Selected: {aspect_ratio}[/dim]")
console.print(f"[dim]Prompt mode: {args.prompt_mode}[/dim]\n")
run_with_retry(
engine,
raw_prompt,
aspect_ratio,
max_retries=args.max_retries,
audio_mode=args.audio_mode,
require_audio=args.require_audio,
prompt_mode=args.prompt_mode,
)
if __name__ == "__main__":
main()