diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -34,7 +34,6 @@ import asyncio from datetime import datetime, timedelta from typing import Optional import dashscope -from dashscope.utils.oss_utils import check_and_upload_local # Gradio supported languages for syntax highlighting GRADIO_SUPPORTED_LANGUAGES = [ @@ -1500,37 +1499,6 @@ Generate complete, working HTML code that can be run immediately. IMPORTANT: Always include "Built with anycoder" as clickable text in the header/top section of your application that links to https://huggingface.co/spaces/akhaliq/anycoder""" -def validate_video_html(video_html: str) -> bool: - """Validate that the video HTML is well-formed and safe to insert.""" - try: - # Basic checks for video HTML structure - if not video_html or not video_html.strip(): - return False - - # Check for required video elements - if '' not in video_html: - return False - - # Check for proper source tag - if '') + 8 - if video_start == -1 or video_end == 7: # 7 means not found - return False - - return True - except Exception: - return False # Stricter prompt for GLM-4.5V to ensure a complete, runnable HTML document with no escaped characters @@ -3157,2312 +3125,201 @@ def inline_multipage_into_single_preview(files: Dict[str, str]) -> str: doc = doc[:i] + nav_script + doc[i:] else: doc = doc + nav_script - except Exception: - # Non-fatal in preview - pass - - return doc - -def extract_html_document(text: str) -> str: - """Return substring starting from the first or if present, else original text. - - This ignores prose or planning notes before the actual HTML so previews don't break. - """ - if not text: - return text - lower = text.lower() - idx = lower.find(" Dict[str, str]: - """Infer npm dependencies from Svelte/TS imports across generated files. - - Returns mapping of package name -> semver (string). Uses conservative defaults - when versions aren't known. Adds special-cased versions when known. - """ - import re as _re - deps: Dict[str, str] = {} - import_from = _re.compile(r"import\s+[^;]*?from\s+['\"]([^'\"]+)['\"]", _re.IGNORECASE) - bare_import = _re.compile(r"import\s+['\"]([^'\"]+)['\"]", _re.IGNORECASE) - - def maybe_add(pkg: str): - if not pkg or pkg.startswith('.') or pkg.startswith('/') or pkg.startswith('http'): - return - if pkg.startswith('svelte'): - return - if pkg not in deps: - # Default to wildcard; adjust known packages below - deps[pkg] = "*" - - for path, content in (files or {}).items(): - if not isinstance(content, str): - continue - for m in import_from.finditer(content): - maybe_add(m.group(1)) - for m in bare_import.finditer(content): - maybe_add(m.group(1)) - - # Pin known versions when sensible - if '@gradio/dataframe' in deps: - deps['@gradio/dataframe'] = '^0.19.1' - - return deps - -def build_svelte_package_json(existing_json_text: str | None, detected_dependencies: Dict[str, str]) -> str: - """Create or merge a package.json for Svelte spaces. - - - If existing_json_text is provided, merge detected deps into its dependencies. - - Otherwise, start from the template defaults provided by the user and add deps. - - Always preserve template scripts and devDependencies. - """ - import json as _json - # Template from the user's Svelte space scaffold - template = { - "name": "svelte", - "private": True, - "version": "0.0.0", - "type": "module", - "scripts": { - "dev": "vite", - "build": "vite build", - "preview": "vite preview", - "check": "svelte-check --tsconfig ./tsconfig.app.json && tsc -p tsconfig.node.json" - }, - "devDependencies": { - "@sveltejs/vite-plugin-svelte": "^5.0.3", - "@tsconfig/svelte": "^5.0.4", - "svelte": "^5.28.1", - "svelte-check": "^4.1.6", - "typescript": "~5.8.3", - "vite": "^6.3.5" - } - } - - result = template - if existing_json_text: - try: - parsed = _json.loads(existing_json_text) - # Merge with template as base, keeping template scripts/devDependencies if missing in parsed - result = { - **template, - **{k: v for k, v in parsed.items() if k not in ("scripts", "devDependencies")}, - } - # If parsed contains its own scripts/devDependencies, prefer parsed to respect user's file - if isinstance(parsed.get("scripts"), dict): - result["scripts"] = parsed["scripts"] - if isinstance(parsed.get("devDependencies"), dict): - result["devDependencies"] = parsed["devDependencies"] - except Exception: - # Fallback to template if parse fails - result = template - - # Merge dependencies - existing_deps = result.get("dependencies", {}) - if not isinstance(existing_deps, dict): - existing_deps = {} - merged = {**existing_deps, **(detected_dependencies or {})} - if merged: - result["dependencies"] = merged - else: - result.pop("dependencies", None) - - return _json.dumps(result, indent=2, ensure_ascii=False) + "\n" - -def history_render(history: History): - return gr.update(visible=True), history - -def clear_history(): - return [], [], None, "" # Empty lists for both tuple format and chatbot messages, None for file, empty string for website URL - -def update_image_input_visibility(model): - """Update image input visibility based on selected model""" - is_ernie_vl = model.get("id") == "baidu/ERNIE-4.5-VL-424B-A47B-Base-PT" - is_glm_vl = model.get("id") == "THUDM/GLM-4.1V-9B-Thinking" - is_glm_45v = model.get("id") == "zai-org/GLM-4.5V" - return gr.update(visible=is_ernie_vl or is_glm_vl or is_glm_45v) - -def process_image_for_model(image): - """Convert image to base64 for model input""" - if image is None: - return None - - # Convert numpy array to PIL Image if needed - import io - import base64 - import numpy as np - from PIL import Image - - # Handle numpy array from Gradio - if isinstance(image, np.ndarray): - image = Image.fromarray(image) - - buffer = io.BytesIO() - image.save(buffer, format='PNG') - img_str = base64.b64encode(buffer.getvalue()).decode('utf-8') - return f"data:image/png;base64,{img_str}" - -def compress_video_for_data_uri(video_bytes: bytes, max_size_mb: int = 8) -> bytes: - """Compress video bytes for data URI embedding with size limit""" - import subprocess - import tempfile - import os - - max_size = max_size_mb * 1024 * 1024 - - # If already small enough, return as-is - if len(video_bytes) <= max_size: - return video_bytes - - print(f"[VideoCompress] Video size {len(video_bytes)} bytes exceeds {max_size_mb}MB limit, attempting compression") - - try: - # Create temp files - with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_input: - temp_input.write(video_bytes) - temp_input_path = temp_input.name - - temp_output_path = temp_input_path.replace('.mp4', '_compressed.mp4') - - try: - # Compress with ffmpeg - extremely aggressive settings for tiny preview size - subprocess.run([ - 'ffmpeg', '-i', temp_input_path, - '-vcodec', 'libx264', '-crf', '40', '-preset', 'ultrafast', - '-vf', 'scale=320:-1', '-r', '10', # Very low resolution and frame rate - '-an', # Remove audio to save space - '-t', '10', # Limit to first 10 seconds for preview - '-y', temp_output_path - ], check=True, capture_output=True, stderr=subprocess.DEVNULL) - - # Read compressed video - with open(temp_output_path, 'rb') as f: - compressed_bytes = f.read() - - print(f"[VideoCompress] Compressed from {len(video_bytes)} to {len(compressed_bytes)} bytes") - return compressed_bytes - - except (subprocess.CalledProcessError, FileNotFoundError): - print("[VideoCompress] ffmpeg compression failed, using original video") - return video_bytes - finally: - # Clean up temp files - for path in [temp_input_path, temp_output_path]: - try: - if os.path.exists(path): - os.remove(path) - except Exception: - pass - - except Exception as e: - print(f"[VideoCompress] Compression failed: {e}, using original video") - return video_bytes - -def compress_audio_for_data_uri(audio_bytes: bytes, max_size_mb: int = 4) -> bytes: - """Compress audio bytes for data URI embedding with size limit""" - import subprocess - import tempfile - import os - - max_size = max_size_mb * 1024 * 1024 - - # If already small enough, return as-is - if len(audio_bytes) <= max_size: - return audio_bytes - - print(f"[AudioCompress] Audio size {len(audio_bytes)} bytes exceeds {max_size_mb}MB limit, attempting compression") - - try: - # Create temp files - with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_input: - temp_input.write(audio_bytes) - temp_input_path = temp_input.name - - temp_output_path = temp_input_path.replace('.wav', '_compressed.mp3') - - try: - # Compress with ffmpeg - convert to MP3 with lower bitrate - subprocess.run([ - 'ffmpeg', '-i', temp_input_path, - '-codec:a', 'libmp3lame', '-b:a', '64k', # Low bitrate MP3 - '-y', temp_output_path - ], check=True, capture_output=True, stderr=subprocess.DEVNULL) - - # Read compressed audio - with open(temp_output_path, 'rb') as f: - compressed_bytes = f.read() - - print(f"[AudioCompress] Compressed from {len(audio_bytes)} to {len(compressed_bytes)} bytes") - return compressed_bytes - - except (subprocess.CalledProcessError, FileNotFoundError): - print("[AudioCompress] ffmpeg compression failed, using original audio") - return audio_bytes - finally: - # Clean up temp files - for path in [temp_input_path, temp_output_path]: - try: - if os.path.exists(path): - os.remove(path) - except Exception: - pass - - except Exception as e: - print(f"[AudioCompress] Compression failed: {e}, using original audio") - return audio_bytes - -# --------------------------------------------------------------------------- -# General temp media file management (per-session tracking and cleanup) -# --------------------------------------------------------------------------- -MEDIA_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_media") -MEDIA_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours -_SESSION_MEDIA_FILES: Dict[str, List[str]] = {} -_MEDIA_FILES_LOCK = threading.Lock() - -# Global dictionary to store temporary media files for the session -temp_media_files = {} - -def _ensure_media_dir_exists() -> None: - """Ensure the media temp directory exists.""" - try: - os.makedirs(MEDIA_TEMP_DIR, exist_ok=True) - except Exception: - pass - -def track_session_media_file(session_id: str | None, file_path: str) -> None: - """Track a media file for session-based cleanup.""" - if not session_id or not file_path: - return - with _MEDIA_FILES_LOCK: - if session_id not in _SESSION_MEDIA_FILES: - _SESSION_MEDIA_FILES[session_id] = [] - _SESSION_MEDIA_FILES[session_id].append(file_path) - -def cleanup_session_media(session_id: str | None) -> None: - """Clean up media files for a specific session.""" - if not session_id: - return - with _MEDIA_FILES_LOCK: - files_to_clean = _SESSION_MEDIA_FILES.pop(session_id, []) - - for path in files_to_clean: - try: - if path and os.path.exists(path): - os.unlink(path) - except Exception: - # Best-effort cleanup - pass - -def reap_old_media(ttl_seconds: int = MEDIA_FILE_TTL_SECONDS) -> None: - """Delete old media files in the temp directory based on modification time.""" - try: - _ensure_media_dir_exists() - now_ts = time.time() - for name in os.listdir(MEDIA_TEMP_DIR): - path = os.path.join(MEDIA_TEMP_DIR, name) - if os.path.isfile(path): - try: - mtime = os.path.getmtime(path) - if (now_ts - mtime) > ttl_seconds: - os.unlink(path) - except Exception: - pass - except Exception: - # Temp dir might not exist or be accessible; ignore - pass - -def cleanup_all_temp_media_on_startup() -> None: - """Clean up all temporary media files on app startup.""" - try: - # Clean up temp_media_files registry - temp_media_files.clear() - - # Clean up actual files from disk (assume all are orphaned on startup) - _ensure_media_dir_exists() - for name in os.listdir(MEDIA_TEMP_DIR): - path = os.path.join(MEDIA_TEMP_DIR, name) - if os.path.isfile(path): - try: - os.unlink(path) - except Exception: - pass - - # Clear session tracking - with _MEDIA_FILES_LOCK: - _SESSION_MEDIA_FILES.clear() - - print("[StartupCleanup] Cleaned up orphaned temporary media files") - except Exception as e: - print(f"[StartupCleanup] Error during media cleanup: {str(e)}") - -def cleanup_all_temp_media_on_shutdown() -> None: - """Clean up all temporary media files on app shutdown.""" - try: - print("[ShutdownCleanup] Cleaning up temporary media files...") - - # Clean up temp_media_files registry and remove files - for file_id, file_info in temp_media_files.items(): - try: - if os.path.exists(file_info['path']): - os.unlink(file_info['path']) - except Exception: - pass - temp_media_files.clear() - - # Clean up all session files - with _MEDIA_FILES_LOCK: - for session_id, file_paths in _SESSION_MEDIA_FILES.items(): - for path in file_paths: - try: - if path and os.path.exists(path): - os.unlink(path) - except Exception: - pass - _SESSION_MEDIA_FILES.clear() - - print("[ShutdownCleanup] Temporary media cleanup completed") - except Exception as e: - print(f"[ShutdownCleanup] Error during cleanup: {str(e)}") - -# Register shutdown cleanup handler -atexit.register(cleanup_all_temp_media_on_shutdown) - -def create_temp_media_url(media_bytes: bytes, filename: str, media_type: str = "image", session_id: str | None = None) -> str: - """Create a temporary file and return a local URL for preview. - - Args: - media_bytes: Raw bytes of the media file - filename: Name for the file (will be made unique) - media_type: Type of media ('image', 'video', 'audio') - session_id: Session ID for tracking cleanup - - Returns: - Temporary file URL for preview or error message - """ - try: - # Create unique filename with timestamp and UUID - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - unique_id = str(uuid.uuid4())[:8] - base_name, ext = os.path.splitext(filename) - unique_filename = f"{media_type}_{timestamp}_{unique_id}_{base_name}{ext}" - - # Create temporary file in the dedicated directory - _ensure_media_dir_exists() - temp_path = os.path.join(MEDIA_TEMP_DIR, unique_filename) - - # Write media bytes to temporary file - with open(temp_path, 'wb') as f: - f.write(media_bytes) - - # Track file for session-based cleanup - if session_id: - track_session_media_file(session_id, temp_path) - - # Store the file info for later upload - file_id = f"{media_type}_{unique_id}" - temp_media_files[file_id] = { - 'path': temp_path, - 'filename': filename, - 'media_type': media_type, - 'media_bytes': media_bytes - } - - # Return file:// URL for preview - file_url = f"file://{temp_path}" - print(f"[TempMedia] Created temporary {media_type} file: {file_url}") - return file_url - - except Exception as e: - print(f"[TempMedia] Failed to create temporary file: {str(e)}") - return f"Error creating temporary {media_type} file: {str(e)}" - -def upload_media_to_hf(media_bytes: bytes, filename: str, media_type: str = "image", token: gr.OAuthToken | None = None, use_temp: bool = True) -> str: - """Upload media file to user's Hugging Face account or create temporary file. - - Args: - media_bytes: Raw bytes of the media file - filename: Name for the file (will be made unique) - media_type: Type of media ('image', 'video', 'audio') - token: OAuth token from gr.login (takes priority over env var) - use_temp: If True, create temporary file for preview; if False, upload to HF - - Returns: - Permanent URL to the uploaded file, temporary URL, or error message - """ - try: - # If use_temp is True, create temporary file for preview - if use_temp: - return create_temp_media_url(media_bytes, filename, media_type) - - # Otherwise, upload to Hugging Face for permanent URL - # Try to get token from OAuth first, then fall back to environment variable - hf_token = None - if token and token.token: - hf_token = token.token - else: - hf_token = os.getenv('HF_TOKEN') - - if not hf_token: - return "Error: Please log in with your Hugging Face account to upload media, or set HF_TOKEN environment variable." - - # Initialize HF API - api = HfApi(token=hf_token) - - # Get current user info to determine username - try: - user_info = api.whoami() - username = user_info.get('name', 'unknown-user') - except Exception as e: - print(f"[HFUpload] Could not get user info: {e}") - username = 'anycoder-user' - - # Create repository name for media storage - repo_name = f"{username}/anycoder-media" - - # Try to create the repository if it doesn't exist - try: - api.create_repo( - repo_id=repo_name, - repo_type="dataset", - private=False, - exist_ok=True - ) - print(f"[HFUpload] Repository {repo_name} ready") - except Exception as e: - print(f"[HFUpload] Repository creation/access issue: {e}") - # Continue anyway, repo might already exist - - # Create unique filename with timestamp and UUID - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - unique_id = str(uuid.uuid4())[:8] - base_name, ext = os.path.splitext(filename) - unique_filename = f"{media_type}/{timestamp}_{unique_id}_{base_name}{ext}" - - # Create temporary file for upload - with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file: - temp_file.write(media_bytes) - temp_path = temp_file.name - - try: - # Upload file to HF repository - api.upload_file( - path_or_fileobj=temp_path, - path_in_repo=unique_filename, - repo_id=repo_name, - repo_type="dataset", - commit_message=f"Upload {media_type} generated by AnyCoder" - ) - - # Generate permanent URL - permanent_url = f"https://huggingface.co/datasets/{repo_name}/resolve/main/{unique_filename}" - print(f"[HFUpload] Successfully uploaded {media_type} to {permanent_url}") - return permanent_url - - finally: - # Clean up temporary file - try: - os.unlink(temp_path) - except Exception: - pass - - except Exception as e: - print(f"[HFUpload] Upload failed: {str(e)}") - return f"Error uploading {media_type} to Hugging Face: {str(e)}" - -def upload_temp_files_to_hf_and_replace_urls(html_content: str, token: gr.OAuthToken | None = None) -> str: - """Upload all temporary media files to HF and replace their URLs in HTML content. - - Args: - html_content: HTML content containing temporary file URLs - token: OAuth token for HF authentication - - Returns: - Updated HTML content with permanent HF URLs - """ - try: - if not temp_media_files: - print("[DeployUpload] No temporary media files to upload") - return html_content - - print(f"[DeployUpload] Uploading {len(temp_media_files)} temporary media files to HF") - updated_content = html_content - - for file_id, file_info in temp_media_files.items(): - try: - # Upload to HF with permanent URL - permanent_url = upload_media_to_hf( - file_info['media_bytes'], - file_info['filename'], - file_info['media_type'], - token, - use_temp=False # Force permanent upload - ) - - if not permanent_url.startswith("Error"): - # Replace the temporary file URL with permanent URL - temp_url = f"file://{file_info['path']}" - updated_content = updated_content.replace(temp_url, permanent_url) - print(f"[DeployUpload] Replaced {temp_url} with {permanent_url}") - else: - print(f"[DeployUpload] Failed to upload {file_id}: {permanent_url}") - - except Exception as e: - print(f"[DeployUpload] Error uploading {file_id}: {str(e)}") - continue - - # Clean up temporary files after upload - cleanup_temp_media_files() - - return updated_content - - except Exception as e: - print(f"[DeployUpload] Failed to upload temporary files: {str(e)}") - return html_content - -def cleanup_temp_media_files(): - """Clean up temporary media files from disk and memory.""" - try: - for file_id, file_info in temp_media_files.items(): - try: - if os.path.exists(file_info['path']): - os.remove(file_info['path']) - print(f"[TempCleanup] Removed {file_info['path']}") - except Exception as e: - print(f"[TempCleanup] Failed to remove {file_info['path']}: {str(e)}") - - # Clear the global dictionary - temp_media_files.clear() - print("[TempCleanup] Cleared temporary media files registry") - - except Exception as e: - print(f"[TempCleanup] Error during cleanup: {str(e)}") -def generate_image_to_image(input_image_data, prompt: str, token: gr.OAuthToken | None = None) -> str: - """Generate an image using image-to-image via OpenRouter. - - Uses Google Gemini 2.5 Flash Image Preview via OpenRouter chat completions API. - - Returns an HTML tag whose src is an uploaded temporary URL. - """ - try: - # Check for OpenRouter API key - openrouter_key = os.getenv('OPENROUTER_API_KEY') - if not openrouter_key: - return "Error: OPENROUTER_API_KEY environment variable is not set. Please set it to your OpenRouter API key." - - # Normalize input image to bytes - import io - from PIL import Image - import base64 - import requests - import json as _json - try: - import numpy as np - except Exception: - np = None - - if hasattr(input_image_data, 'read'): - raw = input_image_data.read() - pil_image = Image.open(io.BytesIO(raw)) - elif hasattr(input_image_data, 'mode') and hasattr(input_image_data, 'size'): - pil_image = input_image_data - elif np is not None and isinstance(input_image_data, np.ndarray): - pil_image = Image.fromarray(input_image_data) - elif isinstance(input_image_data, (bytes, bytearray)): - pil_image = Image.open(io.BytesIO(input_image_data)) - else: - pil_image = Image.open(io.BytesIO(bytes(input_image_data))) - - if pil_image.mode != 'RGB': - pil_image = pil_image.convert('RGB') - - # Resize input image to avoid request body size limits - max_input_size = 1024 - if pil_image.width > max_input_size or pil_image.height > max_input_size: - pil_image.thumbnail((max_input_size, max_input_size), Image.Resampling.LANCZOS) - - # Convert to base64 - import io as _io - buffered = _io.BytesIO() - pil_image.save(buffered, format='PNG') - img_b64 = base64.b64encode(buffered.getvalue()).decode('utf-8') - - # Call OpenRouter API - headers = { - "Authorization": f"Bearer {openrouter_key}", - "Content-Type": "application/json", - "HTTP-Referer": os.getenv("YOUR_SITE_URL", "https://example.com"), - "X-Title": os.getenv("YOUR_SITE_NAME", "AnyCoder Image I2I"), - } - payload = { - "model": "google/gemini-2.5-flash-image-preview:free", - "messages": [ - { - "role": "user", - "content": [ - {"type": "text", "text": prompt}, - {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_b64}"}}, - ], - } - ], - "max_tokens": 2048, - } - - try: - resp = requests.post( - "https://openrouter.ai/api/v1/chat/completions", - headers=headers, - data=_json.dumps(payload), - timeout=60, - ) - resp.raise_for_status() - result_data = resp.json() - - # Corrected response parsing logic - message = result_data.get('choices', [{}])[0].get('message', {}) - - if message and 'images' in message and message['images']: - # Get the first image from the 'images' list - image_data = message['images'][0] - base64_string = image_data.get('image_url', {}).get('url', '') - - if base64_string and ',' in base64_string: - # Remove the "data:image/png;base64," prefix - base64_content = base64_string.split(',')[1] - - # Decode the base64 string and create a PIL image - img_bytes = base64.b64decode(base64_content) - edited_image = Image.open(_io.BytesIO(img_bytes)) - - # Convert PIL image to JPEG bytes for upload - out_buf = _io.BytesIO() - edited_image.convert('RGB').save(out_buf, format='JPEG', quality=90, optimize=True) - image_bytes = out_buf.getvalue() - else: - raise RuntimeError(f"API returned an invalid image format. Response: {_json.dumps(result_data, indent=2)}") - else: - raise RuntimeError(f"API did not return an image. Full Response: {_json.dumps(result_data, indent=2)}") - - except requests.exceptions.HTTPError as err: - error_body = err.response.text - if err.response.status_code == 401: - return "Error: Authentication failed. Check your OpenRouter API key." - elif err.response.status_code == 429: - return "Error: Rate limit exceeded or insufficient credits. Check your OpenRouter account." - else: - return f"Error: An API error occurred: {error_body}" - except Exception as e: - return f"Error: An unexpected error occurred: {str(e)}" - - # Upload and return HTML tag - filename = "image_to_image_result.jpg" - temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True) - if temp_url.startswith("Error"): - return temp_url - return f"\"{prompt}\"" - except Exception as e: - print(f"Image-to-image generation error: {str(e)}") - return f"Error generating image (image-to-image): {str(e)}" -def generate_video_from_image(input_image_data, prompt: str, session_id: str | None = None, token: gr.OAuthToken | None = None) -> str: - """Generate a video from an input image and prompt using Hugging Face InferenceClient. - - Returns an HTML