import aiohttp import aiosqlite import asyncio import time from typing import List, Tuple, Dict, Set, Optional from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup from server.app.utils.url_helper import is_same_domain, normalize_url from server.app.utils.diskcache_lock import diskcache_lock from server.constant.constants import (SQLITE_DB_DIR, SQLITE_DB_NAME, MAX_CRAWL_PARALLEL_REQUEST, SITEMAP_URL_RECORDED, SITEMAP_URL_EXPIRED, DOMAIN_STATISTICS_GATHERING_COLLECTED) from server.logger.logger_config import my_logger as logger class AsyncCrawlerSiteLink: def __init__(self, base_url: str, version: int) -> None: logger.info( f"[CRAWL_LINK] init, base_url: '{base_url}', version: {version}") self.base_url = normalize_url(base_url) self.sqlite_db_path = f"{SQLITE_DB_DIR}/{SQLITE_DB_NAME}" self.visited_urls = set() self.semaphore = asyncio.Semaphore(MAX_CRAWL_PARALLEL_REQUEST) self.domain = urlparse(self.base_url).netloc self.version = version self.distributed_lock = diskcache_lock self.count = 0 self.batch_urls_queue = asyncio.Queue() self.queue_lock = asyncio.Lock() self.batch_size = MAX_CRAWL_PARALLEL_REQUEST * 4 async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]: logger.info(f"[CRAWL_LINK] fetch_page, url: '{url}'") async with self.semaphore: try: async with session.get(url) as response: return await response.text() except Exception as e: logger.error( f"[CRAWL_LINK] fetch_page, Error fetching {url}: {e}") return None async def add_url_to_queue(self, url: str) -> None: logger.info(f"[CRAWL_LINK] add_url_to_queue, url: '{url}'") batch_urls = [] async with self.queue_lock: await self.batch_urls_queue.put(url) if self.batch_urls_queue.qsize() >= self.batch_size: batch_urls = await self.process_batch_urls() if batch_urls: # Process batch_urls to separate existing and new URLs existing_urls, new_urls = await self.check_urls_existence( batch_urls) # Update existing URLs and insert new URLs await self.update_and_insert_urls(existing_urls, new_urls) async def process_batch_urls(self) -> List[str]: logger.info(f"[CRAWL_LINK] process_batch_urls") # Lock is already acquired in add_url_to_queue batch_urls = [] while not self.batch_urls_queue.empty(): batch_urls.append(await self.batch_urls_queue.get()) return batch_urls async def check_urls_existence( self, batch_urls: List[str]) -> Tuple[Dict[str, int], List[str]]: """ Check which URLs exist in the database and separate them from new URLs. """ logger.info( f"[CRAWL_LINK] check_urls_existence, batch_urls: {batch_urls}") # Prepare sql to check existence placeholders = ', '.join(['?'] * len(batch_urls)) sql = f"SELECT url, id FROM t_sitemap_url_tab WHERE doc_status = 4 and url IN ({placeholders})" async with aiosqlite.connect(self.sqlite_db_path) as db: await db.execute("PRAGMA journal_mode=WAL;") cursor = await db.execute(sql, batch_urls) existing_records = await cursor.fetchall() # Convert list of existing records into a dict for easy lookup existing_urls = {record[0]: record[1] for record in existing_records} # Identify new URLs new_urls = [url for url in batch_urls if url not in existing_urls] return existing_urls, new_urls async def update_and_insert_urls(self, existing_urls: Dict[str, int], new_urls: List[str]) -> None: """ Update existing URLs and insert new URLs into the database in a single operation. """ logger.info( f"[CRAWL_LINK] update_and_insert_urls, existing_urls: {existing_urls}, new_urls: {new_urls}" ) timestamp = int(time.time()) update_sql = "UPDATE t_sitemap_url_tab SET doc_status = ?, version = ?, mtime = ? WHERE id = ?" insert_sql = "INSERT INTO t_sitemap_url_tab (domain, url, content, content_length, content_md5, doc_status, version, ctime, mtime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" # Prepare data for updating existing URLs updates: List[Tuple[int, int, int, int]] = [ (SITEMAP_URL_RECORDED, self.version, timestamp, url_id) for url_id in existing_urls.values() ] # Prepare data for inserting new URLs inserts: List[Tuple[str, str, str, int, str, int, int, int, int]] = [ (self.domain, url, '[]', 0, '', SITEMAP_URL_RECORDED, self.version, timestamp, timestamp) for url in new_urls ] async with aiosqlite.connect(self.sqlite_db_path) as db: await db.execute("PRAGMA journal_mode=WAL;") try: with self.distributed_lock.lock(): if updates: await db.executemany(update_sql, updates) if inserts: await db.executemany(insert_sql, inserts) await db.commit() except Exception as e: logger.error(f"process distributed_lock exception:{e}") async def save_link_to_db(self, url: str) -> None: logger.info(f"[CRAWL_LINK] save_link_to_db, url: '{url}'") await self.add_url_to_queue(url) async def parse_link(self, session: aiohttp.ClientSession, html_text: str, url: str) -> None: logger.info(f"[CRAWL_LINK] parse_link, url: '{url}'") link_set = await self.extract_link(html_text, url) await self.save_link_to_db(url) for full_link in link_set: normalized_link = normalize_url(full_link) if normalized_link not in self.visited_urls and is_same_domain( self.base_url, normalized_link): self.visited_urls.add(normalized_link) await self.crawl_link(session, normalized_link) async def extract_link(self, html_text: str, url: str) -> Set[str]: logger.info(f"[CRAWL_LINK] extrack_link, url: '{url}'") try: soup = BeautifulSoup(html_text, 'html.parser') # Extract links within the body link_set = { urljoin(url, a['href']) for a in soup.find_all('a', href=True) } if soup.body else set() return link_set except Exception as e: logger.error(f"Error processing content from {url}: {str(e)}") return set() async def crawl_link(self, session: aiohttp.ClientSession, url: str) -> None: self.count += 1 logger.info( f"[CRAWL_LINK] craw_link, url: '{url}', count: {self.count}") html_text = await self.fetch_page(session, url) if html_text: await self.parse_link(session, html_text, url) async def update_site_domain_status(self, domain_status: int) -> None: logger.info( f"[CRAWL_LINK] update_site_domain_status, domain_status: {domain_status}" ) timestamp = int(time.time()) async with aiosqlite.connect(self.sqlite_db_path) as db: await db.execute("PRAGMA journal_mode=WAL;") try: with self.distributed_lock.lock(): await db.execute( "UPDATE t_sitemap_domain_tab SET domain_status = ?, mtime = ? WHERE domain = ?", (domain_status, timestamp, self.domain)) await db.commit() except Exception as e: logger.error(f"process distributed_lock exception:{e}") async def mark_expired_link(self) -> None: logger.info(f"[CRAWL_LINK] mark_expired_link") timestamp = int(time.time()) async with aiosqlite.connect(self.sqlite_db_path) as db: # Enable WAL mode for better concurrency await db.execute("PRAGMA journal_mode=WAL;") try: with self.distributed_lock.lock(): # Update doc_status to `SITEMAP_URL_EXPIRED` for URLs that are not currently marked as status `SITEMAP_URL_RECORDED` await db.execute( "UPDATE t_sitemap_url_tab SET doc_status = ?, mtime = ? WHERE domain = ? AND doc_status != ?", (SITEMAP_URL_EXPIRED, timestamp, self.domain, SITEMAP_URL_RECORDED)) await db.commit() except Exception as e: logger.error(f"process distributed_lock exception:{e}") async def run(self) -> None: begin_time = int(time.time()) logger.info( f"[CRAWL_LINK] run begin! base_url: '{self.base_url}', begin_time: {begin_time}" ) headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36", "Accept": "*/*", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive" } async with aiohttp.ClientSession(headers=headers) as session: self.visited_urls.add(self.base_url) await self.crawl_link(session, self.base_url) if self.batch_urls_queue.qsize() > 0: # Process the remaining urls in batch_urls_queue batch_urls = [] while not self.batch_urls_queue.empty(): batch_urls.append(await self.batch_urls_queue.get()) logger.info( f"[CRAW_LINK process the remaining urls:{batch_urls} in batch_urls_queue" ) existing_urls, new_urls = await self.check_urls_existence( batch_urls) await self.update_and_insert_urls(existing_urls, new_urls) await self.mark_expired_link() await self.update_site_domain_status( DOMAIN_STATISTICS_GATHERING_COLLECTED) end_time = int(time.time()) timecost = end_time - begin_time logger.warning( f"[CRAWL_LINK] run end! base_url: '{self.base_url}', end_time: {end_time}, timecost: {timecost}" )