Ruhi_hosting / core /zip_deployer.py
Ruhivig65's picture
Upload 11 files
9684770 verified
"""
============================================
🔥 RUHI-CORE - ZIP Deployer Engine
============================================
Upload ZIP -> Extract -> Auto-Detect -> Deploy!
Supports: .zip, .tar.gz, .tar, .7z, .rar
"""
import os
import re
import uuid
import shutil
import zipfile
import tarfile
import asyncio
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, List, Tuple
import aiofiles
import aiosqlite
from loguru import logger
from core.config import settings
from core.process_manager import process_manager
class ZipDeployer:
"""
Handles ZIP/archive upload and automatic deployment.
Flow:
1. Upload ZIP file -> /data/uploads/
2. Extract to temp dir -> /data/temp/
3. Auto-detect entry point (main.py, index.js, etc.)
4. Move to /data/apps/{service_name}/
5. Create service in ProcessManager
6. Optionally auto-start
"""
# Entry point detection priority
ENTRY_POINTS = {
"python": [
"main.py", "app.py", "bot.py", "run.py", "server.py",
"index.py", "start.py", "manage.py", "wsgi.py", "asgi.py",
"__main__.py"
],
"node": [
"index.js", "app.js", "server.js", "bot.js", "main.js",
"start.js", "index.ts", "app.ts", "server.ts"
],
"shell": [
"start.sh", "run.sh", "entrypoint.sh", "boot.sh", "init.sh"
]
}
# Package file detection
PACKAGE_FILES = {
"python": ["requirements.txt", "Pipfile", "pyproject.toml", "setup.py", "setup.cfg"],
"node": ["package.json", "yarn.lock", "pnpm-lock.yaml", "package-lock.json"],
}
def __init__(self):
self.uploads_dir = settings.UPLOADS_DIR
self.temp_dir = settings.TEMP_DIR
self.apps_dir = settings.APPS_DIR
# Ensure directories exist
self.uploads_dir.mkdir(parents=True, exist_ok=True)
self.temp_dir.mkdir(parents=True, exist_ok=True)
self.apps_dir.mkdir(parents=True, exist_ok=True)
logger.info("📦 ZipDeployer initialized")
async def save_upload(self, filename: str, content: bytes) -> Dict:
"""
Save an uploaded file to the uploads directory.
Returns file info and generates a deployment ID.
"""
# Validate filename
safe_name = self._sanitize_filename(filename)
if not safe_name:
raise ValueError("Invalid filename")
# Check extension
ext = Path(safe_name).suffix.lower()
allowed = ['.zip', '.tar', '.gz', '.tgz', '.7z', '.rar', '.tar.gz']
if not any(safe_name.endswith(e) for e in allowed):
raise ValueError(f"Unsupported file type: {ext}. Allowed: {', '.join(allowed)}")
# Check size
if len(content) > settings.MAX_UPLOAD_SIZE:
max_mb = settings.MAX_UPLOAD_SIZE / (1024 * 1024)
raise ValueError(f"File too large. Max: {max_mb}MB")
# Generate deployment ID
deploy_id = str(uuid.uuid4())[:12]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save file
save_name = f"{timestamp}_{deploy_id}_{safe_name}"
save_path = self.uploads_dir / save_name
async with aiofiles.open(save_path, 'wb') as f:
await f.write(content)
file_size = len(content)
logger.info(f"📥 Upload saved: {save_name} ({self._format_size(file_size)})")
return {
"deploy_id": deploy_id,
"filename": safe_name,
"saved_as": save_name,
"path": str(save_path),
"size": file_size,
"size_formatted": self._format_size(file_size),
"timestamp": timestamp,
}
async def extract_archive(self, file_path: str, deploy_id: str) -> Dict:
"""
Extract an archive to a temporary directory.
Supports ZIP, TAR, TAR.GZ, 7Z.
"""
src = Path(file_path)
if not src.exists():
raise FileNotFoundError(f"Archive not found: {file_path}")
# Create extraction directory
extract_dir = self.temp_dir / deploy_id
if extract_dir.exists():
shutil.rmtree(extract_dir)
extract_dir.mkdir(parents=True)
filename_lower = src.name.lower()
try:
if filename_lower.endswith('.zip'):
await self._extract_zip(src, extract_dir)
elif filename_lower.endswith(('.tar.gz', '.tgz')):
await self._extract_tar(src, extract_dir, 'gz')
elif filename_lower.endswith('.tar.bz2'):
await self._extract_tar(src, extract_dir, 'bz2')
elif filename_lower.endswith('.tar'):
await self._extract_tar(src, extract_dir, '')
elif filename_lower.endswith('.7z'):
await self._extract_7z(src, extract_dir)
elif filename_lower.endswith('.rar'):
await self._extract_rar(src, extract_dir)
else:
raise ValueError(f"Unsupported archive format: {src.suffix}")
except Exception as e:
# Cleanup on error
if extract_dir.exists():
shutil.rmtree(extract_dir)
raise RuntimeError(f"Extraction failed: {str(e)}")
# Handle single-directory archives (unwrap)
extract_dir = self._unwrap_single_dir(extract_dir)
# Count extracted files
files = list(extract_dir.rglob('*'))
file_count = sum(1 for f in files if f.is_file())
dir_count = sum(1 for f in files if f.is_dir())
total_size = sum(f.stat().st_size for f in files if f.is_file())
# List top-level contents
top_level = [
{
"name": item.name,
"is_dir": item.is_dir(),
"size": item.stat().st_size if item.is_file() else 0,
}
for item in sorted(extract_dir.iterdir(), key=lambda x: (not x.is_dir(), x.name))
]
logger.info(f"📦 Extracted: {file_count} files, {dir_count} dirs, {self._format_size(total_size)}")
return {
"deploy_id": deploy_id,
"extract_dir": str(extract_dir),
"file_count": file_count,
"dir_count": dir_count,
"total_size": total_size,
"total_size_formatted": self._format_size(total_size),
"top_level": top_level[:30],
}
async def auto_detect(self, extract_dir: str) -> Dict:
"""
Auto-detect project type, language, and entry point.
"""
base = Path(extract_dir)
if not base.exists():
raise FileNotFoundError(f"Directory not found: {extract_dir}")
detected = {
"language": None,
"entry_file": None,
"type": "worker", # Default to worker
"package_manager": None,
"dependencies_file": None,
"framework": None,
"has_dockerfile": False,
"has_env": False,
"suggestions": []
}
# List all files
all_files = {f.name: f for f in base.iterdir() if f.is_file()}
all_names = set(all_files.keys())
# Check for Dockerfile
detected["has_dockerfile"] = "Dockerfile" in all_names or "dockerfile" in all_names
# Check for .env
detected["has_env"] = ".env" in all_names or ".env.example" in all_names
# 1. Check for Python project
for entry in self.ENTRY_POINTS["python"]:
if entry in all_names:
detected["language"] = "python"
detected["entry_file"] = entry
break
# Check Python package files
for pkg_file in self.PACKAGE_FILES["python"]:
if pkg_file in all_names:
detected["language"] = "python"
detected["dependencies_file"] = pkg_file
if pkg_file == "requirements.txt":
detected["package_manager"] = "pip"
elif pkg_file == "Pipfile":
detected["package_manager"] = "pipenv"
elif pkg_file == "pyproject.toml":
detected["package_manager"] = "poetry/pip"
break
# 2. Check for Node.js project
if not detected["language"]:
for entry in self.ENTRY_POINTS["node"]:
if entry in all_names:
detected["language"] = "node"
detected["entry_file"] = entry
break
if "package.json" in all_names:
if not detected["language"]:
detected["language"] = "node"
detected["dependencies_file"] = "package.json"
detected["package_manager"] = "npm"
# Read package.json for more info
try:
import json
with open(base / "package.json", 'r') as f:
pkg = json.load(f)
# Check for start script
scripts = pkg.get("scripts", {})
if "start" in scripts:
detected["suggestions"].append(f"Found start script: {scripts['start']}")
# Detect entry from main
if not detected["entry_file"] and "main" in pkg:
detected["entry_file"] = pkg["main"]
# Detect framework
deps = {**pkg.get("dependencies", {}), **pkg.get("devDependencies", {})}
if "express" in deps:
detected["framework"] = "Express.js"
detected["type"] = "web"
elif "next" in deps:
detected["framework"] = "Next.js"
detected["type"] = "web"
elif "discord.js" in deps:
detected["framework"] = "Discord.js"
detected["type"] = "bot"
elif "telegraf" in deps:
detected["framework"] = "Telegraf (Telegram)"
detected["type"] = "bot"
except Exception:
pass
# 3. Check for Shell scripts
if not detected["language"]:
for entry in self.ENTRY_POINTS["shell"]:
if entry in all_names:
detected["language"] = "shell"
detected["entry_file"] = entry
break
# 4. Try to detect web service vs worker
if detected["entry_file"]:
try:
content = (base / detected["entry_file"]).read_text(errors='replace').lower()
web_indicators = [
'flask', 'fastapi', 'django', 'uvicorn', 'gunicorn',
'express', 'http.createserver', 'app.listen',
'http.server', 'socketserver', 'tornado',
'aiohttp', 'sanic', 'starlette', 'bottle',
'bind', 'port', 'listen'
]
if any(indicator in content for indicator in web_indicators):
detected["type"] = "web"
# Detect bot
bot_indicators = [
'discord', 'telegram', 'bot.run', 'client.run',
'bot.start', 'pyrogram', 'telethon', 'aiogram',
'slack_sdk', 'tweepy', 'praw'
]
if any(indicator in content for indicator in bot_indicators):
detected["type"] = "bot"
# Detect Python framework
if detected["language"] == "python":
if 'flask' in content:
detected["framework"] = "Flask"
elif 'fastapi' in content:
detected["framework"] = "FastAPI"
elif 'django' in content:
detected["framework"] = "Django"
elif 'discord' in content:
detected["framework"] = "Discord.py"
elif 'pyrogram' in content or 'telethon' in content or 'aiogram' in content:
detected["framework"] = "Telegram Bot"
except Exception:
pass
# Generate suggestions
if not detected["entry_file"]:
# Look for any Python/JS files
py_files = [f for f in all_names if f.endswith('.py')]
js_files = [f for f in all_names if f.endswith('.js')]
if py_files:
detected["language"] = "python"
detected["entry_file"] = py_files[0]
detected["suggestions"].append(f"No standard entry point found. Using: {py_files[0]}")
elif js_files:
detected["language"] = "node"
detected["entry_file"] = js_files[0]
detected["suggestions"].append(f"No standard entry point found. Using: {js_files[0]}")
else:
detected["suggestions"].append("No recognizable entry point found. Please specify manually.")
if detected["dependencies_file"]:
detected["suggestions"].append(f"Dependencies file found: {detected['dependencies_file']}")
logger.info(f"🔍 Auto-detected: {detected['language']}/{detected['type']} -> {detected['entry_file']}")
return detected
async def deploy(
self,
deploy_id: str,
extract_dir: str,
service_name: str,
language: str = "python",
entry_file: str = "main.py",
service_type: str = "web",
auto_start: bool = True,
install_deps: bool = True,
env_vars: dict = None,
command: str = "",
description: str = ""
) -> Dict:
"""
Deploy extracted files as a new service.
1. Move files to /data/apps/{name}/
2. Install dependencies if needed
3. Create service in ProcessManager
4. Optionally start it
"""
src_dir = Path(extract_dir)
if not src_dir.exists():
raise FileNotFoundError(f"Source directory not found: {extract_dir}")
# Sanitize service name
safe_name = re.sub(r'[^a-zA-Z0-9_-]', '-', service_name.strip()).lower()
if not safe_name:
raise ValueError("Invalid service name")
# Target app directory
app_dir = self.apps_dir / safe_name
# If app dir already exists, back it up
if app_dir.exists():
backup_name = f"{safe_name}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = settings.BACKUPS_DIR / backup_name
shutil.move(str(app_dir), str(backup_path))
logger.info(f"📦 Existing app backed up to: {backup_name}")
# Move extracted files to app directory
shutil.copytree(str(src_dir), str(app_dir))
# Install dependencies
install_log = ""
if install_deps:
install_log = await self._install_dependencies(app_dir, language)
# Create service
try:
# Check if service with this name exists
existing = process_manager.get_service_by_name(safe_name)
if existing:
# Update existing service
existing.app_dir = str(app_dir)
existing.entry_file = entry_file
existing.language = language
existing.type = service_type
existing.command = command
existing.description = description
if env_vars:
existing.env_vars = env_vars
await existing._save_to_db()
service = existing
logger.info(f"🔄 Updated existing service: {safe_name}")
else:
service = await process_manager.create_service(
name=safe_name,
type=service_type,
language=language,
entry_file=entry_file,
command=command,
auto_restart=True,
env_vars=env_vars or {},
description=description or f"Deployed from ZIP ({deploy_id})"
)
except Exception as e:
raise RuntimeError(f"Failed to create service: {str(e)}")
# Auto-start if requested
started = False
if auto_start:
started = await service.start()
# Cleanup temp directory
try:
temp_deploy = self.temp_dir / deploy_id
if temp_deploy.exists():
shutil.rmtree(temp_deploy)
except Exception:
pass
# Save deployment record
await self._save_deployment(deploy_id, service.id, safe_name)
logger.info(f"🚀 Deployed: {safe_name} (ID: {service.id}, Started: {started})")
return {
"deploy_id": deploy_id,
"service_id": service.id,
"service_name": safe_name,
"app_dir": str(app_dir),
"language": language,
"entry_file": entry_file,
"type": service_type,
"dependencies_installed": bool(install_log),
"install_log": install_log,
"auto_started": started,
"status": service.status,
"message": f"Successfully deployed '{safe_name}'! {'Service is running.' if started else 'Service created but not started.'}",
"service": service.get_info()
}
async def quick_deploy(
self,
filename: str,
content: bytes,
service_name: Optional[str] = None,
auto_start: bool = True
) -> Dict:
"""
One-click deployment: Upload ZIP -> Extract -> Detect -> Deploy
All in one step!
"""
deploy_log = []
# Step 1: Save upload
deploy_log.append("📥 Saving uploaded file...")
upload_info = await self.save_upload(filename, content)
deploy_id = upload_info["deploy_id"]
deploy_log.append(f"✅ Saved: {upload_info['filename']} ({upload_info['size_formatted']})")
# Step 2: Extract
deploy_log.append("📦 Extracting archive...")
extract_info = await self.extract_archive(upload_info["path"], deploy_id)
deploy_log.append(f"✅ Extracted: {extract_info['file_count']} files")
# Step 3: Auto-detect
deploy_log.append("🔍 Auto-detecting project type...")
detection = await self.auto_detect(extract_info["extract_dir"])
deploy_log.append(f"✅ Detected: {detection['language']}/{detection['type']} -> {detection['entry_file']}")
if detection.get("framework"):
deploy_log.append(f"🎯 Framework: {detection['framework']}")
# Generate service name from filename if not provided
if not service_name:
base_name = Path(filename).stem
# Remove common archive suffixes
for suffix in ['.tar', '.min', '.dist', '.build']:
base_name = base_name.replace(suffix, '')
service_name = re.sub(r'[^a-zA-Z0-9]', '-', base_name).strip('-').lower()
if not service_name:
service_name = f"app-{deploy_id[:6]}"
# Step 4: Deploy
deploy_log.append(f"🚀 Deploying as '{service_name}'...")
if not detection["entry_file"]:
deploy_log.append("⚠️ No entry point detected. Creating default main.py...")
# Create a default entry file
default_content = '''# Auto-generated by RUHI-CORE
# Please edit this file with your application code
print("Hello from RUHI-CORE! 🚀")
print("Edit this file in the File Manager to add your code.")
import time
while True:
print("Service is running... ⚡")
time.sleep(60)
'''
extract_path = Path(extract_info["extract_dir"])
with open(extract_path / "main.py", 'w') as f:
f.write(default_content)
detection["entry_file"] = "main.py"
detection["language"] = "python"
deploy_result = await self.deploy(
deploy_id=deploy_id,
extract_dir=extract_info["extract_dir"],
service_name=service_name,
language=detection.get("language", "python"),
entry_file=detection.get("entry_file", "main.py"),
service_type=detection.get("type", "worker"),
auto_start=auto_start,
install_deps=True,
description=f"Quick deployed from {filename}"
)
deploy_log.append(f"✅ Deployment complete!")
return {
**deploy_result,
"deploy_log": deploy_log,
"detection": detection,
}
async def _install_dependencies(self, app_dir: Path, language: str) -> str:
"""Install dependencies for the project"""
log_lines = []
try:
if language == "python":
req_file = app_dir / "requirements.txt"
if req_file.exists():
logger.info(f"📦 Installing Python dependencies for {app_dir.name}...")
log_lines.append("Installing Python dependencies...")
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: __import__('subprocess').run(
[
'pip', 'install', '--no-cache-dir', '-q',
'-r', str(req_file),
'--target', str(app_dir / '.deps')
],
capture_output=True, text=True, timeout=300
)
)
if result.returncode == 0:
log_lines.append("✅ Dependencies installed successfully")
else:
log_lines.append(f"⚠️ Some dependencies may have failed: {result.stderr[:500]}")
if result.stdout:
log_lines.append(result.stdout[:1000])
elif language == "node":
pkg_file = app_dir / "package.json"
if pkg_file.exists():
logger.info(f"📦 Installing Node.js dependencies for {app_dir.name}...")
log_lines.append("Installing Node.js dependencies...")
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: __import__('subprocess').run(
['npm', 'install', '--production'],
cwd=str(app_dir),
capture_output=True, text=True, timeout=300
)
)
if result.returncode == 0:
log_lines.append("✅ Dependencies installed successfully")
else:
log_lines.append(f"⚠️ npm install issues: {result.stderr[:500]}")
except Exception as e:
log_lines.append(f"❌ Dependency installation error: {str(e)}")
logger.error(f"Dependency installation error: {str(e)}")
return "\n".join(log_lines)
async def _save_deployment(self, deploy_id: str, service_id: str, name: str):
"""Save deployment record to database"""
try:
async with aiosqlite.connect(str(settings.DB_PATH)) as db:
await db.execute("""
INSERT INTO deployments (id, service_id, status, completed_at, log)
VALUES (?, ?, 'completed', ?, ?)
""", (deploy_id, service_id, datetime.now().isoformat(), f"Deployed {name}"))
await db.commit()
except Exception as e:
logger.error(f"Deploy record save error: {str(e)}")
async def get_deployment_history(self) -> List[Dict]:
"""Get deployment history"""
try:
async with aiosqlite.connect(str(settings.DB_PATH)) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM deployments ORDER BY started_at DESC LIMIT 50"
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
except Exception:
return []
def _unwrap_single_dir(self, extract_dir: Path) -> Path:
"""
If archive extracts to a single directory, use that as the root.
Example: archive.zip -> archive/ -> {files} -> use archive/ as root
"""
items = list(extract_dir.iterdir())
if len(items) == 1 and items[0].is_dir():
# Move all contents up one level
single_dir = items[0]
temp_name = extract_dir / f"_unwrap_temp_{uuid.uuid4().hex[:8]}"
single_dir.rename(temp_name)
for item in temp_name.iterdir():
shutil.move(str(item), str(extract_dir / item.name))
temp_name.rmdir()
logger.info(f"📦 Unwrapped single-directory archive")
return extract_dir
async def _extract_zip(self, src: Path, dest: Path):
"""Extract ZIP archive"""
def _do_extract():
with zipfile.ZipFile(str(src), 'r') as zf:
# Security: check for path traversal
for info in zf.infolist():
if info.filename.startswith('/') or '..' in info.filename:
raise ValueError(f"Dangerous path in archive: {info.filename}")
zf.extractall(str(dest))
await asyncio.get_event_loop().run_in_executor(None, _do_extract)
async def _extract_tar(self, src: Path, dest: Path, compression: str):
"""Extract TAR archive"""
mode = f"r:{compression}" if compression else "r"
def _do_extract():
with tarfile.open(str(src), mode) as tf:
# Security check
for member in tf.getmembers():
if member.name.startswith('/') or '..' in member.name:
raise ValueError(f"Dangerous path in archive: {member.name}")
tf.extractall(str(dest))
await asyncio.get_event_loop().run_in_executor(None, _do_extract)
async def _extract_7z(self, src: Path, dest: Path):
"""Extract 7z archive"""
try:
import py7zr
def _do_extract():
with py7zr.SevenZipFile(str(src), mode='r') as z:
z.extractall(path=str(dest))
await asyncio.get_event_loop().run_in_executor(None, _do_extract)
except ImportError:
raise RuntimeError("7z support not available. Install py7zr.")
async def _extract_rar(self, src: Path, dest: Path):
"""Extract RAR archive"""
try:
import rarfile
def _do_extract():
with rarfile.RarFile(str(src), 'r') as rf:
rf.extractall(str(dest))
await asyncio.get_event_loop().run_in_executor(None, _do_extract)
except ImportError:
raise RuntimeError("RAR support not available. Install rarfile.")
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize a filename"""
# Remove path separators
name = filename.replace('\\', '/').split('/')[-1]
# Remove dangerous characters
name = re.sub(r'[^\w\s\-\.]', '', name)
# Remove leading dots (hidden files)
name = name.lstrip('.')
return name.strip()
@staticmethod
def _format_size(size_bytes: int) -> str:
"""Format file size"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"
# Global instance
zip_deployer = ZipDeployer()