File size: 5,995 Bytes
9d8a0cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from __future__ import annotations

import base64
import hashlib
import logging
from datetime import datetime
from typing import Optional

import httpx

from ingestion.models import RawDocument
from src.confluence_agent.config import confluence_config

logger = logging.getLogger(__name__)

_EXPAND = "body.storage,version,ancestors,space"


class ConfluenceAdapter:
    def __init__(
        self,
        base_url: str = "",
        token: str = "",
        email: str = "",
        team_id: str = "",
    ) -> None:
        self._base_url = (base_url or confluence_config.confluence_base_url).rstrip("/")
        self._token = token or confluence_config.confluence_token
        self._email = email or confluence_config.confluence_email
        self._team_id = team_id or confluence_config.team_id

    def _auth_headers(self) -> dict[str, str]:
        credentials = base64.b64encode(f"{self._email}:{self._token}".encode()).decode()
        return {"Authorization": f"Basic {credentials}", "Accept": "application/json"}

    def _client(self) -> httpx.AsyncClient:
        return httpx.AsyncClient(headers=self._auth_headers(), timeout=30)

    def _page_to_raw_document(self, page: dict) -> Optional[RawDocument]:
        try:
            page_id = page["id"]
            title = page.get("title", "Untitled")
            space_key = (page.get("space") or {}).get("key", "")
            html_body = page.get("body", {}).get("storage", {}).get("value", "")
            ancestors = [a.get("title", "") for a in page.get("ancestors", [])]
            version = (page.get("version") or {}).get("number", 0)

            doc_id = hashlib.sha256(f"confluence:{page_id}".encode()).hexdigest()
            return RawDocument(
                doc_id=doc_id,
                title=title,
                content=html_body,  # raw storage HTML — chunker will parse it
                source_url=f"{self._base_url}/wiki/spaces/{space_key}/pages/{page_id}",
                source_type="confluence",
                team_id=self._team_id,
                metadata={
                    "page_id": page_id,
                    "space_key": space_key,
                    "ancestors": ancestors,
                    "version": version,
                },
            )
        except Exception:
            logger.exception("confluence_adapter: failed to parse page payload")
            return None

    async def fetch_page(self, page_id: str) -> Optional[RawDocument]:
        if not self._base_url or not self._token:
            logger.warning("confluence_adapter: credentials not configured")
            return None
        url = f"{self._base_url}/wiki/rest/api/content/{page_id}"
        async with self._client() as client:
            try:
                resp = await client.get(url, params={"expand": _EXPAND})
                resp.raise_for_status()
                return self._page_to_raw_document(resp.json())
            except Exception:
                logger.exception("confluence_adapter: failed to fetch page %s", page_id)
                return None

    async def fetch_space(self, space_key: str) -> list[RawDocument]:
        if not self._base_url or not self._token:
            logger.warning("confluence_adapter: credentials not configured")
            return []
        url = f"{self._base_url}/wiki/rest/api/content"
        params: dict = {
            "spaceKey": space_key,
            "expand": _EXPAND,
            "limit": 50,
            "start": 0,
        }
        docs: list[RawDocument] = []
        async with self._client() as client:
            while True:
                try:
                    resp = await client.get(url, params=params)
                    resp.raise_for_status()
                    data = resp.json()
                    for page in data.get("results", []):
                        doc = self._page_to_raw_document(page)
                        if doc:
                            docs.append(doc)
                    if data.get("_links", {}).get("next"):
                        params["start"] += params["limit"]
                    else:
                        break
                except Exception:
                    logger.exception("confluence_adapter: fetch_space failed at start=%d", params["start"])
                    break

        logger.info("confluence_adapter: fetched %d pages from space %s", len(docs), space_key)
        return docs

    async def fetch_incremental(self, space_key: str, since: datetime) -> list[RawDocument]:
        """Return pages modified since `since`. Uses CQL if available, falls back to full fetch."""
        if not self._base_url or not self._token:
            return []
        since_str = since.strftime("%Y-%m-%d %H:%M")
        url = f"{self._base_url}/wiki/rest/api/content/search"
        params = {
            "cql": f'space = "{space_key}" AND lastModified >= "{since_str}"',
            "expand": _EXPAND,
            "limit": 50,
            "start": 0,
        }
        docs: list[RawDocument] = []
        async with self._client() as client:
            while True:
                try:
                    resp = await client.get(url, params=params)
                    if resp.status_code == 404:
                        # CQL not supported — fall back to full space fetch
                        return await self.fetch_space(space_key)
                    resp.raise_for_status()
                    data = resp.json()
                    for page in data.get("results", []):
                        doc = self._page_to_raw_document(page)
                        if doc:
                            docs.append(doc)
                    if data.get("_links", {}).get("next"):
                        params["start"] += params["limit"]
                    else:
                        break
                except Exception:
                    logger.exception("confluence_adapter: incremental fetch failed")
                    break

        return docs