AND / restore.py
ziren28's picture
v2.6: Hubble sync + D1 database
126cf9c verified
"""
抖音港版登录页修复 - 核心逻辑 (v3.1 = v3 逻辑 + conn.Session 并发加固)
供 app.py / main.py 调用, 所有输出通过 log 回调返回.
v3.1 相比 v3:
- SSH/ADB 生命周期改由 conn.Session 托管
· 本地端口动态分配 (不再撞 -L 8377)
· SSH keep-alive (云平台空闲断线不再丢任务)
· 硬超时兜底 (默认 HKDY_HARD_TIMEOUT=180s)
· 自动重连 1 次
· 只清自己的 serial, 绝不 adb kill-server
- 对外签名保持不变:
run_restore(raw_input_text, pack_dir, log, progress, stop_event=None)
-> (ok: bool, verify_png_path: str|None, backup_path: str|None)
backup_path 在 v3 下始终为 None.
- 为兼容 hk_gate.py 等老调用者, 保留 Tunnel / Adb / parse_ssh_command /
adb_connect / adb_wait_device / _filter_three_lines / RestoreError 符号.
"""
import hashlib
import os
import sys
import tempfile
import time
_HERE = os.path.dirname(os.path.abspath(__file__))
_LIB = os.path.join(_HERE, "lib")
if os.path.isdir(_LIB) and _LIB not in sys.path:
sys.path.insert(0, _LIB)
import paramiko # noqa: F401 (back-compat re-exports 可能用到)
from conn import (
Session,
parse_ssh_command, # 转发 (hk_gate.py 等老代码可能用)
_run as _conn_run,
_Tunnel,
)
PKG = "com.ss.android.ugc.aweme.mobile"
# v3: 港版预注册 DID 模板 (唯一资源)
TEMPLATE_NAME = "applog_stats.xml"
TEMPLATE_MD5 = "d2a6209411e4d4454359e1ac906ca16c"
TEMPLATE_SIZE = 3621
class RestoreError(RuntimeError):
pass
# ============ 后向兼容 shim (hk_gate.py 直接 import 这几个) ============
# 新代码请用 conn.Session, 不要再新增对下面符号的引用.
class Tunnel(_Tunnel):
"""Legacy shim. 参数签名与 v2 保持一致."""
def __init__(self, host, port, user, password, local_port, rhost, rport):
super().__init__(host, port, user, password, rhost, rport,
preferred_local=local_port)
class Adb:
"""Legacy shim. 直接操作 adb 命令行, 不带自动重连 (老代码依赖它的行为)."""
def __init__(self, port):
self.port = port
self.serial = f"localhost:{port}"
def sh(self, cmd, check=True, timeout=120):
r = _conn_run(["adb", "-s", self.serial, "shell", cmd], timeout=timeout)
if check and r.returncode != 0:
raise RuntimeError(f"adb shell 失败:\n{cmd}\n{r.stderr or r.stdout}")
return r.stdout or ""
def push(self, local, remote, timeout=1800):
r = _conn_run(["adb", "-s", self.serial, "push", local, remote], timeout=timeout)
if r.returncode != 0:
raise RuntimeError(f"adb push 失败: {r.stderr or r.stdout}")
return r.stdout
def pull(self, remote, local, timeout=300):
r = _conn_run(["adb", "-s", self.serial, "pull", remote, local], timeout=timeout)
if r.returncode != 0:
raise RuntimeError(f"adb pull 失败: {r.stderr or r.stdout}")
return r.stdout
def root(self):
_conn_run(["adb", "-s", self.serial, "root"], timeout=20)
def adb_connect(port):
return _conn_run(["adb", "connect", f"localhost:{port}"], timeout=20).stdout.strip()
def adb_wait_device(port, timeout=20):
_conn_run(["adb", "-s", f"localhost:{port}", "wait-for-device"], timeout=timeout)
def _filter_three_lines(raw):
"""Legacy. 返回 (ssh_cmd, password, adb_cmd). Session 用 _parse_input 内部处理."""
lines = [l.strip() for l in (raw or "").splitlines() if l.strip()]
ssh_cmd = next((l for l in lines if l.lstrip().lower().startswith("ssh ")), None)
adb_cmd = next((l for l in lines if l.lstrip().lower().startswith("adb ")), None)
password = next(
(l for l in lines if not l.lstrip().lower().startswith(("ssh ", "adb "))),
None,
)
return ssh_cmd, password, adb_cmd
# ============ 主流程 ============
def _md5(path):
return hashlib.md5(open(path, "rb").read()).hexdigest()
def run_restore(raw_input_text, pack_dir, log, progress, stop_event=None):
"""主流程. 返回 (ok, verify_png, backup_path).
v3 下 backup_path 始终为 None — pm clear 自身即回滚到出厂态."""
def check_stop():
if stop_event is not None and stop_event.is_set():
raise RestoreError("用户已中断")
# [0] 模板校验
if pack_dir and not os.path.isdir(pack_dir):
try: os.makedirs(pack_dir, exist_ok=True)
except Exception: pass
template = os.path.join(pack_dir, TEMPLATE_NAME) if pack_dir else None
if not template or not os.path.exists(template):
raise RestoreError(f"找不到模板文件: {template}\n"
f"请确认 {TEMPLATE_NAME} 存在于 pack_dir.")
sz_local = os.path.getsize(template)
md5_local = _md5(template)
if sz_local != TEMPLATE_SIZE or md5_local != TEMPLATE_MD5:
log(f"⚠ 模板异常: 大小={sz_local} (期望 {TEMPLATE_SIZE}) md5={md5_local}")
log(f" 期望 md5={TEMPLATE_MD5}. 继续尝试, 港版识别可能失败.")
else:
log(f" 模板 OK: {TEMPLATE_NAME} {sz_local} B md5={md5_local[:8]}…")
# [1/6] SSH 隧道 + adb ready (Session 一把搞定, 带 keep-alive / 自动重连 / 硬超时)
log("[1/6] 建立 SSH 隧道 + adb ready (动态端口, keep-alive, 硬超时 180s)")
progress(10)
check_stop()
try:
sess = Session(raw_input_text, log=log) # hard_timeout 读 HKDY_HARD_TIMEOUT 环境变量
except ValueError as e:
raise RestoreError(str(e))
try:
try:
sess.start()
except paramiko.AuthenticationException:
raise RestoreError("SSH 认证失败: 用户名或密码错误")
except Exception as e:
raise RestoreError(f"建立连接失败: {e}")
log(f" serial={sess.serial} (local_port={sess.local_port})")
progress(22)
check_stop()
# [2/6] 设备信息
who = sess.sh("id", check=False).strip()
if "uid=0" not in who:
log(f" ⚠ 当前非 root: {who}")
brand = sess.sh("getprop ro.product.brand").strip()
model = sess.sh("getprop ro.product.model").strip()
sdk = sess.sh("getprop ro.build.version.sdk").strip()
log(f"[2/6] 设备: {brand} {model} Android SDK={sdk}")
progress(32)
if "package:" not in sess.sh(f"pm path {PKG}", check=False):
raise RestoreError(f"目标设备未安装 {PKG}")
# [3/6] pm clear
log(f"[3/6] pm clear {PKG} — 清空抖音数据")
sess.sh(f"am force-stop {PKG}", check=False)
sess.sh(f"pm clear {PKG}")
time.sleep(2)
progress(45)
check_stop()
data_dir = f"/data/data/{PKG}"
for _ in range(5):
if sess.sh(f"[ -d {data_dir} ] && echo yes || echo no",
check=False).strip() == "yes":
break
time.sleep(1)
else:
raise RestoreError(f"pm clear 后 {data_dir} 不存在")
uid_gid = sess.sh(f"stat -c '%u:%g' {data_dir}").strip()
ctx = sess.sh(f"ls -Zd {data_dir}").strip().split()[0]
log(f" UID={uid_gid} SELinux={ctx}")
progress(55)
# [4/6] 写模板
log(f"[4/6] 写入 shared_prefs/{TEMPLATE_NAME}")
remote_tmp = f"/data/local/tmp/_hkdy_{os.getpid()}_{TEMPLATE_NAME}"
sess.push(template, remote_tmp)
out = sess.sh(
f"mkdir -p {data_dir}/shared_prefs && "
f"cp {remote_tmp} {data_dir}/shared_prefs/{TEMPLATE_NAME} && "
f"chown {uid_gid} {data_dir}/shared_prefs && "
f"chown {uid_gid} {data_dir}/shared_prefs/{TEMPLATE_NAME} && "
f"chmod 660 {data_dir}/shared_prefs/{TEMPLATE_NAME} && "
f"chcon {ctx} {data_dir}/shared_prefs && "
f"chcon {ctx} {data_dir}/shared_prefs/{TEMPLATE_NAME} && "
f"rm -f {remote_tmp} && echo WROTE_OK"
)
if "WROTE_OK" not in out:
raise RestoreError(f"写入失败:\n{out}")
actual_sz = sess.sh(
f"stat -c '%s' {data_dir}/shared_prefs/{TEMPLATE_NAME}",
check=False,
).strip()
log(f" 落地大小: {actual_sz} B (期望 {TEMPLATE_SIZE})")
progress(70)
check_stop()
# [5/6] 启动抖音
log("[5/6] 启动抖音")
sess.sh(f"monkey -p {PKG} -c android.intent.category.LAUNCHER 1",
check=False)
for i in range(12):
check_stop()
time.sleep(1)
progress(70 + i * 2)
# [6/6] 截图验证 + 清理
log("[6/6] 截图验证")
sess.sh("screencap -p /sdcard/verify.png", check=False)
verify_png = None
if pack_dir:
# 带 pid 前缀, 并发任务不会互相覆盖
# 截图临时放 /tmp, finally 里统一清 (用户要求: 不持久化截图)
verify_png = os.path.join(
tempfile.gettempdir(),
f"hkdy_verify_{sess.local_port}_{os.getpid()}.png",
)
try:
sess.pull("/sdcard/verify.png", verify_png)
sz_png = os.path.getsize(verify_png) if os.path.exists(verify_png) else 0
log(f" 截图 {sz_png} B (临时 {verify_png}, 函数退出清理)")
except Exception as e:
log(f" ⚠ 截图拉取失败: {e}")
verify_png = None
act = sess.sh("dumpsys window | grep mCurrentFocus | tail -1",
check=False).strip()
log(f" 当前 Activity: {act}")
# 设备端痕迹清理 (不动别人的残留, 只清自己 pid 相关)
sess.sh(
f"rm -f /sdcard/verify.png /data/local/tmp/_hkdy_{os.getpid()}_* "
"2>/dev/null; "
"logcat -c 2>/dev/null; echo CLEAN_OK",
check=False, timeout=15,
)
progress(100)
log("")
log("=== 修复完成 ===")
log("打开抖音点「同意」后进入登录页, 可见 Google / 邮箱登录按钮.")
log("注意: 所有设备共享同一 HK DID (3590369038608599), 勿在一台上批量登号.")
return True, None, None # 截图不持久化, 不返回路径
finally:
# 清截图
try:
if verify_png and os.path.exists(verify_png):
os.unlink(verify_png)
except Exception:
pass
try:
sess.close()
except Exception:
pass