File size: 8,817 Bytes
0493349
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
"""Web scraper adapter for URL-based content ingestion."""

import asyncio
import hashlib
import logging
from datetime import datetime
from typing import Optional
from dataclasses import dataclass
from urllib.parse import urlparse

import aiohttp
from bs4 import BeautifulSoup
from docling import DocumentConverter

from src.config import settings

logger = logging.getLogger(__name__)


@dataclass
class RawDocument:
    """Normalized document model for all sources."""

    uri: str
    source_type: str
    source_subtype: str
    title: str
    content: str
    content_hash: str
    created_at: datetime
    updated_at: datetime
    author_ids: list[str]
    space_id: str
    parent_ids: list[str] = None
    tags: list[str] = None
    raw_metadata: dict = None
    content_type: str = "text"
    priority: int = 3
    ttl_seconds: Optional[int] = None
    source_config: dict = None

    def __post_init__(self):
        if self.parent_ids is None:
            self.parent_ids = []
        if self.tags is None:
            self.tags = []
        if self.raw_metadata is None:
            self.raw_metadata = {}
        if self.source_config is None:
            self.source_config = {}


class WebScraperAdapter:
    """Adapter for scraping and indexing web content."""

    def __init__(self):
        self.timeout = aiohttp.ClientTimeout(total=settings.integrations.web_scraper_timeout)
        self.max_content_size = settings.integrations.web_scraper_max_content_size
        self.user_agent = settings.integrations.user_agent

    async def connect(self, credentials: dict) -> None:
        """No authentication needed for public web scraping."""
        pass

    async def fetch_url(self, url: str) -> Optional[RawDocument]:
        """
        Fetch and parse a single URL.

        Args:
            url: URL to scrape

        Returns:
            RawDocument if successful, None otherwise
        """
        try:
            async with aiohttp.ClientSession() as session:
                headers = {"User-Agent": self.user_agent}

                async with session.get(
                    url, headers=headers, timeout=self.timeout, ssl=False
                ) as response:
                    if response.status != 200:
                        logger.warning(f"Failed to fetch {url}: status {response.status}")
                        return None

                    # Check content size
                    content = await response.read()
                    if len(content) > self.max_content_size:
                        logger.warning(f"Content too large for {url}")
                        return None

                    # Parse content
                    doc = await self._parse_content(
                        url, content, response.headers.get("content-type", "text/html")
                    )
                    return doc

        except asyncio.TimeoutError:
            logger.error(f"Timeout fetching {url}")
            return None
        except Exception as e:
            logger.error(f"Error fetching {url}: {e}")
            return None

    async def fetch_all(self, space_id: str) -> list[RawDocument]:
        """Fetch all URLs in a space (not typically used for web scraping)."""
        return []

    async def fetch_incremental(self, space_id: str, last_sync_at: datetime) -> list[RawDocument]:
        """Fetch only changed pages (requires sitemap or incremental tracking)."""
        return []

    async def fetch_by_query(self, query: str) -> list[RawDocument]:
        """Search for URLs containing keywords (via stored URLs)."""
        return []

    async def _parse_content(
        self, url: str, content: bytes, content_type: str
    ) -> RawDocument:
        """Parse HTML/PDF content and extract text."""

        # Detect file type
        if "application/pdf" in content_type:
            text = await self._extract_pdf_text(content)
            doc_type = "pdf"
        else:
            text = self._extract_html_text(content)
            doc_type = "webpage"

        # Extract title
        soup = BeautifulSoup(content, "html.parser")
        title_tag = soup.find("title")
        title = title_tag.text.strip() if title_tag else urlparse(url).netloc

        # Generate content hash
        content_hash = hashlib.sha256(content).hexdigest()

        # Build RawDocument
        doc = RawDocument(
            uri=f"web://{hashlib.sha256(url.encode()).hexdigest()}",
            source_type="web",
            source_subtype=doc_type,
            title=title,
            content=text,
            content_hash=content_hash,
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow(),
            author_ids=["scraper"],
            space_id="web_content",
            tags=["scraped", doc_type],
            priority=2,
            ttl_seconds=None,
            raw_metadata={"url": url, "domain": urlparse(url).netloc},
        )

        return doc

    def _extract_html_text(self, content: bytes) -> str:
        """Extract text from HTML."""
        soup = BeautifulSoup(content, "html.parser")

        # Remove script and style elements
        for script in soup(["script", "style"]):
            script.decompose()

        # Get text
        text = soup.get_text()

        # Clean up whitespace
        lines = (line.strip() for line in text.splitlines())
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        text = "\n".join(chunk for chunk in chunks if chunk)

        return text

    async def _extract_pdf_text(self, content: bytes) -> str:
        """Extract text from PDF using docling."""
        try:
            converter = DocumentConverter()
            # Save to temp file for docling
            import tempfile
            with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f:
                f.write(content)
                temp_path = f.name

            # Parse PDF
            doc_result = converter.convert(temp_path)
            text = doc_result.document.export_to_markdown()

            # Cleanup
            import os
            os.remove(temp_path)

            return text
        except Exception as e:
            logger.warning(f"Failed to extract PDF text: {e}")
            return "[PDF content could not be extracted]"


class SitemapAdapter(WebScraperAdapter):
    """Adapter for scraping sitemap.xml and crawling all URLs."""

    async def fetch_all(self, space_id: str) -> list[RawDocument]:
        """
        Fetch all URLs from sitemap.

        Args:
            space_id: Base domain (e.g., "https://example.com")

        Returns:
            List of RawDocuments from all URLs in sitemap
        """
        sitemap_url = f"{space_id.rstrip('/')}/sitemap.xml"

        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(sitemap_url, timeout=self.timeout) as response:
                    if response.status != 200:
                        logger.warning(f"Sitemap not found at {sitemap_url}")
                        return []

                    sitemap_content = await response.text()

            # Parse sitemap URLs
            soup = BeautifulSoup(sitemap_content, "xml")
            urls = [loc.text for loc in soup.find_all("loc")]

            # Fetch each URL
            docs = []
            for url in urls:
                try:
                    doc = await self.fetch_url(url)
                    if doc:
                        docs.append(doc)
                except Exception as e:
                    logger.error(f"Error fetching {url}: {e}")
                    continue

                # Rate limiting
                await asyncio.sleep(0.1)

            return docs

        except Exception as e:
            logger.error(f"Error fetching sitemap: {e}")
            return []


class UrlListAdapter(WebScraperAdapter):
    """Adapter for scraping a predefined list of URLs."""

    async def fetch_all(self, space_id: str) -> list[RawDocument]:
        """
        Fetch all URLs from a list (passed as space_id).

        Args:
            space_id: JSON-encoded list of URLs

        Returns:
            List of RawDocuments from all URLs
        """
        import json

        try:
            urls = json.loads(space_id)
        except json.JSONDecodeError:
            logger.error(f"Invalid URL list: {space_id}")
            return []

        docs = []
        for url in urls:
            try:
                doc = await self.fetch_url(url)
                if doc:
                    docs.append(doc)
            except Exception as e:
                logger.error(f"Error fetching {url}: {e}")
                continue

            # Rate limiting
            await asyncio.sleep(0.1)

        return docs