DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Core crawler logic for Janus.
Navigate, wait, capture, retry — async-first.
"""
import asyncio
import logging
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from app.services.crawler.browser import BrowserManager
from app.services.crawler.processor import ContentProcessor
from app.config import CRAWLER_TIMEOUT
logger = logging.getLogger(__name__)
@dataclass
class CrawlResult:
url: str
success: bool
markdown: str = ""
title: str = ""
links: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None
class JanusCrawler:
"""
Self-hosted web crawler — no API keys, no rate limits.
Uses Playwright for browser automation with stealth.
"""
def __init__(self, timeout: int = CRAWLER_TIMEOUT):
self.timeout = timeout
self._processor = ContentProcessor()
async def crawl(self, url: str) -> CrawlResult:
"""Crawl a single URL and return clean content."""
async with BrowserManager() as browser:
page = await browser.get_page()
try:
response = await page.goto(
url,
wait_until="domcontentloaded",
timeout=self.timeout * 1000,
)
if response and response.status >= 400:
return CrawlResult(
url=url,
success=False,
error=f"HTTP {response.status}",
)
try:
await page.wait_for_load_state("networkidle", timeout=10000)
except Exception:
logger.debug(
"Crawler networkidle timeout for %s; proceeding with loaded DOM",
url,
)
html = await page.content()
title = await page.title()
markdown = self._processor.process(html)
links = self._processor.extract_links(html, url)
metadata = self._processor.extract_metadata(html, title)
return CrawlResult(
url=url,
success=True,
markdown=markdown,
title=title,
links=links,
metadata=metadata,
)
except asyncio.TimeoutError:
logger.warning(f"Crawl timeout: {url}")
return CrawlResult(url=url, success=False, error="Timeout")
except Exception as e:
logger.warning(f"Crawl failed: {url}{e}")
return CrawlResult(url=url, success=False, error=str(e)[:200])
async def crawl_batch(
self, urls: List[str], max_concurrent: int = 3
) -> List[CrawlResult]:
"""Crawl multiple URLs concurrently."""
semaphore = asyncio.Semaphore(max_concurrent)
async def _crawl_one(url: str) -> CrawlResult:
async with semaphore:
return await self.crawl(url)
tasks = [_crawl_one(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
r
if isinstance(r, CrawlResult)
else CrawlResult(url="", success=False, error=str(r))
for r in results
]
def crawl_sync(self, url: str) -> CrawlResult:
"""Synchronous wrapper for use in non-async contexts."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, self.crawl(url))
return future.result()
return loop.run_until_complete(self.crawl(url))
except RuntimeError:
return asyncio.run(self.crawl(url))