""" ============================================ 🔥 RUHI-CORE - ZIP Deployer Engine ============================================ Upload ZIP -> Extract -> Auto-Detect -> Deploy! Supports: .zip, .tar.gz, .tar, .7z, .rar """ import os import re import uuid import shutil import zipfile import tarfile import asyncio from datetime import datetime from pathlib import Path from typing import Optional, Dict, List, Tuple import aiofiles import aiosqlite from loguru import logger from core.config import settings from core.process_manager import process_manager class ZipDeployer: """ Handles ZIP/archive upload and automatic deployment. Flow: 1. Upload ZIP file -> /data/uploads/ 2. Extract to temp dir -> /data/temp/ 3. Auto-detect entry point (main.py, index.js, etc.) 4. Move to /data/apps/{service_name}/ 5. Create service in ProcessManager 6. Optionally auto-start """ # Entry point detection priority ENTRY_POINTS = { "python": [ "main.py", "app.py", "bot.py", "run.py", "server.py", "index.py", "start.py", "manage.py", "wsgi.py", "asgi.py", "__main__.py" ], "node": [ "index.js", "app.js", "server.js", "bot.js", "main.js", "start.js", "index.ts", "app.ts", "server.ts" ], "shell": [ "start.sh", "run.sh", "entrypoint.sh", "boot.sh", "init.sh" ] } # Package file detection PACKAGE_FILES = { "python": ["requirements.txt", "Pipfile", "pyproject.toml", "setup.py", "setup.cfg"], "node": ["package.json", "yarn.lock", "pnpm-lock.yaml", "package-lock.json"], } def __init__(self): self.uploads_dir = settings.UPLOADS_DIR self.temp_dir = settings.TEMP_DIR self.apps_dir = settings.APPS_DIR # Ensure directories exist self.uploads_dir.mkdir(parents=True, exist_ok=True) self.temp_dir.mkdir(parents=True, exist_ok=True) self.apps_dir.mkdir(parents=True, exist_ok=True) logger.info("📦 ZipDeployer initialized") async def save_upload(self, filename: str, content: bytes) -> Dict: """ Save an uploaded file to the uploads directory. Returns file info and generates a deployment ID. """ # Validate filename safe_name = self._sanitize_filename(filename) if not safe_name: raise ValueError("Invalid filename") # Check extension ext = Path(safe_name).suffix.lower() allowed = ['.zip', '.tar', '.gz', '.tgz', '.7z', '.rar', '.tar.gz'] if not any(safe_name.endswith(e) for e in allowed): raise ValueError(f"Unsupported file type: {ext}. Allowed: {', '.join(allowed)}") # Check size if len(content) > settings.MAX_UPLOAD_SIZE: max_mb = settings.MAX_UPLOAD_SIZE / (1024 * 1024) raise ValueError(f"File too large. Max: {max_mb}MB") # Generate deployment ID deploy_id = str(uuid.uuid4())[:12] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Save file save_name = f"{timestamp}_{deploy_id}_{safe_name}" save_path = self.uploads_dir / save_name async with aiofiles.open(save_path, 'wb') as f: await f.write(content) file_size = len(content) logger.info(f"📥 Upload saved: {save_name} ({self._format_size(file_size)})") return { "deploy_id": deploy_id, "filename": safe_name, "saved_as": save_name, "path": str(save_path), "size": file_size, "size_formatted": self._format_size(file_size), "timestamp": timestamp, } async def extract_archive(self, file_path: str, deploy_id: str) -> Dict: """ Extract an archive to a temporary directory. Supports ZIP, TAR, TAR.GZ, 7Z. """ src = Path(file_path) if not src.exists(): raise FileNotFoundError(f"Archive not found: {file_path}") # Create extraction directory extract_dir = self.temp_dir / deploy_id if extract_dir.exists(): shutil.rmtree(extract_dir) extract_dir.mkdir(parents=True) filename_lower = src.name.lower() try: if filename_lower.endswith('.zip'): await self._extract_zip(src, extract_dir) elif filename_lower.endswith(('.tar.gz', '.tgz')): await self._extract_tar(src, extract_dir, 'gz') elif filename_lower.endswith('.tar.bz2'): await self._extract_tar(src, extract_dir, 'bz2') elif filename_lower.endswith('.tar'): await self._extract_tar(src, extract_dir, '') elif filename_lower.endswith('.7z'): await self._extract_7z(src, extract_dir) elif filename_lower.endswith('.rar'): await self._extract_rar(src, extract_dir) else: raise ValueError(f"Unsupported archive format: {src.suffix}") except Exception as e: # Cleanup on error if extract_dir.exists(): shutil.rmtree(extract_dir) raise RuntimeError(f"Extraction failed: {str(e)}") # Handle single-directory archives (unwrap) extract_dir = self._unwrap_single_dir(extract_dir) # Count extracted files files = list(extract_dir.rglob('*')) file_count = sum(1 for f in files if f.is_file()) dir_count = sum(1 for f in files if f.is_dir()) total_size = sum(f.stat().st_size for f in files if f.is_file()) # List top-level contents top_level = [ { "name": item.name, "is_dir": item.is_dir(), "size": item.stat().st_size if item.is_file() else 0, } for item in sorted(extract_dir.iterdir(), key=lambda x: (not x.is_dir(), x.name)) ] logger.info(f"📦 Extracted: {file_count} files, {dir_count} dirs, {self._format_size(total_size)}") return { "deploy_id": deploy_id, "extract_dir": str(extract_dir), "file_count": file_count, "dir_count": dir_count, "total_size": total_size, "total_size_formatted": self._format_size(total_size), "top_level": top_level[:30], } async def auto_detect(self, extract_dir: str) -> Dict: """ Auto-detect project type, language, and entry point. """ base = Path(extract_dir) if not base.exists(): raise FileNotFoundError(f"Directory not found: {extract_dir}") detected = { "language": None, "entry_file": None, "type": "worker", # Default to worker "package_manager": None, "dependencies_file": None, "framework": None, "has_dockerfile": False, "has_env": False, "suggestions": [] } # List all files all_files = {f.name: f for f in base.iterdir() if f.is_file()} all_names = set(all_files.keys()) # Check for Dockerfile detected["has_dockerfile"] = "Dockerfile" in all_names or "dockerfile" in all_names # Check for .env detected["has_env"] = ".env" in all_names or ".env.example" in all_names # 1. Check for Python project for entry in self.ENTRY_POINTS["python"]: if entry in all_names: detected["language"] = "python" detected["entry_file"] = entry break # Check Python package files for pkg_file in self.PACKAGE_FILES["python"]: if pkg_file in all_names: detected["language"] = "python" detected["dependencies_file"] = pkg_file if pkg_file == "requirements.txt": detected["package_manager"] = "pip" elif pkg_file == "Pipfile": detected["package_manager"] = "pipenv" elif pkg_file == "pyproject.toml": detected["package_manager"] = "poetry/pip" break # 2. Check for Node.js project if not detected["language"]: for entry in self.ENTRY_POINTS["node"]: if entry in all_names: detected["language"] = "node" detected["entry_file"] = entry break if "package.json" in all_names: if not detected["language"]: detected["language"] = "node" detected["dependencies_file"] = "package.json" detected["package_manager"] = "npm" # Read package.json for more info try: import json with open(base / "package.json", 'r') as f: pkg = json.load(f) # Check for start script scripts = pkg.get("scripts", {}) if "start" in scripts: detected["suggestions"].append(f"Found start script: {scripts['start']}") # Detect entry from main if not detected["entry_file"] and "main" in pkg: detected["entry_file"] = pkg["main"] # Detect framework deps = {**pkg.get("dependencies", {}), **pkg.get("devDependencies", {})} if "express" in deps: detected["framework"] = "Express.js" detected["type"] = "web" elif "next" in deps: detected["framework"] = "Next.js" detected["type"] = "web" elif "discord.js" in deps: detected["framework"] = "Discord.js" detected["type"] = "bot" elif "telegraf" in deps: detected["framework"] = "Telegraf (Telegram)" detected["type"] = "bot" except Exception: pass # 3. Check for Shell scripts if not detected["language"]: for entry in self.ENTRY_POINTS["shell"]: if entry in all_names: detected["language"] = "shell" detected["entry_file"] = entry break # 4. Try to detect web service vs worker if detected["entry_file"]: try: content = (base / detected["entry_file"]).read_text(errors='replace').lower() web_indicators = [ 'flask', 'fastapi', 'django', 'uvicorn', 'gunicorn', 'express', 'http.createserver', 'app.listen', 'http.server', 'socketserver', 'tornado', 'aiohttp', 'sanic', 'starlette', 'bottle', 'bind', 'port', 'listen' ] if any(indicator in content for indicator in web_indicators): detected["type"] = "web" # Detect bot bot_indicators = [ 'discord', 'telegram', 'bot.run', 'client.run', 'bot.start', 'pyrogram', 'telethon', 'aiogram', 'slack_sdk', 'tweepy', 'praw' ] if any(indicator in content for indicator in bot_indicators): detected["type"] = "bot" # Detect Python framework if detected["language"] == "python": if 'flask' in content: detected["framework"] = "Flask" elif 'fastapi' in content: detected["framework"] = "FastAPI" elif 'django' in content: detected["framework"] = "Django" elif 'discord' in content: detected["framework"] = "Discord.py" elif 'pyrogram' in content or 'telethon' in content or 'aiogram' in content: detected["framework"] = "Telegram Bot" except Exception: pass # Generate suggestions if not detected["entry_file"]: # Look for any Python/JS files py_files = [f for f in all_names if f.endswith('.py')] js_files = [f for f in all_names if f.endswith('.js')] if py_files: detected["language"] = "python" detected["entry_file"] = py_files[0] detected["suggestions"].append(f"No standard entry point found. Using: {py_files[0]}") elif js_files: detected["language"] = "node" detected["entry_file"] = js_files[0] detected["suggestions"].append(f"No standard entry point found. Using: {js_files[0]}") else: detected["suggestions"].append("No recognizable entry point found. Please specify manually.") if detected["dependencies_file"]: detected["suggestions"].append(f"Dependencies file found: {detected['dependencies_file']}") logger.info(f"🔍 Auto-detected: {detected['language']}/{detected['type']} -> {detected['entry_file']}") return detected async def deploy( self, deploy_id: str, extract_dir: str, service_name: str, language: str = "python", entry_file: str = "main.py", service_type: str = "web", auto_start: bool = True, install_deps: bool = True, env_vars: dict = None, command: str = "", description: str = "" ) -> Dict: """ Deploy extracted files as a new service. 1. Move files to /data/apps/{name}/ 2. Install dependencies if needed 3. Create service in ProcessManager 4. Optionally start it """ src_dir = Path(extract_dir) if not src_dir.exists(): raise FileNotFoundError(f"Source directory not found: {extract_dir}") # Sanitize service name safe_name = re.sub(r'[^a-zA-Z0-9_-]', '-', service_name.strip()).lower() if not safe_name: raise ValueError("Invalid service name") # Target app directory app_dir = self.apps_dir / safe_name # If app dir already exists, back it up if app_dir.exists(): backup_name = f"{safe_name}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" backup_path = settings.BACKUPS_DIR / backup_name shutil.move(str(app_dir), str(backup_path)) logger.info(f"📦 Existing app backed up to: {backup_name}") # Move extracted files to app directory shutil.copytree(str(src_dir), str(app_dir)) # Install dependencies install_log = "" if install_deps: install_log = await self._install_dependencies(app_dir, language) # Create service try: # Check if service with this name exists existing = process_manager.get_service_by_name(safe_name) if existing: # Update existing service existing.app_dir = str(app_dir) existing.entry_file = entry_file existing.language = language existing.type = service_type existing.command = command existing.description = description if env_vars: existing.env_vars = env_vars await existing._save_to_db() service = existing logger.info(f"🔄 Updated existing service: {safe_name}") else: service = await process_manager.create_service( name=safe_name, type=service_type, language=language, entry_file=entry_file, command=command, auto_restart=True, env_vars=env_vars or {}, description=description or f"Deployed from ZIP ({deploy_id})" ) except Exception as e: raise RuntimeError(f"Failed to create service: {str(e)}") # Auto-start if requested started = False if auto_start: started = await service.start() # Cleanup temp directory try: temp_deploy = self.temp_dir / deploy_id if temp_deploy.exists(): shutil.rmtree(temp_deploy) except Exception: pass # Save deployment record await self._save_deployment(deploy_id, service.id, safe_name) logger.info(f"🚀 Deployed: {safe_name} (ID: {service.id}, Started: {started})") return { "deploy_id": deploy_id, "service_id": service.id, "service_name": safe_name, "app_dir": str(app_dir), "language": language, "entry_file": entry_file, "type": service_type, "dependencies_installed": bool(install_log), "install_log": install_log, "auto_started": started, "status": service.status, "message": f"Successfully deployed '{safe_name}'! {'Service is running.' if started else 'Service created but not started.'}", "service": service.get_info() } async def quick_deploy( self, filename: str, content: bytes, service_name: Optional[str] = None, auto_start: bool = True ) -> Dict: """ One-click deployment: Upload ZIP -> Extract -> Detect -> Deploy All in one step! """ deploy_log = [] # Step 1: Save upload deploy_log.append("📥 Saving uploaded file...") upload_info = await self.save_upload(filename, content) deploy_id = upload_info["deploy_id"] deploy_log.append(f"✅ Saved: {upload_info['filename']} ({upload_info['size_formatted']})") # Step 2: Extract deploy_log.append("📦 Extracting archive...") extract_info = await self.extract_archive(upload_info["path"], deploy_id) deploy_log.append(f"✅ Extracted: {extract_info['file_count']} files") # Step 3: Auto-detect deploy_log.append("🔍 Auto-detecting project type...") detection = await self.auto_detect(extract_info["extract_dir"]) deploy_log.append(f"✅ Detected: {detection['language']}/{detection['type']} -> {detection['entry_file']}") if detection.get("framework"): deploy_log.append(f"🎯 Framework: {detection['framework']}") # Generate service name from filename if not provided if not service_name: base_name = Path(filename).stem # Remove common archive suffixes for suffix in ['.tar', '.min', '.dist', '.build']: base_name = base_name.replace(suffix, '') service_name = re.sub(r'[^a-zA-Z0-9]', '-', base_name).strip('-').lower() if not service_name: service_name = f"app-{deploy_id[:6]}" # Step 4: Deploy deploy_log.append(f"🚀 Deploying as '{service_name}'...") if not detection["entry_file"]: deploy_log.append("⚠️ No entry point detected. Creating default main.py...") # Create a default entry file default_content = '''# Auto-generated by RUHI-CORE # Please edit this file with your application code print("Hello from RUHI-CORE! 🚀") print("Edit this file in the File Manager to add your code.") import time while True: print("Service is running... ⚡") time.sleep(60) ''' extract_path = Path(extract_info["extract_dir"]) with open(extract_path / "main.py", 'w') as f: f.write(default_content) detection["entry_file"] = "main.py" detection["language"] = "python" deploy_result = await self.deploy( deploy_id=deploy_id, extract_dir=extract_info["extract_dir"], service_name=service_name, language=detection.get("language", "python"), entry_file=detection.get("entry_file", "main.py"), service_type=detection.get("type", "worker"), auto_start=auto_start, install_deps=True, description=f"Quick deployed from {filename}" ) deploy_log.append(f"✅ Deployment complete!") return { **deploy_result, "deploy_log": deploy_log, "detection": detection, } async def _install_dependencies(self, app_dir: Path, language: str) -> str: """Install dependencies for the project""" log_lines = [] try: if language == "python": req_file = app_dir / "requirements.txt" if req_file.exists(): logger.info(f"📦 Installing Python dependencies for {app_dir.name}...") log_lines.append("Installing Python dependencies...") result = await asyncio.get_event_loop().run_in_executor( None, lambda: __import__('subprocess').run( [ 'pip', 'install', '--no-cache-dir', '-q', '-r', str(req_file), '--target', str(app_dir / '.deps') ], capture_output=True, text=True, timeout=300 ) ) if result.returncode == 0: log_lines.append("✅ Dependencies installed successfully") else: log_lines.append(f"⚠️ Some dependencies may have failed: {result.stderr[:500]}") if result.stdout: log_lines.append(result.stdout[:1000]) elif language == "node": pkg_file = app_dir / "package.json" if pkg_file.exists(): logger.info(f"📦 Installing Node.js dependencies for {app_dir.name}...") log_lines.append("Installing Node.js dependencies...") result = await asyncio.get_event_loop().run_in_executor( None, lambda: __import__('subprocess').run( ['npm', 'install', '--production'], cwd=str(app_dir), capture_output=True, text=True, timeout=300 ) ) if result.returncode == 0: log_lines.append("✅ Dependencies installed successfully") else: log_lines.append(f"⚠️ npm install issues: {result.stderr[:500]}") except Exception as e: log_lines.append(f"❌ Dependency installation error: {str(e)}") logger.error(f"Dependency installation error: {str(e)}") return "\n".join(log_lines) async def _save_deployment(self, deploy_id: str, service_id: str, name: str): """Save deployment record to database""" try: async with aiosqlite.connect(str(settings.DB_PATH)) as db: await db.execute(""" INSERT INTO deployments (id, service_id, status, completed_at, log) VALUES (?, ?, 'completed', ?, ?) """, (deploy_id, service_id, datetime.now().isoformat(), f"Deployed {name}")) await db.commit() except Exception as e: logger.error(f"Deploy record save error: {str(e)}") async def get_deployment_history(self) -> List[Dict]: """Get deployment history""" try: async with aiosqlite.connect(str(settings.DB_PATH)) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM deployments ORDER BY started_at DESC LIMIT 50" ) rows = await cursor.fetchall() return [dict(row) for row in rows] except Exception: return [] def _unwrap_single_dir(self, extract_dir: Path) -> Path: """ If archive extracts to a single directory, use that as the root. Example: archive.zip -> archive/ -> {files} -> use archive/ as root """ items = list(extract_dir.iterdir()) if len(items) == 1 and items[0].is_dir(): # Move all contents up one level single_dir = items[0] temp_name = extract_dir / f"_unwrap_temp_{uuid.uuid4().hex[:8]}" single_dir.rename(temp_name) for item in temp_name.iterdir(): shutil.move(str(item), str(extract_dir / item.name)) temp_name.rmdir() logger.info(f"📦 Unwrapped single-directory archive") return extract_dir async def _extract_zip(self, src: Path, dest: Path): """Extract ZIP archive""" def _do_extract(): with zipfile.ZipFile(str(src), 'r') as zf: # Security: check for path traversal for info in zf.infolist(): if info.filename.startswith('/') or '..' in info.filename: raise ValueError(f"Dangerous path in archive: {info.filename}") zf.extractall(str(dest)) await asyncio.get_event_loop().run_in_executor(None, _do_extract) async def _extract_tar(self, src: Path, dest: Path, compression: str): """Extract TAR archive""" mode = f"r:{compression}" if compression else "r" def _do_extract(): with tarfile.open(str(src), mode) as tf: # Security check for member in tf.getmembers(): if member.name.startswith('/') or '..' in member.name: raise ValueError(f"Dangerous path in archive: {member.name}") tf.extractall(str(dest)) await asyncio.get_event_loop().run_in_executor(None, _do_extract) async def _extract_7z(self, src: Path, dest: Path): """Extract 7z archive""" try: import py7zr def _do_extract(): with py7zr.SevenZipFile(str(src), mode='r') as z: z.extractall(path=str(dest)) await asyncio.get_event_loop().run_in_executor(None, _do_extract) except ImportError: raise RuntimeError("7z support not available. Install py7zr.") async def _extract_rar(self, src: Path, dest: Path): """Extract RAR archive""" try: import rarfile def _do_extract(): with rarfile.RarFile(str(src), 'r') as rf: rf.extractall(str(dest)) await asyncio.get_event_loop().run_in_executor(None, _do_extract) except ImportError: raise RuntimeError("RAR support not available. Install rarfile.") def _sanitize_filename(self, filename: str) -> str: """Sanitize a filename""" # Remove path separators name = filename.replace('\\', '/').split('/')[-1] # Remove dangerous characters name = re.sub(r'[^\w\s\-\.]', '', name) # Remove leading dots (hidden files) name = name.lstrip('.') return name.strip() @staticmethod def _format_size(size_bytes: int) -> str: """Format file size""" if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 * 1024: return f"{size_bytes / 1024:.1f} KB" elif size_bytes < 1024 * 1024 * 1024: return f"{size_bytes / (1024 * 1024):.1f} MB" else: return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB" # Global instance zip_deployer = ZipDeployer()