Spaces:
Sleeping
Sleeping
| """ | |
| ============================================ | |
| 🔥 RUHI-CORE - ZIP Deployer Engine | |
| ============================================ | |
| Upload ZIP -> Extract -> Auto-Detect -> Deploy! | |
| Supports: .zip, .tar.gz, .tar, .7z, .rar | |
| """ | |
| import os | |
| import re | |
| import uuid | |
| import shutil | |
| import zipfile | |
| import tarfile | |
| import asyncio | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional, Dict, List, Tuple | |
| import aiofiles | |
| import aiosqlite | |
| from loguru import logger | |
| from core.config import settings | |
| from core.process_manager import process_manager | |
| class ZipDeployer: | |
| """ | |
| Handles ZIP/archive upload and automatic deployment. | |
| Flow: | |
| 1. Upload ZIP file -> /data/uploads/ | |
| 2. Extract to temp dir -> /data/temp/ | |
| 3. Auto-detect entry point (main.py, index.js, etc.) | |
| 4. Move to /data/apps/{service_name}/ | |
| 5. Create service in ProcessManager | |
| 6. Optionally auto-start | |
| """ | |
| # Entry point detection priority | |
| ENTRY_POINTS = { | |
| "python": [ | |
| "main.py", "app.py", "bot.py", "run.py", "server.py", | |
| "index.py", "start.py", "manage.py", "wsgi.py", "asgi.py", | |
| "__main__.py" | |
| ], | |
| "node": [ | |
| "index.js", "app.js", "server.js", "bot.js", "main.js", | |
| "start.js", "index.ts", "app.ts", "server.ts" | |
| ], | |
| "shell": [ | |
| "start.sh", "run.sh", "entrypoint.sh", "boot.sh", "init.sh" | |
| ] | |
| } | |
| # Package file detection | |
| PACKAGE_FILES = { | |
| "python": ["requirements.txt", "Pipfile", "pyproject.toml", "setup.py", "setup.cfg"], | |
| "node": ["package.json", "yarn.lock", "pnpm-lock.yaml", "package-lock.json"], | |
| } | |
| def __init__(self): | |
| self.uploads_dir = settings.UPLOADS_DIR | |
| self.temp_dir = settings.TEMP_DIR | |
| self.apps_dir = settings.APPS_DIR | |
| # Ensure directories exist | |
| self.uploads_dir.mkdir(parents=True, exist_ok=True) | |
| self.temp_dir.mkdir(parents=True, exist_ok=True) | |
| self.apps_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info("📦 ZipDeployer initialized") | |
| async def save_upload(self, filename: str, content: bytes) -> Dict: | |
| """ | |
| Save an uploaded file to the uploads directory. | |
| Returns file info and generates a deployment ID. | |
| """ | |
| # Validate filename | |
| safe_name = self._sanitize_filename(filename) | |
| if not safe_name: | |
| raise ValueError("Invalid filename") | |
| # Check extension | |
| ext = Path(safe_name).suffix.lower() | |
| allowed = ['.zip', '.tar', '.gz', '.tgz', '.7z', '.rar', '.tar.gz'] | |
| if not any(safe_name.endswith(e) for e in allowed): | |
| raise ValueError(f"Unsupported file type: {ext}. Allowed: {', '.join(allowed)}") | |
| # Check size | |
| if len(content) > settings.MAX_UPLOAD_SIZE: | |
| max_mb = settings.MAX_UPLOAD_SIZE / (1024 * 1024) | |
| raise ValueError(f"File too large. Max: {max_mb}MB") | |
| # Generate deployment ID | |
| deploy_id = str(uuid.uuid4())[:12] | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Save file | |
| save_name = f"{timestamp}_{deploy_id}_{safe_name}" | |
| save_path = self.uploads_dir / save_name | |
| async with aiofiles.open(save_path, 'wb') as f: | |
| await f.write(content) | |
| file_size = len(content) | |
| logger.info(f"📥 Upload saved: {save_name} ({self._format_size(file_size)})") | |
| return { | |
| "deploy_id": deploy_id, | |
| "filename": safe_name, | |
| "saved_as": save_name, | |
| "path": str(save_path), | |
| "size": file_size, | |
| "size_formatted": self._format_size(file_size), | |
| "timestamp": timestamp, | |
| } | |
| async def extract_archive(self, file_path: str, deploy_id: str) -> Dict: | |
| """ | |
| Extract an archive to a temporary directory. | |
| Supports ZIP, TAR, TAR.GZ, 7Z. | |
| """ | |
| src = Path(file_path) | |
| if not src.exists(): | |
| raise FileNotFoundError(f"Archive not found: {file_path}") | |
| # Create extraction directory | |
| extract_dir = self.temp_dir / deploy_id | |
| if extract_dir.exists(): | |
| shutil.rmtree(extract_dir) | |
| extract_dir.mkdir(parents=True) | |
| filename_lower = src.name.lower() | |
| try: | |
| if filename_lower.endswith('.zip'): | |
| await self._extract_zip(src, extract_dir) | |
| elif filename_lower.endswith(('.tar.gz', '.tgz')): | |
| await self._extract_tar(src, extract_dir, 'gz') | |
| elif filename_lower.endswith('.tar.bz2'): | |
| await self._extract_tar(src, extract_dir, 'bz2') | |
| elif filename_lower.endswith('.tar'): | |
| await self._extract_tar(src, extract_dir, '') | |
| elif filename_lower.endswith('.7z'): | |
| await self._extract_7z(src, extract_dir) | |
| elif filename_lower.endswith('.rar'): | |
| await self._extract_rar(src, extract_dir) | |
| else: | |
| raise ValueError(f"Unsupported archive format: {src.suffix}") | |
| except Exception as e: | |
| # Cleanup on error | |
| if extract_dir.exists(): | |
| shutil.rmtree(extract_dir) | |
| raise RuntimeError(f"Extraction failed: {str(e)}") | |
| # Handle single-directory archives (unwrap) | |
| extract_dir = self._unwrap_single_dir(extract_dir) | |
| # Count extracted files | |
| files = list(extract_dir.rglob('*')) | |
| file_count = sum(1 for f in files if f.is_file()) | |
| dir_count = sum(1 for f in files if f.is_dir()) | |
| total_size = sum(f.stat().st_size for f in files if f.is_file()) | |
| # List top-level contents | |
| top_level = [ | |
| { | |
| "name": item.name, | |
| "is_dir": item.is_dir(), | |
| "size": item.stat().st_size if item.is_file() else 0, | |
| } | |
| for item in sorted(extract_dir.iterdir(), key=lambda x: (not x.is_dir(), x.name)) | |
| ] | |
| logger.info(f"📦 Extracted: {file_count} files, {dir_count} dirs, {self._format_size(total_size)}") | |
| return { | |
| "deploy_id": deploy_id, | |
| "extract_dir": str(extract_dir), | |
| "file_count": file_count, | |
| "dir_count": dir_count, | |
| "total_size": total_size, | |
| "total_size_formatted": self._format_size(total_size), | |
| "top_level": top_level[:30], | |
| } | |
| async def auto_detect(self, extract_dir: str) -> Dict: | |
| """ | |
| Auto-detect project type, language, and entry point. | |
| """ | |
| base = Path(extract_dir) | |
| if not base.exists(): | |
| raise FileNotFoundError(f"Directory not found: {extract_dir}") | |
| detected = { | |
| "language": None, | |
| "entry_file": None, | |
| "type": "worker", # Default to worker | |
| "package_manager": None, | |
| "dependencies_file": None, | |
| "framework": None, | |
| "has_dockerfile": False, | |
| "has_env": False, | |
| "suggestions": [] | |
| } | |
| # List all files | |
| all_files = {f.name: f for f in base.iterdir() if f.is_file()} | |
| all_names = set(all_files.keys()) | |
| # Check for Dockerfile | |
| detected["has_dockerfile"] = "Dockerfile" in all_names or "dockerfile" in all_names | |
| # Check for .env | |
| detected["has_env"] = ".env" in all_names or ".env.example" in all_names | |
| # 1. Check for Python project | |
| for entry in self.ENTRY_POINTS["python"]: | |
| if entry in all_names: | |
| detected["language"] = "python" | |
| detected["entry_file"] = entry | |
| break | |
| # Check Python package files | |
| for pkg_file in self.PACKAGE_FILES["python"]: | |
| if pkg_file in all_names: | |
| detected["language"] = "python" | |
| detected["dependencies_file"] = pkg_file | |
| if pkg_file == "requirements.txt": | |
| detected["package_manager"] = "pip" | |
| elif pkg_file == "Pipfile": | |
| detected["package_manager"] = "pipenv" | |
| elif pkg_file == "pyproject.toml": | |
| detected["package_manager"] = "poetry/pip" | |
| break | |
| # 2. Check for Node.js project | |
| if not detected["language"]: | |
| for entry in self.ENTRY_POINTS["node"]: | |
| if entry in all_names: | |
| detected["language"] = "node" | |
| detected["entry_file"] = entry | |
| break | |
| if "package.json" in all_names: | |
| if not detected["language"]: | |
| detected["language"] = "node" | |
| detected["dependencies_file"] = "package.json" | |
| detected["package_manager"] = "npm" | |
| # Read package.json for more info | |
| try: | |
| import json | |
| with open(base / "package.json", 'r') as f: | |
| pkg = json.load(f) | |
| # Check for start script | |
| scripts = pkg.get("scripts", {}) | |
| if "start" in scripts: | |
| detected["suggestions"].append(f"Found start script: {scripts['start']}") | |
| # Detect entry from main | |
| if not detected["entry_file"] and "main" in pkg: | |
| detected["entry_file"] = pkg["main"] | |
| # Detect framework | |
| deps = {**pkg.get("dependencies", {}), **pkg.get("devDependencies", {})} | |
| if "express" in deps: | |
| detected["framework"] = "Express.js" | |
| detected["type"] = "web" | |
| elif "next" in deps: | |
| detected["framework"] = "Next.js" | |
| detected["type"] = "web" | |
| elif "discord.js" in deps: | |
| detected["framework"] = "Discord.js" | |
| detected["type"] = "bot" | |
| elif "telegraf" in deps: | |
| detected["framework"] = "Telegraf (Telegram)" | |
| detected["type"] = "bot" | |
| except Exception: | |
| pass | |
| # 3. Check for Shell scripts | |
| if not detected["language"]: | |
| for entry in self.ENTRY_POINTS["shell"]: | |
| if entry in all_names: | |
| detected["language"] = "shell" | |
| detected["entry_file"] = entry | |
| break | |
| # 4. Try to detect web service vs worker | |
| if detected["entry_file"]: | |
| try: | |
| content = (base / detected["entry_file"]).read_text(errors='replace').lower() | |
| web_indicators = [ | |
| 'flask', 'fastapi', 'django', 'uvicorn', 'gunicorn', | |
| 'express', 'http.createserver', 'app.listen', | |
| 'http.server', 'socketserver', 'tornado', | |
| 'aiohttp', 'sanic', 'starlette', 'bottle', | |
| 'bind', 'port', 'listen' | |
| ] | |
| if any(indicator in content for indicator in web_indicators): | |
| detected["type"] = "web" | |
| # Detect bot | |
| bot_indicators = [ | |
| 'discord', 'telegram', 'bot.run', 'client.run', | |
| 'bot.start', 'pyrogram', 'telethon', 'aiogram', | |
| 'slack_sdk', 'tweepy', 'praw' | |
| ] | |
| if any(indicator in content for indicator in bot_indicators): | |
| detected["type"] = "bot" | |
| # Detect Python framework | |
| if detected["language"] == "python": | |
| if 'flask' in content: | |
| detected["framework"] = "Flask" | |
| elif 'fastapi' in content: | |
| detected["framework"] = "FastAPI" | |
| elif 'django' in content: | |
| detected["framework"] = "Django" | |
| elif 'discord' in content: | |
| detected["framework"] = "Discord.py" | |
| elif 'pyrogram' in content or 'telethon' in content or 'aiogram' in content: | |
| detected["framework"] = "Telegram Bot" | |
| except Exception: | |
| pass | |
| # Generate suggestions | |
| if not detected["entry_file"]: | |
| # Look for any Python/JS files | |
| py_files = [f for f in all_names if f.endswith('.py')] | |
| js_files = [f for f in all_names if f.endswith('.js')] | |
| if py_files: | |
| detected["language"] = "python" | |
| detected["entry_file"] = py_files[0] | |
| detected["suggestions"].append(f"No standard entry point found. Using: {py_files[0]}") | |
| elif js_files: | |
| detected["language"] = "node" | |
| detected["entry_file"] = js_files[0] | |
| detected["suggestions"].append(f"No standard entry point found. Using: {js_files[0]}") | |
| else: | |
| detected["suggestions"].append("No recognizable entry point found. Please specify manually.") | |
| if detected["dependencies_file"]: | |
| detected["suggestions"].append(f"Dependencies file found: {detected['dependencies_file']}") | |
| logger.info(f"🔍 Auto-detected: {detected['language']}/{detected['type']} -> {detected['entry_file']}") | |
| return detected | |
| async def deploy( | |
| self, | |
| deploy_id: str, | |
| extract_dir: str, | |
| service_name: str, | |
| language: str = "python", | |
| entry_file: str = "main.py", | |
| service_type: str = "web", | |
| auto_start: bool = True, | |
| install_deps: bool = True, | |
| env_vars: dict = None, | |
| command: str = "", | |
| description: str = "" | |
| ) -> Dict: | |
| """ | |
| Deploy extracted files as a new service. | |
| 1. Move files to /data/apps/{name}/ | |
| 2. Install dependencies if needed | |
| 3. Create service in ProcessManager | |
| 4. Optionally start it | |
| """ | |
| src_dir = Path(extract_dir) | |
| if not src_dir.exists(): | |
| raise FileNotFoundError(f"Source directory not found: {extract_dir}") | |
| # Sanitize service name | |
| safe_name = re.sub(r'[^a-zA-Z0-9_-]', '-', service_name.strip()).lower() | |
| if not safe_name: | |
| raise ValueError("Invalid service name") | |
| # Target app directory | |
| app_dir = self.apps_dir / safe_name | |
| # If app dir already exists, back it up | |
| if app_dir.exists(): | |
| backup_name = f"{safe_name}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| backup_path = settings.BACKUPS_DIR / backup_name | |
| shutil.move(str(app_dir), str(backup_path)) | |
| logger.info(f"📦 Existing app backed up to: {backup_name}") | |
| # Move extracted files to app directory | |
| shutil.copytree(str(src_dir), str(app_dir)) | |
| # Install dependencies | |
| install_log = "" | |
| if install_deps: | |
| install_log = await self._install_dependencies(app_dir, language) | |
| # Create service | |
| try: | |
| # Check if service with this name exists | |
| existing = process_manager.get_service_by_name(safe_name) | |
| if existing: | |
| # Update existing service | |
| existing.app_dir = str(app_dir) | |
| existing.entry_file = entry_file | |
| existing.language = language | |
| existing.type = service_type | |
| existing.command = command | |
| existing.description = description | |
| if env_vars: | |
| existing.env_vars = env_vars | |
| await existing._save_to_db() | |
| service = existing | |
| logger.info(f"🔄 Updated existing service: {safe_name}") | |
| else: | |
| service = await process_manager.create_service( | |
| name=safe_name, | |
| type=service_type, | |
| language=language, | |
| entry_file=entry_file, | |
| command=command, | |
| auto_restart=True, | |
| env_vars=env_vars or {}, | |
| description=description or f"Deployed from ZIP ({deploy_id})" | |
| ) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to create service: {str(e)}") | |
| # Auto-start if requested | |
| started = False | |
| if auto_start: | |
| started = await service.start() | |
| # Cleanup temp directory | |
| try: | |
| temp_deploy = self.temp_dir / deploy_id | |
| if temp_deploy.exists(): | |
| shutil.rmtree(temp_deploy) | |
| except Exception: | |
| pass | |
| # Save deployment record | |
| await self._save_deployment(deploy_id, service.id, safe_name) | |
| logger.info(f"🚀 Deployed: {safe_name} (ID: {service.id}, Started: {started})") | |
| return { | |
| "deploy_id": deploy_id, | |
| "service_id": service.id, | |
| "service_name": safe_name, | |
| "app_dir": str(app_dir), | |
| "language": language, | |
| "entry_file": entry_file, | |
| "type": service_type, | |
| "dependencies_installed": bool(install_log), | |
| "install_log": install_log, | |
| "auto_started": started, | |
| "status": service.status, | |
| "message": f"Successfully deployed '{safe_name}'! {'Service is running.' if started else 'Service created but not started.'}", | |
| "service": service.get_info() | |
| } | |
| async def quick_deploy( | |
| self, | |
| filename: str, | |
| content: bytes, | |
| service_name: Optional[str] = None, | |
| auto_start: bool = True | |
| ) -> Dict: | |
| """ | |
| One-click deployment: Upload ZIP -> Extract -> Detect -> Deploy | |
| All in one step! | |
| """ | |
| deploy_log = [] | |
| # Step 1: Save upload | |
| deploy_log.append("📥 Saving uploaded file...") | |
| upload_info = await self.save_upload(filename, content) | |
| deploy_id = upload_info["deploy_id"] | |
| deploy_log.append(f"✅ Saved: {upload_info['filename']} ({upload_info['size_formatted']})") | |
| # Step 2: Extract | |
| deploy_log.append("📦 Extracting archive...") | |
| extract_info = await self.extract_archive(upload_info["path"], deploy_id) | |
| deploy_log.append(f"✅ Extracted: {extract_info['file_count']} files") | |
| # Step 3: Auto-detect | |
| deploy_log.append("🔍 Auto-detecting project type...") | |
| detection = await self.auto_detect(extract_info["extract_dir"]) | |
| deploy_log.append(f"✅ Detected: {detection['language']}/{detection['type']} -> {detection['entry_file']}") | |
| if detection.get("framework"): | |
| deploy_log.append(f"🎯 Framework: {detection['framework']}") | |
| # Generate service name from filename if not provided | |
| if not service_name: | |
| base_name = Path(filename).stem | |
| # Remove common archive suffixes | |
| for suffix in ['.tar', '.min', '.dist', '.build']: | |
| base_name = base_name.replace(suffix, '') | |
| service_name = re.sub(r'[^a-zA-Z0-9]', '-', base_name).strip('-').lower() | |
| if not service_name: | |
| service_name = f"app-{deploy_id[:6]}" | |
| # Step 4: Deploy | |
| deploy_log.append(f"🚀 Deploying as '{service_name}'...") | |
| if not detection["entry_file"]: | |
| deploy_log.append("⚠️ No entry point detected. Creating default main.py...") | |
| # Create a default entry file | |
| default_content = '''# Auto-generated by RUHI-CORE | |
| # Please edit this file with your application code | |
| print("Hello from RUHI-CORE! 🚀") | |
| print("Edit this file in the File Manager to add your code.") | |
| import time | |
| while True: | |
| print("Service is running... ⚡") | |
| time.sleep(60) | |
| ''' | |
| extract_path = Path(extract_info["extract_dir"]) | |
| with open(extract_path / "main.py", 'w') as f: | |
| f.write(default_content) | |
| detection["entry_file"] = "main.py" | |
| detection["language"] = "python" | |
| deploy_result = await self.deploy( | |
| deploy_id=deploy_id, | |
| extract_dir=extract_info["extract_dir"], | |
| service_name=service_name, | |
| language=detection.get("language", "python"), | |
| entry_file=detection.get("entry_file", "main.py"), | |
| service_type=detection.get("type", "worker"), | |
| auto_start=auto_start, | |
| install_deps=True, | |
| description=f"Quick deployed from {filename}" | |
| ) | |
| deploy_log.append(f"✅ Deployment complete!") | |
| return { | |
| **deploy_result, | |
| "deploy_log": deploy_log, | |
| "detection": detection, | |
| } | |
| async def _install_dependencies(self, app_dir: Path, language: str) -> str: | |
| """Install dependencies for the project""" | |
| log_lines = [] | |
| try: | |
| if language == "python": | |
| req_file = app_dir / "requirements.txt" | |
| if req_file.exists(): | |
| logger.info(f"📦 Installing Python dependencies for {app_dir.name}...") | |
| log_lines.append("Installing Python dependencies...") | |
| result = await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| lambda: __import__('subprocess').run( | |
| [ | |
| 'pip', 'install', '--no-cache-dir', '-q', | |
| '-r', str(req_file), | |
| '--target', str(app_dir / '.deps') | |
| ], | |
| capture_output=True, text=True, timeout=300 | |
| ) | |
| ) | |
| if result.returncode == 0: | |
| log_lines.append("✅ Dependencies installed successfully") | |
| else: | |
| log_lines.append(f"⚠️ Some dependencies may have failed: {result.stderr[:500]}") | |
| if result.stdout: | |
| log_lines.append(result.stdout[:1000]) | |
| elif language == "node": | |
| pkg_file = app_dir / "package.json" | |
| if pkg_file.exists(): | |
| logger.info(f"📦 Installing Node.js dependencies for {app_dir.name}...") | |
| log_lines.append("Installing Node.js dependencies...") | |
| result = await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| lambda: __import__('subprocess').run( | |
| ['npm', 'install', '--production'], | |
| cwd=str(app_dir), | |
| capture_output=True, text=True, timeout=300 | |
| ) | |
| ) | |
| if result.returncode == 0: | |
| log_lines.append("✅ Dependencies installed successfully") | |
| else: | |
| log_lines.append(f"⚠️ npm install issues: {result.stderr[:500]}") | |
| except Exception as e: | |
| log_lines.append(f"❌ Dependency installation error: {str(e)}") | |
| logger.error(f"Dependency installation error: {str(e)}") | |
| return "\n".join(log_lines) | |
| async def _save_deployment(self, deploy_id: str, service_id: str, name: str): | |
| """Save deployment record to database""" | |
| try: | |
| async with aiosqlite.connect(str(settings.DB_PATH)) as db: | |
| await db.execute(""" | |
| INSERT INTO deployments (id, service_id, status, completed_at, log) | |
| VALUES (?, ?, 'completed', ?, ?) | |
| """, (deploy_id, service_id, datetime.now().isoformat(), f"Deployed {name}")) | |
| await db.commit() | |
| except Exception as e: | |
| logger.error(f"Deploy record save error: {str(e)}") | |
| async def get_deployment_history(self) -> List[Dict]: | |
| """Get deployment history""" | |
| try: | |
| async with aiosqlite.connect(str(settings.DB_PATH)) as db: | |
| db.row_factory = aiosqlite.Row | |
| cursor = await db.execute( | |
| "SELECT * FROM deployments ORDER BY started_at DESC LIMIT 50" | |
| ) | |
| rows = await cursor.fetchall() | |
| return [dict(row) for row in rows] | |
| except Exception: | |
| return [] | |
| def _unwrap_single_dir(self, extract_dir: Path) -> Path: | |
| """ | |
| If archive extracts to a single directory, use that as the root. | |
| Example: archive.zip -> archive/ -> {files} -> use archive/ as root | |
| """ | |
| items = list(extract_dir.iterdir()) | |
| if len(items) == 1 and items[0].is_dir(): | |
| # Move all contents up one level | |
| single_dir = items[0] | |
| temp_name = extract_dir / f"_unwrap_temp_{uuid.uuid4().hex[:8]}" | |
| single_dir.rename(temp_name) | |
| for item in temp_name.iterdir(): | |
| shutil.move(str(item), str(extract_dir / item.name)) | |
| temp_name.rmdir() | |
| logger.info(f"📦 Unwrapped single-directory archive") | |
| return extract_dir | |
| async def _extract_zip(self, src: Path, dest: Path): | |
| """Extract ZIP archive""" | |
| def _do_extract(): | |
| with zipfile.ZipFile(str(src), 'r') as zf: | |
| # Security: check for path traversal | |
| for info in zf.infolist(): | |
| if info.filename.startswith('/') or '..' in info.filename: | |
| raise ValueError(f"Dangerous path in archive: {info.filename}") | |
| zf.extractall(str(dest)) | |
| await asyncio.get_event_loop().run_in_executor(None, _do_extract) | |
| async def _extract_tar(self, src: Path, dest: Path, compression: str): | |
| """Extract TAR archive""" | |
| mode = f"r:{compression}" if compression else "r" | |
| def _do_extract(): | |
| with tarfile.open(str(src), mode) as tf: | |
| # Security check | |
| for member in tf.getmembers(): | |
| if member.name.startswith('/') or '..' in member.name: | |
| raise ValueError(f"Dangerous path in archive: {member.name}") | |
| tf.extractall(str(dest)) | |
| await asyncio.get_event_loop().run_in_executor(None, _do_extract) | |
| async def _extract_7z(self, src: Path, dest: Path): | |
| """Extract 7z archive""" | |
| try: | |
| import py7zr | |
| def _do_extract(): | |
| with py7zr.SevenZipFile(str(src), mode='r') as z: | |
| z.extractall(path=str(dest)) | |
| await asyncio.get_event_loop().run_in_executor(None, _do_extract) | |
| except ImportError: | |
| raise RuntimeError("7z support not available. Install py7zr.") | |
| async def _extract_rar(self, src: Path, dest: Path): | |
| """Extract RAR archive""" | |
| try: | |
| import rarfile | |
| def _do_extract(): | |
| with rarfile.RarFile(str(src), 'r') as rf: | |
| rf.extractall(str(dest)) | |
| await asyncio.get_event_loop().run_in_executor(None, _do_extract) | |
| except ImportError: | |
| raise RuntimeError("RAR support not available. Install rarfile.") | |
| def _sanitize_filename(self, filename: str) -> str: | |
| """Sanitize a filename""" | |
| # Remove path separators | |
| name = filename.replace('\\', '/').split('/')[-1] | |
| # Remove dangerous characters | |
| name = re.sub(r'[^\w\s\-\.]', '', name) | |
| # Remove leading dots (hidden files) | |
| name = name.lstrip('.') | |
| return name.strip() | |
| def _format_size(size_bytes: int) -> str: | |
| """Format file size""" | |
| if size_bytes < 1024: | |
| return f"{size_bytes} B" | |
| elif size_bytes < 1024 * 1024: | |
| return f"{size_bytes / 1024:.1f} KB" | |
| elif size_bytes < 1024 * 1024 * 1024: | |
| return f"{size_bytes / (1024 * 1024):.1f} MB" | |
| else: | |
| return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB" | |
| # Global instance | |
| zip_deployer = ZipDeployer() | |