#!/usr/bin/env python3 import glob import json import os import re import signal import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path from typing import * import gmpy2 from gmpy2 import mpz from enigma_challenges.breaking_rsa import Problem, Solution from enigma_challenges.solution_output import build_solution_zip, write_solution_output CHALLENGE_INPUT_FILE = "/challenge_input/challenge_input.json" def _env_int(name: str, default: int) -> int: try: return int(os.environ.get(name, default)) except (TypeError, ValueError): return default def _cpu_count() -> int: n = os.cpu_count() or 8 try: parts = Path("/sys/fs/cgroup/cpu.max").read_text().split() if parts and parts[0] != "max": return max(1, min(n, int(parts[0]) // int(parts[1]))) except (OSError, ValueError, IndexError): pass try: quota = int(Path("/sys/fs/cgroup/cpu/cpu.cfs_quota_us").read_text()) period = int(Path("/sys/fs/cgroup/cpu/cpu.cfs_period_us").read_text()) if quota > 0 and period > 0: return max(1, min(n, quota // period)) except (OSError, ValueError): pass return n def _find_bin(env_key: str, candidates: List[str]) -> Optional[str]: env = os.environ.get(env_key, "").strip() if env and os.path.isfile(env): return env for c in candidates: if os.path.isfile(c): return c return None def _find_cado_script() -> Optional[str]: env = os.environ.get("CADO_NFS", "").strip() if env and os.path.isfile(env): return env for c in ["/opt/cado-nfs/build/release/cado-nfs.py", "/usr/local/bin/cado-nfs.py", "/usr/bin/cado-nfs.py"]: if os.path.isfile(c): return c for pat in ["/opt/cado-nfs/build/*/cado-nfs.py", "/usr/local/lib/cado-nfs-*/cado-nfs.py"]: m = sorted(glob.glob(pat)) if m: return m[-1] return None def _start_broker(sock_path: str) -> Optional[subprocess.Popen]: broker = _find_bin("RAMNFS_BROKER", ["/opt/ramnfs/broker", "/app/ramnfs/broker"]) if not broker: return None try: os.unlink(sock_path) except OSError: pass try: proc = subprocess.Popen([broker, sock_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception: return None for _ in range(50): if os.path.exists(sock_path): return proc time.sleep(0.1) proc.kill() return None def _factors_from_line(line: str, n: mpz) -> Optional[Tuple[int, int]]: parts = line.split() if len(parts) < 2 or not all(re.fullmatch(r"\d+", p) for p in parts): return None prod = 1 for p in parts: prod *= int(p) if prod == n and all(gmpy2.is_prime(mpz(int(p))) for p in parts): a, b = int(parts[0]), int(parts[1]) return (a, b) if a <= b else (b, a) return None def _kill(proc: subprocess.Popen) -> None: if proc is None or proc.poll() is not None: return for sig in (signal.SIGTERM, signal.SIGKILL): try: os.killpg(os.getpgid(proc.pid), sig) except Exception: try: proc.kill() except Exception: pass try: proc.wait(timeout=8) return except Exception: continue def _run_cado(n: mpz) -> Optional[Tuple[int, int]]: cado = _find_cado_script() if not cado: return None shim = _find_bin("RAMNFS_SHIM", ["/opt/ramnfs/shim.so", "/app/ramnfs/shim.so"]) sock = os.environ.get("RAMNFS_SOCK", "/tmp/ramnfs.sock") workdir = os.environ.get("RAMNFS_WORKDIR", "/ramwork/factor.work") threads = _env_int("CADO_THREADS", 0) or _cpu_count() broker = None if shim: broker = _start_broker(sock) if not broker: shim = None if not shim: workdir = os.path.join(os.environ.get("TMPDIR", "/tmp"), "cado_run") os.makedirs(workdir, exist_ok=True) env = dict(os.environ) env["HOME"] = env["TMPDIR"] = "/tmp" if shim: env["LD_PRELOAD"] = shim env["RAMNFS_SOCK"] = sock env["RAMNFS_PREFIX"] = "/ramwork" cado_build = str(Path(cado).parent) cmd = [ sys.executable, cado, str(int(n)), f"tasks.workdir={workdir}", f"tasks.threads={threads}", "server.address=localhost", "server.port=0", "server.threaded=1", f"slaves.nrclients={threads}", f"slaves.cado_nfs_client.bindir={cado_build}", f"tasks.linalg.bwc.threads={threads}", "tasks.sieve.las.threads=1", ] if os.environ.get("CADO_ADMAX"): cmd.append(f"tasks.polyselect.admax={os.environ['CADO_ADMAX']}") if os.environ.get("CADO_DEGREE"): cmd.append(f"tasks.polyselect.degree={os.environ['CADO_DEGREE']}") proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env, start_new_session=True) factors: Optional[Tuple[int, int]] = None try: assert proc.stdout for raw in proc.stdout: for line in raw.replace("\r", "\n").splitlines(): s = line.strip() if not s: continue factors = _factors_from_line(s, n) if factors: break if factors: break except Exception: pass finally: _kill(proc) if broker: _kill(broker) return factors def factor_semiprime(n_int: int) -> Tuple[Optional[int], Optional[int], str]: res = _run_cado(mpz(n_int)) if res: return res[0], res[1], "cado_gnfs" return None, None, "failed" def _load_problem() -> Tuple[str, "Problem"]: if os.path.isfile(CHALLENGE_INPUT_FILE): try: data = json.loads(Path(CHALLENGE_INPUT_FILE).read_text()) prob = Problem(int(data["difficulty"]), int(data["num"]), int(data["num_bits"])) cid = (sys.argv[1].strip() if len(sys.argv) > 1 else "") or "challenge" return cid, prob except Exception: pass if len(sys.argv) == 3: prob = Problem.from_json(sys.argv[2].strip()) return sys.argv[1].strip(), prob raise SystemExit("No problem input") def main() -> None: timestamp_start = datetime.now(timezone.utc).isoformat() start = time.time() challenge_id, problem = _load_problem() if problem.num < 6: sys.exit(1) p, q, method = factor_semiprime(problem.num) solve_time = time.time() - start ok = (p is not None and q is not None and mpz(p) * mpz(q) == problem.num and gmpy2.is_prime(mpz(p)) and gmpy2.is_prime(mpz(q))) solution = Solution("success", int(p), int(q)) if ok else Solution("failed", None, None) result_json = json.dumps(solution.to_dict(), indent=2) solve_info_json = json.dumps({ "solution_status": solution.status, "challenge_id": challenge_id, "timestamp_utc": timestamp_start, "solve_time_seconds": solve_time, "method": method, "num_bits": problem.num_bits, }) output_dir = os.environ.get("OUTPUT_DIR") if output_dir: try: Path(output_dir).mkdir(exist_ok=True) Path(output_dir, "result.json").write_text(result_json) Path(output_dir, "solve_info.json").write_text(solve_info_json) except OSError: pass write_solution_output(build_solution_zip({ "result.json": result_json, "solve_info.json": solve_info_json, })) os._exit(0 if solution.status == "success" else 1) if __name__ == "__main__": main()