import os import re import sys import time import json import random import argparse import subprocess import requests try: from rich.console import Console from rich.prompt import Prompt from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn from rich.table import Table from rich.text import Text except ImportError: import subprocess subprocess.check_call([sys.executable, "-m", "pip", "install", "rich", "-q"]) from rich.console import Console from rich.prompt import Prompt from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn from rich.table import Table from rich.text import Text console = Console() # ───────────────────────────────────────────────────────── # Proxy Rotator # ───────────────────────────────────────────────────────── class ProxyRotator: def __init__(self, proxy_file): self.proxies = [] self.current_index = 0 self._load(proxy_file) def _load(self, filename): try: with open(filename, 'r') as f: for line in f: line = line.strip() if not line: continue parts = line.split(':') if len(parts) == 4: # IP:PORT:USER:PASS proxy_url = f"http://{parts[2]}:{parts[3]}@{parts[0]}:{parts[1]}" self.proxies.append(proxy_url) elif len(parts) == 2: self.proxies.append(f"http://{parts[0]}:{parts[1]}") random.shuffle(self.proxies) console.print(f"[bold green]✔ Loaded {len(self.proxies)} proxies![/bold green]") except Exception as e: console.print(f"[bold red]✖ Failed to load proxies: {e}[/bold red]") def next(self): if not self.proxies: return None proxy = self.proxies[self.current_index % len(self.proxies)] self.current_index += 1 return proxy def display_current(self, proxy): if proxy: safe = proxy.split('@')[-1] if '@' in proxy else proxy console.print(f"[dim cyan]🔄 Proxy:[/dim cyan] [bold white]{safe}[/bold white]") # ───────────────────────────────────────────────────────── # VeoAI Engine # ───────────────────────────────────────────────────────── class VeoEngine: AJAX_URL = "https://veoaifree.com/wp-admin/admin-ajax.php" VIDEO_BASE = "https://veoaifree.com/video/uploads/" MODEL_CONFIGS = { "veo": { "name": "Veo 3.1", "page_url": "https://veoaifree.com/veo-video-generator/", }, "grok": { "name": "Grok 4.5", "page_url": "https://veoaifree.com/grok-ai-video-generator/", }, } DEFAULT_MODEL = "veo" PAGE_URL = MODEL_CONFIGS[DEFAULT_MODEL]["page_url"] MODEL_NAME = MODEL_CONFIGS[DEFAULT_MODEL]["name"] HEADERS = { "accept": "*/*", "accept-language": "en-US,en;q=0.7", "content-type": "application/x-www-form-urlencoded; charset=UTF-8", "sec-ch-ua": '"Brave";v="147", "Not.A/Brand";v="8", "Chromium";v="147"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Linux"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "sec-gpc": "1", "x-requested-with": "XMLHttpRequest", } def __init__(self, proxy_rotator, model_key=DEFAULT_MODEL): self.rotator = proxy_rotator self.model_key = self.normalize_model_key(model_key) self.model_config = self.MODEL_CONFIGS[self.model_key] self.MODEL_NAME = self.model_config["name"] self.PAGE_URL = self.model_config["page_url"] self.session = None self.nonce = None self._build_session() @classmethod def normalize_model_key(cls, model_key): model_key = (model_key or cls.DEFAULT_MODEL).strip().lower() aliases = { "veo": "veo", "veo 3.1": "veo", "grok": "grok", "grok 4.5": "grok", } model_key = aliases.get(model_key, model_key) if model_key not in cls.MODEL_CONFIGS: raise ValueError(f"Unknown model '{model_key}'. Available models: {', '.join(cls.MODEL_CONFIGS)}") return model_key @classmethod def model_choices(cls): return list(cls.MODEL_CONFIGS.keys()) def _build_session(self): self.session = requests.Session() self.session.headers.update(self.HEADERS) self.session.headers["referer"] = self.PAGE_URL self.session.proxies = {} # direct connection, no proxy def rotate(self): console.print("[bold dark_orange]↻ Rotating to next proxy...[/bold dark_orange]") self._build_session() def _rotate_poll_proxy(self): """No-op — running direct, no proxies to rotate.""" pass # ── Step 1: Grab nonce from the page ── def fetch_nonce(self): with console.status("[bold green]⟳ Fetching session nonce...", spinner="bouncingBar"): res = self.session.get(self.PAGE_URL, timeout=20) res.raise_for_status() # Primary pattern matches = re.findall(r'nonce["\']?\s*[:=]\s*["\']([a-f0-9]{10})["\']', res.text) if matches: self.nonce = matches[0] console.print(f"[bold green]✔ Nonce locked:[/bold green] [white]{self.nonce}[/white]") return # Fallback — grab any 10-char hex that looks like a WP nonce fallback = re.findall(r'"([a-f0-9]{10})"', res.text) if fallback: self.nonce = fallback[0] console.print(f"[bold yellow]⚠ Guessed nonce:[/bold yellow] [white]{self.nonce}[/white]") return raise Exception("Nonce not found in page HTML — proxy might be blocked.") # ── Step 2: Enhance prompt via their AI ── def enhance_prompt(self, raw_prompt): with console.status("[bold magenta]✦ Enhancing prompt...", spinner="dots12"): payload = { "action": "veo_video_generator", "nonce": self.nonce, "prompt": raw_prompt, "model": self.MODEL_NAME, "modelName": self.MODEL_NAME, "actionType": "main-prompt-generation", } res = self.session.post(self.AJAX_URL, data=payload, timeout=30) res.raise_for_status() console.print(f"[dim] Raw enhance response: {res.text[:200]}[/dim]") try: data = res.json() if isinstance(data, dict): enhanced = str(data.get("data", "")).strip() else: enhanced = res.text.strip() except: enhanced = res.text.strip() if not enhanced or len(enhanced) < 5: raise Exception(f"Enhancement failed. Response: {res.text[:150]}") console.print(Panel(enhanced, title="[bold magenta]✦ Enhanced Prompt[/bold magenta]", border_style="magenta")) return enhanced # ── Step 3: Queue video generation ── def generate_video(self, enhanced_prompt, aspect_ratio="16:9", audio_mode="auto"): ratio_map = { "16:9": "VIDEO_ASPECT_RATIO_LANDSCAPE", "9:16": "VIDEO_ASPECT_RATIO_PORTRAIT", } api_ratio = ratio_map.get(aspect_ratio, "VIDEO_ASPECT_RATIO_LANDSCAPE") with console.status(f"[bold blue]⚡ Queueing video generation ({aspect_ratio})...", spinner="earth"): payload = { "action": "veo_video_generator", "nonce": self.nonce, "prompt": enhanced_prompt, "model": self.MODEL_NAME, "modelName": self.MODEL_NAME, "totalVariations": "1", "aspectRatio": api_ratio, "actionType": "full-video-generate", } if audio_mode == "try": payload.update({ "generateAudio": "true", "enableAudio": "true", "withAudio": "true", "audio": "true", }) res = self.session.post(self.AJAX_URL, data=payload, timeout=30) res.raise_for_status() console.print(f"[dim] Raw generate response: {res.text[:300]}[/dim]") scene_data = None try: data = res.json() if isinstance(data, dict): scene_data = data.get("data") or data.get("sceneData") if isinstance(scene_data, dict): scene_data = scene_data.get("sceneData") or scene_data.get("id") except: pass if not scene_data: # Brute-extract any 6-8 digit number nums = re.findall(r'(\d{6,8})', res.text) if nums: scene_data = nums[0] if not scene_data: raise Exception(f"Could not extract scene ID. Response: {res.text[:200]}") scene_data = str(scene_data) console.print(f"[bold blue]✔ Queued! Scene ID:[/bold blue] [white]{scene_data}[/white]") return scene_data # ── Step 4: Wait for video ── # Reverse-engineered from logic.js: # - Browser waits 85s after generate before first poll # - Empty response = still rendering, retry in 20s # - Non-empty response = raw video URL (plain text, not JSON) # - Site does: url.replace("videos/", "video/") on the result def wait_for_video(self, scene_data): payload = { "action": "veo_video_generator", "nonce": self.nonce, "sceneData": scene_data, "actionType": "final-video-results", } # ── Phase 1: Initial wait (85s, matching the browser's setTimeout) ── console.print(f"\n[bold cyan]⏳ Server is rendering your video...[/bold cyan]") console.print(f"[dim] (Waiting 85s before first check — this matches how the site works)[/dim]\n") with Progress( SpinnerColumn(), TextColumn("[bold cyan]{task.description}"), BarColumn(bar_width=40), TextColumn("[bold]{task.percentage:>3.0f}%"), TimeElapsedColumn(), console=console, ) as progress: task = progress.add_task("Rendering", total=85) for i in range(85): time.sleep(1) progress.update(task, advance=1) # ── Phase 2: Poll every 20s until we get a non-empty response ── max_attempts = 15 # 15 × 20s = 5 minutes max polling for attempt in range(1, max_attempts + 1): try: console.print(f"[cyan] → Checking for video (attempt {attempt}/{max_attempts})...[/cyan]") res = self.session.post(self.AJAX_URL, data=payload, timeout=120) body = res.text.strip() console.print(f"[dim] ← Status: {res.status_code} | Size: {len(body)} bytes[/dim]") if body == "": # Empty = still processing (confirmed from site JS) console.print(f"[yellow] ⏸ Still rendering... checking again in 20s[/yellow]") time.sleep(20) continue # Non-empty response — extract the video URL console.print(f"[dim] ← Body: {body[:500]}[/dim]") video_url = self._extract_video_url(body, scene_data) if video_url: return video_url # Got a non-empty response but couldn't parse a URL — log it and retry console.print(f"[yellow] ⚠ Got response but no video URL found. Retrying in 20s...[/yellow]") time.sleep(20) except requests.exceptions.Timeout: console.print(f"[yellow] ⏸ Request timed out (>120s). Retrying...[/yellow]") time.sleep(5) except Exception as e: console.print(f"[red] ✖ Poll error: {e}[/red]") time.sleep(10) raise Exception("Could not retrieve video after all poll attempts.") def _extract_video_url(self, response_text, scene_data): text = response_text.strip() # The site's JS does: url.replace("videos/", "video/") text = text.replace("videos/", "video/") # Method 1: Response IS a direct URL (most common from this site) if text.startswith("http") and (".mp4" in text or "video" in text): console.print(f"[bold bright_green]✔ Got video URL![/bold bright_green]") return text # Method 2: URL embedded in a longer response mp4_matches = re.findall(r'https?://[^\s<"\'\\\]]+\.mp4', text) if mp4_matches: url = mp4_matches[0].replace("videos/", "video/") console.print(f"[bold bright_green]✔ Extracted video URL from response![/bold bright_green]") return url # Method 3: Filename pattern → construct full URL filename_matches = re.findall(r'(video_\d+_\d+\.mp4)', text) if filename_matches: url = f"{self.VIDEO_BASE}{filename_matches[0]}" console.print(f"[bold bright_green]✔ Constructed video URL from filename![/bold bright_green]") return url # Method 4: Try JSON parsing for nested data try: data = json.loads(text) data_str = json.dumps(data) mp4_matches = re.findall(r'https?://[^\s<"\'\\\]]+\.mp4', data_str) if mp4_matches: return mp4_matches[0].replace("videos/", "video/") if isinstance(data, dict): inner = data.get("data", "") if isinstance(inner, str) and inner.strip(): inner = inner.strip().replace("videos/", "video/") if inner.startswith("http"): return inner if "mp4" in inner: return f"{self.VIDEO_BASE}{inner}" except (json.JSONDecodeError, ValueError): pass return None # ── Step 5: Download the mp4 ── def download(self, url, filename): with console.status(f"[bold yellow]⬇ Downloading video...", spinner="arrow3"): headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", "Referer": self.PAGE_URL, "Range": "bytes=0-", } res = requests.get(url, stream=True, headers=headers, timeout=60) if res.status_code not in (200, 206): raise Exception(f"Download failed: HTTP {res.status_code}") total = int(res.headers.get("content-length", 0)) downloaded = 0 # Use progress bar for download with Progress( SpinnerColumn(), TextColumn("[bold yellow]{task.description}"), BarColumn(bar_width=40), TextColumn("[bold]{task.percentage:>3.0f}%"), TimeElapsedColumn(), console=console, ) as progress: task = progress.add_task("Downloading", total=total or None) with open(filename, "wb") as f: for chunk in res.iter_content(chunk_size=65536): if chunk: f.write(chunk) downloaded += len(chunk) progress.update(task, advance=len(chunk)) size_mb = os.path.getsize(filename) / (1024 * 1024) console.print(f"[bold yellow]✔ Saved:[/bold yellow] [green]{filename}[/green] ({size_mb:.1f} MB)") return filename def has_audio_track(self, filename): ffprobe_result = self._has_audio_track_with_ffprobe(filename) if ffprobe_result is not None: return ffprobe_result try: with open(filename, "rb") as f: data = f.read() return b"soun" in data and any(codec in data for codec in (b"mp4a", b"aac", b"ac-3", b"ec-3", b"Opus", b"alac")) except OSError: return False def _has_audio_track_with_ffprobe(self, filename): try: result = subprocess.run( [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "csv=p=0", filename, ], capture_output=True, text=True, timeout=20, ) except (FileNotFoundError, subprocess.SubprocessError): return None if result.returncode != 0: return None return "audio" in result.stdout.lower() # ───────────────────────────────────────────────────────── # Main CLI Flow # ───────────────────────────────────────────────────────── def run_with_retry(engine, raw_prompt, aspect_ratio, max_retries=20, audio_mode="auto", require_audio=False, prompt_mode="raw"): for attempt in range(1, max_retries + 1): try: console.rule(f"[bold]Attempt {attempt}/{max_retries}", style="dim") engine.fetch_nonce() if prompt_mode == "enhanced": final_prompt = engine.enhance_prompt(raw_prompt) else: final_prompt = raw_prompt.strip() console.print(Panel(final_prompt, title="[bold cyan]✦ Generation Prompt (Raw)[/bold cyan]", border_style="cyan")) scene_id = engine.generate_video(final_prompt, aspect_ratio, audio_mode=audio_mode) video_url = engine.wait_for_video(scene_id) if video_url: console.print(f"\n[bold bright_green]🎬 Video URL: {video_url}[/bold bright_green]\n") filename = f"veo_video_{scene_id}.mp4" saved_file = engine.download(video_url, filename) has_audio = engine.has_audio_track(saved_file) audio_status = "Audio track detected" if has_audio else "No audio track detected" audio_style = "bright_green" if has_audio else "yellow" console.print(f"[bold {audio_style}]🔊 {audio_status}[/bold {audio_style}]") if require_audio and not has_audio: raise Exception("Audio track is required, but the downloaded video has no detectable audio stream.") console.print(Panel( f"[bold bright_green]ALL DONE![/bold bright_green]\n" f"Video saved as: [white]{filename}[/white]\n" f"Audio: [white]{audio_status}[/white]", border_style="green" )) return True except Exception as e: console.print(f"\n[bold red]✖ Failed:[/bold red] {e}") if attempt < max_retries: console.print(f"[dark_orange] Rotating proxy and retrying...[/dark_orange]\n") engine.rotate() time.sleep(2) else: console.print("[bold red]All retries exhausted.[/bold red]") return False def parse_args(): parser = argparse.ArgumentParser(description="Generate a video from a text prompt using the VeoAI CLI flow.") parser.add_argument("--prompt", help="Prompt to generate the video from. If omitted, interactive input is used.") parser.add_argument( "--aspect-ratio", choices=["16:9", "9:16"], default=None, help="Video aspect ratio. If omitted, interactive input is used.", ) parser.add_argument( "--model", choices=VeoEngine.model_choices(), default=VeoEngine.DEFAULT_MODEL, help="Model/page to use: 'veo' uses Veo 3.1; 'grok' uses Grok 4.5.", ) parser.add_argument( "--max-retries", type=int, default=20, help="Maximum number of generation attempts before giving up.", ) parser.add_argument( "--prompt-mode", choices=["raw", "enhanced"], default="raw", help="Prompt mode. 'raw' sends your exact prompt to generation; 'enhanced' rewrites it before generation.", ) parser.add_argument( "--audio-mode", choices=["auto", "try", "none"], default="auto", help="Audio handling mode. 'try' sends experimental audio flags to the generator; 'auto' uses default API behavior.", ) parser.add_argument( "--require-audio", action="store_true", help="Fail the run if the downloaded MP4 has no detectable audio track.", ) return parser.parse_args() def main(): args = parse_args() os.system("clear" if os.name == "posix" else "cls") console.print(Panel.fit( "[bold red]🔥 VEO AI VIDEO GENERATOR 🔥[/bold red]\n" "[dim white]Proxy-Rotated · Auto-Retry · Fully Automated[/dim white]", border_style="bright_red", )) proxy_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "proxies-res-any-1777564385571.txt") rotator = ProxyRotator(proxy_file) engine = VeoEngine(rotator, model_key=args.model) console.print(f"[dim]Model: {engine.MODEL_NAME}[/dim]") console.print(f"[dim]Referrer: {engine.PAGE_URL}[/dim]") raw_prompt = args.prompt or Prompt.ask("\n[bold bright_white]Enter your prompt[/bold bright_white]") if args.aspect_ratio: aspect_ratio = args.aspect_ratio else: ratio_choice = Prompt.ask( "[bold bright_white]Aspect Ratio[/bold bright_white]", choices=["1", "2"], default="1", ) aspect_ratio = "9:16" if ratio_choice == "2" else "16:9" console.print(f"[dim]Selected: {aspect_ratio}[/dim]") console.print(f"[dim]Prompt mode: {args.prompt_mode}[/dim]\n") run_with_retry( engine, raw_prompt, aspect_ratio, max_retries=args.max_retries, audio_mode=args.audio_mode, require_audio=args.require_audio, prompt_mode=args.prompt_mode, ) if __name__ == "__main__": main()