Spaces:
Running
Running
File size: 8,662 Bytes
651accf 4749bad 651accf 4749bad 651accf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
import sys
import os
import requests
from bs4 import BeautifulSoup
import re
import urllib
import time
import shutil
def get_main_url(url):
return "/".join(url.split("/")[:3])
def save_pdf_from_url(pdf_url, directory, name, headers):
try:
response = requests.get(pdf_url, headers=headers, allow_redirects=True)
response.raise_for_status()
if not response.content.startswith(b'%PDF'):
content_str = response.content.decode('utf-8', errors='ignore')
if 'Preparing to download' in content_str:
pmc_match = re.search(r'PMC\d+', pdf_url)
if pmc_match:
pmc_id = pmc_match.group()
alt_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
print(f"** Trying alternative URL: {alt_url}")
response = requests.get(alt_url, headers=headers, allow_redirects=True)
response.raise_for_status()
with open(f'{directory}/{name}.pdf', 'wb') as f:
f.write(response.content)
print(f"** Successfully fetched and saved PDF for PMCID {name}. File size: {len(response.content)} bytes")
except requests.RequestException as e:
print(f"** Failed to download PDF from {pdf_url}: {e}")
def fetch(pmcid, finders, name, headers, error_pmids, args):
uri = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmcid.strip()}"
success = False
if os.path.exists(f"{args['out']}/{pmcid}.pdf"):
print(f"** Reprint #{pmcid} already downloaded and in folder; skipping.")
return
try:
req = requests.get(uri, headers=headers)
req.raise_for_status()
soup = BeautifulSoup(req.content, 'lxml')
for finder in finders:
print(f"Trying {finder}")
pdf_url = eval(finder)(req, soup, headers)
if pdf_url:
save_pdf_from_url(pdf_url, args['out'], name, headers)
success = True
break
if not success:
print(f"** Reprint {pmcid} could not be fetched with the current finders.")
error_pmids.write(f"{pmcid}\t{name}\n")
except requests.RequestException as e:
print(f"** Request failed for PMCID {pmcid}: {e}")
error_pmids.write(f"{pmcid}\t{name}\n")
def acs_publications(req, soup, headers):
links = [x for x in soup.find_all('a') if x.get('title') and ('high-res pdf' in x.get('title').lower() or 'low-res pdf' in x.get('title').lower())]
if links:
print("** Using ACS Publications finder...")
return get_main_url(req.url) + links[0].get('href')
return None
def future_medicine(req, soup, headers):
links = soup.find_all('a', attrs={'href': re.compile("/doi/pdf")})
if links:
print("** Using Future Medicine finder...")
return get_main_url(req.url) + links[0].get('href')
return None
def generic_citation_labelled(req, soup, headers):
links = soup.find_all('meta', attrs={'name': 'citation_pdf_url'})
if links:
print("** Using Generic Citation Labelled finder...")
return links[0].get('content')
return None
def nejm(req, soup, headers):
links = [x for x in soup.find_all('a') if x.get('data-download-type') and x.get('data-download-type').lower() == 'article pdf']
if links:
print("** Using NEJM finder...")
return get_main_url(req.url) + links[0].get('href')
return None
def pubmed_central_v2(req, soup, headers):
links = soup.find_all('a', attrs={'href': re.compile('/pmc/articles')})
if links:
print("** Using PubMed Central V2 finder...")
return f"https://www.ncbi.nlm.nih.gov{links[0].get('href')}"
return None
def science_direct(req, soup, headers):
try:
new_uri = urllib.parse.unquote(soup.find_all('input')[0].get('value'))
req = requests.get(new_uri, allow_redirects=True, headers=headers)
req.raise_for_status()
soup = BeautifulSoup(req.content, 'lxml')
links = soup.find_all('meta', attrs={'name': 'citation_pdf_url'})
if links:
print("** Using Science Direct finder...")
return links[0].get('content')
except Exception as e:
print(f"** Science Direct finder error: {e}")
return None
def uchicago_press(req, soup, headers):
links = [x for x in soup.find_all('a') if x.get('href') and 'pdf' in x.get('href') and '.edu/doi/' in x.get('href')]
if links:
print("** Using UChicago Press finder...")
return get_main_url(req.url) + links[0].get('href')
return None
def europe_pmc_service(req, soup, headers):
pmc_match = re.search(r'PMC\d+', req.url)
if pmc_match:
pmc_id = pmc_match.group()
print(f"** Using Europe PMC Service finder for {pmc_id}...")
return f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
return None
def main(pcds):
args = {
'pmcids': f'{pcds}', # 替换为你要下载的 PMCIDs
'pmf': '%#$', # 如果使用文件则设置路径
'out': 'fetched_pdfs',
'errors': 'unfetched_pmcids.tsv',
'maxRetries': 3,
'batch': 10,
'delay': 5
}
if args['pmcids'] == '%#$' and args['pmf'] == '%#$':
print("Error: 必须提供 pmcids 或 pmf")
return
if args['pmcids'] != '%#$' and args['pmf'] != '%#$':
print("Warning: 同时提供了 pmcids 和 pmf,忽略 pmf")
args['pmf'] = '%#$'
if not os.path.exists(args['out']):
print(f"创建输出目录: {args['out']}")
os.mkdir(args['out'])
headers = requests.utils.default_headers()
headers['User-Agent'] = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36'
if args['pmcids'] != '%#$':
pmcids = args['pmcids'].split(",")
names = pmcids
else:
pmcids = [line.strip().split() for line in open(args['pmf'])]
if len(pmcids[0]) == 1:
pmcids = [x[0] for x in pmcids]
names = pmcids
else:
names = [x[1] for x in pmcids]
pmcids = [x[0] for x in pmcids]
finders = [
'europe_pmc_service',
'generic_citation_labelled',
'pubmed_central_v2',
'acs_publications',
'uchicago_press',
'nejm',
'future_medicine',
'science_direct'
]
batch_count = 0
with open(args['errors'], 'w+') as error_pmids:
for pmcid, name in zip(pmcids, names):
print(f"Trying to fetch PMCID {pmcid.strip()}")
retries_so_far = 0
while retries_so_far < args['maxRetries']:
try:
fetch(pmcid, finders, name, headers, error_pmids, args)
retries_so_far = args['maxRetries']
except requests.ConnectionError as e:
if '104' in str(e):
retries_so_far += 1
if retries_so_far < args['maxRetries']:
print(f"** Retry {retries_so_far}/{args['maxRetries']} for {pmcid} due to error {e}")
else:
print(f"** Max retries reached for {pmcid}")
error_pmids.write(f"{pmcid}\t{name}\n")
else:
print(f"** Connection error for {pmcid}: {e}")
retries_so_far = args['maxRetries']
error_pmids.write(f"{pmcid}\t{name}\n")
except Exception as e:
print(f"** General error for {pmcid}: {e}")
retries_so_far = args['maxRetries']
error_pmids.write(f"{pmcid}\t{name}\n")
batch_count += 1
if batch_count % args['batch'] == 0:
print(f"** Batch limit reached. Sleeping for {args['delay']} seconds...")
time.sleep(args['delay'])
# 下载完成后,打包输出目录
zip_path = args['out'] + ".zip"
shutil.make_archive(args['out'], 'zip', args['out'])
return zip_path # <== 返回 zip 文件路径
|