import os import json import hashlib import pathlib from typing import List, Tuple, Optional, Dict import requests import gradio as gr import bencodepy import py7zr from py7zr.exceptions import CrcError # NEW: HTML parsing from bs4 import BeautifulSoup # ========================= # Helpers # ========================= def human_bytes(n: int) -> str: f = float(n) for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]: if f < 1024.0: return f"{f:.2f} {unit}" f /= 1024.0 return f"{f:.2f} EiB" def ensure_dir(p: str): os.makedirs(p, exist_ok=True) def fetch_bytes(url: str, timeout: int = 45) -> bytes: r = requests.get(url, timeout=timeout) r.raise_for_status() return r.content def parse_torrent(raw: bytes) -> Dict: data = bencodepy.decode(raw) if not isinstance(data, dict) or b"info" not in data: raise ValueError("Invalid .torrent (missing 'info').") info = data[b"info"] infohash_v1 = hashlib.sha1(bencodepy.encode(info)).hexdigest() name = info.get(b"name") if isinstance(name, (bytes, bytearray)): name = name.decode("utf-8", errors="replace") files = [] if b"files" in info: for f in info[b"files"]: length = int(f.get(b"length", 0)) parts = [] for pe in f.get(b"path", []): parts.append(pe.decode("utf-8", "replace") if isinstance(pe, (bytes, bytearray)) else str(pe)) rel = "/".join(parts) if parts else "(unknown)" files.append({"path": rel, "length": length}) else: length = int(info.get(b"length", 0)) rel = name or "(unnamed)" files.append({"path": rel, "length": length}) web_seeds = [] if b"url-list" in data: v = data[b"url-list"] if isinstance(v, (bytes, bytearray)): web_seeds = [v.decode("utf-8", "replace")] elif isinstance(v, list): for u in v: if isinstance(u, (bytes, bytearray)): web_seeds.append(u.decode("utf-8", "replace")) return { "infohash": infohash_v1, "name": name or "(unknown)", "files": files, "web_seeds": [s.rstrip("/") for s in web_seeds if isinstance(s, str) and s.strip()], } def join_url(base: str, *segs: str) -> str: parts = [base.rstrip("/")] for s in segs: enc = "/".join([requests.utils.quote(p) for p in s.split("/")]) parts.append(enc) return "/".join(parts) def _head_or_peek(url: str, timeout: int = 20) -> Tuple[bool, Optional[int]]: try: r = requests.head(url, timeout=timeout, allow_redirects=True) if r.status_code < 400: size = r.headers.get("Content-Length") return True, (int(size) if size and size.isdigit() else None) except Exception: pass try: r = requests.get(url, stream=True, timeout=timeout, allow_redirects=True) if r.status_code < 400: size = r.headers.get("Content-Length") try: next(r.iter_content(chunk_size=1024)) except Exception: pass try: r.close() except Exception: pass return True, (int(size) if size and size.isdigit() else None) except Exception: pass return False, None def supports_range_and_size(url: str, timeout: int = 30) -> Tuple[bool, Optional[int]]: try: r = requests.head(url, timeout=timeout, allow_redirects=True) if r.status_code < 400: size = int(r.headers.get("Content-Length", "0") or 0) return (("bytes" in r.headers.get("Accept-Ranges", "").lower()) or size > 0, size if size > 0 else None) except Exception: pass try: r = requests.get(url, stream=True, timeout=timeout, allow_redirects=True) r.raise_for_status() size = int(r.headers.get("Content-Length", "0") or 0) try: r.close() except Exception: pass return ("bytes" in r.headers.get("Accept-Ranges", "").lower() or size > 0, size if size > 0 else None) except Exception: return False, None def download_file_exact(url: str, dest_path: pathlib.Path, expected_size: Optional[int], timeout: int = 120, max_attempts: int = 2): dest_path.parent.mkdir(parents=True, exist_ok=True) def _resume_once(): tmp = dest_path.with_suffix(dest_path.suffix + ".part") existing = tmp.stat().st_size if tmp.exists() else 0 can_range, _ = supports_range_and_size(url) headers = {"Range": f"bytes={existing}-"} if (can_range and existing > 0) else {} mode = "ab" if headers else "wb" with requests.get(url, stream=True, timeout=timeout, headers=headers) as r: r.raise_for_status() with open(tmp, mode) as f: for chunk in r.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) tmp.rename(dest_path) def _fresh_once(): tmp = dest_path.with_suffix(dest_path.suffix + ".part") if tmp.exists(): tmp.unlink() if dest_path.exists(): dest_path.unlink() with requests.get(url, stream=True, timeout=timeout) as r: r.raise_for_status() with open(tmp, "wb") as f: for chunk in r.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) tmp.rename(dest_path) attempts = 0 while attempts < max_attempts: attempts += 1 if attempts == 1: _resume_once() else: _fresh_once() if expected_size is None: return if dest_path.exists() and dest_path.stat().st_size == expected_size: return got = dest_path.stat().st_size if dest_path.exists() else 0 raise gr.Error(f"Downloaded size mismatch for {dest_path.name}: got {got} bytes, expected {expected_size}.") def list_files_recursive(root: pathlib.Path) -> List[str]: out = [] for p in root.rglob("*"): if p.is_file(): out.append(str(p)) return out def preview_path(path_str: str, max_bytes: int = 250_000) -> Tuple[str, Optional[str]]: if not path_str or not os.path.exists(path_str): return "File not found.", None p = pathlib.Path(path_str) suffix = p.suffix.lower() try: if suffix in [".csv", ".tsv", ".json", ".ndjson", ".txt", ".log", ".md", ".eml", ".html", ".htm", ".meta"]: raw = open(p, "rb").read(max_bytes) text = raw.decode("utf-8", errors="replace") return f"Previewing {p.name} (truncated):\n\n```\n{text}\n```", None else: st = p.stat() return (f"File: **{p.name}**\nSize: {human_bytes(st.st_size)}\nPath: `{p}`", None) except Exception as e: return f"Error previewing file: {type(e).__name__}: {e}", None def infer_bases_from_torrent_url(torrent_url: str) -> List[str]: u = torrent_url.strip() if "/" not in u: return [] base = u.rsplit("/", 1)[0] return [base] def resolve_download_url(bases: List[str], root_name: str, rel_path: str) -> Optional[str]: candidates = [] for b in bases: candidates.append(join_url(b, root_name, rel_path)) candidates.append(join_url(b, rel_path)) for c in candidates: ok, _ = _head_or_peek(c) if ok: return c return None def test_7z_integrity(archive_path: str) -> bool: try: with py7zr.SevenZipFile(archive_path, mode="r") as z: z.test() return True except Exception: return False def safe_extract_7z(archive_path: str, dest_dir: str) -> Tuple[int, List[str]]: extracted_count = 0 skipped: List[str] = [] dest = pathlib.Path(dest_dir) dest.mkdir(parents=True, exist_ok=True) try: with py7zr.SevenZipFile(archive_path, mode="r") as z: z.extract(path=str(dest)) return -1, skipped except CrcError: pass with py7zr.SevenZipFile(archive_path, mode="r") as z: members = [info.filename for info in z.list() if not info.is_directory] for name in members: try: z.extract(targets=[name], path=str(dest)) extracted_count += 1 except CrcError: skipped.append(name) except Exception: skipped.append(name) return extracted_count, skipped # ========================= # NEW: HTML/.meta → JSONL exporter # ========================= def _parse_meta_file(path: pathlib.Path) -> Dict: """ Try JSON parse; else parse simple 'key: value' lines; else return raw text. """ raw = path.read_text(encoding="utf-8", errors="replace") # try JSON try: obj = json.loads(raw) return {"type": "meta", "path": str(path), "content": obj} except Exception: pass # key: value lines data: Dict[str, str] = {} lines = [ln.strip() for ln in raw.splitlines() if ln.strip()] for ln in lines: if ":" in ln: k, v = ln.split(":", 1) data[k.strip()] = v.strip() if data: return {"type": "meta", "path": str(path), "content": data} # fallback raw return {"type": "meta", "path": str(path), "content_raw": raw} def _parse_html_file(path: pathlib.Path) -> Dict: """ Extract title, meta[name/content], and plain text. """ raw = path.read_text(encoding="utf-8", errors="replace") # Prefer lxml if present; fallback to built-in parser try: soup = BeautifulSoup(raw, "lxml") except Exception: soup = BeautifulSoup(raw, "html.parser") title = (soup.title.string.strip() if soup.title and soup.title.string else "") meta = {} for tag in soup.find_all("meta"): name = tag.get("name") or tag.get("property") content = tag.get("content") if name and content: meta[str(name)] = str(content) text = soup.get_text(separator="\n", strip=True) return {"type": "html", "path": str(path), "title": title, "meta": meta, "text": text} def build_jsonl_from_extracted(ex_dir: str, out_dir: str, max_records: Optional[int] = None) -> Tuple[str, int, int]: """ Walk extracted dir, convert all .html/.htm and .meta files to JSONL. Returns (output_path, html_count, meta_count). """ ex_root = pathlib.Path(ex_dir) out_root = pathlib.Path(out_dir) out_root.mkdir(parents=True, exist_ok=True) out_path = out_root / "converted.jsonl" html_count = 0 meta_count = 0 written = 0 with open(out_path, "w", encoding="utf-8") as fout: for p in ex_root.rglob("*"): if not p.is_file(): continue suf = p.suffix.lower() try: if suf in (".html", ".htm"): rec = _parse_html_file(p) html_count += 1 elif suf == ".meta": rec = _parse_meta_file(p) meta_count += 1 else: continue fout.write(json.dumps(rec, ensure_ascii=False) + "\n") written += 1 if max_records and written >= max_records: break except Exception as e: # Skip unreadable files but carry on err = {"type": "error", "path": str(p), "error": f"{type(e).__name__}: {e}"} fout.write(json.dumps(err, ensure_ascii=False) + "\n") return str(out_path), html_count, meta_count # ========================= # Pipeline # ========================= def run_pipeline(torrent_url: str): if not torrent_url.strip().lower().endswith(".torrent"): raise gr.Error("Please provide a direct .torrent URL.") raw = fetch_bytes(torrent_url.strip()) meta = parse_torrent(raw) seeds = list(meta["web_seeds"]) or infer_bases_from_torrent_url(torrent_url) infohash = meta["infohash"] root_name = meta["name"] sevenz_files = [f for f in meta["files"] if f["path"].lower().endswith(".7z")] if not sevenz_files: raise gr.Error("No .7z files listed in the torrent.") if not seeds: raise gr.Error("No HTTP source found to fetch files. " "If this is DDoSecrets, ensure the .torrent sits with the files over HTTPS.") base_dir = pathlib.Path("/mnt/data/work") / infohash dl_dir = base_dir / "downloads" ex_dir = base_dir / "extracted" ensure_dir(str(dl_dir)) ensure_dir(str(ex_dir)) logs = [] saved_archives = [] expected_map = {f["path"]: int(f.get("length", 0)) for f in meta["files"]} for f in sevenz_files: rel = f["path"] final_url = None for seed in seeds: final_url = resolve_download_url([seed], root_name, rel) if final_url: break if not final_url: raise gr.Error(f"Could not resolve an HTTP URL for {rel} from bases {seeds}.") dest = dl_dir / rel expected_size = expected_map.get(rel) or None logs.append(f"Downloading: {final_url}") download_file_exact(final_url, dest, expected_size) if not dest.exists(): raise gr.Error(f"Download failed: {final_url}") logs.append(f"Saved: {dest} ({human_bytes(dest.stat().st_size)})") if not test_7z_integrity(str(dest)): logs.append(f"CRC test failed for {dest.name}, retrying download fresh…") download_file_exact(final_url, dest, expected_size, max_attempts=2) if not test_7z_integrity(str(dest)): logs.append(f"Archive still reports CRC problems: {dest.name}. Will try per-file extraction and skip corrupt members.") saved_archives.append(str(dest)) for apath in saved_archives: logs.append(f"Extracting: {apath}") count, skipped = safe_extract_7z(apath, str(ex_dir)) if count == -1: logs.append(f"Extracted OK → {ex_dir}") else: logs.append(f"Extracted {count} members to {ex_dir}") if skipped: logs.append(f"Skipped {len(skipped)} corrupted member(s):") show = skipped[:10] logs += [f" - {s}" for s in show] if len(skipped) > 10: logs.append(f" … and {len(skipped) - 10} more") extracted = list_files_recursive(ex_dir) if not extracted: logs.append("No files extracted (archive may be empty).") else: logs.append(f"Extracted files: {len(extracted)}") log_md = "### Run log\n" + "\n".join(f"- {l}" for l in logs) # RETURN the extracted dir so we can build JSON later return log_md, extracted, (extracted[0] if extracted else ""), str(ex_dir), str(base_dir) def do_preview(path: str): md, _ = preview_path(path) return md # NEW: hook to build JSONL and return a downloadable file def do_build_jsonl(ex_dir: str, base_dir: str): if not ex_dir or not os.path.isdir(ex_dir): raise gr.Error("Extraction folder not found. Run the download/extract step first.") out_dir = str(pathlib.Path(base_dir) / "exports") out_path, html_count, meta_count = build_jsonl_from_extracted(ex_dir, out_dir) summary = f"Built JSONL at: `{out_path}`\n- HTML files: {html_count}\n- META files: {meta_count}\n" return summary, out_path # ========================= # UI # ========================= with gr.Blocks(title="Torrent → 7z → View (HTTP only)") as demo: gr.Markdown( """ # Torrent → 7z → View (HTTP only) Paste a **.torrent URL** (with web seeds or DDoSecrets-style layout). The app downloads `.7z` file(s), verifies size & CRC, extracts them, lets you preview text/csv/json, **and exports all `.html` + `.meta` to a single JSONL**. """ ) url_in = gr.Textbox(label=".torrent URL", placeholder="https://data.ddosecrets.com/Collection/Collection.torrent") go_btn = gr.Button("Download, Extract & List") log_out = gr.Markdown() files_dd = gr.Dropdown(label="Extracted files", choices=[], interactive=True) preview_btn = gr.Button("Preview selected") preview_md = gr.Markdown() # NEW: export controls gr.Markdown("### Export `.html` and `.meta` → combined JSONL") build_btn = gr.Button("Build JSONL from extracted") build_log = gr.Markdown() dl_file = gr.File(label="Download combined JSONL", interactive=False) # internal state: extracted dir & base dir for exports ex_dir_state = gr.State() base_dir_state = gr.State() def _go(url): log, files, first, ex_dir, base_dir = run_pipeline(url) return ( log, gr.update(choices=files, value=(first if first else None)), (first if first else ""), ex_dir, base_dir ) go_btn.click(fn=_go, inputs=[url_in], outputs=[log_out, files_dd, files_dd, ex_dir_state, base_dir_state]) preview_btn.click(fn=do_preview, inputs=[files_dd], outputs=[preview_md]) build_btn.click(fn=do_build_jsonl, inputs=[ex_dir_state, base_dir_state], outputs=[build_log, dl_file]) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), allowed_paths=["/mnt/data"] # allow returning files from /mnt/data )