import os import shutil import asyncio import uuid import time import logging from pathlib import Path from fastapi import FastAPI, UploadFile, File, WebSocket, WebSocketDisconnect, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from patcher import PhantomPatcherV2, TaskCancelledError # ───────────────────────────────────────────────────────────────────────────── # LOGGING # ───────────────────────────────────────────────────────────────────────────── LOG_FORMAT = "%(asctime)s [%(levelname)s] [SERVER] %(message)s" logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) logger = logging.getLogger("phantom.server") logging.getLogger("multipart").setLevel(logging.WARNING) logging.getLogger("python_multipart").setLevel(logging.WARNING) # ───────────────────────────────────────────────────────────────────────────── # CONSTANTES # ───────────────────────────────────────────────────────────────────────────── UPLOAD_DIR = Path("uploads") DOWNLOAD_DIR = Path("downloads") WORKSPACE_DIR = Path("phantom_workspace") MAX_FILE_SIZE_MB = int(os.environ.get("MAX_FILE_SIZE_MB", "512")) MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", "2")) ALLOWED_EXTENSIONS = {".apk", ".xapk", ".apks", ".zip", ".obb"} JANITOR_INTERVAL_SECONDS = 600 FILE_EXPIRY_SECONDS = 3600 TASK_CLEANUP_GRACE_SECONDS = 300 # 5 min: download continua disponível após conclusão WS_PING_INTERVAL = 15.0 # Ping se nenhuma mensagem por 15s WS_POLL_INTERVAL = 0.05 # Verificar fila de logs a cada 50ms # ───────────────────────────────────────────────────────────────────────────── # APP E SEMÁFORO GLOBAL # ───────────────────────────────────────────────────────────────────────────── app = FastAPI(title="PHANTOM APK Automator", version="5.5.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) for _dir in [UPLOAD_DIR, DOWNLOAD_DIR, WORKSPACE_DIR]: _dir.mkdir(exist_ok=True) # Registra tasks ativas e recentemente concluídas (para reconexão dentro da grace window) active_tasks: dict[str, PhantomPatcherV2] = {} process_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS) # ───────────────────────────────────────────────────────────────────────────── # STARTUP # ───────────────────────────────────────────────────────────────────────────── @app.on_event("startup") async def startup_event(): asyncio.create_task(janitor_daemon()) logger.info(f"PHANTOM Engine v5.5 iniciada! Max Sync Jobs: {MAX_CONCURRENT_TASKS}") # ───────────────────────────────────────────────────────────────────────────── # JANITOR DAEMON # ───────────────────────────────────────────────────────────────────────────── async def janitor_daemon(): while True: try: await asyncio.sleep(JANITOR_INTERVAL_SECONDS) cutoff = time.time() - FILE_EXPIRY_SECONDS purged = 0 for target_dir in [UPLOAD_DIR, DOWNLOAD_DIR]: for f in target_dir.glob("*"): try: if f.is_file() and f.stat().st_mtime < cutoff: f.unlink(missing_ok=True) purged += 1 except OSError: pass for d in WORKSPACE_DIR.glob("*"): try: if d.is_dir() and d.stat().st_mtime < cutoff: shutil.rmtree(d, ignore_errors=True) purged += 1 except OSError: pass if purged: logger.info(f"[JANITOR] {purged} artefatos expirados removidos.") except Exception as e: logger.error(f"[JANITOR] Erro: {e}") # ───────────────────────────────────────────────────────────────────────────── # WEBSOCKET — Index-based streaming com suporte a reconexão # ───────────────────────────────────────────────────────────────────────────── def _is_terminal_message(msg: str) -> bool: """Retorna True se a mensagem indica fim de tarefa.""" return any(t in msg for t in ("[DOWNLOAD_READY]", "[FATAL]", "[CANCELLED]")) @app.websocket("/ws/status/{task_id}") async def websocket_status(websocket: WebSocket, task_id: str): await websocket.accept() patcher = active_tasks.get(task_id) if not patcher: # CORREÇÃO CRÍTICA: prefixo [SESSION_INVALID] → frontend para de reconectar try: await websocket.send_text("[SESSION_INVALID] Sessão inválida, expirada ou não encontrada.") await asyncio.sleep(1.0) # Garantir que a mensagem chegue pelo TCP antes de cortar await websocket.close() except Exception: pass return try: sent_idx = 0 # Índice no log_history — nunca perde histórico em reconexões last_msg_time = time.monotonic() while True: # ── Drena todos os logs novos disponíveis ── history = patcher.log_history # Referência local (list é thread-safe no CPython) current_len = len(history) while sent_idx < current_len: msg = history[sent_idx] sent_idx += 1 try: await websocket.send_text(msg) except Exception: return # Cliente desconectou last_msg_time = time.monotonic() if _is_terminal_message(msg): return # Missão concluída # ── Verifica se a tarefa terminou sem mensagem terminal (edge case) ── if patcher.is_done and sent_idx >= len(patcher.log_history): break # ── Ping para manter conexão viva durante operações longas ── now = time.monotonic() if now - last_msg_time > WS_PING_INTERVAL: try: await websocket.send_text("[PING]") except Exception: return last_msg_time = now # ── Yield para o event loop: WebSocket permanece responsivo ── await asyncio.sleep(WS_POLL_INTERVAL) except WebSocketDisconnect: pass except Exception: pass # ───────────────────────────────────────────────────────────────────────────── # UPLOAD — Core do processamento # ───────────────────────────────────────────────────────────────────────────── @app.post("/upload") async def upload_apk(file: UploadFile = File(...)): filename = file.filename or "upload.apk" suffix = Path(filename).suffix.lower() if suffix not in ALLOWED_EXTENSIONS: ct = (file.content_type or "").lower() if any(x in ct for x in ["zip", "apk", "octet-stream", "android"]): suffix = ".apk" else: raise HTTPException(status_code=415, detail=f"Formato '{suffix}' não suportado.") task_id = str(uuid.uuid4()) logger.info(f"[UPLOAD] Job {task_id[:8]} → {filename}") try: contents = await file.read() except Exception: raise HTTPException(status_code=400, detail="Falha ao ler o arquivo enviado.") size_mb = len(contents) / (1024 * 1024) if size_mb > MAX_FILE_SIZE_MB: raise HTTPException(status_code=413, detail=f"Arquivo muito grande ({size_mb:.1f}MB > {MAX_FILE_SIZE_MB}MB).") file_path = UPLOAD_DIR / f"{task_id}{suffix}" try: file_path.write_bytes(contents) except Exception: raise HTTPException(status_code=500, detail="Erro ao salvar arquivo em disco.") patcher = PhantomPatcherV2(work_dir=WORKSPACE_DIR / task_id, task_id=task_id) active_tasks[task_id] = patcher output_name = f"MOD_{task_id[:8]}.apk" output_path = DOWNLOAD_DIR / output_name async def async_worker(): try: patcher.log("info", f"[INFO] Posição na Fila: Aguardando GPU/CPU...") async with process_semaphore: patcher.log("info", f"[INFO] Job liberado para Processamento! ({size_mb:.1f}MB)") patcher.clean() decomp_dir = await patcher.decompile(file_path) await patcher.patch_with_rollbacks(decomp_dir) await patcher.rebuild_and_sign(decomp_dir, output_path) patcher.log_event(f"[DOWNLOAD_READY] /download/{output_name}") logger.info(f"[JOB OK] {task_id[:8]}") except TaskCancelledError: patcher.log("warning", "[CANCELLED] Operação abortada e destruída com sucesso.") logger.info(f"[CANCELLED] {task_id[:8]}") except Exception as e: patcher.log_event(f"[FATAL] Erro Interno: {type(e).__name__}: {e}") logger.error(f"[JOB FAIL] {task_id[:8]}: {e}", exc_info=True) finally: patcher.is_done = True # Cleanup com grace period para download ficar disponível await asyncio.sleep(TASK_CLEANUP_GRACE_SECONDS) active_tasks.pop(task_id, None) shutil.rmtree(patcher.work_dir, ignore_errors=True) file_path.unlink(missing_ok=True) asyncio.create_task(async_worker()) return {"task_id": task_id, "filename": filename} # ───────────────────────────────────────────────────────────────────────────── # CANCELAMENTO # ───────────────────────────────────────────────────────────────────────────── @app.post("/cancel/{task_id}") async def cancel_task(task_id: str): """ Dispara cancelamento da tarefa. Como o patcher agora roda em thread, o event loop processa este request IMEDIATAMENTE mesmo que o patching esteja em andamento. """ patcher = active_tasks.get(task_id) if not patcher: raise HTTPException(status_code=404, detail="Tarefa não encontrada ou já finalizada.") if patcher.is_cancelled: return {"status": "already_cancelling"} logger.warning(f"[CANCEL_REQ] {task_id[:8]} Recebeu comando de morte.") patcher.is_cancelled = True # Mata subprocess ativo imediatamente se houver if patcher.active_process and patcher.active_process.returncode is None: try: patcher.active_process.terminate() except Exception: pass return {"status": "cancel_signal_sent"} # ───────────────────────────────────────────────────────────────────────────── # DOWNLOAD / HEALTH # ───────────────────────────────────────────────────────────────────────────── @app.get("/download/{filename}") async def download_mod(filename: str): if ".." in filename or "/" in filename or "\\" in filename: raise HTTPException(status_code=400, detail="Nome de arquivo inválido.") file_path = DOWNLOAD_DIR / filename if not file_path.exists(): raise HTTPException(status_code=404, detail="Arquivo não encontrado ou expirado.") return FileResponse( path=file_path, filename=filename, media_type="application/vnd.android.package-archive", ) @app.get("/health") async def health_check(): return { "status": "operational", "active_tasks": len(active_tasks), "concurrency_limit": MAX_CONCURRENT_TASKS, } # ───────────────────────────────────────────────────────────────────────────── # FRONTEND ESTÁTICO # ───────────────────────────────────────────────────────────────────────────── if os.path.exists("frontend/dist"): app.mount("/", StaticFiles(directory="frontend/dist", html=True), name="static")