Spaces:
Running on Zero
Running on Zero
| #!/usr/bin/env python3 | |
| """Her · हेर — bulk session uploader (scan → scrub → upload, with your approval). | |
| Brings your Claude Code sessions into the private Her Space so you get a full Projects | |
| view. It NEVER touches your originals: it COPIES the sessions you pick into a local | |
| staging folder, SCRUBS likely secrets from the copies, then UPLOADS them — pausing for | |
| your approval at each of the three steps. | |
| Pure standard library — no pip installs. Run: | |
| python her_upload.py | |
| python her_upload.py --space build-small-hackathon/her # override the Space | |
| python her_upload.py --projects-dir ~/.claude/projects # override the source | |
| Auth: uses your Hugging Face token (HF_TOKEN env, else ~/.cache/huggingface/token — | |
| created by `hf auth login`). Required because the Space is private. | |
| PRIVACY: the scrubber is best-effort (you review the redaction summary before upload), | |
| and your uploads auto-delete from the Space after 24h (or when you click "clear my data" | |
| / close the tab). Nothing here ever modifies ~/.claude. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import glob | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| import sys | |
| import uuid | |
| import urllib.request | |
| import urllib.error | |
| from pathlib import Path | |
| DEFAULT_SPACE = "build-small-hackathon/her" | |
| # --------------------------------------------------------------------------- # | |
| # small console helpers | |
| # --------------------------------------------------------------------------- # | |
| def c(txt, color="orange"): | |
| codes = {"orange": "38;5;208", "red": "31", "green": "32", "cyan": "36", "dim": "2", "bold": "1"} | |
| return f"\033[{codes.get(color,'0')}m{txt}\033[0m" | |
| def hr(): | |
| print(c("─" * 64, "dim")) | |
| def ask(prompt: str) -> str: | |
| try: | |
| return input(prompt).strip() | |
| except (EOFError, KeyboardInterrupt): | |
| print("\naborted.") | |
| sys.exit(1) | |
| def confirm(prompt: str) -> bool: | |
| return ask(prompt + " [y/N] ").lower() in ("y", "yes") | |
| # --------------------------------------------------------------------------- # | |
| # auth + host | |
| # --------------------------------------------------------------------------- # | |
| def hf_token() -> str: | |
| tok = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") | |
| if tok: | |
| return tok.strip() | |
| for p in (Path.home() / ".cache/huggingface/token", Path.home() / ".huggingface/token"): | |
| try: | |
| t = p.read_text(encoding="utf-8").strip() | |
| if t: | |
| return t | |
| except OSError: | |
| pass | |
| print(c("No Hugging Face token found.", "red")) | |
| print("Run `hf auth login` (or set HF_TOKEN) so the script can reach your private Space.") | |
| sys.exit(1) | |
| def space_host(space_id: str) -> str: | |
| # owner/name -> owner-name.hf.space (HF lowercases and dashes the id) | |
| return space_id.replace("/", "-").lower() + ".hf.space" | |
| # --------------------------------------------------------------------------- # | |
| # scan projects (read the REAL cwd from inside each file — like the engine does) | |
| # --------------------------------------------------------------------------- # | |
| def read_cwd(path: str): | |
| try: | |
| with open(path, "r", encoding="utf-8") as fh: | |
| for line in fh: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| r = json.loads(line) | |
| except ValueError: | |
| continue | |
| if isinstance(r, dict) and r.get("type") in ("user", "assistant") and r.get("cwd"): | |
| return r.get("cwd") | |
| except OSError: | |
| return None | |
| return None | |
| def scan(projects_dir: str): | |
| """Return [{encoded, cwd, files:[paths]}] grouped by the encoded project folder.""" | |
| groups = {} | |
| for fp in glob.glob(os.path.join(projects_dir, "*", "*.jsonl")): | |
| enc = os.path.basename(os.path.dirname(fp)) | |
| groups.setdefault(enc, {"encoded": enc, "cwd": None, "files": []}) | |
| groups[enc]["files"].append(os.path.abspath(fp)) | |
| for g in groups.values(): | |
| g["files"].sort() | |
| for f in g["files"]: | |
| cwd = read_cwd(f) | |
| if cwd: | |
| g["cwd"] = cwd | |
| break | |
| out = list(groups.values()) | |
| out.sort(key=lambda g: (g["cwd"] or g["encoded"]).lower()) | |
| return out | |
| def parse_selection(sel: str, n: int): | |
| sel = sel.strip().lower() | |
| if sel in ("all", "*", "a"): | |
| return list(range(n)) | |
| picked = set() | |
| for part in sel.replace(" ", "").split(","): | |
| if not part: | |
| continue | |
| if "-" in part: | |
| try: | |
| a, b = part.split("-", 1) | |
| for i in range(int(a), int(b) + 1): | |
| if 1 <= i <= n: | |
| picked.add(i - 1) | |
| except ValueError: | |
| pass | |
| elif part.isdigit(): | |
| i = int(part) | |
| if 1 <= i <= n: | |
| picked.add(i - 1) | |
| return sorted(picked) | |
| # --------------------------------------------------------------------------- # | |
| # scrubber — best-effort secret redaction (you review the summary before upload) | |
| # --------------------------------------------------------------------------- # | |
| _REPL = "[REDACTED]" | |
| _WHOLE = [ | |
| ("private key block", re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----", re.S)), | |
| ("openai/anthropic key", re.compile(r"\b(?:sk|sk-ant|sk-proj)-[A-Za-z0-9_\-]{20,}\b")), | |
| ("hf token", re.compile(r"\bhf_[A-Za-z0-9]{20,}\b")), | |
| ("github token", re.compile(r"\bgh[posru]_[A-Za-z0-9]{30,}\b")), | |
| ("aws access key id", re.compile(r"\b(?:AKIA|ASIA)[0-9A-Z]{16}\b")), | |
| ("google api key", re.compile(r"\bAIza[0-9A-Za-z_\-]{35}\b")), | |
| ("slack token", re.compile(r"\bxox[baprs]-[A-Za-z0-9-]{10,}\b")), | |
| ("bearer token", re.compile(r"(?i)\bBearer\s+[A-Za-z0-9._\-]{16,}")), | |
| ("jwt", re.compile(r"\beyJ[A-Za-z0-9_\-]{10,}\.[A-Za-z0-9_\-]{10,}\.[A-Za-z0-9_\-]{10,}\b")), | |
| ] | |
| # group1 = the key + separator (+ an optional opening quote, possibly JSON-escaped as \"); | |
| # group2 = the secret value (stops at a quote, backslash, whitespace, or JSON delimiter, | |
| # so it works whether the value is bare or wrapped in escaped quotes inside the JSONL). | |
| _KV = re.compile( | |
| r"(?i)(\"?(?:password|passwd|secret|token|api[_-]?key|access[_-]?key|client[_-]?secret|auth[_-]?token)\"?\s*[:=]\s*(?:\\?\")?)" | |
| r"([^\"\\\s,}{]{6,})" | |
| ) | |
| def scrub_text(text: str): | |
| counts = {} | |
| for name, pat in _WHOLE: | |
| text, n = pat.subn(_REPL, text) | |
| if n: | |
| counts[name] = counts.get(name, 0) + n | |
| def _kv(m): | |
| return m.group(1) + _REPL | |
| text, n = _KV.subn(_kv, text) | |
| if n: | |
| counts["key=value secret"] = counts.get("key=value secret", 0) + n | |
| return text, counts | |
| # --------------------------------------------------------------------------- # | |
| # upload (stdlib multipart) | |
| # --------------------------------------------------------------------------- # | |
| def upload_file(host: str, token: str, client: str, project: str, filename: str, data: bytes): | |
| boundary = "----her" + uuid.uuid4().hex | |
| pre = b"" | |
| for k, v in (("project", project),): | |
| pre += (f"--{boundary}\r\nContent-Disposition: form-data; name=\"{k}\"\r\n\r\n{v}\r\n").encode() | |
| pre += ( | |
| f"--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"{filename}\"\r\n" | |
| f"Content-Type: application/jsonl\r\n\r\n" | |
| ).encode() | |
| body = pre + data + b"\r\n" + f"--{boundary}--\r\n".encode() | |
| req = urllib.request.Request( | |
| f"https://{host}/api/upload", | |
| data=body, | |
| method="POST", | |
| headers={ | |
| "Content-Type": f"multipart/form-data; boundary={boundary}", | |
| "Authorization": f"Bearer {token}", | |
| "X-Her-Client": client, | |
| }, | |
| ) | |
| with urllib.request.urlopen(req, timeout=120) as resp: | |
| return json.loads(resp.read().decode("utf-8")) | |
| # --------------------------------------------------------------------------- # | |
| # main | |
| # --------------------------------------------------------------------------- # | |
| def main(): | |
| ap = argparse.ArgumentParser(description="Bulk-upload Claude Code sessions to your Her Space.") | |
| ap.add_argument("--space", default=os.environ.get("HER_SPACE", DEFAULT_SPACE), help="HF Space id (owner/name)") | |
| ap.add_argument("--host", default=os.environ.get("HER_HOST"), help="override the *.hf.space host") | |
| ap.add_argument("--projects-dir", default=os.path.expanduser("~/.claude/projects")) | |
| ap.add_argument("--staging", default=os.path.abspath("./her-staging")) | |
| args = ap.parse_args() | |
| host = args.host or space_host(args.space) | |
| token = hf_token() | |
| client = uuid.uuid4().hex # this upload's private namespace; the open-URL carries it | |
| print(c("\nHer · हेर — bring your sessions in", "bold")) | |
| print(c(f"Space: {args.space} ({host})", "dim")) | |
| print(c(f"Source: {args.projects_dir}", "dim")) | |
| # ---- STEP 1: SELECT ---------------------------------------------------- # | |
| hr(); print(c("STEP 1 / 3 · choose projects", "cyan")) | |
| groups = scan(args.projects_dir) | |
| if not groups: | |
| print(c(f"No .jsonl sessions found under {args.projects_dir}", "red")) | |
| sys.exit(1) | |
| for i, g in enumerate(groups, 1): | |
| print(f" {i:>2}. {c(g['cwd'] or g['encoded'], 'orange')} " | |
| + c(f"({len(g['files'])} session{'s' if len(g['files'])!=1 else ''})", "dim")) | |
| print(c("\nEnter numbers (e.g. 1,3,5 or 2-6), or 'all'.", "dim")) | |
| picks = parse_selection(ask("Select projects: "), len(groups)) | |
| if not picks: | |
| print("Nothing selected."); sys.exit(0) | |
| chosen = [groups[i] for i in picks] | |
| total_files = sum(len(g["files"]) for g in chosen) | |
| print(c(f"\n→ {len(chosen)} project(s), {total_files} session(s) selected.", "green")) | |
| if not confirm("Copy these into the staging folder and continue?"): | |
| sys.exit(0) | |
| # ---- STEP 2: COPY + SCRUB --------------------------------------------- # | |
| hr(); print(c("STEP 2 / 3 · copy to staging + scrub secrets", "cyan")) | |
| staging = Path(args.staging) | |
| if staging.exists(): | |
| shutil.rmtree(staging, ignore_errors=True) | |
| staging.mkdir(parents=True, exist_ok=True) | |
| staged = [] # (project_encoded, staged_path, original_name) | |
| redaction_totals = {} | |
| files_with_redactions = 0 | |
| for g in chosen: | |
| outdir = staging / g["encoded"] | |
| outdir.mkdir(parents=True, exist_ok=True) | |
| for src in g["files"]: | |
| try: | |
| raw = Path(src).read_text(encoding="utf-8", errors="replace") | |
| except OSError: | |
| continue | |
| cleaned, counts = scrub_text(raw) | |
| if counts: | |
| files_with_redactions += 1 | |
| for k, v in counts.items(): | |
| redaction_totals[k] = redaction_totals.get(k, 0) + v | |
| dst = outdir / os.path.basename(src) | |
| dst.write_text(cleaned, encoding="utf-8") | |
| staged.append((g["encoded"], dst, os.path.basename(src))) | |
| print(c(f"Staged {len(staged)} scrubbed session(s) → {staging}", "green")) | |
| if redaction_totals: | |
| print(c(f"Redacted likely secrets in {files_with_redactions} file(s):", "orange")) | |
| for k, v in sorted(redaction_totals.items(), key=lambda x: -x[1]): | |
| print(f" · {k}: {v}") | |
| else: | |
| print(c("No obvious secrets matched (the scrubber is best-effort — review if unsure).", "dim")) | |
| print(c(f"\nYou can inspect the scrubbed copies in {staging} before uploading.", "dim")) | |
| if not confirm("Upload these scrubbed sessions to your private Space?"): | |
| print("Stopped before upload. Staging kept for your review."); sys.exit(0) | |
| # ---- STEP 3: UPLOAD ---------------------------------------------------- # | |
| hr(); print(c("STEP 3 / 3 · upload", "cyan")) | |
| ok = 0 | |
| for idx, (enc, path, name) in enumerate(staged, 1): | |
| try: | |
| data = path.read_bytes() | |
| upload_file(host, token, client, enc, name, data) | |
| ok += 1 | |
| print(f" [{idx}/{len(staged)}] {c('uploaded', 'green')} {enc}/{name}") | |
| except urllib.error.HTTPError as e: | |
| print(f" [{idx}/{len(staged)}] {c('FAILED', 'red')} {name}: HTTP {e.code} {e.reason}") | |
| except Exception as e: # noqa: BLE001 | |
| print(f" [{idx}/{len(staged)}] {c('FAILED', 'red')} {name}: {e}") | |
| hr() | |
| if ok == 0: | |
| print(c("No sessions uploaded.", "red")); sys.exit(1) | |
| print(c(f"✅ Uploaded {ok}/{len(staged)} session(s).", "green")) | |
| spaces_url = f"https://huggingface.co/spaces/{args.space}?client={client}" | |
| print("\nOpen your Projects view (bound to this upload):") | |
| print(" " + c(spaces_url, "cyan")) | |
| print(c("\n⏳ Give it a few seconds on first open — the Space analyzes the sessions and", "orange")) | |
| print(c(" the local model writes the cross-session summary. If a project briefly shows", "orange")) | |
| print(c(" “no sessions found”, just wait a moment and refresh; it’s still generating.", "orange")) | |
| print(c("\nIf your projects don't appear, open the Space, then in the browser console run:", "dim")) | |
| print(c(f" localStorage.setItem('her.clientId','{client}'); location.reload()", "dim")) | |
| print(c("\nReminder: your uploads auto-delete after 24h, or instantly via “clear my data”.", "dim")) | |
| if __name__ == "__main__": | |
| main() | |