from __future__ import annotations from dataclasses import dataclass from pathlib import Path import tempfile from urllib.parse import urlparse from dotenv import load_dotenv import base64 import json import os import re import shutil import subprocess import requests from requests.auth import HTTPBasicAuth load_dotenv() # examples: https://scilifelab.atlassian.net/wiki/spaces/demo/pages/20381827/Template+pages # and for gitlab: https://gitlab.com/gitlab-examples/cpp-example @dataclass(frozen=True) class DownloadResult: local_path: Path repository_name: str base_url: str _CONFLUENCE_URL_MAP_FILENAME = ".confluence_url_map.json" def _sanitize_filename(value: str) -> str: cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._") return cleaned or "page" def _confluence_headers() -> dict[str, str]: headers = {"Accept": "application/json"} return headers def _confluence_request(url: str, params: dict | None = None) -> dict: response = requests.get( url, params=params, headers=_confluence_headers(), auth=HTTPBasicAuth( os.getenv("CONFLUENCE_USER_EMAIL", "").strip(), os.getenv("ATLASSIAN_API_KEY", "").strip(), ), timeout=30, ) if response.status_code in {401, 403} and os.getenv("CONFLUENCE_READ_ONLY_KEY"): print(response.text) raise RuntimeError( "Confluence auth failed (401/403). If using an Atlassian API token, set " "CONFLUENCE_USER_EMAIL and CONFLUENCE_READ_ONLY_KEY." ) if response.status_code != 200: raise RuntimeError( f"Confluence request failed ({response.status_code}): {response.text}" ) return response.json() def _resolve_confluence_page_url(base_url: str, page: dict) -> str: links = page.get("_links", {}) if isinstance(page, dict) else {} webui = str(links.get("webui") or "").strip() base = str(links.get("base") or base_url).strip() if webui: if webui.startswith("http://") or webui.startswith("https://"): return webui return f"{base.rstrip('/')}/{webui.lstrip('/')}" page_id = str(page.get("id") or "").strip() if page_id: return f"{base_url.rstrip('/')}/pages/{page_id}" return base_url def _fetch_confluence_homepage_id(base_url: str, space_key: str) -> str: payload = _confluence_request( f"{base_url}/rest/api/space/{space_key}", params={"expand": "homepage"}, ) homepage = payload.get("homepage", {}) return str(homepage.get("id") or "").strip() def _write_confluence_url_map(dest_path: Path, url_map: dict[str, str]) -> None: if not url_map: return map_path = dest_path / _CONFLUENCE_URL_MAP_FILENAME map_path.write_text( json.dumps(url_map, ensure_ascii=True, indent=2), encoding="utf-8", ) def _build_base_url(url: str, branch: str = "") -> str: if "gitlab.com" in url: return f"{url}/-/blob/{branch}" if branch else url elif "github.com" in url: return f"{url}/blob/{branch}" if branch else url else: return url # Fallback to original URL if we don't recognize the host def _get_default_branch(url: str) -> str: result = subprocess.run( ["git", "ls-remote", "--symref", url, "HEAD"], capture_output=True, text=True, ) if result.returncode != 0: raise RuntimeError(result.stderr.strip()) for line in result.stdout.splitlines(): if line.startswith("ref:"): # ref: refs/heads/main HEAD return line.split("refs/heads/")[1].split()[0] raise RuntimeError("Could not determine default branch.") def _infer_git_repo_name(url: str) -> str: if "://" in url: path = urlparse(url).path else: # Support scp-style URLs: git@host:org/repo.git if ":" in url: path = url.split(":", 1)[1] else: path = url path = path.rstrip("/") name = Path(path).name if name.endswith(".git"): name = name[:-4] if not name: raise ValueError("Could not determine repository name from URL.") return name def download_git_repo(url: str, dest_root: Path) -> DownloadResult: if shutil.which("git") is None: raise RuntimeError("git is not available on this system.") repo_name = _infer_git_repo_name(url) dest_path = dest_root / repo_name result = subprocess.run( ["git", "clone", "--depth", "1", url, str(dest_path)], capture_output=True, text=True, ) if result.returncode != 0: stderr = (result.stderr or result.stdout or "").strip() raise RuntimeError(f"git clone failed: {stderr}") branch = _get_default_branch(url) print("Baseurl: " + _build_base_url(url, branch)) return DownloadResult( local_path=dest_path, repository_name=repo_name, base_url=_build_base_url(url, branch), ) def _parse_confluence_base_and_page(url: str) -> tuple[str, str]: parsed = urlparse(url) if not parsed.scheme or not parsed.netloc: raise ValueError("Invalid Confluence URL.") page_id = "" query_params = parsed.query.split("&") if parsed.query else [] for entry in query_params: if entry.startswith("pageId="): page_id = entry.split("=", 1)[1] break path = parsed.path or "" if not page_id: match = re.search(r"/pages/(\d+)", path) if match: page_id = match.group(1) base_path = "/wiki" if "/wiki/" in path or path.startswith("/wiki") else "" base_url = f"{parsed.scheme}://{parsed.netloc}{base_path}" if page_id: return base_url, page_id space_key = "" match = re.search(r"/spaces/([^/]+)", path) if not match: match = re.search(r"/display/([^/]+)", path) if match: space_key = match.group(1) if space_key: homepage_id = _fetch_confluence_homepage_id(base_url, space_key) if homepage_id: return base_url, homepage_id raise ValueError( "Could not find a Confluence page ID in the URL. " "Use a link with '/pages/' or '?pageId=', " "or a space URL like '/spaces/' or '/display/'." ) def _fetch_confluence_page( base_url: str, page_id: str, ) -> dict: return _confluence_request( f"{base_url}/rest/api/content/{page_id}", params={"expand": "body.storage,title"}, ) def _fetch_confluence_child_pages( base_url: str, page_id: str, limit: int, ) -> list[dict]: children: list[dict] = [] start = 0 while True: payload = _confluence_request( f"{base_url}/rest/api/content/{page_id}/child/page", params={"limit": limit, "start": start, "expand": "body.storage,title"}, ) results = payload.get("results", []) children.extend(results) if not results: break next_link = payload.get("_links", {}).get("next") if next_link: start += len(results) continue if len(results) < limit: break start += len(results) return children def download_confluence_space( url: str, dest_root: Path, limit: int = 50, ) -> DownloadResult: base_url, root_page_id = _parse_confluence_base_and_page(url) folder_name = _sanitize_filename(f"confluence_{root_page_id}") dest_path = dest_root / folder_name dest_path.mkdir(parents=True, exist_ok=True) total_written = 0 url_map: dict[str, str] = {} root_page_url = "" queue = [root_page_id] seen: set[str] = set() while queue: page_id = queue.pop(0) if page_id in seen: continue seen.add(page_id) page = _fetch_confluence_page(base_url, page_id) title = str(page.get("title") or "untitled") body = page.get("body", {}).get("storage", {}).get("value", "") html = f"

{title}

\n{body}" filename = f"{_sanitize_filename(title)}_{page_id}.html" (dest_path / filename).write_text(html, encoding="utf-8") total_written += 1 page_url = _resolve_confluence_page_url(base_url, page) url_map[Path(filename).as_posix()] = page_url if page_id == root_page_id: root_page_url = page_url children = _fetch_confluence_child_pages(base_url, page_id, limit) for child in children: child_id = str(child.get("id") or "") if child_id: queue.append(child_id) if total_written == 0: raise RuntimeError("No pages were downloaded.") _write_confluence_url_map(dest_path, url_map) return DownloadResult( local_path=dest_path, repository_name=folder_name, base_url=root_page_url or f"{base_url.rstrip('/')}/pages/{root_page_id}", ) def download_source(url: str, source_type: str, dest_root: Path) -> DownloadResult: normalized = source_type.strip().lower() if normalized in {"git", "git repository", "git repo"}: return download_git_repo(url, dest_root) if normalized in {"confluence", "confluence space"}: return download_confluence_space(url, dest_root) raise ValueError(f"Unsupported source type: {source_type}") if __name__ == "__main__": with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) download_result = download_source( "https://fortissgmbh.atlassian.net/wiki/spaces/MA/overview?homepageId=486113542", "confluence", temp_dir_path, ) # for debugging print filetree print(download_result) for path in Path(temp_dir).rglob("*"): print(path) # url = "https://fortissgmbh.atlassian.net/wiki/api/v2/pages/620199938" # auth = HTTPBasicAuth("amougou@fortiss.org", os.getenv("CONFLUENCE_READ_2", "").strip()) # headers = { # "Accept": "application/json" # } # response = requests.request( # "GET", # url, # headers=headers, # auth=auth # ) # print(response.text)