""" 抖音港版登录页修复 - 核心逻辑 (v3.1 = v3 逻辑 + conn.Session 并发加固) 供 app.py / main.py 调用, 所有输出通过 log 回调返回. v3.1 相比 v3: - SSH/ADB 生命周期改由 conn.Session 托管 · 本地端口动态分配 (不再撞 -L 8377) · SSH keep-alive (云平台空闲断线不再丢任务) · 硬超时兜底 (默认 HKDY_HARD_TIMEOUT=180s) · 自动重连 1 次 · 只清自己的 serial, 绝不 adb kill-server - 对外签名保持不变: run_restore(raw_input_text, pack_dir, log, progress, stop_event=None) -> (ok: bool, verify_png_path: str|None, backup_path: str|None) backup_path 在 v3 下始终为 None. - 为兼容 hk_gate.py 等老调用者, 保留 Tunnel / Adb / parse_ssh_command / adb_connect / adb_wait_device / _filter_three_lines / RestoreError 符号. """ import hashlib import os import sys import tempfile import time _HERE = os.path.dirname(os.path.abspath(__file__)) _LIB = os.path.join(_HERE, "lib") if os.path.isdir(_LIB) and _LIB not in sys.path: sys.path.insert(0, _LIB) import paramiko # noqa: F401 (back-compat re-exports 可能用到) from conn import ( Session, parse_ssh_command, # 转发 (hk_gate.py 等老代码可能用) _run as _conn_run, _Tunnel, ) PKG = "com.ss.android.ugc.aweme.mobile" # v3: 港版预注册 DID 模板 (唯一资源) TEMPLATE_NAME = "applog_stats.xml" TEMPLATE_MD5 = "d2a6209411e4d4454359e1ac906ca16c" TEMPLATE_SIZE = 3621 class RestoreError(RuntimeError): pass # ============ 后向兼容 shim (hk_gate.py 直接 import 这几个) ============ # 新代码请用 conn.Session, 不要再新增对下面符号的引用. class Tunnel(_Tunnel): """Legacy shim. 参数签名与 v2 保持一致.""" def __init__(self, host, port, user, password, local_port, rhost, rport): super().__init__(host, port, user, password, rhost, rport, preferred_local=local_port) class Adb: """Legacy shim. 直接操作 adb 命令行, 不带自动重连 (老代码依赖它的行为).""" def __init__(self, port): self.port = port self.serial = f"localhost:{port}" def sh(self, cmd, check=True, timeout=120): r = _conn_run(["adb", "-s", self.serial, "shell", cmd], timeout=timeout) if check and r.returncode != 0: raise RuntimeError(f"adb shell 失败:\n{cmd}\n{r.stderr or r.stdout}") return r.stdout or "" def push(self, local, remote, timeout=1800): r = _conn_run(["adb", "-s", self.serial, "push", local, remote], timeout=timeout) if r.returncode != 0: raise RuntimeError(f"adb push 失败: {r.stderr or r.stdout}") return r.stdout def pull(self, remote, local, timeout=300): r = _conn_run(["adb", "-s", self.serial, "pull", remote, local], timeout=timeout) if r.returncode != 0: raise RuntimeError(f"adb pull 失败: {r.stderr or r.stdout}") return r.stdout def root(self): _conn_run(["adb", "-s", self.serial, "root"], timeout=20) def adb_connect(port): return _conn_run(["adb", "connect", f"localhost:{port}"], timeout=20).stdout.strip() def adb_wait_device(port, timeout=20): _conn_run(["adb", "-s", f"localhost:{port}", "wait-for-device"], timeout=timeout) def _filter_three_lines(raw): """Legacy. 返回 (ssh_cmd, password, adb_cmd). Session 用 _parse_input 内部处理.""" lines = [l.strip() for l in (raw or "").splitlines() if l.strip()] ssh_cmd = next((l for l in lines if l.lstrip().lower().startswith("ssh ")), None) adb_cmd = next((l for l in lines if l.lstrip().lower().startswith("adb ")), None) password = next( (l for l in lines if not l.lstrip().lower().startswith(("ssh ", "adb "))), None, ) return ssh_cmd, password, adb_cmd # ============ 主流程 ============ def _md5(path): return hashlib.md5(open(path, "rb").read()).hexdigest() def run_restore(raw_input_text, pack_dir, log, progress, stop_event=None): """主流程. 返回 (ok, verify_png, backup_path). v3 下 backup_path 始终为 None — pm clear 自身即回滚到出厂态.""" def check_stop(): if stop_event is not None and stop_event.is_set(): raise RestoreError("用户已中断") # [0] 模板校验 if pack_dir and not os.path.isdir(pack_dir): try: os.makedirs(pack_dir, exist_ok=True) except Exception: pass template = os.path.join(pack_dir, TEMPLATE_NAME) if pack_dir else None if not template or not os.path.exists(template): raise RestoreError(f"找不到模板文件: {template}\n" f"请确认 {TEMPLATE_NAME} 存在于 pack_dir.") sz_local = os.path.getsize(template) md5_local = _md5(template) if sz_local != TEMPLATE_SIZE or md5_local != TEMPLATE_MD5: log(f"⚠ 模板异常: 大小={sz_local} (期望 {TEMPLATE_SIZE}) md5={md5_local}") log(f" 期望 md5={TEMPLATE_MD5}. 继续尝试, 港版识别可能失败.") else: log(f" 模板 OK: {TEMPLATE_NAME} {sz_local} B md5={md5_local[:8]}…") # [1/6] SSH 隧道 + adb ready (Session 一把搞定, 带 keep-alive / 自动重连 / 硬超时) log("[1/6] 建立 SSH 隧道 + adb ready (动态端口, keep-alive, 硬超时 180s)") progress(10) check_stop() try: sess = Session(raw_input_text, log=log) # hard_timeout 读 HKDY_HARD_TIMEOUT 环境变量 except ValueError as e: raise RestoreError(str(e)) try: try: sess.start() except paramiko.AuthenticationException: raise RestoreError("SSH 认证失败: 用户名或密码错误") except Exception as e: raise RestoreError(f"建立连接失败: {e}") log(f" serial={sess.serial} (local_port={sess.local_port})") progress(22) check_stop() # [2/6] 设备信息 who = sess.sh("id", check=False).strip() if "uid=0" not in who: log(f" ⚠ 当前非 root: {who}") brand = sess.sh("getprop ro.product.brand").strip() model = sess.sh("getprop ro.product.model").strip() sdk = sess.sh("getprop ro.build.version.sdk").strip() log(f"[2/6] 设备: {brand} {model} Android SDK={sdk}") progress(32) if "package:" not in sess.sh(f"pm path {PKG}", check=False): raise RestoreError(f"目标设备未安装 {PKG}") # [3/6] pm clear log(f"[3/6] pm clear {PKG} — 清空抖音数据") sess.sh(f"am force-stop {PKG}", check=False) sess.sh(f"pm clear {PKG}") time.sleep(2) progress(45) check_stop() data_dir = f"/data/data/{PKG}" for _ in range(5): if sess.sh(f"[ -d {data_dir} ] && echo yes || echo no", check=False).strip() == "yes": break time.sleep(1) else: raise RestoreError(f"pm clear 后 {data_dir} 不存在") uid_gid = sess.sh(f"stat -c '%u:%g' {data_dir}").strip() ctx = sess.sh(f"ls -Zd {data_dir}").strip().split()[0] log(f" UID={uid_gid} SELinux={ctx}") progress(55) # [4/6] 写模板 log(f"[4/6] 写入 shared_prefs/{TEMPLATE_NAME}") remote_tmp = f"/data/local/tmp/_hkdy_{os.getpid()}_{TEMPLATE_NAME}" sess.push(template, remote_tmp) out = sess.sh( f"mkdir -p {data_dir}/shared_prefs && " f"cp {remote_tmp} {data_dir}/shared_prefs/{TEMPLATE_NAME} && " f"chown {uid_gid} {data_dir}/shared_prefs && " f"chown {uid_gid} {data_dir}/shared_prefs/{TEMPLATE_NAME} && " f"chmod 660 {data_dir}/shared_prefs/{TEMPLATE_NAME} && " f"chcon {ctx} {data_dir}/shared_prefs && " f"chcon {ctx} {data_dir}/shared_prefs/{TEMPLATE_NAME} && " f"rm -f {remote_tmp} && echo WROTE_OK" ) if "WROTE_OK" not in out: raise RestoreError(f"写入失败:\n{out}") actual_sz = sess.sh( f"stat -c '%s' {data_dir}/shared_prefs/{TEMPLATE_NAME}", check=False, ).strip() log(f" 落地大小: {actual_sz} B (期望 {TEMPLATE_SIZE})") progress(70) check_stop() # [5/6] 启动抖音 log("[5/6] 启动抖音") sess.sh(f"monkey -p {PKG} -c android.intent.category.LAUNCHER 1", check=False) for i in range(12): check_stop() time.sleep(1) progress(70 + i * 2) # [6/6] 截图验证 + 清理 log("[6/6] 截图验证") sess.sh("screencap -p /sdcard/verify.png", check=False) verify_png = None if pack_dir: # 带 pid 前缀, 并发任务不会互相覆盖 # 截图临时放 /tmp, finally 里统一清 (用户要求: 不持久化截图) verify_png = os.path.join( tempfile.gettempdir(), f"hkdy_verify_{sess.local_port}_{os.getpid()}.png", ) try: sess.pull("/sdcard/verify.png", verify_png) sz_png = os.path.getsize(verify_png) if os.path.exists(verify_png) else 0 log(f" 截图 {sz_png} B (临时 {verify_png}, 函数退出清理)") except Exception as e: log(f" ⚠ 截图拉取失败: {e}") verify_png = None act = sess.sh("dumpsys window | grep mCurrentFocus | tail -1", check=False).strip() log(f" 当前 Activity: {act}") # 设备端痕迹清理 (不动别人的残留, 只清自己 pid 相关) sess.sh( f"rm -f /sdcard/verify.png /data/local/tmp/_hkdy_{os.getpid()}_* " "2>/dev/null; " "logcat -c 2>/dev/null; echo CLEAN_OK", check=False, timeout=15, ) progress(100) log("") log("=== 修复完成 ===") log("打开抖音点「同意」后进入登录页, 可见 Google / 邮箱登录按钮.") log("注意: 所有设备共享同一 HK DID (3590369038608599), 勿在一台上批量登号.") return True, None, None # 截图不持久化, 不返回路径 finally: # 清截图 try: if verify_png and os.path.exists(verify_png): os.unlink(verify_png) except Exception: pass try: sess.close() except Exception: pass