dataset / v13 /my_solution /breaking_rsa.py
david22guy's picture
Upload folder using huggingface_hub
8d1e50f verified
Raw
History Blame Contribute Delete
7.86 kB
#!/usr/bin/env python3
import glob
import json
import os
import re
import signal
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import *
import gmpy2
from gmpy2 import mpz
from enigma_challenges.breaking_rsa import Problem, Solution
from enigma_challenges.solution_output import build_solution_zip, write_solution_output
CHALLENGE_INPUT_FILE = "/challenge_input/challenge_input.json"
def _env_int(name: str, default: int) -> int:
try:
return int(os.environ.get(name, default))
except (TypeError, ValueError):
return default
def _cpu_count() -> int:
n = os.cpu_count() or 8
try:
parts = Path("/sys/fs/cgroup/cpu.max").read_text().split()
if parts and parts[0] != "max":
return max(1, min(n, int(parts[0]) // int(parts[1])))
except (OSError, ValueError, IndexError):
pass
try:
quota = int(Path("/sys/fs/cgroup/cpu/cpu.cfs_quota_us").read_text())
period = int(Path("/sys/fs/cgroup/cpu/cpu.cfs_period_us").read_text())
if quota > 0 and period > 0:
return max(1, min(n, quota // period))
except (OSError, ValueError):
pass
return n
def _find_bin(env_key: str, candidates: List[str]) -> Optional[str]:
env = os.environ.get(env_key, "").strip()
if env and os.path.isfile(env):
return env
for c in candidates:
if os.path.isfile(c):
return c
return None
def _find_cado_script() -> Optional[str]:
env = os.environ.get("CADO_NFS", "").strip()
if env and os.path.isfile(env):
return env
for c in ["/opt/cado-nfs/build/release/cado-nfs.py",
"/usr/local/bin/cado-nfs.py", "/usr/bin/cado-nfs.py"]:
if os.path.isfile(c):
return c
for pat in ["/opt/cado-nfs/build/*/cado-nfs.py", "/usr/local/lib/cado-nfs-*/cado-nfs.py"]:
m = sorted(glob.glob(pat))
if m:
return m[-1]
return None
def _start_broker(sock_path: str) -> Optional[subprocess.Popen]:
broker = _find_bin("RAMNFS_BROKER", ["/opt/ramnfs/broker", "/app/ramnfs/broker"])
if not broker:
return None
try:
os.unlink(sock_path)
except OSError:
pass
try:
proc = subprocess.Popen([broker, sock_path],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception:
return None
for _ in range(50):
if os.path.exists(sock_path):
return proc
time.sleep(0.1)
proc.kill()
return None
def _factors_from_line(line: str, n: mpz) -> Optional[Tuple[int, int]]:
parts = line.split()
if len(parts) < 2 or not all(re.fullmatch(r"\d+", p) for p in parts):
return None
prod = 1
for p in parts:
prod *= int(p)
if prod == n and all(gmpy2.is_prime(mpz(int(p))) for p in parts):
a, b = int(parts[0]), int(parts[1])
return (a, b) if a <= b else (b, a)
return None
def _kill(proc: subprocess.Popen) -> None:
if proc is None or proc.poll() is not None:
return
for sig in (signal.SIGTERM, signal.SIGKILL):
try:
os.killpg(os.getpgid(proc.pid), sig)
except Exception:
try:
proc.kill()
except Exception:
pass
try:
proc.wait(timeout=8)
return
except Exception:
continue
def _run_cado(n: mpz) -> Optional[Tuple[int, int]]:
cado = _find_cado_script()
if not cado:
return None
shim = _find_bin("RAMNFS_SHIM", ["/opt/ramnfs/shim.so", "/app/ramnfs/shim.so"])
sock = os.environ.get("RAMNFS_SOCK", "/tmp/ramnfs.sock")
workdir = os.environ.get("RAMNFS_WORKDIR", "/ramwork/factor.work")
threads = _env_int("CADO_THREADS", 0) or _cpu_count()
broker = None
if shim:
broker = _start_broker(sock)
if not broker:
shim = None
if not shim:
workdir = os.path.join(os.environ.get("TMPDIR", "/tmp"), "cado_run")
os.makedirs(workdir, exist_ok=True)
env = dict(os.environ)
env["HOME"] = env["TMPDIR"] = "/tmp"
if shim:
env["LD_PRELOAD"] = shim
env["RAMNFS_SOCK"] = sock
env["RAMNFS_PREFIX"] = "/ramwork"
cado_build = str(Path(cado).parent)
cmd = [
sys.executable, cado, str(int(n)),
f"tasks.workdir={workdir}",
f"tasks.threads={threads}",
"server.address=localhost", "server.port=0", "server.threaded=1",
f"slaves.nrclients={threads}",
f"slaves.cado_nfs_client.bindir={cado_build}",
f"tasks.linalg.bwc.threads={threads}",
"tasks.sieve.las.threads=1",
]
if os.environ.get("CADO_ADMAX"):
cmd.append(f"tasks.polyselect.admax={os.environ['CADO_ADMAX']}")
if os.environ.get("CADO_DEGREE"):
cmd.append(f"tasks.polyselect.degree={os.environ['CADO_DEGREE']}")
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True, env=env, start_new_session=True)
factors: Optional[Tuple[int, int]] = None
try:
assert proc.stdout
for raw in proc.stdout:
for line in raw.replace("\r", "\n").splitlines():
s = line.strip()
if not s:
continue
factors = _factors_from_line(s, n)
if factors:
break
if factors:
break
except Exception:
pass
finally:
_kill(proc)
if broker:
_kill(broker)
return factors
def factor_semiprime(n_int: int) -> Tuple[Optional[int], Optional[int], str]:
res = _run_cado(mpz(n_int))
if res:
return res[0], res[1], "cado_gnfs"
return None, None, "failed"
def _load_problem() -> Tuple[str, "Problem"]:
if os.path.isfile(CHALLENGE_INPUT_FILE):
try:
data = json.loads(Path(CHALLENGE_INPUT_FILE).read_text())
prob = Problem(int(data["difficulty"]), int(data["num"]), int(data["num_bits"]))
cid = (sys.argv[1].strip() if len(sys.argv) > 1 else "") or "challenge"
return cid, prob
except Exception:
pass
if len(sys.argv) == 3:
prob = Problem.from_json(sys.argv[2].strip())
return sys.argv[1].strip(), prob
raise SystemExit("No problem input")
def main() -> None:
timestamp_start = datetime.now(timezone.utc).isoformat()
start = time.time()
challenge_id, problem = _load_problem()
if problem.num < 6:
sys.exit(1)
p, q, method = factor_semiprime(problem.num)
solve_time = time.time() - start
ok = (p is not None and q is not None
and mpz(p) * mpz(q) == problem.num
and gmpy2.is_prime(mpz(p)) and gmpy2.is_prime(mpz(q)))
solution = Solution("success", int(p), int(q)) if ok else Solution("failed", None, None)
result_json = json.dumps(solution.to_dict(), indent=2)
solve_info_json = json.dumps({
"solution_status": solution.status,
"challenge_id": challenge_id,
"timestamp_utc": timestamp_start,
"solve_time_seconds": solve_time,
"method": method,
"num_bits": problem.num_bits,
})
output_dir = os.environ.get("OUTPUT_DIR")
if output_dir:
try:
Path(output_dir).mkdir(exist_ok=True)
Path(output_dir, "result.json").write_text(result_json)
Path(output_dir, "solve_info.json").write_text(solve_info_json)
except OSError:
pass
write_solution_output(build_solution_zip({
"result.json": result_json,
"solve_info.json": solve_info_json,
}))
os._exit(0 if solution.status == "success" else 1)
if __name__ == "__main__":
main()