Spaces:
Sleeping
Sleeping
File size: 10,747 Bytes
1161dd2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
import aiohttp
import aiosqlite
import asyncio
import time
from typing import List, Tuple, Dict, Set, Optional
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from server.app.utils.url_helper import is_same_domain, normalize_url
from server.app.utils.diskcache_lock import diskcache_lock
from server.constant.constants import (SQLITE_DB_DIR, SQLITE_DB_NAME,
MAX_CRAWL_PARALLEL_REQUEST,
SITEMAP_URL_RECORDED,
SITEMAP_URL_EXPIRED,
DOMAIN_STATISTICS_GATHERING_COLLECTED)
from server.logger.logger_config import my_logger as logger
class AsyncCrawlerSiteLink:
def __init__(self, base_url: str, version: int) -> None:
logger.info(
f"[CRAWL_LINK] init, base_url: '{base_url}', version: {version}")
self.base_url = normalize_url(base_url)
self.sqlite_db_path = f"{SQLITE_DB_DIR}/{SQLITE_DB_NAME}"
self.visited_urls = set()
self.semaphore = asyncio.Semaphore(MAX_CRAWL_PARALLEL_REQUEST)
self.domain = urlparse(self.base_url).netloc
self.version = version
self.distributed_lock = diskcache_lock
self.count = 0
self.batch_urls_queue = asyncio.Queue()
self.queue_lock = asyncio.Lock()
self.batch_size = MAX_CRAWL_PARALLEL_REQUEST * 4
async def fetch_page(self, session: aiohttp.ClientSession,
url: str) -> Optional[str]:
logger.info(f"[CRAWL_LINK] fetch_page, url: '{url}'")
async with self.semaphore:
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
logger.error(
f"[CRAWL_LINK] fetch_page, Error fetching {url}: {e}")
return None
async def add_url_to_queue(self, url: str) -> None:
logger.info(f"[CRAWL_LINK] add_url_to_queue, url: '{url}'")
batch_urls = []
async with self.queue_lock:
await self.batch_urls_queue.put(url)
if self.batch_urls_queue.qsize() >= self.batch_size:
batch_urls = await self.process_batch_urls()
if batch_urls:
# Process batch_urls to separate existing and new URLs
existing_urls, new_urls = await self.check_urls_existence(
batch_urls)
# Update existing URLs and insert new URLs
await self.update_and_insert_urls(existing_urls, new_urls)
async def process_batch_urls(self) -> List[str]:
logger.info(f"[CRAWL_LINK] process_batch_urls")
# Lock is already acquired in add_url_to_queue
batch_urls = []
while not self.batch_urls_queue.empty():
batch_urls.append(await self.batch_urls_queue.get())
return batch_urls
async def check_urls_existence(
self, batch_urls: List[str]) -> Tuple[Dict[str, int], List[str]]:
"""
Check which URLs exist in the database and separate them from new URLs.
"""
logger.info(
f"[CRAWL_LINK] check_urls_existence, batch_urls: {batch_urls}")
# Prepare sql to check existence
placeholders = ', '.join(['?'] * len(batch_urls))
sql = f"SELECT url, id FROM t_sitemap_url_tab WHERE doc_status = 4 and url IN ({placeholders})"
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
cursor = await db.execute(sql, batch_urls)
existing_records = await cursor.fetchall()
# Convert list of existing records into a dict for easy lookup
existing_urls = {record[0]: record[1] for record in existing_records}
# Identify new URLs
new_urls = [url for url in batch_urls if url not in existing_urls]
return existing_urls, new_urls
async def update_and_insert_urls(self, existing_urls: Dict[str, int],
new_urls: List[str]) -> None:
"""
Update existing URLs and insert new URLs into the database in a single operation.
"""
logger.info(
f"[CRAWL_LINK] update_and_insert_urls, existing_urls: {existing_urls}, new_urls: {new_urls}"
)
timestamp = int(time.time())
update_sql = "UPDATE t_sitemap_url_tab SET doc_status = ?, version = ?, mtime = ? WHERE id = ?"
insert_sql = "INSERT INTO t_sitemap_url_tab (domain, url, content, content_length, content_md5, doc_status, version, ctime, mtime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
# Prepare data for updating existing URLs
updates: List[Tuple[int, int, int, int]] = [
(SITEMAP_URL_RECORDED, self.version, timestamp, url_id)
for url_id in existing_urls.values()
]
# Prepare data for inserting new URLs
inserts: List[Tuple[str, str, str, int, str, int, int, int, int]] = [
(self.domain, url, '[]', 0, '', SITEMAP_URL_RECORDED, self.version,
timestamp, timestamp) for url in new_urls
]
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
try:
with self.distributed_lock.lock():
if updates:
await db.executemany(update_sql, updates)
if inserts:
await db.executemany(insert_sql, inserts)
await db.commit()
except Exception as e:
logger.error(f"process distributed_lock exception:{e}")
async def save_link_to_db(self, url: str) -> None:
logger.info(f"[CRAWL_LINK] save_link_to_db, url: '{url}'")
await self.add_url_to_queue(url)
async def parse_link(self, session: aiohttp.ClientSession, html_text: str,
url: str) -> None:
logger.info(f"[CRAWL_LINK] parse_link, url: '{url}'")
link_set = await self.extract_link(html_text, url)
await self.save_link_to_db(url)
for full_link in link_set:
normalized_link = normalize_url(full_link)
if normalized_link not in self.visited_urls and is_same_domain(
self.base_url, normalized_link):
self.visited_urls.add(normalized_link)
await self.crawl_link(session, normalized_link)
async def extract_link(self, html_text: str, url: str) -> Set[str]:
logger.info(f"[CRAWL_LINK] extrack_link, url: '{url}'")
try:
soup = BeautifulSoup(html_text, 'html.parser')
# Extract links within the body
link_set = {
urljoin(url, a['href'])
for a in soup.find_all('a', href=True)
} if soup.body else set()
return link_set
except Exception as e:
logger.error(f"Error processing content from {url}: {str(e)}")
return set()
async def crawl_link(self, session: aiohttp.ClientSession,
url: str) -> None:
self.count += 1
logger.info(
f"[CRAWL_LINK] craw_link, url: '{url}', count: {self.count}")
html_text = await self.fetch_page(session, url)
if html_text:
await self.parse_link(session, html_text, url)
async def update_site_domain_status(self, domain_status: int) -> None:
logger.info(
f"[CRAWL_LINK] update_site_domain_status, domain_status: {domain_status}"
)
timestamp = int(time.time())
async with aiosqlite.connect(self.sqlite_db_path) as db:
await db.execute("PRAGMA journal_mode=WAL;")
try:
with self.distributed_lock.lock():
await db.execute(
"UPDATE t_sitemap_domain_tab SET domain_status = ?, mtime = ? WHERE domain = ?",
(domain_status, timestamp, self.domain))
await db.commit()
except Exception as e:
logger.error(f"process distributed_lock exception:{e}")
async def mark_expired_link(self) -> None:
logger.info(f"[CRAWL_LINK] mark_expired_link")
timestamp = int(time.time())
async with aiosqlite.connect(self.sqlite_db_path) as db:
# Enable WAL mode for better concurrency
await db.execute("PRAGMA journal_mode=WAL;")
try:
with self.distributed_lock.lock():
# Update doc_status to `SITEMAP_URL_EXPIRED` for URLs that are not currently marked as status `SITEMAP_URL_RECORDED`
await db.execute(
"UPDATE t_sitemap_url_tab SET doc_status = ?, mtime = ? WHERE domain = ? AND doc_status != ?",
(SITEMAP_URL_EXPIRED, timestamp, self.domain,
SITEMAP_URL_RECORDED))
await db.commit()
except Exception as e:
logger.error(f"process distributed_lock exception:{e}")
async def run(self) -> None:
begin_time = int(time.time())
logger.info(
f"[CRAWL_LINK] run begin! base_url: '{self.base_url}', begin_time: {begin_time}"
)
headers = {
"User-Agent":
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36",
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive"
}
async with aiohttp.ClientSession(headers=headers) as session:
self.visited_urls.add(self.base_url)
await self.crawl_link(session, self.base_url)
if self.batch_urls_queue.qsize() > 0:
# Process the remaining urls in batch_urls_queue
batch_urls = []
while not self.batch_urls_queue.empty():
batch_urls.append(await self.batch_urls_queue.get())
logger.info(
f"[CRAW_LINK process the remaining urls:{batch_urls} in batch_urls_queue"
)
existing_urls, new_urls = await self.check_urls_existence(
batch_urls)
await self.update_and_insert_urls(existing_urls, new_urls)
await self.mark_expired_link()
await self.update_site_domain_status(
DOMAIN_STATISTICS_GATHERING_COLLECTED)
end_time = int(time.time())
timecost = end_time - begin_time
logger.warning(
f"[CRAWL_LINK] run end! base_url: '{self.base_url}', end_time: {end_time}, timecost: {timecost}"
)
|