regulens / scripts /source_ingest.py
Maximilian Amougou
Upload source_ingest.py
753d95d verified
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import tempfile
from urllib.parse import urlparse
from dotenv import load_dotenv
import base64
import json
import os
import re
import shutil
import subprocess
import requests
from requests.auth import HTTPBasicAuth
load_dotenv()
# examples: https://scilifelab.atlassian.net/wiki/spaces/demo/pages/20381827/Template+pages
# and for gitlab: https://gitlab.com/gitlab-examples/cpp-example
@dataclass(frozen=True)
class DownloadResult:
local_path: Path
repository_name: str
base_url: str
_CONFLUENCE_URL_MAP_FILENAME = ".confluence_url_map.json"
def _sanitize_filename(value: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._")
return cleaned or "page"
def _confluence_headers() -> dict[str, str]:
headers = {"Accept": "application/json"}
return headers
def _confluence_request(url: str, params: dict | None = None) -> dict:
response = requests.get(
url,
params=params,
headers=_confluence_headers(),
auth=HTTPBasicAuth(
os.getenv("CONFLUENCE_USER_EMAIL", "").strip(),
os.getenv("ATLASSIAN_API_KEY", "").strip(),
),
timeout=30,
)
if response.status_code in {401, 403} and os.getenv("CONFLUENCE_READ_ONLY_KEY"):
print(response.text)
raise RuntimeError(
"Confluence auth failed (401/403). If using an Atlassian API token, set "
"CONFLUENCE_USER_EMAIL and CONFLUENCE_READ_ONLY_KEY."
)
if response.status_code != 200:
raise RuntimeError(
f"Confluence request failed ({response.status_code}): {response.text}"
)
return response.json()
def _resolve_confluence_page_url(base_url: str, page: dict) -> str:
links = page.get("_links", {}) if isinstance(page, dict) else {}
webui = str(links.get("webui") or "").strip()
base = str(links.get("base") or base_url).strip()
if webui:
if webui.startswith("http://") or webui.startswith("https://"):
return webui
return f"{base.rstrip('/')}/{webui.lstrip('/')}"
page_id = str(page.get("id") or "").strip()
if page_id:
return f"{base_url.rstrip('/')}/pages/{page_id}"
return base_url
def _fetch_confluence_homepage_id(base_url: str, space_key: str) -> str:
payload = _confluence_request(
f"{base_url}/rest/api/space/{space_key}",
params={"expand": "homepage"},
)
homepage = payload.get("homepage", {})
return str(homepage.get("id") or "").strip()
def _write_confluence_url_map(dest_path: Path, url_map: dict[str, str]) -> None:
if not url_map:
return
map_path = dest_path / _CONFLUENCE_URL_MAP_FILENAME
map_path.write_text(
json.dumps(url_map, ensure_ascii=True, indent=2),
encoding="utf-8",
)
def _build_base_url(url: str, branch: str = "") -> str:
if "gitlab.com" in url:
return f"{url}/-/blob/{branch}" if branch else url
elif "github.com" in url:
return f"{url}/blob/{branch}" if branch else url
else:
return url # Fallback to original URL if we don't recognize the host
def _get_default_branch(url: str) -> str:
result = subprocess.run(
["git", "ls-remote", "--symref", url, "HEAD"],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip())
for line in result.stdout.splitlines():
if line.startswith("ref:"):
# ref: refs/heads/main HEAD
return line.split("refs/heads/")[1].split()[0]
raise RuntimeError("Could not determine default branch.")
def _infer_git_repo_name(url: str) -> str:
if "://" in url:
path = urlparse(url).path
else:
# Support scp-style URLs: git@host:org/repo.git
if ":" in url:
path = url.split(":", 1)[1]
else:
path = url
path = path.rstrip("/")
name = Path(path).name
if name.endswith(".git"):
name = name[:-4]
if not name:
raise ValueError("Could not determine repository name from URL.")
return name
def download_git_repo(url: str, dest_root: Path) -> DownloadResult:
if shutil.which("git") is None:
raise RuntimeError("git is not available on this system.")
repo_name = _infer_git_repo_name(url)
dest_path = dest_root / repo_name
result = subprocess.run(
["git", "clone", "--depth", "1", url, str(dest_path)],
capture_output=True,
text=True,
)
if result.returncode != 0:
stderr = (result.stderr or result.stdout or "").strip()
raise RuntimeError(f"git clone failed: {stderr}")
branch = _get_default_branch(url)
print("Baseurl: " + _build_base_url(url, branch))
return DownloadResult(
local_path=dest_path,
repository_name=repo_name,
base_url=_build_base_url(url, branch),
)
def _parse_confluence_base_and_page(url: str) -> tuple[str, str]:
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
raise ValueError("Invalid Confluence URL.")
page_id = ""
query_params = parsed.query.split("&") if parsed.query else []
for entry in query_params:
if entry.startswith("pageId="):
page_id = entry.split("=", 1)[1]
break
path = parsed.path or ""
if not page_id:
match = re.search(r"/pages/(\d+)", path)
if match:
page_id = match.group(1)
base_path = "/wiki" if "/wiki/" in path or path.startswith("/wiki") else ""
base_url = f"{parsed.scheme}://{parsed.netloc}{base_path}"
if page_id:
return base_url, page_id
space_key = ""
match = re.search(r"/spaces/([^/]+)", path)
if not match:
match = re.search(r"/display/([^/]+)", path)
if match:
space_key = match.group(1)
if space_key:
homepage_id = _fetch_confluence_homepage_id(base_url, space_key)
if homepage_id:
return base_url, homepage_id
raise ValueError(
"Could not find a Confluence page ID in the URL. "
"Use a link with '/pages/<PAGEID>' or '?pageId=<PAGEID>', "
"or a space URL like '/spaces/<SPACEKEY>' or '/display/<SPACEKEY>'."
)
def _fetch_confluence_page(
base_url: str,
page_id: str,
) -> dict:
return _confluence_request(
f"{base_url}/rest/api/content/{page_id}",
params={"expand": "body.storage,title"},
)
def _fetch_confluence_child_pages(
base_url: str,
page_id: str,
limit: int,
) -> list[dict]:
children: list[dict] = []
start = 0
while True:
payload = _confluence_request(
f"{base_url}/rest/api/content/{page_id}/child/page",
params={"limit": limit, "start": start, "expand": "body.storage,title"},
)
results = payload.get("results", [])
children.extend(results)
if not results:
break
next_link = payload.get("_links", {}).get("next")
if next_link:
start += len(results)
continue
if len(results) < limit:
break
start += len(results)
return children
def download_confluence_space(
url: str,
dest_root: Path,
limit: int = 50,
) -> DownloadResult:
base_url, root_page_id = _parse_confluence_base_and_page(url)
folder_name = _sanitize_filename(f"confluence_{root_page_id}")
dest_path = dest_root / folder_name
dest_path.mkdir(parents=True, exist_ok=True)
total_written = 0
url_map: dict[str, str] = {}
root_page_url = ""
queue = [root_page_id]
seen: set[str] = set()
while queue:
page_id = queue.pop(0)
if page_id in seen:
continue
seen.add(page_id)
page = _fetch_confluence_page(base_url, page_id)
title = str(page.get("title") or "untitled")
body = page.get("body", {}).get("storage", {}).get("value", "")
html = f"<h1>{title}</h1>\n{body}"
filename = f"{_sanitize_filename(title)}_{page_id}.html"
(dest_path / filename).write_text(html, encoding="utf-8")
total_written += 1
page_url = _resolve_confluence_page_url(base_url, page)
url_map[Path(filename).as_posix()] = page_url
if page_id == root_page_id:
root_page_url = page_url
children = _fetch_confluence_child_pages(base_url, page_id, limit)
for child in children:
child_id = str(child.get("id") or "")
if child_id:
queue.append(child_id)
if total_written == 0:
raise RuntimeError("No pages were downloaded.")
_write_confluence_url_map(dest_path, url_map)
return DownloadResult(
local_path=dest_path,
repository_name=folder_name,
base_url=root_page_url or f"{base_url.rstrip('/')}/pages/{root_page_id}",
)
def download_source(url: str, source_type: str, dest_root: Path) -> DownloadResult:
normalized = source_type.strip().lower()
if normalized in {"git", "git repository", "git repo"}:
return download_git_repo(url, dest_root)
if normalized in {"confluence", "confluence space"}:
return download_confluence_space(url, dest_root)
raise ValueError(f"Unsupported source type: {source_type}")
if __name__ == "__main__":
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
download_result = download_source(
"https://fortissgmbh.atlassian.net/wiki/spaces/MA/overview?homepageId=486113542",
"confluence",
temp_dir_path,
)
# for debugging print filetree
print(download_result)
for path in Path(temp_dir).rglob("*"):
print(path)
# url = "https://fortissgmbh.atlassian.net/wiki/api/v2/pages/620199938"
# auth = HTTPBasicAuth("amougou@fortiss.org", os.getenv("CONFLUENCE_READ_2", "").strip())
# headers = {
# "Accept": "application/json"
# }
# response = requests.request(
# "GET",
# url,
# headers=headers,
# auth=auth
# )
# print(response.text)