GodSpeed / src /confluence_agent /adapter.py
Ananth Shyam
feat: Implement file and JIRA agents with parsing, processing, and ingestion capabilities
9d8a0cf
from __future__ import annotations
import base64
import hashlib
import logging
from datetime import datetime
from typing import Optional
import httpx
from ingestion.models import RawDocument
from src.confluence_agent.config import confluence_config
logger = logging.getLogger(__name__)
_EXPAND = "body.storage,version,ancestors,space"
class ConfluenceAdapter:
def __init__(
self,
base_url: str = "",
token: str = "",
email: str = "",
team_id: str = "",
) -> None:
self._base_url = (base_url or confluence_config.confluence_base_url).rstrip("/")
self._token = token or confluence_config.confluence_token
self._email = email or confluence_config.confluence_email
self._team_id = team_id or confluence_config.team_id
def _auth_headers(self) -> dict[str, str]:
credentials = base64.b64encode(f"{self._email}:{self._token}".encode()).decode()
return {"Authorization": f"Basic {credentials}", "Accept": "application/json"}
def _client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(headers=self._auth_headers(), timeout=30)
def _page_to_raw_document(self, page: dict) -> Optional[RawDocument]:
try:
page_id = page["id"]
title = page.get("title", "Untitled")
space_key = (page.get("space") or {}).get("key", "")
html_body = page.get("body", {}).get("storage", {}).get("value", "")
ancestors = [a.get("title", "") for a in page.get("ancestors", [])]
version = (page.get("version") or {}).get("number", 0)
doc_id = hashlib.sha256(f"confluence:{page_id}".encode()).hexdigest()
return RawDocument(
doc_id=doc_id,
title=title,
content=html_body, # raw storage HTML — chunker will parse it
source_url=f"{self._base_url}/wiki/spaces/{space_key}/pages/{page_id}",
source_type="confluence",
team_id=self._team_id,
metadata={
"page_id": page_id,
"space_key": space_key,
"ancestors": ancestors,
"version": version,
},
)
except Exception:
logger.exception("confluence_adapter: failed to parse page payload")
return None
async def fetch_page(self, page_id: str) -> Optional[RawDocument]:
if not self._base_url or not self._token:
logger.warning("confluence_adapter: credentials not configured")
return None
url = f"{self._base_url}/wiki/rest/api/content/{page_id}"
async with self._client() as client:
try:
resp = await client.get(url, params={"expand": _EXPAND})
resp.raise_for_status()
return self._page_to_raw_document(resp.json())
except Exception:
logger.exception("confluence_adapter: failed to fetch page %s", page_id)
return None
async def fetch_space(self, space_key: str) -> list[RawDocument]:
if not self._base_url or not self._token:
logger.warning("confluence_adapter: credentials not configured")
return []
url = f"{self._base_url}/wiki/rest/api/content"
params: dict = {
"spaceKey": space_key,
"expand": _EXPAND,
"limit": 50,
"start": 0,
}
docs: list[RawDocument] = []
async with self._client() as client:
while True:
try:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
for page in data.get("results", []):
doc = self._page_to_raw_document(page)
if doc:
docs.append(doc)
if data.get("_links", {}).get("next"):
params["start"] += params["limit"]
else:
break
except Exception:
logger.exception("confluence_adapter: fetch_space failed at start=%d", params["start"])
break
logger.info("confluence_adapter: fetched %d pages from space %s", len(docs), space_key)
return docs
async def fetch_incremental(self, space_key: str, since: datetime) -> list[RawDocument]:
"""Return pages modified since `since`. Uses CQL if available, falls back to full fetch."""
if not self._base_url or not self._token:
return []
since_str = since.strftime("%Y-%m-%d %H:%M")
url = f"{self._base_url}/wiki/rest/api/content/search"
params = {
"cql": f'space = "{space_key}" AND lastModified >= "{since_str}"',
"expand": _EXPAND,
"limit": 50,
"start": 0,
}
docs: list[RawDocument] = []
async with self._client() as client:
while True:
try:
resp = await client.get(url, params=params)
if resp.status_code == 404:
# CQL not supported — fall back to full space fetch
return await self.fetch_space(space_key)
resp.raise_for_status()
data = resp.json()
for page in data.get("results", []):
doc = self._page_to_raw_document(page)
if doc:
docs.append(doc)
if data.get("_links", {}).get("next"):
params["start"] += params["limit"]
else:
break
except Exception:
logger.exception("confluence_adapter: incremental fetch failed")
break
return docs