Spaces:
Paused
Paused
| # from crawl4ai import AsyncWebCrawler | |
| # from urllib.parse import urlparse | |
| import aiohttp | |
| import asyncio | |
| # from asyncio.exceptions import TimeoutError as async_timeout | |
| from fast_async import make_async | |
| from bs4 import BeautifulSoup, NavigableString | |
| # import secrets | |
| # from datetime import datetime | |
| # import random | |
| import os | |
| import re | |
| import uuid | |
| from typing import List, Dict, Optional #, Tuple | |
| from io import BytesIO | |
| import PyPDF2 | |
| from fake_useragent import FakeUserAgent | |
| from htmlrag import clean_html, build_block_tree, EmbedHTMLPruner, BM25HTMLPruner | |
| from transformers import AutoTokenizer, AutoConfig | |
| import torch | |
| import time | |
| # class Crawler: | |
| # def __init__(self, user_dir=None, rate_limit=1, headless=True, verbose=False): | |
| # self.session_pool = {} # Track active sessions | |
| # self.verbose = verbose | |
| # self.rate_limit = rate_limit | |
| # self.user_dir = user_dir | |
| # self.headless = headless | |
| # self.crawler = AsyncWebCrawler( | |
| # context_options={"userDataDir": self.user_dir}, | |
| # headless=self.headless, | |
| # verbose=self.verbose | |
| # ) | |
| # # Browser context management | |
| # self._browser_contexts = {} | |
| # self._context_locks = {} | |
| # async def get_browser_context(self, session_id): | |
| # """Get or create a browser context with proper locking""" | |
| # if session_id not in self._context_locks: | |
| # self._context_locks[session_id] = asyncio.Lock() | |
| # async with self._context_locks[session_id]: | |
| # if session_id not in self._browser_contexts: | |
| # context = await self.crawler.new_context() | |
| # self._browser_contexts[session_id] = context | |
| # return self._browser_contexts[session_id] | |
| # async def cleanup_browser_context(self, session_id): | |
| # """Safely cleanup browser context""" | |
| # if session_id in self._context_locks: | |
| # async with self._context_locks[session_id]: | |
| # if session_id in self._browser_contexts: | |
| # try: | |
| # await asyncio.shield( | |
| # self._browser_contexts[session_id].close() | |
| # ) | |
| # except Exception as e: | |
| # print(f"Error cleaning up browser context: {e}") | |
| # finally: | |
| # del self._browser_contexts[session_id] | |
| # def create_session(self): | |
| # """Create a new session with secure ID""" | |
| # session_id = secrets.token_urlsafe(32) # Secure session ID | |
| # self.session_pool[session_id] = { | |
| # 'created_at': datetime.now(), | |
| # 'last_used': datetime.now(), | |
| # 'requests_count': 0 | |
| # } | |
| # return session_id | |
| # def rotate_session(self, session_id): | |
| # """Implement session rotation logic""" | |
| # if self.session_pool[session_id]['requests_count'] > 100: | |
| # self.cleanup_session(session_id) | |
| # return self.create_session() | |
| # return session_id | |
| # def is_dynamic_page(self, html_content: str) -> Tuple[bool, Optional[str]]: | |
| # """Analyzes HTML content to determine if a webpage is dynamically loaded""" | |
| # def _check_structural_indicators(soup: BeautifulSoup) -> Dict[str, int]: | |
| # """Check structural indicators of dynamic content loading.""" | |
| # scores = { | |
| # 'empty_containers': 0, | |
| # 'repeated_structures': 0, | |
| # 'api_endpoints': 0, | |
| # 'state_management': 0 | |
| # } | |
| # # 1. Check for empty content containers | |
| # main_containers = soup.find_all(['main', 'div', 'section'], | |
| # class_=lambda x: x and any(term in str(x).lower() | |
| # for term in ['content', 'main', 'feed', 'list', 'container'])) | |
| # for container in main_containers: | |
| # # Check if container is empty or has minimal content | |
| # if len(container.find_all()) < 3: | |
| # scores['empty_containers'] += 1 | |
| # # Check for repeated similar structures (common in dynamic lists) | |
| # children = container.find_all(recursive=False) | |
| # if children: | |
| # first_child_class = children[0].get('class', []) | |
| # similar_siblings = [c for c in children[1:] | |
| # if c.get('class', []) == first_child_class] | |
| # if len(similar_siblings) > 0: | |
| # scores['repeated_structures'] += 1 | |
| # # 2. Check for API endpoints in scripts | |
| # scripts = soup.find_all('script', {'src': True}) | |
| # api_patterns = ['/api/', '/graphql', '/rest/', '/v1/', '/v2/'] | |
| # for script in scripts: | |
| # if any(pattern in script['src'] for pattern in api_patterns): | |
| # scores['api_endpoints'] += 1 | |
| # # 3. Look for state management setup | |
| # state_patterns = [ | |
| # r'window\.__INITIAL_STATE__', | |
| # r'window\.__PRELOADED_STATE__', | |
| # r'__REDUX_STATE__', | |
| # r'__NUXT__', | |
| # r'__NEXT_DATA__', | |
| # r'window\.__data' | |
| # ] | |
| # inline_scripts = soup.find_all('script') | |
| # for script in inline_scripts: | |
| # if script.string: | |
| # for pattern in state_patterns: | |
| # if re.search(pattern, script.string): | |
| # scores['state_management'] += 1 | |
| # return scores | |
| # def _check_modern_framework_indicators(soup: BeautifulSoup) -> Dict[str, int]: | |
| # """Check for indicators of modern web frameworks and dynamic loading patterns.""" | |
| # scores = { | |
| # 'framework_roots': 0, | |
| # 'hydration': 0, | |
| # 'routing': 0 | |
| # } | |
| # # 1. Framework-specific root elements | |
| # framework_roots = { | |
| # 'react': ['react-root', 'react-app', 'root', '__next'], | |
| # 'angular': ['ng-version', 'ng-app'], | |
| # 'vue': ['v-app', '#app', 'nuxt-app'], | |
| # 'modern': ['app-root', 'application', 'spa-root'] | |
| # } | |
| # for framework, identifiers in framework_roots.items(): | |
| # for id_value in identifiers: | |
| # if (soup.find(attrs={'id': re.compile(id_value, re.I)}) or | |
| # soup.find(attrs={'class': re.compile(id_value, re.I)}) or | |
| # soup.find(attrs={'data-': re.compile(id_value, re.I)})): | |
| # scores['framework_roots'] += 1 | |
| # # 2. Check for hydration indicators | |
| # hydration_patterns = [ | |
| # r'hydrate', | |
| # r'createRoot', | |
| # r'reactive', | |
| # r'observable' | |
| # ] | |
| # scripts = soup.find_all('script') | |
| # for script in scripts: | |
| # if script.string: | |
| # for pattern in hydration_patterns: | |
| # if re.search(pattern, script.string): | |
| # scores['hydration'] += 1 | |
| # # 3. Check for dynamic routing setup | |
| # router_patterns = [ | |
| # 'router-view', | |
| # 'router-link', | |
| # 'route-link', | |
| # 'history.push', | |
| # 'navigation' | |
| # ] | |
| # for pattern in router_patterns: | |
| # if soup.find(class_=re.compile(pattern, re.I)) or \ | |
| # soup.find(id=re.compile(pattern, re.I)): | |
| # scores['routing'] += 1 | |
| # return scores | |
| # def _check_dynamic_loading_patterns(soup: BeautifulSoup) -> Dict[str, int]: | |
| # """Check for various dynamic content loading patterns.""" | |
| # scores = { | |
| # 'infinite_scroll': 0, | |
| # 'load_more_buttons': 0, | |
| # 'pagination': 0, | |
| # 'lazy_loading': 0, | |
| # 'loading_indicators': 0 | |
| # } | |
| # # 1. Check for infinite scroll indicators | |
| # scroll_indicators = [ | |
| # 'infinite-scroll', | |
| # 'data-infinite', | |
| # 'data-virtualized', | |
| # 'virtual-scroll', | |
| # 'scroll-container', | |
| # 'scroll-viewport' | |
| # ] | |
| # for indicator in scroll_indicators: | |
| # elements = soup.find_all( | |
| # lambda tag: any(indicator.lower() in str(v).lower() | |
| # for v in tag.attrs.values()) | |
| # ) | |
| # if elements: | |
| # scores['infinite_scroll'] += len(elements) | |
| # # 2. Check for load more buttons | |
| # button_patterns = [ | |
| # r'load[_-]?more', | |
| # r'show[_-]?more', | |
| # r'view[_-]?more', | |
| # r'see[_-]?more', | |
| # r'more[_-]?posts', | |
| # r'more[_-]?results' | |
| # ] | |
| # for pattern in button_patterns: | |
| # elements = soup.find_all( | |
| # ['button', 'a', 'div', 'span'], | |
| # text=re.compile(pattern, re.I) | |
| # ) | |
| # if elements: | |
| # scores['load_more_buttons'] += len(elements) | |
| # # 3. Check for pagination | |
| # pagination_patterns = [ | |
| # 'pagination', | |
| # 'page-numbers', | |
| # 'page-nav', | |
| # 'page-links' | |
| # ] | |
| # for pattern in pagination_patterns: | |
| # elements = soup.find_all(class_=re.compile(pattern, re.I)) | |
| # if elements: | |
| # scores['pagination'] += len(elements) | |
| # # 4. Check for lazy loading | |
| # lazy_patterns = ['lazy', 'data-src', 'data-lazy'] | |
| # for pattern in lazy_patterns: | |
| # elements = soup.find_all( | |
| # lambda tag: any(pattern.lower() in str(v).lower() | |
| # for v in tag.attrs.values()) | |
| # ) | |
| # if elements: | |
| # scores['lazy_loading'] += len(elements) | |
| # # 5. Check for loading indicators | |
| # loading_patterns = [ | |
| # 'loading', | |
| # 'spinner', | |
| # 'skeleton', | |
| # 'placeholder', | |
| # 'shimmer' | |
| # ] | |
| # for pattern in loading_patterns: | |
| # elements = soup.find_all(class_=re.compile(pattern, re.I)) | |
| # if elements: | |
| # scores['loading_indicators'] += len(elements) | |
| # return scores | |
| # def _evaluate_dynamic_indicators( | |
| # structural: Dict[str, int], | |
| # framework: Dict[str, int], | |
| # loading: Dict[str, int] | |
| # ) -> Tuple[bool, Optional[str]]: | |
| # """Evaluate dynamic indicators and return JavaScript instructions.""" | |
| # methods = [] | |
| # js_snippets = [] | |
| # # Infinite Scroll | |
| # if loading['infinite_scroll'] > 0: | |
| # methods.append("scroll") | |
| # js_snippets.append( | |
| # """ | |
| # window.scrollTo(0, document.body.scrollHeight); | |
| # await new Promise(resolve => setTimeout(resolve, 1000)); | |
| # """.strip().replace('\n', '') | |
| # ) | |
| # # Load More Buttons | |
| # if loading['load_more_buttons'] > 0: | |
| # methods.append("button") | |
| # js_snippets.append( | |
| # """ | |
| # const button = Array.from(document.querySelectorAll('button, a, div, span')).find( | |
| # el => /load[_-]?more|show[_-]?more/i.test(el.textContent) | |
| # ); | |
| # if (button) { | |
| # button.click(); | |
| # await new Promise(resolve => setTimeout(resolve, 1000)); | |
| # } else { | |
| # console.warn("No 'Load More' button found."); | |
| # } | |
| # """.strip().replace('\n', '') | |
| # ) | |
| # # Paginated Interfaces | |
| # if loading.get('pagination', 0) > 0: | |
| # methods.append("pagination") | |
| # js_snippets.append( | |
| # """ | |
| # const nextPage = document.querySelector('a[rel="next"], .pagination-next, .page-next'); | |
| # if (nextPage) { | |
| # nextPage.click(); | |
| # await new Promise(resolve => setTimeout(resolve, 1000)); | |
| # } else { | |
| # console.warn("No pagination link found."); | |
| # } | |
| # """.strip().replace('\n', '') | |
| # ) | |
| # # Lazy Loading | |
| # if loading.get('lazy_loading', 0) > 0: | |
| # methods.append("lazy") | |
| # js_snippets.append( | |
| # """ | |
| # if (window.__INITIAL_STATE__ || window.__REDUX_STATE__ || window.__NUXT__ || window.__NEXT_DATA__) { | |
| # console.log('Framework state detected. Consider monitoring network requests for further actions.'); | |
| # } | |
| # """.strip().replace('\n', '') | |
| # ) | |
| # # Framework and State Management Indicators | |
| # if framework['framework_roots'] > 0 or structural['state_management'] > 0: | |
| # methods.append("stateful") | |
| # js_snippets.append( | |
| # """ | |
| # if (window.__INITIAL_STATE__ || window.__REDUX_STATE__ || window.__NUXT__ || window.__NEXT_DATA__) { | |
| # console.log('Detected stateful framework data loading.'); | |
| # } | |
| # """.strip().replace('\n', '') | |
| # ) | |
| # # API-Driven Content | |
| # if structural['api_endpoints'] > 0: | |
| # methods.append("api") | |
| # js_snippets.append( | |
| # """ | |
| # console.log('API requests detected. Use browser devtools to inspect network activity for specific endpoints.'); | |
| # """.strip().replace('\n', '') | |
| # ) | |
| # # Aggregate and finalize | |
| # if methods: | |
| # js_code = "\n".join(js_snippets) | |
| # return True, js_code | |
| # return False, None | |
| # # Main execution | |
| # soup = BeautifulSoup(html_content, 'html.parser') | |
| # # Run all checks | |
| # structural_scores = _check_structural_indicators(soup) | |
| # framework_scores = _check_modern_framework_indicators(soup) | |
| # loading_scores = _check_dynamic_loading_patterns(soup) | |
| # # Evaluate results | |
| # return _evaluate_dynamic_indicators(structural_scores, framework_scores, loading_scores) | |
| # async def crawl( | |
| # self, | |
| # url, | |
| # depth=2, | |
| # max_pages=5, | |
| # session_id=None, | |
| # human_simulation=True, | |
| # rotate_user_agent=True, | |
| # rotate_proxy=True, | |
| # return_html=False | |
| # ): | |
| # if not session_id: | |
| # session_id = self.create_session() | |
| # session_id = self.rotate_session(session_id) | |
| # # List of rotating user agents | |
| # user_agents = [ | |
| # 'Chrome/115.0.0.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', | |
| # 'Chrome/115.0.0.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', | |
| # 'Chrome/115.0.0.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', | |
| # 'Chrome/115.0.0.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', | |
| # 'Chrome/115.0.0.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36' | |
| # ] | |
| # # List of rotating proxies | |
| # proxies = [ | |
| # "http://50.62.183.123:80", | |
| # "http://104.129.60.84:6516", | |
| # "http://156.228.118.163:3128", | |
| # "http://142.111.104.97:6107", | |
| # "http://156.228.99.99:3128" | |
| # ] | |
| # try: | |
| # async with self.crawler as crawler: | |
| # # Rotate user agent and optimize headers for each attempt | |
| # headers = { | |
| # "User-Agent": random.choice(user_agents) if rotate_user_agent else user_agents[0], | |
| # "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", | |
| # "Accept-Language": "en-US,en;q=0.5", | |
| # "Accept-Encoding": "gzip, deflate", | |
| # "Connection": "keep-alive", | |
| # "Upgrade-Insecure-Requests": "1", | |
| # "Sec-Fetch-Dest": "document", | |
| # "Sec-Fetch-Mode": "navigate", | |
| # "Sec-Fetch-Site": "none", | |
| # "Sec-Fetch-User": "?1", | |
| # "Cache-Control": "max-age=0" | |
| # } | |
| # # Update crawler headers for rotation | |
| # crawler.crawler_strategy.headers = headers | |
| # if rotate_proxy: | |
| # # Update crawler proxy for rotation | |
| # crawler.crawler_strategy.proxy = random.choice(proxies) | |
| # result_1 = await crawler.arun( | |
| # session_id=session_id, | |
| # url=url, | |
| # magic=True if human_simulation else False, | |
| # simulate_user=True if human_simulation else False, | |
| # override_navigator=True if human_simulation else False, | |
| # depth=depth, | |
| # max_pages=max_pages, | |
| # bypass_cache=True, | |
| # remove_overlay_elements=True, | |
| # delay_before_retrieve_html=1.0, | |
| # verbose=self.verbose | |
| # ) | |
| # # Update session metrics | |
| # self.session_pool[session_id]['requests_count'] += 1 | |
| # self.session_pool[session_id]['last_used'] = datetime.now() | |
| # if result_1.success: | |
| # if hasattr(result_1, 'html'): | |
| # success, js_code = self.is_dynamic_page(result_1.html) | |
| # if success: | |
| # async with crawler as crawler: | |
| # # Update crawler headers for rotation | |
| # crawler.crawler_strategy.headers = headers | |
| # if rotate_proxy: | |
| # # Update crawler proxy for rotation | |
| # crawler.crawler_strategy.proxy = random.choice(proxies) | |
| # print(f"Executing JS code: {js_code}") | |
| # result_2 = await crawler.arun( | |
| # session_id=session_id, | |
| # url=url, | |
| # magic=True if human_simulation else False, | |
| # simulate_user=True if human_simulation else False, | |
| # override_navigator=True if human_simulation else False, | |
| # depth=depth, | |
| # max_pages=max_pages, | |
| # js_code=js_code, | |
| # bypass_cache=True, | |
| # remove_overlay_elements=True, | |
| # delay_before_retrieve_html=1.0, | |
| # verbose=self.verbose | |
| # ) | |
| # if result_2.success: | |
| # result = result_2 | |
| # else: | |
| # result = result_1 | |
| # # Update session metrics | |
| # self.session_pool[session_id]['requests_count'] += 1 | |
| # self.session_pool[session_id]['last_used'] = datetime.now() | |
| # else: | |
| # result = result_1 | |
| # if return_html and hasattr(result, 'html'): | |
| # return result.html | |
| # elif hasattr(result, 'fit_markdown'): | |
| # return result.fit_markdown | |
| # elif hasattr(result, 'markdown'): | |
| # return self.extract_content(result.markdown) | |
| # except Exception as e: | |
| # print(f"Error crawling {url}: {str(e)}") | |
| # return None | |
| # async def crawl_with_retry( | |
| # self, | |
| # url, | |
| # depth=2, | |
| # max_pages=5, | |
| # max_retries=3, | |
| # backoff_factor=1, | |
| # session_id=None, | |
| # human_simulation=True, | |
| # rotate_user_agent=True, | |
| # rotate_proxy=True, | |
| # return_html=False, | |
| # timeout=10.0 | |
| # ): | |
| # """Crawl with retry logic and anti-blocking measures""" | |
| # async def attempt_crawl(attempt): | |
| # try: | |
| # async with async_timeout.timeout(timeout): | |
| # context = await self.get_browser_context(session_id) | |
| # return await self.crawl( | |
| # context, | |
| # url, | |
| # depth, | |
| # max_pages, | |
| # session_id, | |
| # human_simulation, | |
| # rotate_user_agent, | |
| # rotate_proxy, | |
| # return_html | |
| # ) | |
| # except asyncio.TimeoutError: | |
| # print(f"Timeout on attempt {attempt} for {url}") | |
| # raise | |
| # except Exception as e: | |
| # print(f"Error on attempt {attempt} for {url}: {e}") | |
| # raise | |
| # if not self.is_valid_url(url) and not self.is_html_url(url): | |
| # print(f"Invalid URL: {url}") | |
| # return f"No web results found for query: {url}" | |
| # for attempt in range(max_retries): | |
| # try: | |
| # if attempt > 0: | |
| # # Add delay between retries with exponential backoff | |
| # delay = backoff_factor * (2 ** (attempt - 1)) | |
| # await asyncio.sleep(delay) | |
| # return await attempt_crawl(attempt + 1) | |
| # except Exception as e: | |
| # if attempt == max_retries - 1: | |
| # print(f"Max retries ({max_retries}) reached for {url}") | |
| # return f"Failed to crawl after {max_retries} attempts: {url}" | |
| # continue | |
| # return f"No content found after {max_retries} attempts for: {url}" | |
| # def extract_content(self, html_content): | |
| # soup = BeautifulSoup(html_content, 'html.parser') | |
| # for script in soup(["script", "style"]): | |
| # script.decompose() | |
| # text = soup.get_text() | |
| # lines = (line.strip() for line in text.splitlines()) | |
| # chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| # text = '\n'.join(chunk for chunk in chunks if chunk) | |
| # return text | |
| # def cleanup_session(self, session_id): | |
| # """Clean up a session""" | |
| # print(f"Cleaning up session {session_id}") | |
| # if session_id in self.session_pool: | |
| # self.crawler.crawler_strategy.kill_session(session_id) | |
| # del self.session_pool[session_id] | |
| # def cleanup_expired_sessions(self): | |
| # """Regular cleanup of expired sessions using proper time calculation""" | |
| # try: | |
| # current_time = datetime.now() | |
| # expired_sessions = [] | |
| # for sid, data in self.session_pool.items(): | |
| # # Calculate time difference in seconds | |
| # time_diff = (current_time - data['last_used']).total_seconds() | |
| # # Check if more than 1 hour (3600 seconds) | |
| # if time_diff > 3600: | |
| # expired_sessions.append(sid) | |
| # # Cleanup expired sessions | |
| # for session_id in expired_sessions: | |
| # self.cleanup_session(session_id) | |
| # except Exception as e: | |
| # if self.verbose: | |
| # print(f"Error during session cleanup: {str(e)}") | |
| # @staticmethod | |
| # def is_valid_url(url): | |
| # try: | |
| # result = urlparse(url) | |
| # return all([result.scheme, result.netloc]) | |
| # except ValueError: | |
| # return False | |
| # @staticmethod | |
| # def is_html_url(url): | |
| # return url.endswith(".html") or url.endswith(".htm") | |
| # | |
| class CustomCrawler: | |
| def __init__( | |
| self, | |
| embed_model: str = "HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1", | |
| max_concurrent_requests: int = 10, | |
| verbose: bool = True | |
| ): | |
| print(f"π¦ Initializing the crawler") if verbose else None | |
| time.sleep(1) | |
| self.embed_model = embed_model | |
| self.max_concurrent_requests = max_concurrent_requests | |
| self.verbose = verbose | |
| self.ua = FakeUserAgent() | |
| self.semaphore = asyncio.Semaphore(self.max_concurrent_requests) | |
| self.sessions = {} | |
| # Intilizing HTML Pruners and Tokenizer | |
| print(f"π Loading HTML Pruners and Tokenizer with {self.embed_model}") if self.verbose else None | |
| self.bm25_html_pruner = BM25HTMLPruner() | |
| self.embed_html_pruner = EmbedHTMLPruner( | |
| embed_model=self.embed_model, | |
| local_inference=True | |
| ) | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.embed_model, | |
| use_fast=True, | |
| trust_remote_code=True, | |
| device="cuda" if torch.cuda.is_available() else "cpu" | |
| ) | |
| # Get the model config and set the max context length for the model | |
| print(f"π οΈ Getting model configuration for {self.embed_model}") if self.verbose else None | |
| self.config = AutoConfig.from_pretrained(self.embed_model) | |
| self.tokenizer.max_seq_length = self.config.max_position_embeddings | |
| print(f"π Setting max context length to {self.tokenizer.max_seq_length}") if self.verbose else None | |
| async def create_session(self): | |
| session_id = str(uuid.uuid4()) | |
| timeout = aiohttp.ClientTimeout(total=600) # Set a 10-minute timeout | |
| connector = aiohttp.TCPConnector(limit=self.max_concurrent_requests) # Connection pool | |
| self.sessions[session_id] = aiohttp.ClientSession(timeout=timeout, connector=connector) | |
| print(f"π Created session: {session_id}") if self.verbose else None | |
| return session_id | |
| async def close_session(self, session_id): | |
| session = self.sessions.pop(session_id, None) | |
| if session: | |
| await session.close() | |
| print(f"π Closed session: {session_id}") if self.verbose else None | |
| async def cleanup_expired_sessions(self, expiration_time: int = 600): # Default 10 minutes | |
| current_time = time.time() | |
| expired_sessions = [] | |
| print("π Checking for expired sessions") if self.verbose else None | |
| for session_id, (session, creation_time) in self.sessions.items(): | |
| if current_time - creation_time > expiration_time: | |
| expired_sessions.append(session_id) | |
| for session_id in expired_sessions: | |
| await self.close_session(session_id) | |
| print(f"ποΈ Successfully cleaned up all expired sessions") if self.verbose else None | |
| def html_rag( | |
| self, | |
| query: str, | |
| html: str, | |
| max_context_length: int = 32000, | |
| buffer: int = 2000 | |
| ) -> str: | |
| if not html: | |
| raise Exception("No HTML contents provided.") | |
| # Validate HTML structure | |
| try: | |
| BeautifulSoup(html, 'html.parser') | |
| except Exception as e: | |
| raise Exception(f"Invalid HTML content: {e}") | |
| prompt_for_retrieval = \ | |
| """Given a query, your task is to retrieve the most relevant passages that answers and/or is relevant to the query. | |
| Query:""" | |
| self.embed_html_pruner.query_instruction_for_retrieval = prompt_for_retrieval | |
| print(f"π§Ή Pruning HTML for query: {query}") if self.verbose else None | |
| cleaned_html = clean_html(html) | |
| block_tree, cleaned_html = build_block_tree(cleaned_html, max_node_words=10) | |
| block_rankings = self.bm25_html_pruner.calculate_block_rankings(query, cleaned_html, block_tree) | |
| max_context_window = max_context_length - buffer | |
| pruned_html = self.embed_html_pruner.prune_HTML( | |
| cleaned_html, | |
| block_tree, | |
| block_rankings, | |
| self.tokenizer, | |
| max_context_window | |
| ) | |
| print(f"π Successfully pruned HTML for query: {query}") if self.verbose else None | |
| return pruned_html | |
| async def fetch_page_contents( | |
| self, | |
| urls: List[str], | |
| query: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| max_attempts: int = 3, | |
| delay: float = 1.0, | |
| timeout: float = 10.0, | |
| return_type: str = "markdown", | |
| rotate_headers: bool = True, | |
| ) -> List[Optional[str]]: | |
| async def fetch_single_page(url, proxies, session=None, query=query): | |
| for attempt in range(max_attempts): | |
| print(f"π Attempt {attempt + 1}/{max_attempts}: Fetching content from {url}") if self.verbose else None | |
| content = await self._fetch_page_contents( | |
| url=url, | |
| query=query, | |
| timeout=timeout, | |
| return_type=return_type, | |
| rotate_headers=rotate_headers, | |
| proxies=proxies, | |
| session=session | |
| ) | |
| if content: | |
| print(f"β Successfully fetched content from {url}") if self.verbose else None | |
| return content | |
| else: | |
| if max_attempts > 1: | |
| print(f"π« Failed to fetch content from {url}. Retrying in {delay} seconds...") if self.verbose else None | |
| await asyncio.sleep(delay) | |
| print(f"π« Failed to fetch content from {url} after {max_attempts} attempts.") if self.verbose else None | |
| return None | |
| proxy_list = self.load_proxies() # Load proxies from environment variables | |
| if proxy_list: | |
| proxies = proxy_list | |
| else: | |
| proxies = None | |
| if not urls: | |
| raise Exception("No URLs provided!") | |
| if return_type == "fit_markdown" and query is None: | |
| raise Exception("Query must be provided when return_type is 'fit_markdown'!") | |
| if session_id: # Use existing session if provided | |
| if session_id not in self.sessions: | |
| raise ValueError(f"Invalid session ID: {session_id}") | |
| session = self.sessions[session_id] | |
| tasks = [fetch_single_page(url, proxies, session) for url in urls] # Pass session to tasks | |
| else: # No session handling if session_id is None | |
| tasks = [fetch_single_page(url, proxies) for url in urls] # No session passed | |
| results = await asyncio.gather(*tasks) | |
| return [result for result in results if result is not None] | |
| async def _fetch_page_contents( | |
| self, | |
| url: str, | |
| query: Optional[str] = None, | |
| timeout: float = 5.0, | |
| return_type: str = "markdown", | |
| rotate_headers: bool = True, | |
| proxies: Optional[List[str]] = None, | |
| session: Optional[aiohttp.ClientSession] = None | |
| ) -> Optional[str]: | |
| async def get_content(response, return_type=return_type): | |
| print(f"π Getting content from {url}") if self.verbose else None | |
| if return_type == "html": | |
| return await response.text() | |
| response.raise_for_status() | |
| content_type = response.headers.get('Content-Type', '').lower() | |
| if 'application/pdf' in content_type: | |
| content = await response.read() | |
| text = self.extract_text_from_pdf(content) | |
| return text | |
| elif 'text/html' in content_type: | |
| html_content = await response.text() | |
| if return_type == "fit_markdown": | |
| html_content = self.html_rag(query, html_content).wait() | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| for script_or_style in soup(["script", "style"]): | |
| script_or_style.decompose() | |
| text = self.html_to_markdown(soup) | |
| return text.strip() | |
| else: | |
| print(f"π« Unsupported content type {content_type} for URL {url}") if self.verbose else None | |
| return None | |
| headers = self.get_headers() if rotate_headers else {} | |
| proxy = self.get_proxy(proxies) if proxies else None | |
| # Total connection timeout | |
| timeout_config = aiohttp.ClientTimeout(total=timeout) | |
| try: | |
| # Use provided session if available | |
| if session: | |
| async with session.get(url, proxy=proxy, timeout=timeout_config, headers=headers) as response: | |
| return await get_content(response) | |
| # Otherwise, create a new session for each request | |
| else: | |
| async with aiohttp.ClientSession() as new_session: | |
| async with new_session.get(url, proxy=proxy, timeout=timeout_config, headers=headers) as response: | |
| return await get_content(response) | |
| except aiohttp.ClientError as e: | |
| print(f"π« Request Exception for {url}: {e}") if self.verbose else None | |
| return None | |
| except asyncio.TimeoutError as e: | |
| print(f"π« Timeout error for {url}") if self.verbose else None | |
| return None | |
| except Exception as e: | |
| print(f"π« Unexpected error fetching {url}: {e}") if self.verbose else None | |
| return None | |
| def load_proxies(self) -> Optional[List[str]]: | |
| # Get all environment variables | |
| env_vars = dict(os.environ) | |
| # Load proxies from environment variables | |
| proxy_pattern = re.compile(r"PROXY_\d+") | |
| proxies = [env_vars[key] for key in env_vars if proxy_pattern.match(key)] | |
| if proxies: | |
| print(f"π Loaded {len(proxies)} proxies from environment variables") if self.verbose else None | |
| return proxies | |
| else: | |
| return None | |
| def get_proxy(self, proxies: List[str]) -> str: | |
| if proxies: # Check if the proxies list is not empty | |
| return next(iter(proxies)) | |
| return None # Or raise an exception, handle differently, etc. | |
| def get_headers(self) -> Dict[str, str]: | |
| return {'User-Agent': self.ua.random} | |
| def extract_text_from_pdf(self, pdf_content: bytes) -> str: | |
| try: | |
| print(f"π Extracting text from PDF") if self.verbose else None | |
| pdf_reader = PyPDF2.PdfReader(BytesIO(pdf_content)) | |
| text = '' | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| print(f"πͺ Successfully extracted text from PDF") if self.verbose else None | |
| return text | |
| except Exception as e: | |
| print(f"π« Error extracting text from PDF: {e}") if self.verbose else None | |
| return "" | |
| def html_to_markdown(self, soup): | |
| markdown_text = "" | |
| print(f"π Converting HTML to Markdown") if self.verbose else None | |
| def process_element(element, indent=0): | |
| nonlocal markdown_text | |
| if isinstance(element, NavigableString): | |
| text = str(element).strip() | |
| if text: | |
| markdown_text += text + " " | |
| return | |
| tag = element.name | |
| if tag == "h1": | |
| markdown_text += "# " + element.text.strip() + "\n\n" | |
| elif tag == "h2": | |
| markdown_text += "## " + element.text.strip() + "\n\n" | |
| elif tag == "h3": | |
| markdown_text += "### " + element.text.strip() + "\n\n" | |
| elif tag == "h4": | |
| markdown_text += "#### " + element.text.strip() + "\n\n" | |
| elif tag == "h5": | |
| markdown_text += "##### " + element.text.strip() + "\n\n" | |
| elif tag == "h6": | |
| markdown_text += "###### " + element.text.strip() + "\n\n" | |
| elif tag == "p": | |
| markdown_text += element.text.strip() + "\n\n" | |
| elif tag == "br": | |
| markdown_text += "\n" | |
| elif tag == "ul": | |
| for li in element.find_all("li", recursive=False): | |
| markdown_text += " " * indent + "- " | |
| process_element(li, indent + 1) | |
| markdown_text += "\n" | |
| markdown_text += "\n" | |
| elif tag == "ol": | |
| for i, li in enumerate(element.find_all("li", recursive=False), 1): | |
| markdown_text += " " * indent + f"{i}. " | |
| process_element(li, indent + 1) | |
| markdown_text += "\n" | |
| markdown_text += "\n" | |
| elif tag == "table": | |
| rows = element.find_all("tr") | |
| for row in rows: | |
| cells = row.find_all(["td", "th"]) | |
| row_text = [cell.text.strip() for cell in cells] | |
| markdown_text += "| " + " | ".join(row_text) + " |\n" | |
| if row == rows[0]: # Header row separator | |
| markdown_text += "| " + " | ".join(["---"] * len(cells)) + " |\n" | |
| markdown_text += "\n" | |
| elif tag == "blockquote": | |
| markdown_text += "> " + element.text.strip().replace("\n", "\n> ") + "\n\n" | |
| elif tag == "strong" or tag == "b": | |
| markdown_text += "**" + element.text.strip() + "**" | |
| elif tag == "em" or tag == "i": | |
| markdown_text += "*" + element.text.strip() + "*" | |
| elif tag == "code": | |
| markdown_text += "`" + element.text.strip() + "`" | |
| elif tag == "pre": | |
| markdown_text += "```\n" + element.text + "\n```\n\n" | |
| elif tag == "hr": | |
| markdown_text += "---\n\n" | |
| else: | |
| for child in element.children: | |
| process_element(child, indent) | |
| process_element(soup) | |
| print(f"π Successfully converted HTML to Markdown") if self.verbose else None | |
| return markdown_text | |
| if __name__ == "__main__": | |
| import time | |
| import winloop | |
| URLS = [ | |
| "https://en.wikipedia.org/wiki/Treaty_Principles_Bill#:~:text=The%20Treaty%20Principles%20Bill%2C%20or,of%20the%20Treaty%20of%20Waitangi.", | |
| "https://www.parliament.nz/en/pb/sc/make-a-submission/document/54SCJUST_SCF_227E6D0B-E632-42EB-CFFE-08DCFEB826C6/principles-of-the-treaty-of-waitangi-bill", | |
| "https://en.wikipedia.org/wiki/Waitangi_Tribunal", | |
| "https://aljazeera.com/news/2024/11/19/why-are-new-zealands-maori-protesting-over-colonial-era-treaty-bill", | |
| "https://downiewenjack.ca/treaty-of-waitangi-treaty-principles-bill/" | |
| ]# * 10 # Make 50 requests | |
| query = "What is the Treaty of Waitangi Bill?" | |
| loop = asyncio.get_event_loop() | |
| custom_crawler = CustomCrawler(max_concurrent_requests=1000) | |
| session_id = loop.run_until_complete(custom_crawler.create_session()) | |
| start = time.perf_counter() | |
| winloop.install() | |
| result = loop.run_until_complete(custom_crawler.fetch_page_contents( | |
| URLS, | |
| query, | |
| session_id=session_id, | |
| timeout=20, | |
| max_attempts=1, | |
| return_type="fit_markdown", | |
| ) | |
| ) | |
| end = time.perf_counter() | |
| loop.run_until_complete(custom_crawler.close_session(session_id)) | |
| loop.run_until_complete(custom_crawler.cleanup_expired_sessions()) | |
| print("\n\n".join([f"Document {i+1}:\n\n{result[i]}" for i in range(len(result))])) | |
| print(f"\n\nTime taken: {end - start} seconds") | |