Spaces:
Paused
Paused
| import os | |
| import json | |
| import hashlib | |
| import pathlib | |
| from typing import List, Tuple, Optional, Dict | |
| import requests | |
| import gradio as gr | |
| import bencodepy | |
| import py7zr | |
| from py7zr.exceptions import CrcError | |
| # NEW: HTML parsing | |
| from bs4 import BeautifulSoup | |
| # ========================= | |
| # Helpers | |
| # ========================= | |
| def human_bytes(n: int) -> str: | |
| f = float(n) | |
| for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]: | |
| if f < 1024.0: | |
| return f"{f:.2f} {unit}" | |
| f /= 1024.0 | |
| return f"{f:.2f} EiB" | |
| def ensure_dir(p: str): | |
| os.makedirs(p, exist_ok=True) | |
| def fetch_bytes(url: str, timeout: int = 45) -> bytes: | |
| r = requests.get(url, timeout=timeout) | |
| r.raise_for_status() | |
| return r.content | |
| def parse_torrent(raw: bytes) -> Dict: | |
| data = bencodepy.decode(raw) | |
| if not isinstance(data, dict) or b"info" not in data: | |
| raise ValueError("Invalid .torrent (missing 'info').") | |
| info = data[b"info"] | |
| infohash_v1 = hashlib.sha1(bencodepy.encode(info)).hexdigest() | |
| name = info.get(b"name") | |
| if isinstance(name, (bytes, bytearray)): | |
| name = name.decode("utf-8", errors="replace") | |
| files = [] | |
| if b"files" in info: | |
| for f in info[b"files"]: | |
| length = int(f.get(b"length", 0)) | |
| parts = [] | |
| for pe in f.get(b"path", []): | |
| parts.append(pe.decode("utf-8", "replace") if isinstance(pe, (bytes, bytearray)) else str(pe)) | |
| rel = "/".join(parts) if parts else "(unknown)" | |
| files.append({"path": rel, "length": length}) | |
| else: | |
| length = int(info.get(b"length", 0)) | |
| rel = name or "(unnamed)" | |
| files.append({"path": rel, "length": length}) | |
| web_seeds = [] | |
| if b"url-list" in data: | |
| v = data[b"url-list"] | |
| if isinstance(v, (bytes, bytearray)): | |
| web_seeds = [v.decode("utf-8", "replace")] | |
| elif isinstance(v, list): | |
| for u in v: | |
| if isinstance(u, (bytes, bytearray)): | |
| web_seeds.append(u.decode("utf-8", "replace")) | |
| return { | |
| "infohash": infohash_v1, | |
| "name": name or "(unknown)", | |
| "files": files, | |
| "web_seeds": [s.rstrip("/") for s in web_seeds if isinstance(s, str) and s.strip()], | |
| } | |
| def join_url(base: str, *segs: str) -> str: | |
| parts = [base.rstrip("/")] | |
| for s in segs: | |
| enc = "/".join([requests.utils.quote(p) for p in s.split("/")]) | |
| parts.append(enc) | |
| return "/".join(parts) | |
| def _head_or_peek(url: str, timeout: int = 20) -> Tuple[bool, Optional[int]]: | |
| try: | |
| r = requests.head(url, timeout=timeout, allow_redirects=True) | |
| if r.status_code < 400: | |
| size = r.headers.get("Content-Length") | |
| return True, (int(size) if size and size.isdigit() else None) | |
| except Exception: | |
| pass | |
| try: | |
| r = requests.get(url, stream=True, timeout=timeout, allow_redirects=True) | |
| if r.status_code < 400: | |
| size = r.headers.get("Content-Length") | |
| try: | |
| next(r.iter_content(chunk_size=1024)) | |
| except Exception: | |
| pass | |
| try: | |
| r.close() | |
| except Exception: | |
| pass | |
| return True, (int(size) if size and size.isdigit() else None) | |
| except Exception: | |
| pass | |
| return False, None | |
| def supports_range_and_size(url: str, timeout: int = 30) -> Tuple[bool, Optional[int]]: | |
| try: | |
| r = requests.head(url, timeout=timeout, allow_redirects=True) | |
| if r.status_code < 400: | |
| size = int(r.headers.get("Content-Length", "0") or 0) | |
| return (("bytes" in r.headers.get("Accept-Ranges", "").lower()) or size > 0, size if size > 0 else None) | |
| except Exception: | |
| pass | |
| try: | |
| r = requests.get(url, stream=True, timeout=timeout, allow_redirects=True) | |
| r.raise_for_status() | |
| size = int(r.headers.get("Content-Length", "0") or 0) | |
| try: | |
| r.close() | |
| except Exception: | |
| pass | |
| return ("bytes" in r.headers.get("Accept-Ranges", "").lower() or size > 0, size if size > 0 else None) | |
| except Exception: | |
| return False, None | |
| def download_file_exact(url: str, dest_path: pathlib.Path, expected_size: Optional[int], | |
| timeout: int = 120, max_attempts: int = 2): | |
| dest_path.parent.mkdir(parents=True, exist_ok=True) | |
| def _resume_once(): | |
| tmp = dest_path.with_suffix(dest_path.suffix + ".part") | |
| existing = tmp.stat().st_size if tmp.exists() else 0 | |
| can_range, _ = supports_range_and_size(url) | |
| headers = {"Range": f"bytes={existing}-"} if (can_range and existing > 0) else {} | |
| mode = "ab" if headers else "wb" | |
| with requests.get(url, stream=True, timeout=timeout, headers=headers) as r: | |
| r.raise_for_status() | |
| with open(tmp, mode) as f: | |
| for chunk in r.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| f.write(chunk) | |
| tmp.rename(dest_path) | |
| def _fresh_once(): | |
| tmp = dest_path.with_suffix(dest_path.suffix + ".part") | |
| if tmp.exists(): | |
| tmp.unlink() | |
| if dest_path.exists(): | |
| dest_path.unlink() | |
| with requests.get(url, stream=True, timeout=timeout) as r: | |
| r.raise_for_status() | |
| with open(tmp, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| f.write(chunk) | |
| tmp.rename(dest_path) | |
| attempts = 0 | |
| while attempts < max_attempts: | |
| attempts += 1 | |
| if attempts == 1: | |
| _resume_once() | |
| else: | |
| _fresh_once() | |
| if expected_size is None: | |
| return | |
| if dest_path.exists() and dest_path.stat().st_size == expected_size: | |
| return | |
| got = dest_path.stat().st_size if dest_path.exists() else 0 | |
| raise gr.Error(f"Downloaded size mismatch for {dest_path.name}: got {got} bytes, expected {expected_size}.") | |
| def list_files_recursive(root: pathlib.Path) -> List[str]: | |
| out = [] | |
| for p in root.rglob("*"): | |
| if p.is_file(): | |
| out.append(str(p)) | |
| return out | |
| def preview_path(path_str: str, max_bytes: int = 250_000) -> Tuple[str, Optional[str]]: | |
| if not path_str or not os.path.exists(path_str): | |
| return "File not found.", None | |
| p = pathlib.Path(path_str) | |
| suffix = p.suffix.lower() | |
| try: | |
| if suffix in [".csv", ".tsv", ".json", ".ndjson", ".txt", ".log", ".md", ".eml", ".html", ".htm", ".meta"]: | |
| raw = open(p, "rb").read(max_bytes) | |
| text = raw.decode("utf-8", errors="replace") | |
| return f"Previewing {p.name} (truncated):\n\n```\n{text}\n```", None | |
| else: | |
| st = p.stat() | |
| return (f"File: **{p.name}**\nSize: {human_bytes(st.st_size)}\nPath: `{p}`", None) | |
| except Exception as e: | |
| return f"Error previewing file: {type(e).__name__}: {e}", None | |
| def infer_bases_from_torrent_url(torrent_url: str) -> List[str]: | |
| u = torrent_url.strip() | |
| if "/" not in u: | |
| return [] | |
| base = u.rsplit("/", 1)[0] | |
| return [base] | |
| def resolve_download_url(bases: List[str], root_name: str, rel_path: str) -> Optional[str]: | |
| candidates = [] | |
| for b in bases: | |
| candidates.append(join_url(b, root_name, rel_path)) | |
| candidates.append(join_url(b, rel_path)) | |
| for c in candidates: | |
| ok, _ = _head_or_peek(c) | |
| if ok: | |
| return c | |
| return None | |
| def test_7z_integrity(archive_path: str) -> bool: | |
| try: | |
| with py7zr.SevenZipFile(archive_path, mode="r") as z: | |
| z.test() | |
| return True | |
| except Exception: | |
| return False | |
| def safe_extract_7z(archive_path: str, dest_dir: str) -> Tuple[int, List[str]]: | |
| extracted_count = 0 | |
| skipped: List[str] = [] | |
| dest = pathlib.Path(dest_dir) | |
| dest.mkdir(parents=True, exist_ok=True) | |
| try: | |
| with py7zr.SevenZipFile(archive_path, mode="r") as z: | |
| z.extract(path=str(dest)) | |
| return -1, skipped | |
| except CrcError: | |
| pass | |
| with py7zr.SevenZipFile(archive_path, mode="r") as z: | |
| members = [info.filename for info in z.list() if not info.is_directory] | |
| for name in members: | |
| try: | |
| z.extract(targets=[name], path=str(dest)) | |
| extracted_count += 1 | |
| except CrcError: | |
| skipped.append(name) | |
| except Exception: | |
| skipped.append(name) | |
| return extracted_count, skipped | |
| # ========================= | |
| # NEW: HTML/.meta → JSONL exporter | |
| # ========================= | |
| def _parse_meta_file(path: pathlib.Path) -> Dict: | |
| """ | |
| Try JSON parse; else parse simple 'key: value' lines; else return raw text. | |
| """ | |
| raw = path.read_text(encoding="utf-8", errors="replace") | |
| # try JSON | |
| try: | |
| obj = json.loads(raw) | |
| return {"type": "meta", "path": str(path), "content": obj} | |
| except Exception: | |
| pass | |
| # key: value lines | |
| data: Dict[str, str] = {} | |
| lines = [ln.strip() for ln in raw.splitlines() if ln.strip()] | |
| for ln in lines: | |
| if ":" in ln: | |
| k, v = ln.split(":", 1) | |
| data[k.strip()] = v.strip() | |
| if data: | |
| return {"type": "meta", "path": str(path), "content": data} | |
| # fallback raw | |
| return {"type": "meta", "path": str(path), "content_raw": raw} | |
| def _parse_html_file(path: pathlib.Path) -> Dict: | |
| """ | |
| Extract title, meta[name/content], and plain text. | |
| """ | |
| raw = path.read_text(encoding="utf-8", errors="replace") | |
| # Prefer lxml if present; fallback to built-in parser | |
| try: | |
| soup = BeautifulSoup(raw, "lxml") | |
| except Exception: | |
| soup = BeautifulSoup(raw, "html.parser") | |
| title = (soup.title.string.strip() if soup.title and soup.title.string else "") | |
| meta = {} | |
| for tag in soup.find_all("meta"): | |
| name = tag.get("name") or tag.get("property") | |
| content = tag.get("content") | |
| if name and content: | |
| meta[str(name)] = str(content) | |
| text = soup.get_text(separator="\n", strip=True) | |
| return {"type": "html", "path": str(path), "title": title, "meta": meta, "text": text} | |
| def build_jsonl_from_extracted(ex_dir: str, out_dir: str, max_records: Optional[int] = None) -> Tuple[str, int, int]: | |
| """ | |
| Walk extracted dir, convert all .html/.htm and .meta files to JSONL. | |
| Returns (output_path, html_count, meta_count). | |
| """ | |
| ex_root = pathlib.Path(ex_dir) | |
| out_root = pathlib.Path(out_dir) | |
| out_root.mkdir(parents=True, exist_ok=True) | |
| out_path = out_root / "converted.jsonl" | |
| html_count = 0 | |
| meta_count = 0 | |
| written = 0 | |
| with open(out_path, "w", encoding="utf-8") as fout: | |
| for p in ex_root.rglob("*"): | |
| if not p.is_file(): | |
| continue | |
| suf = p.suffix.lower() | |
| try: | |
| if suf in (".html", ".htm"): | |
| rec = _parse_html_file(p) | |
| html_count += 1 | |
| elif suf == ".meta": | |
| rec = _parse_meta_file(p) | |
| meta_count += 1 | |
| else: | |
| continue | |
| fout.write(json.dumps(rec, ensure_ascii=False) + "\n") | |
| written += 1 | |
| if max_records and written >= max_records: | |
| break | |
| except Exception as e: | |
| # Skip unreadable files but carry on | |
| err = {"type": "error", "path": str(p), "error": f"{type(e).__name__}: {e}"} | |
| fout.write(json.dumps(err, ensure_ascii=False) + "\n") | |
| return str(out_path), html_count, meta_count | |
| # ========================= | |
| # Pipeline | |
| # ========================= | |
| def run_pipeline(torrent_url: str): | |
| if not torrent_url.strip().lower().endswith(".torrent"): | |
| raise gr.Error("Please provide a direct .torrent URL.") | |
| raw = fetch_bytes(torrent_url.strip()) | |
| meta = parse_torrent(raw) | |
| seeds = list(meta["web_seeds"]) or infer_bases_from_torrent_url(torrent_url) | |
| infohash = meta["infohash"] | |
| root_name = meta["name"] | |
| sevenz_files = [f for f in meta["files"] if f["path"].lower().endswith(".7z")] | |
| if not sevenz_files: | |
| raise gr.Error("No .7z files listed in the torrent.") | |
| if not seeds: | |
| raise gr.Error("No HTTP source found to fetch files. " | |
| "If this is DDoSecrets, ensure the .torrent sits with the files over HTTPS.") | |
| base_dir = pathlib.Path("/mnt/data/work") / infohash | |
| dl_dir = base_dir / "downloads" | |
| ex_dir = base_dir / "extracted" | |
| ensure_dir(str(dl_dir)) | |
| ensure_dir(str(ex_dir)) | |
| logs = [] | |
| saved_archives = [] | |
| expected_map = {f["path"]: int(f.get("length", 0)) for f in meta["files"]} | |
| for f in sevenz_files: | |
| rel = f["path"] | |
| final_url = None | |
| for seed in seeds: | |
| final_url = resolve_download_url([seed], root_name, rel) | |
| if final_url: | |
| break | |
| if not final_url: | |
| raise gr.Error(f"Could not resolve an HTTP URL for {rel} from bases {seeds}.") | |
| dest = dl_dir / rel | |
| expected_size = expected_map.get(rel) or None | |
| logs.append(f"Downloading: {final_url}") | |
| download_file_exact(final_url, dest, expected_size) | |
| if not dest.exists(): | |
| raise gr.Error(f"Download failed: {final_url}") | |
| logs.append(f"Saved: {dest} ({human_bytes(dest.stat().st_size)})") | |
| if not test_7z_integrity(str(dest)): | |
| logs.append(f"CRC test failed for {dest.name}, retrying download fresh…") | |
| download_file_exact(final_url, dest, expected_size, max_attempts=2) | |
| if not test_7z_integrity(str(dest)): | |
| logs.append(f"Archive still reports CRC problems: {dest.name}. Will try per-file extraction and skip corrupt members.") | |
| saved_archives.append(str(dest)) | |
| for apath in saved_archives: | |
| logs.append(f"Extracting: {apath}") | |
| count, skipped = safe_extract_7z(apath, str(ex_dir)) | |
| if count == -1: | |
| logs.append(f"Extracted OK → {ex_dir}") | |
| else: | |
| logs.append(f"Extracted {count} members to {ex_dir}") | |
| if skipped: | |
| logs.append(f"Skipped {len(skipped)} corrupted member(s):") | |
| show = skipped[:10] | |
| logs += [f" - {s}" for s in show] | |
| if len(skipped) > 10: | |
| logs.append(f" … and {len(skipped) - 10} more") | |
| extracted = list_files_recursive(ex_dir) | |
| if not extracted: | |
| logs.append("No files extracted (archive may be empty).") | |
| else: | |
| logs.append(f"Extracted files: {len(extracted)}") | |
| log_md = "### Run log\n" + "\n".join(f"- {l}" for l in logs) | |
| # RETURN the extracted dir so we can build JSON later | |
| return log_md, extracted, (extracted[0] if extracted else ""), str(ex_dir), str(base_dir) | |
| def do_preview(path: str): | |
| md, _ = preview_path(path) | |
| return md | |
| # NEW: hook to build JSONL and return a downloadable file | |
| def do_build_jsonl(ex_dir: str, base_dir: str): | |
| if not ex_dir or not os.path.isdir(ex_dir): | |
| raise gr.Error("Extraction folder not found. Run the download/extract step first.") | |
| out_dir = str(pathlib.Path(base_dir) / "exports") | |
| out_path, html_count, meta_count = build_jsonl_from_extracted(ex_dir, out_dir) | |
| summary = f"Built JSONL at: `{out_path}`\n- HTML files: {html_count}\n- META files: {meta_count}\n" | |
| return summary, out_path | |
| # ========================= | |
| # UI | |
| # ========================= | |
| with gr.Blocks(title="Torrent → 7z → View (HTTP only)") as demo: | |
| gr.Markdown( | |
| """ | |
| # Torrent → 7z → View (HTTP only) | |
| Paste a **.torrent URL** (with web seeds or DDoSecrets-style layout). | |
| The app downloads `.7z` file(s), verifies size & CRC, extracts them, lets you preview text/csv/json, **and exports all `.html` + `.meta` to a single JSONL**. | |
| """ | |
| ) | |
| url_in = gr.Textbox(label=".torrent URL", placeholder="https://data.ddosecrets.com/Collection/Collection.torrent") | |
| go_btn = gr.Button("Download, Extract & List") | |
| log_out = gr.Markdown() | |
| files_dd = gr.Dropdown(label="Extracted files", choices=[], interactive=True) | |
| preview_btn = gr.Button("Preview selected") | |
| preview_md = gr.Markdown() | |
| # NEW: export controls | |
| gr.Markdown("### Export `.html` and `.meta` → combined JSONL") | |
| build_btn = gr.Button("Build JSONL from extracted") | |
| build_log = gr.Markdown() | |
| dl_file = gr.File(label="Download combined JSONL", interactive=False) | |
| # internal state: extracted dir & base dir for exports | |
| ex_dir_state = gr.State() | |
| base_dir_state = gr.State() | |
| def _go(url): | |
| log, files, first, ex_dir, base_dir = run_pipeline(url) | |
| return ( | |
| log, | |
| gr.update(choices=files, value=(first if first else None)), | |
| (first if first else ""), | |
| ex_dir, | |
| base_dir | |
| ) | |
| go_btn.click(fn=_go, inputs=[url_in], outputs=[log_out, files_dd, files_dd, ex_dir_state, base_dir_state]) | |
| preview_btn.click(fn=do_preview, inputs=[files_dd], outputs=[preview_md]) | |
| build_btn.click(fn=do_build_jsonl, inputs=[ex_dir_state, base_dir_state], outputs=[build_log, dl_file]) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)), | |
| allowed_paths=["/mnt/data"] # allow returning files from /mnt/data | |
| ) | |