import sys import os import requests from bs4 import BeautifulSoup import re import urllib import time import shutil def get_main_url(url): return "/".join(url.split("/")[:3]) def save_pdf_from_url(pdf_url, directory, name, headers): try: response = requests.get(pdf_url, headers=headers, allow_redirects=True) response.raise_for_status() if not response.content.startswith(b'%PDF'): content_str = response.content.decode('utf-8', errors='ignore') if 'Preparing to download' in content_str: pmc_match = re.search(r'PMC\d+', pdf_url) if pmc_match: pmc_id = pmc_match.group() alt_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" print(f"** Trying alternative URL: {alt_url}") response = requests.get(alt_url, headers=headers, allow_redirects=True) response.raise_for_status() with open(f'{directory}/{name}.pdf', 'wb') as f: f.write(response.content) print(f"** Successfully fetched and saved PDF for PMCID {name}. File size: {len(response.content)} bytes") except requests.RequestException as e: print(f"** Failed to download PDF from {pdf_url}: {e}") def fetch(pmcid, finders, name, headers, error_pmids, args): uri = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmcid.strip()}" success = False if os.path.exists(f"{args['out']}/{pmcid}.pdf"): print(f"** Reprint #{pmcid} already downloaded and in folder; skipping.") return try: req = requests.get(uri, headers=headers) req.raise_for_status() soup = BeautifulSoup(req.content, 'lxml') for finder in finders: print(f"Trying {finder}") pdf_url = eval(finder)(req, soup, headers) if pdf_url: save_pdf_from_url(pdf_url, args['out'], name, headers) success = True break if not success: print(f"** Reprint {pmcid} could not be fetched with the current finders.") error_pmids.write(f"{pmcid}\t{name}\n") except requests.RequestException as e: print(f"** Request failed for PMCID {pmcid}: {e}") error_pmids.write(f"{pmcid}\t{name}\n") def acs_publications(req, soup, headers): links = [x for x in soup.find_all('a') if x.get('title') and ('high-res pdf' in x.get('title').lower() or 'low-res pdf' in x.get('title').lower())] if links: print("** Using ACS Publications finder...") return get_main_url(req.url) + links[0].get('href') return None def future_medicine(req, soup, headers): links = soup.find_all('a', attrs={'href': re.compile("/doi/pdf")}) if links: print("** Using Future Medicine finder...") return get_main_url(req.url) + links[0].get('href') return None def generic_citation_labelled(req, soup, headers): links = soup.find_all('meta', attrs={'name': 'citation_pdf_url'}) if links: print("** Using Generic Citation Labelled finder...") return links[0].get('content') return None def nejm(req, soup, headers): links = [x for x in soup.find_all('a') if x.get('data-download-type') and x.get('data-download-type').lower() == 'article pdf'] if links: print("** Using NEJM finder...") return get_main_url(req.url) + links[0].get('href') return None def pubmed_central_v2(req, soup, headers): links = soup.find_all('a', attrs={'href': re.compile('/pmc/articles')}) if links: print("** Using PubMed Central V2 finder...") return f"https://www.ncbi.nlm.nih.gov{links[0].get('href')}" return None def science_direct(req, soup, headers): try: new_uri = urllib.parse.unquote(soup.find_all('input')[0].get('value')) req = requests.get(new_uri, allow_redirects=True, headers=headers) req.raise_for_status() soup = BeautifulSoup(req.content, 'lxml') links = soup.find_all('meta', attrs={'name': 'citation_pdf_url'}) if links: print("** Using Science Direct finder...") return links[0].get('content') except Exception as e: print(f"** Science Direct finder error: {e}") return None def uchicago_press(req, soup, headers): links = [x for x in soup.find_all('a') if x.get('href') and 'pdf' in x.get('href') and '.edu/doi/' in x.get('href')] if links: print("** Using UChicago Press finder...") return get_main_url(req.url) + links[0].get('href') return None def europe_pmc_service(req, soup, headers): pmc_match = re.search(r'PMC\d+', req.url) if pmc_match: pmc_id = pmc_match.group() print(f"** Using Europe PMC Service finder for {pmc_id}...") return f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" return None def main(pcds): args = { 'pmcids': f'{pcds}', # 替换为你要下载的 PMCIDs 'pmf': '%#$', # 如果使用文件则设置路径 'out': 'fetched_pdfs', 'errors': 'unfetched_pmcids.tsv', 'maxRetries': 3, 'batch': 10, 'delay': 5 } if args['pmcids'] == '%#$' and args['pmf'] == '%#$': print("Error: 必须提供 pmcids 或 pmf") return if args['pmcids'] != '%#$' and args['pmf'] != '%#$': print("Warning: 同时提供了 pmcids 和 pmf,忽略 pmf") args['pmf'] = '%#$' if not os.path.exists(args['out']): print(f"创建输出目录: {args['out']}") os.mkdir(args['out']) headers = requests.utils.default_headers() headers['User-Agent'] = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36' if args['pmcids'] != '%#$': pmcids = args['pmcids'].split(",") names = pmcids else: pmcids = [line.strip().split() for line in open(args['pmf'])] if len(pmcids[0]) == 1: pmcids = [x[0] for x in pmcids] names = pmcids else: names = [x[1] for x in pmcids] pmcids = [x[0] for x in pmcids] finders = [ 'europe_pmc_service', 'generic_citation_labelled', 'pubmed_central_v2', 'acs_publications', 'uchicago_press', 'nejm', 'future_medicine', 'science_direct' ] batch_count = 0 with open(args['errors'], 'w+') as error_pmids: for pmcid, name in zip(pmcids, names): print(f"Trying to fetch PMCID {pmcid.strip()}") retries_so_far = 0 while retries_so_far < args['maxRetries']: try: fetch(pmcid, finders, name, headers, error_pmids, args) retries_so_far = args['maxRetries'] except requests.ConnectionError as e: if '104' in str(e): retries_so_far += 1 if retries_so_far < args['maxRetries']: print(f"** Retry {retries_so_far}/{args['maxRetries']} for {pmcid} due to error {e}") else: print(f"** Max retries reached for {pmcid}") error_pmids.write(f"{pmcid}\t{name}\n") else: print(f"** Connection error for {pmcid}: {e}") retries_so_far = args['maxRetries'] error_pmids.write(f"{pmcid}\t{name}\n") except Exception as e: print(f"** General error for {pmcid}: {e}") retries_so_far = args['maxRetries'] error_pmids.write(f"{pmcid}\t{name}\n") batch_count += 1 if batch_count % args['batch'] == 0: print(f"** Batch limit reached. Sleeping for {args['delay']} seconds...") time.sleep(args['delay']) # 下载完成后,打包输出目录 zip_path = args['out'] + ".zip" shutil.make_archive(args['out'], 'zip', args['out']) return zip_path # <== 返回 zip 文件路径