RegexApps / main.py
itzraissc
refactor: update main entry point logic to improve regex processing efficiency
68e5cda
import os
import shutil
import asyncio
import uuid
import time
import logging
from pathlib import Path
from fastapi import FastAPI, UploadFile, File, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from patcher import PhantomPatcherV2, TaskCancelledError
# ─────────────────────────────────────────────────────────────────────────────
# LOGGING
# ─────────────────────────────────────────────────────────────────────────────
LOG_FORMAT = "%(asctime)s [%(levelname)s] [SERVER] %(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger = logging.getLogger("phantom.server")
logging.getLogger("multipart").setLevel(logging.WARNING)
logging.getLogger("python_multipart").setLevel(logging.WARNING)
# ─────────────────────────────────────────────────────────────────────────────
# CONSTANTES
# ─────────────────────────────────────────────────────────────────────────────
UPLOAD_DIR = Path("uploads")
DOWNLOAD_DIR = Path("downloads")
WORKSPACE_DIR = Path("phantom_workspace")
MAX_FILE_SIZE_MB = int(os.environ.get("MAX_FILE_SIZE_MB", "512"))
MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", "2"))
ALLOWED_EXTENSIONS = {".apk", ".xapk", ".apks", ".zip", ".obb"}
JANITOR_INTERVAL_SECONDS = 600
FILE_EXPIRY_SECONDS = 3600
TASK_CLEANUP_GRACE_SECONDS = 300 # 5 min: download continua disponível após conclusão
WS_PING_INTERVAL = 15.0 # Ping se nenhuma mensagem por 15s
WS_POLL_INTERVAL = 0.05 # Verificar fila de logs a cada 50ms
# ─────────────────────────────────────────────────────────────────────────────
# APP E SEMÁFORO GLOBAL
# ─────────────────────────────────────────────────────────────────────────────
app = FastAPI(title="PHANTOM APK Automator", version="5.5.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
for _dir in [UPLOAD_DIR, DOWNLOAD_DIR, WORKSPACE_DIR]:
_dir.mkdir(exist_ok=True)
# Registra tasks ativas e recentemente concluídas (para reconexão dentro da grace window)
active_tasks: dict[str, PhantomPatcherV2] = {}
process_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
# ─────────────────────────────────────────────────────────────────────────────
# STARTUP
# ─────────────────────────────────────────────────────────────────────────────
@app.on_event("startup")
async def startup_event():
asyncio.create_task(janitor_daemon())
logger.info(f"PHANTOM Engine v5.5 iniciada! Max Sync Jobs: {MAX_CONCURRENT_TASKS}")
# ─────────────────────────────────────────────────────────────────────────────
# JANITOR DAEMON
# ─────────────────────────────────────────────────────────────────────────────
async def janitor_daemon():
while True:
try:
await asyncio.sleep(JANITOR_INTERVAL_SECONDS)
cutoff = time.time() - FILE_EXPIRY_SECONDS
purged = 0
for target_dir in [UPLOAD_DIR, DOWNLOAD_DIR]:
for f in target_dir.glob("*"):
try:
if f.is_file() and f.stat().st_mtime < cutoff:
f.unlink(missing_ok=True)
purged += 1
except OSError:
pass
for d in WORKSPACE_DIR.glob("*"):
try:
if d.is_dir() and d.stat().st_mtime < cutoff:
shutil.rmtree(d, ignore_errors=True)
purged += 1
except OSError:
pass
if purged:
logger.info(f"[JANITOR] {purged} artefatos expirados removidos.")
except Exception as e:
logger.error(f"[JANITOR] Erro: {e}")
# ─────────────────────────────────────────────────────────────────────────────
# WEBSOCKET — Index-based streaming com suporte a reconexão
# ─────────────────────────────────────────────────────────────────────────────
def _is_terminal_message(msg: str) -> bool:
"""Retorna True se a mensagem indica fim de tarefa."""
return any(t in msg for t in ("[DOWNLOAD_READY]", "[FATAL]", "[CANCELLED]"))
@app.websocket("/ws/status/{task_id}")
async def websocket_status(websocket: WebSocket, task_id: str):
await websocket.accept()
patcher = active_tasks.get(task_id)
if not patcher:
# CORREÇÃO CRÍTICA: prefixo [SESSION_INVALID] → frontend para de reconectar
try:
await websocket.send_text("[SESSION_INVALID] Sessão inválida, expirada ou não encontrada.")
await asyncio.sleep(1.0) # Garantir que a mensagem chegue pelo TCP antes de cortar
await websocket.close()
except Exception:
pass
return
try:
sent_idx = 0 # Índice no log_history — nunca perde histórico em reconexões
last_msg_time = time.monotonic()
while True:
# ── Drena todos os logs novos disponíveis ──
history = patcher.log_history # Referência local (list é thread-safe no CPython)
current_len = len(history)
while sent_idx < current_len:
msg = history[sent_idx]
sent_idx += 1
try:
await websocket.send_text(msg)
except Exception:
return # Cliente desconectou
last_msg_time = time.monotonic()
if _is_terminal_message(msg):
return # Missão concluída
# ── Verifica se a tarefa terminou sem mensagem terminal (edge case) ──
if patcher.is_done and sent_idx >= len(patcher.log_history):
break
# ── Ping para manter conexão viva durante operações longas ──
now = time.monotonic()
if now - last_msg_time > WS_PING_INTERVAL:
try:
await websocket.send_text("[PING]")
except Exception:
return
last_msg_time = now
# ── Yield para o event loop: WebSocket permanece responsivo ──
await asyncio.sleep(WS_POLL_INTERVAL)
except WebSocketDisconnect:
pass
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────────
# UPLOAD — Core do processamento
# ─────────────────────────────────────────────────────────────────────────────
@app.post("/upload")
async def upload_apk(file: UploadFile = File(...)):
filename = file.filename or "upload.apk"
suffix = Path(filename).suffix.lower()
if suffix not in ALLOWED_EXTENSIONS:
ct = (file.content_type or "").lower()
if any(x in ct for x in ["zip", "apk", "octet-stream", "android"]):
suffix = ".apk"
else:
raise HTTPException(status_code=415, detail=f"Formato '{suffix}' não suportado.")
task_id = str(uuid.uuid4())
logger.info(f"[UPLOAD] Job {task_id[:8]}{filename}")
try:
contents = await file.read()
except Exception:
raise HTTPException(status_code=400, detail="Falha ao ler o arquivo enviado.")
size_mb = len(contents) / (1024 * 1024)
if size_mb > MAX_FILE_SIZE_MB:
raise HTTPException(status_code=413, detail=f"Arquivo muito grande ({size_mb:.1f}MB > {MAX_FILE_SIZE_MB}MB).")
file_path = UPLOAD_DIR / f"{task_id}{suffix}"
try:
file_path.write_bytes(contents)
except Exception:
raise HTTPException(status_code=500, detail="Erro ao salvar arquivo em disco.")
patcher = PhantomPatcherV2(work_dir=WORKSPACE_DIR / task_id, task_id=task_id)
active_tasks[task_id] = patcher
output_name = f"MOD_{task_id[:8]}.apk"
output_path = DOWNLOAD_DIR / output_name
async def async_worker():
try:
patcher.log("info", f"[INFO] Posição na Fila: Aguardando GPU/CPU...")
async with process_semaphore:
patcher.log("info", f"[INFO] Job liberado para Processamento! ({size_mb:.1f}MB)")
patcher.clean()
decomp_dir = await patcher.decompile(file_path)
await patcher.patch_with_rollbacks(decomp_dir)
await patcher.rebuild_and_sign(decomp_dir, output_path)
patcher.log_event(f"[DOWNLOAD_READY] /download/{output_name}")
logger.info(f"[JOB OK] {task_id[:8]}")
except TaskCancelledError:
patcher.log("warning", "[CANCELLED] Operação abortada e destruída com sucesso.")
logger.info(f"[CANCELLED] {task_id[:8]}")
except Exception as e:
patcher.log_event(f"[FATAL] Erro Interno: {type(e).__name__}: {e}")
logger.error(f"[JOB FAIL] {task_id[:8]}: {e}", exc_info=True)
finally:
patcher.is_done = True
# Cleanup com grace period para download ficar disponível
await asyncio.sleep(TASK_CLEANUP_GRACE_SECONDS)
active_tasks.pop(task_id, None)
shutil.rmtree(patcher.work_dir, ignore_errors=True)
file_path.unlink(missing_ok=True)
asyncio.create_task(async_worker())
return {"task_id": task_id, "filename": filename}
# ─────────────────────────────────────────────────────────────────────────────
# CANCELAMENTO
# ─────────────────────────────────────────────────────────────────────────────
@app.post("/cancel/{task_id}")
async def cancel_task(task_id: str):
"""
Dispara cancelamento da tarefa.
Como o patcher agora roda em thread, o event loop processa este request
IMEDIATAMENTE mesmo que o patching esteja em andamento.
"""
patcher = active_tasks.get(task_id)
if not patcher:
raise HTTPException(status_code=404, detail="Tarefa não encontrada ou já finalizada.")
if patcher.is_cancelled:
return {"status": "already_cancelling"}
logger.warning(f"[CANCEL_REQ] {task_id[:8]} Recebeu comando de morte.")
patcher.is_cancelled = True
# Mata subprocess ativo imediatamente se houver
if patcher.active_process and patcher.active_process.returncode is None:
try:
patcher.active_process.terminate()
except Exception:
pass
return {"status": "cancel_signal_sent"}
# ─────────────────────────────────────────────────────────────────────────────
# DOWNLOAD / HEALTH
# ─────────────────────────────────────────────────────────────────────────────
@app.get("/download/{filename}")
async def download_mod(filename: str):
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Nome de arquivo inválido.")
file_path = DOWNLOAD_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="Arquivo não encontrado ou expirado.")
return FileResponse(
path=file_path,
filename=filename,
media_type="application/vnd.android.package-archive",
)
@app.get("/health")
async def health_check():
return {
"status": "operational",
"active_tasks": len(active_tasks),
"concurrency_limit": MAX_CONCURRENT_TASKS,
}
# ─────────────────────────────────────────────────────────────────────────────
# FRONTEND ESTÁTICO
# ─────────────────────────────────────────────────────────────────────────────
if os.path.exists("frontend/dist"):
app.mount("/", StaticFiles(directory="frontend/dist", html=True), name="static")