Spaces:
Sleeping
Sleeping
Ananth Shyam
feat: Implement file and JIRA agents with parsing, processing, and ingestion capabilities
9d8a0cf | from __future__ import annotations | |
| import base64 | |
| import hashlib | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional | |
| import httpx | |
| from ingestion.models import RawDocument | |
| from src.confluence_agent.config import confluence_config | |
| logger = logging.getLogger(__name__) | |
| _EXPAND = "body.storage,version,ancestors,space" | |
| class ConfluenceAdapter: | |
| def __init__( | |
| self, | |
| base_url: str = "", | |
| token: str = "", | |
| email: str = "", | |
| team_id: str = "", | |
| ) -> None: | |
| self._base_url = (base_url or confluence_config.confluence_base_url).rstrip("/") | |
| self._token = token or confluence_config.confluence_token | |
| self._email = email or confluence_config.confluence_email | |
| self._team_id = team_id or confluence_config.team_id | |
| def _auth_headers(self) -> dict[str, str]: | |
| credentials = base64.b64encode(f"{self._email}:{self._token}".encode()).decode() | |
| return {"Authorization": f"Basic {credentials}", "Accept": "application/json"} | |
| def _client(self) -> httpx.AsyncClient: | |
| return httpx.AsyncClient(headers=self._auth_headers(), timeout=30) | |
| def _page_to_raw_document(self, page: dict) -> Optional[RawDocument]: | |
| try: | |
| page_id = page["id"] | |
| title = page.get("title", "Untitled") | |
| space_key = (page.get("space") or {}).get("key", "") | |
| html_body = page.get("body", {}).get("storage", {}).get("value", "") | |
| ancestors = [a.get("title", "") for a in page.get("ancestors", [])] | |
| version = (page.get("version") or {}).get("number", 0) | |
| doc_id = hashlib.sha256(f"confluence:{page_id}".encode()).hexdigest() | |
| return RawDocument( | |
| doc_id=doc_id, | |
| title=title, | |
| content=html_body, # raw storage HTML — chunker will parse it | |
| source_url=f"{self._base_url}/wiki/spaces/{space_key}/pages/{page_id}", | |
| source_type="confluence", | |
| team_id=self._team_id, | |
| metadata={ | |
| "page_id": page_id, | |
| "space_key": space_key, | |
| "ancestors": ancestors, | |
| "version": version, | |
| }, | |
| ) | |
| except Exception: | |
| logger.exception("confluence_adapter: failed to parse page payload") | |
| return None | |
| async def fetch_page(self, page_id: str) -> Optional[RawDocument]: | |
| if not self._base_url or not self._token: | |
| logger.warning("confluence_adapter: credentials not configured") | |
| return None | |
| url = f"{self._base_url}/wiki/rest/api/content/{page_id}" | |
| async with self._client() as client: | |
| try: | |
| resp = await client.get(url, params={"expand": _EXPAND}) | |
| resp.raise_for_status() | |
| return self._page_to_raw_document(resp.json()) | |
| except Exception: | |
| logger.exception("confluence_adapter: failed to fetch page %s", page_id) | |
| return None | |
| async def fetch_space(self, space_key: str) -> list[RawDocument]: | |
| if not self._base_url or not self._token: | |
| logger.warning("confluence_adapter: credentials not configured") | |
| return [] | |
| url = f"{self._base_url}/wiki/rest/api/content" | |
| params: dict = { | |
| "spaceKey": space_key, | |
| "expand": _EXPAND, | |
| "limit": 50, | |
| "start": 0, | |
| } | |
| docs: list[RawDocument] = [] | |
| async with self._client() as client: | |
| while True: | |
| try: | |
| resp = await client.get(url, params=params) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| for page in data.get("results", []): | |
| doc = self._page_to_raw_document(page) | |
| if doc: | |
| docs.append(doc) | |
| if data.get("_links", {}).get("next"): | |
| params["start"] += params["limit"] | |
| else: | |
| break | |
| except Exception: | |
| logger.exception("confluence_adapter: fetch_space failed at start=%d", params["start"]) | |
| break | |
| logger.info("confluence_adapter: fetched %d pages from space %s", len(docs), space_key) | |
| return docs | |
| async def fetch_incremental(self, space_key: str, since: datetime) -> list[RawDocument]: | |
| """Return pages modified since `since`. Uses CQL if available, falls back to full fetch.""" | |
| if not self._base_url or not self._token: | |
| return [] | |
| since_str = since.strftime("%Y-%m-%d %H:%M") | |
| url = f"{self._base_url}/wiki/rest/api/content/search" | |
| params = { | |
| "cql": f'space = "{space_key}" AND lastModified >= "{since_str}"', | |
| "expand": _EXPAND, | |
| "limit": 50, | |
| "start": 0, | |
| } | |
| docs: list[RawDocument] = [] | |
| async with self._client() as client: | |
| while True: | |
| try: | |
| resp = await client.get(url, params=params) | |
| if resp.status_code == 404: | |
| # CQL not supported — fall back to full space fetch | |
| return await self.fetch_space(space_key) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| for page in data.get("results", []): | |
| doc = self._page_to_raw_document(page) | |
| if doc: | |
| docs.append(doc) | |
| if data.get("_links", {}).get("next"): | |
| params["start"] += params["limit"] | |
| else: | |
| break | |
| except Exception: | |
| logger.exception("confluence_adapter: incremental fetch failed") | |
| break | |
| return docs | |