Spaces:
Sleeping
Sleeping
File size: 5,995 Bytes
9d8a0cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | from __future__ import annotations
import base64
import hashlib
import logging
from datetime import datetime
from typing import Optional
import httpx
from ingestion.models import RawDocument
from src.confluence_agent.config import confluence_config
logger = logging.getLogger(__name__)
_EXPAND = "body.storage,version,ancestors,space"
class ConfluenceAdapter:
def __init__(
self,
base_url: str = "",
token: str = "",
email: str = "",
team_id: str = "",
) -> None:
self._base_url = (base_url or confluence_config.confluence_base_url).rstrip("/")
self._token = token or confluence_config.confluence_token
self._email = email or confluence_config.confluence_email
self._team_id = team_id or confluence_config.team_id
def _auth_headers(self) -> dict[str, str]:
credentials = base64.b64encode(f"{self._email}:{self._token}".encode()).decode()
return {"Authorization": f"Basic {credentials}", "Accept": "application/json"}
def _client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(headers=self._auth_headers(), timeout=30)
def _page_to_raw_document(self, page: dict) -> Optional[RawDocument]:
try:
page_id = page["id"]
title = page.get("title", "Untitled")
space_key = (page.get("space") or {}).get("key", "")
html_body = page.get("body", {}).get("storage", {}).get("value", "")
ancestors = [a.get("title", "") for a in page.get("ancestors", [])]
version = (page.get("version") or {}).get("number", 0)
doc_id = hashlib.sha256(f"confluence:{page_id}".encode()).hexdigest()
return RawDocument(
doc_id=doc_id,
title=title,
content=html_body, # raw storage HTML — chunker will parse it
source_url=f"{self._base_url}/wiki/spaces/{space_key}/pages/{page_id}",
source_type="confluence",
team_id=self._team_id,
metadata={
"page_id": page_id,
"space_key": space_key,
"ancestors": ancestors,
"version": version,
},
)
except Exception:
logger.exception("confluence_adapter: failed to parse page payload")
return None
async def fetch_page(self, page_id: str) -> Optional[RawDocument]:
if not self._base_url or not self._token:
logger.warning("confluence_adapter: credentials not configured")
return None
url = f"{self._base_url}/wiki/rest/api/content/{page_id}"
async with self._client() as client:
try:
resp = await client.get(url, params={"expand": _EXPAND})
resp.raise_for_status()
return self._page_to_raw_document(resp.json())
except Exception:
logger.exception("confluence_adapter: failed to fetch page %s", page_id)
return None
async def fetch_space(self, space_key: str) -> list[RawDocument]:
if not self._base_url or not self._token:
logger.warning("confluence_adapter: credentials not configured")
return []
url = f"{self._base_url}/wiki/rest/api/content"
params: dict = {
"spaceKey": space_key,
"expand": _EXPAND,
"limit": 50,
"start": 0,
}
docs: list[RawDocument] = []
async with self._client() as client:
while True:
try:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
for page in data.get("results", []):
doc = self._page_to_raw_document(page)
if doc:
docs.append(doc)
if data.get("_links", {}).get("next"):
params["start"] += params["limit"]
else:
break
except Exception:
logger.exception("confluence_adapter: fetch_space failed at start=%d", params["start"])
break
logger.info("confluence_adapter: fetched %d pages from space %s", len(docs), space_key)
return docs
async def fetch_incremental(self, space_key: str, since: datetime) -> list[RawDocument]:
"""Return pages modified since `since`. Uses CQL if available, falls back to full fetch."""
if not self._base_url or not self._token:
return []
since_str = since.strftime("%Y-%m-%d %H:%M")
url = f"{self._base_url}/wiki/rest/api/content/search"
params = {
"cql": f'space = "{space_key}" AND lastModified >= "{since_str}"',
"expand": _EXPAND,
"limit": 50,
"start": 0,
}
docs: list[RawDocument] = []
async with self._client() as client:
while True:
try:
resp = await client.get(url, params=params)
if resp.status_code == 404:
# CQL not supported — fall back to full space fetch
return await self.fetch_space(space_key)
resp.raise_for_status()
data = resp.json()
for page in data.get("results", []):
doc = self._page_to_raw_document(page)
if doc:
docs.append(doc)
if data.get("_links", {}).get("next"):
params["start"] += params["limit"]
else:
break
except Exception:
logger.exception("confluence_adapter: incremental fetch failed")
break
return docs
|