Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import sys | |
| import time | |
| import json | |
| import random | |
| import argparse | |
| import subprocess | |
| import requests | |
| try: | |
| from rich.console import Console | |
| from rich.prompt import Prompt | |
| from rich.panel import Panel | |
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn | |
| from rich.table import Table | |
| from rich.text import Text | |
| except ImportError: | |
| import subprocess | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "rich", "-q"]) | |
| from rich.console import Console | |
| from rich.prompt import Prompt | |
| from rich.panel import Panel | |
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn | |
| from rich.table import Table | |
| from rich.text import Text | |
| console = Console() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Proxy Rotator | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ProxyRotator: | |
| def __init__(self, proxy_file): | |
| self.proxies = [] | |
| self.current_index = 0 | |
| self._load(proxy_file) | |
| def _load(self, filename): | |
| try: | |
| with open(filename, 'r') as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| parts = line.split(':') | |
| if len(parts) == 4: | |
| # IP:PORT:USER:PASS | |
| proxy_url = f"http://{parts[2]}:{parts[3]}@{parts[0]}:{parts[1]}" | |
| self.proxies.append(proxy_url) | |
| elif len(parts) == 2: | |
| self.proxies.append(f"http://{parts[0]}:{parts[1]}") | |
| random.shuffle(self.proxies) | |
| console.print(f"[bold green]β Loaded {len(self.proxies)} proxies![/bold green]") | |
| except Exception as e: | |
| console.print(f"[bold red]β Failed to load proxies: {e}[/bold red]") | |
| def next(self): | |
| if not self.proxies: | |
| return None | |
| proxy = self.proxies[self.current_index % len(self.proxies)] | |
| self.current_index += 1 | |
| return proxy | |
| def display_current(self, proxy): | |
| if proxy: | |
| safe = proxy.split('@')[-1] if '@' in proxy else proxy | |
| console.print(f"[dim cyan]π Proxy:[/dim cyan] [bold white]{safe}[/bold white]") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # VeoAI Engine | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class VeoEngine: | |
| AJAX_URL = "https://veoaifree.com/wp-admin/admin-ajax.php" | |
| VIDEO_BASE = "https://veoaifree.com/video/uploads/" | |
| MODEL_CONFIGS = { | |
| "veo": { | |
| "name": "Veo 3.1", | |
| "page_url": "https://veoaifree.com/veo-video-generator/", | |
| }, | |
| "grok": { | |
| "name": "Grok 4.5", | |
| "page_url": "https://veoaifree.com/grok-ai-video-generator/", | |
| }, | |
| } | |
| DEFAULT_MODEL = "veo" | |
| PAGE_URL = MODEL_CONFIGS[DEFAULT_MODEL]["page_url"] | |
| MODEL_NAME = MODEL_CONFIGS[DEFAULT_MODEL]["name"] | |
| HEADERS = { | |
| "accept": "*/*", | |
| "accept-language": "en-US,en;q=0.7", | |
| "content-type": "application/x-www-form-urlencoded; charset=UTF-8", | |
| "sec-ch-ua": '"Brave";v="147", "Not.A/Brand";v="8", "Chromium";v="147"', | |
| "sec-ch-ua-mobile": "?0", | |
| "sec-ch-ua-platform": '"Linux"', | |
| "sec-fetch-dest": "empty", | |
| "sec-fetch-mode": "cors", | |
| "sec-fetch-site": "same-origin", | |
| "sec-gpc": "1", | |
| "x-requested-with": "XMLHttpRequest", | |
| } | |
| def __init__(self, proxy_rotator, model_key=DEFAULT_MODEL): | |
| self.rotator = proxy_rotator | |
| self.model_key = self.normalize_model_key(model_key) | |
| self.model_config = self.MODEL_CONFIGS[self.model_key] | |
| self.MODEL_NAME = self.model_config["name"] | |
| self.PAGE_URL = self.model_config["page_url"] | |
| self.session = None | |
| self.nonce = None | |
| self._build_session() | |
| def normalize_model_key(cls, model_key): | |
| model_key = (model_key or cls.DEFAULT_MODEL).strip().lower() | |
| aliases = { | |
| "veo": "veo", | |
| "veo 3.1": "veo", | |
| "grok": "grok", | |
| "grok 4.5": "grok", | |
| } | |
| model_key = aliases.get(model_key, model_key) | |
| if model_key not in cls.MODEL_CONFIGS: | |
| raise ValueError(f"Unknown model '{model_key}'. Available models: {', '.join(cls.MODEL_CONFIGS)}") | |
| return model_key | |
| def model_choices(cls): | |
| return list(cls.MODEL_CONFIGS.keys()) | |
| def _build_session(self): | |
| self.session = requests.Session() | |
| self.session.headers.update(self.HEADERS) | |
| self.session.headers["referer"] = self.PAGE_URL | |
| self.session.proxies = {} # direct connection, no proxy | |
| def rotate(self): | |
| console.print("[bold dark_orange]β» Rotating to next proxy...[/bold dark_orange]") | |
| self._build_session() | |
| def _rotate_poll_proxy(self): | |
| """No-op β running direct, no proxies to rotate.""" | |
| pass | |
| # ββ Step 1: Grab nonce from the page ββ | |
| def fetch_nonce(self): | |
| with console.status("[bold green]β³ Fetching session nonce...", spinner="bouncingBar"): | |
| res = self.session.get(self.PAGE_URL, timeout=20) | |
| res.raise_for_status() | |
| # Primary pattern | |
| matches = re.findall(r'nonce["\']?\s*[:=]\s*["\']([a-f0-9]{10})["\']', res.text) | |
| if matches: | |
| self.nonce = matches[0] | |
| console.print(f"[bold green]β Nonce locked:[/bold green] [white]{self.nonce}[/white]") | |
| return | |
| # Fallback β grab any 10-char hex that looks like a WP nonce | |
| fallback = re.findall(r'"([a-f0-9]{10})"', res.text) | |
| if fallback: | |
| self.nonce = fallback[0] | |
| console.print(f"[bold yellow]β Guessed nonce:[/bold yellow] [white]{self.nonce}[/white]") | |
| return | |
| raise Exception("Nonce not found in page HTML β proxy might be blocked.") | |
| # ββ Step 2: Enhance prompt via their AI ββ | |
| def enhance_prompt(self, raw_prompt): | |
| with console.status("[bold magenta]β¦ Enhancing prompt...", spinner="dots12"): | |
| payload = { | |
| "action": "veo_video_generator", | |
| "nonce": self.nonce, | |
| "prompt": raw_prompt, | |
| "model": self.MODEL_NAME, | |
| "modelName": self.MODEL_NAME, | |
| "actionType": "main-prompt-generation", | |
| } | |
| res = self.session.post(self.AJAX_URL, data=payload, timeout=30) | |
| res.raise_for_status() | |
| console.print(f"[dim] Raw enhance response: {res.text[:200]}[/dim]") | |
| try: | |
| data = res.json() | |
| if isinstance(data, dict): | |
| enhanced = str(data.get("data", "")).strip() | |
| else: | |
| enhanced = res.text.strip() | |
| except: | |
| enhanced = res.text.strip() | |
| if not enhanced or len(enhanced) < 5: | |
| raise Exception(f"Enhancement failed. Response: {res.text[:150]}") | |
| console.print(Panel(enhanced, title="[bold magenta]β¦ Enhanced Prompt[/bold magenta]", border_style="magenta")) | |
| return enhanced | |
| # ββ Step 3: Queue video generation ββ | |
| def generate_video(self, enhanced_prompt, aspect_ratio="16:9", audio_mode="auto"): | |
| ratio_map = { | |
| "16:9": "VIDEO_ASPECT_RATIO_LANDSCAPE", | |
| "9:16": "VIDEO_ASPECT_RATIO_PORTRAIT", | |
| } | |
| api_ratio = ratio_map.get(aspect_ratio, "VIDEO_ASPECT_RATIO_LANDSCAPE") | |
| with console.status(f"[bold blue]β‘ Queueing video generation ({aspect_ratio})...", spinner="earth"): | |
| payload = { | |
| "action": "veo_video_generator", | |
| "nonce": self.nonce, | |
| "prompt": enhanced_prompt, | |
| "model": self.MODEL_NAME, | |
| "modelName": self.MODEL_NAME, | |
| "totalVariations": "1", | |
| "aspectRatio": api_ratio, | |
| "actionType": "full-video-generate", | |
| } | |
| if audio_mode == "try": | |
| payload.update({ | |
| "generateAudio": "true", | |
| "enableAudio": "true", | |
| "withAudio": "true", | |
| "audio": "true", | |
| }) | |
| res = self.session.post(self.AJAX_URL, data=payload, timeout=30) | |
| res.raise_for_status() | |
| console.print(f"[dim] Raw generate response: {res.text[:300]}[/dim]") | |
| scene_data = None | |
| try: | |
| data = res.json() | |
| if isinstance(data, dict): | |
| scene_data = data.get("data") or data.get("sceneData") | |
| if isinstance(scene_data, dict): | |
| scene_data = scene_data.get("sceneData") or scene_data.get("id") | |
| except: | |
| pass | |
| if not scene_data: | |
| # Brute-extract any 6-8 digit number | |
| nums = re.findall(r'(\d{6,8})', res.text) | |
| if nums: | |
| scene_data = nums[0] | |
| if not scene_data: | |
| raise Exception(f"Could not extract scene ID. Response: {res.text[:200]}") | |
| scene_data = str(scene_data) | |
| console.print(f"[bold blue]β Queued! Scene ID:[/bold blue] [white]{scene_data}[/white]") | |
| return scene_data | |
| # ββ Step 4: Wait for video ββ | |
| # Reverse-engineered from logic.js: | |
| # - Browser waits 85s after generate before first poll | |
| # - Empty response = still rendering, retry in 20s | |
| # - Non-empty response = raw video URL (plain text, not JSON) | |
| # - Site does: url.replace("videos/", "video/") on the result | |
| def wait_for_video(self, scene_data): | |
| payload = { | |
| "action": "veo_video_generator", | |
| "nonce": self.nonce, | |
| "sceneData": scene_data, | |
| "actionType": "final-video-results", | |
| } | |
| # ββ Phase 1: Initial wait (85s, matching the browser's setTimeout) ββ | |
| console.print(f"\n[bold cyan]β³ Server is rendering your video...[/bold cyan]") | |
| console.print(f"[dim] (Waiting 85s before first check β this matches how the site works)[/dim]\n") | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[bold cyan]{task.description}"), | |
| BarColumn(bar_width=40), | |
| TextColumn("[bold]{task.percentage:>3.0f}%"), | |
| TimeElapsedColumn(), | |
| console=console, | |
| ) as progress: | |
| task = progress.add_task("Rendering", total=85) | |
| for i in range(85): | |
| time.sleep(1) | |
| progress.update(task, advance=1) | |
| # ββ Phase 2: Poll every 20s until we get a non-empty response ββ | |
| max_attempts = 15 # 15 Γ 20s = 5 minutes max polling | |
| for attempt in range(1, max_attempts + 1): | |
| try: | |
| console.print(f"[cyan] β Checking for video (attempt {attempt}/{max_attempts})...[/cyan]") | |
| res = self.session.post(self.AJAX_URL, data=payload, timeout=120) | |
| body = res.text.strip() | |
| console.print(f"[dim] β Status: {res.status_code} | Size: {len(body)} bytes[/dim]") | |
| if body == "": | |
| # Empty = still processing (confirmed from site JS) | |
| console.print(f"[yellow] βΈ Still rendering... checking again in 20s[/yellow]") | |
| time.sleep(20) | |
| continue | |
| # Non-empty response β extract the video URL | |
| console.print(f"[dim] β Body: {body[:500]}[/dim]") | |
| video_url = self._extract_video_url(body, scene_data) | |
| if video_url: | |
| return video_url | |
| # Got a non-empty response but couldn't parse a URL β log it and retry | |
| console.print(f"[yellow] β Got response but no video URL found. Retrying in 20s...[/yellow]") | |
| time.sleep(20) | |
| except requests.exceptions.Timeout: | |
| console.print(f"[yellow] βΈ Request timed out (>120s). Retrying...[/yellow]") | |
| time.sleep(5) | |
| except Exception as e: | |
| console.print(f"[red] β Poll error: {e}[/red]") | |
| time.sleep(10) | |
| raise Exception("Could not retrieve video after all poll attempts.") | |
| def _extract_video_url(self, response_text, scene_data): | |
| text = response_text.strip() | |
| # The site's JS does: url.replace("videos/", "video/") | |
| text = text.replace("videos/", "video/") | |
| # Method 1: Response IS a direct URL (most common from this site) | |
| if text.startswith("http") and (".mp4" in text or "video" in text): | |
| console.print(f"[bold bright_green]β Got video URL![/bold bright_green]") | |
| return text | |
| # Method 2: URL embedded in a longer response | |
| mp4_matches = re.findall(r'https?://[^\s<"\'\\\]]+\.mp4', text) | |
| if mp4_matches: | |
| url = mp4_matches[0].replace("videos/", "video/") | |
| console.print(f"[bold bright_green]β Extracted video URL from response![/bold bright_green]") | |
| return url | |
| # Method 3: Filename pattern β construct full URL | |
| filename_matches = re.findall(r'(video_\d+_\d+\.mp4)', text) | |
| if filename_matches: | |
| url = f"{self.VIDEO_BASE}{filename_matches[0]}" | |
| console.print(f"[bold bright_green]β Constructed video URL from filename![/bold bright_green]") | |
| return url | |
| # Method 4: Try JSON parsing for nested data | |
| try: | |
| data = json.loads(text) | |
| data_str = json.dumps(data) | |
| mp4_matches = re.findall(r'https?://[^\s<"\'\\\]]+\.mp4', data_str) | |
| if mp4_matches: | |
| return mp4_matches[0].replace("videos/", "video/") | |
| if isinstance(data, dict): | |
| inner = data.get("data", "") | |
| if isinstance(inner, str) and inner.strip(): | |
| inner = inner.strip().replace("videos/", "video/") | |
| if inner.startswith("http"): | |
| return inner | |
| if "mp4" in inner: | |
| return f"{self.VIDEO_BASE}{inner}" | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| return None | |
| # ββ Step 5: Download the mp4 ββ | |
| def download(self, url, filename): | |
| with console.status(f"[bold yellow]β¬ Downloading video...", spinner="arrow3"): | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", | |
| "Referer": self.PAGE_URL, | |
| "Range": "bytes=0-", | |
| } | |
| res = requests.get(url, stream=True, headers=headers, timeout=60) | |
| if res.status_code not in (200, 206): | |
| raise Exception(f"Download failed: HTTP {res.status_code}") | |
| total = int(res.headers.get("content-length", 0)) | |
| downloaded = 0 | |
| # Use progress bar for download | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[bold yellow]{task.description}"), | |
| BarColumn(bar_width=40), | |
| TextColumn("[bold]{task.percentage:>3.0f}%"), | |
| TimeElapsedColumn(), | |
| console=console, | |
| ) as progress: | |
| task = progress.add_task("Downloading", total=total or None) | |
| with open(filename, "wb") as f: | |
| for chunk in res.iter_content(chunk_size=65536): | |
| if chunk: | |
| f.write(chunk) | |
| downloaded += len(chunk) | |
| progress.update(task, advance=len(chunk)) | |
| size_mb = os.path.getsize(filename) / (1024 * 1024) | |
| console.print(f"[bold yellow]β Saved:[/bold yellow] [green]{filename}[/green] ({size_mb:.1f} MB)") | |
| return filename | |
| def has_audio_track(self, filename): | |
| ffprobe_result = self._has_audio_track_with_ffprobe(filename) | |
| if ffprobe_result is not None: | |
| return ffprobe_result | |
| try: | |
| with open(filename, "rb") as f: | |
| data = f.read() | |
| return b"soun" in data and any(codec in data for codec in (b"mp4a", b"aac", b"ac-3", b"ec-3", b"Opus", b"alac")) | |
| except OSError: | |
| return False | |
| def _has_audio_track_with_ffprobe(self, filename): | |
| try: | |
| result = subprocess.run( | |
| [ | |
| "ffprobe", | |
| "-v", | |
| "error", | |
| "-select_streams", | |
| "a", | |
| "-show_entries", | |
| "stream=codec_type", | |
| "-of", | |
| "csv=p=0", | |
| filename, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=20, | |
| ) | |
| except (FileNotFoundError, subprocess.SubprocessError): | |
| return None | |
| if result.returncode != 0: | |
| return None | |
| return "audio" in result.stdout.lower() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main CLI Flow | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_with_retry(engine, raw_prompt, aspect_ratio, max_retries=20, audio_mode="auto", require_audio=False, prompt_mode="raw"): | |
| for attempt in range(1, max_retries + 1): | |
| try: | |
| console.rule(f"[bold]Attempt {attempt}/{max_retries}", style="dim") | |
| engine.fetch_nonce() | |
| if prompt_mode == "enhanced": | |
| final_prompt = engine.enhance_prompt(raw_prompt) | |
| else: | |
| final_prompt = raw_prompt.strip() | |
| console.print(Panel(final_prompt, title="[bold cyan]β¦ Generation Prompt (Raw)[/bold cyan]", border_style="cyan")) | |
| scene_id = engine.generate_video(final_prompt, aspect_ratio, audio_mode=audio_mode) | |
| video_url = engine.wait_for_video(scene_id) | |
| if video_url: | |
| console.print(f"\n[bold bright_green]π¬ Video URL: {video_url}[/bold bright_green]\n") | |
| filename = f"veo_video_{scene_id}.mp4" | |
| saved_file = engine.download(video_url, filename) | |
| has_audio = engine.has_audio_track(saved_file) | |
| audio_status = "Audio track detected" if has_audio else "No audio track detected" | |
| audio_style = "bright_green" if has_audio else "yellow" | |
| console.print(f"[bold {audio_style}]π {audio_status}[/bold {audio_style}]") | |
| if require_audio and not has_audio: | |
| raise Exception("Audio track is required, but the downloaded video has no detectable audio stream.") | |
| console.print(Panel( | |
| f"[bold bright_green]ALL DONE![/bold bright_green]\n" | |
| f"Video saved as: [white]{filename}[/white]\n" | |
| f"Audio: [white]{audio_status}[/white]", | |
| border_style="green" | |
| )) | |
| return True | |
| except Exception as e: | |
| console.print(f"\n[bold red]β Failed:[/bold red] {e}") | |
| if attempt < max_retries: | |
| console.print(f"[dark_orange] Rotating proxy and retrying...[/dark_orange]\n") | |
| engine.rotate() | |
| time.sleep(2) | |
| else: | |
| console.print("[bold red]All retries exhausted.[/bold red]") | |
| return False | |
| def parse_args(): | |
| parser = argparse.ArgumentParser(description="Generate a video from a text prompt using the VeoAI CLI flow.") | |
| parser.add_argument("--prompt", help="Prompt to generate the video from. If omitted, interactive input is used.") | |
| parser.add_argument( | |
| "--aspect-ratio", | |
| choices=["16:9", "9:16"], | |
| default=None, | |
| help="Video aspect ratio. If omitted, interactive input is used.", | |
| ) | |
| parser.add_argument( | |
| "--model", | |
| choices=VeoEngine.model_choices(), | |
| default=VeoEngine.DEFAULT_MODEL, | |
| help="Model/page to use: 'veo' uses Veo 3.1; 'grok' uses Grok 4.5.", | |
| ) | |
| parser.add_argument( | |
| "--max-retries", | |
| type=int, | |
| default=20, | |
| help="Maximum number of generation attempts before giving up.", | |
| ) | |
| parser.add_argument( | |
| "--prompt-mode", | |
| choices=["raw", "enhanced"], | |
| default="raw", | |
| help="Prompt mode. 'raw' sends your exact prompt to generation; 'enhanced' rewrites it before generation.", | |
| ) | |
| parser.add_argument( | |
| "--audio-mode", | |
| choices=["auto", "try", "none"], | |
| default="auto", | |
| help="Audio handling mode. 'try' sends experimental audio flags to the generator; 'auto' uses default API behavior.", | |
| ) | |
| parser.add_argument( | |
| "--require-audio", | |
| action="store_true", | |
| help="Fail the run if the downloaded MP4 has no detectable audio track.", | |
| ) | |
| return parser.parse_args() | |
| def main(): | |
| args = parse_args() | |
| os.system("clear" if os.name == "posix" else "cls") | |
| console.print(Panel.fit( | |
| "[bold red]π₯ VEO AI VIDEO GENERATOR π₯[/bold red]\n" | |
| "[dim white]Proxy-Rotated Β· Auto-Retry Β· Fully Automated[/dim white]", | |
| border_style="bright_red", | |
| )) | |
| proxy_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "proxies-res-any-1777564385571.txt") | |
| rotator = ProxyRotator(proxy_file) | |
| engine = VeoEngine(rotator, model_key=args.model) | |
| console.print(f"[dim]Model: {engine.MODEL_NAME}[/dim]") | |
| console.print(f"[dim]Referrer: {engine.PAGE_URL}[/dim]") | |
| raw_prompt = args.prompt or Prompt.ask("\n[bold bright_white]Enter your prompt[/bold bright_white]") | |
| if args.aspect_ratio: | |
| aspect_ratio = args.aspect_ratio | |
| else: | |
| ratio_choice = Prompt.ask( | |
| "[bold bright_white]Aspect Ratio[/bold bright_white]", | |
| choices=["1", "2"], | |
| default="1", | |
| ) | |
| aspect_ratio = "9:16" if ratio_choice == "2" else "16:9" | |
| console.print(f"[dim]Selected: {aspect_ratio}[/dim]") | |
| console.print(f"[dim]Prompt mode: {args.prompt_mode}[/dim]\n") | |
| run_with_retry( | |
| engine, | |
| raw_prompt, | |
| aspect_ratio, | |
| max_retries=args.max_retries, | |
| audio_mode=args.audio_mode, | |
| require_audio=args.require_audio, | |
| prompt_mode=args.prompt_mode, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |