pmc-worker / scraper_template.py
Nottybro's picture
Upload scraper_template.py with huggingface_hub
f91f1fd verified
Raw
History Blame Contribute Delete
23.2 kB
# PMC Sniper v13.0 — injected by HF Worker Space
SESSION_ID = __SESSION_ID__
RESUME_FROM_PAGE = __RESUME_FROM_PAGE__
RUN_ID = '__RUN_ID__'
TOTAL_SESSIONS = __TOTAL_SESSIONS__
import subprocess, sys
def _install():
subprocess.run(['apt-get','install','-y','-q','xvfb','xauth',
'x11-utils','libnss3','libatk1.0-0','libatk-bridge2.0-0',
'libcups2','libxcomposite1','libxdamage1','libxfixes3',
'libxrandr2','libgbm1','libpango-1.0-0','libcairo2',
'libasound2','libx11-xcb1','libxcb-dri3-0'],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.run([sys.executable,'-m','pip','install','-q',
'playwright','pyvirtualdisplay','beautifulsoup4'], check=True)
subprocess.run(['playwright','install','chromium'],
check=True, stdout=subprocess.DEVNULL)
subprocess.run(['playwright','install-deps','chromium'],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_install()
import csv, json, random, asyncio, re, time, logging
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional, Set
from collections import deque
try:
import nest_asyncio; nest_asyncio.apply()
except ImportError: pass
from playwright.async_api import async_playwright
from pyvirtualdisplay import Display
from bs4 import BeautifulSoup
logging.basicConfig(level=logging.INFO,
format='%(asctime)s | %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger(__name__)
TOTAL_PMC_PAGES = 6435
PAGES_PER_SESSION = TOTAL_PMC_PAGES // TOTAL_SESSIONS
SLICE_START = 1 + SESSION_ID * PAGES_PER_SESSION
SLICE_END = SLICE_START + PAGES_PER_SESSION - 1
if SESSION_ID == TOTAL_SESSIONS - 1:
SLICE_END = TOTAL_PMC_PAGES
START_PAGE = RESUME_FROM_PAGE if RESUME_FROM_PAGE > 0 else SLICE_START
END_PAGE = SLICE_END
OUTPUT_DIR = Path(f'pmc_s{SESSION_ID}')
DL_WORKERS = 2
MAX_RATE_PER_HR = 300
TIME_LIMIT_SECS = (12 * 3600) - (20 * 60)
BASE_URL = 'https://www.planetminecraft.com'
LISTING_URL = f'{BASE_URL}/projects/?share=schematic&order=order_popularity'
UA = ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36')
BLOCKED = {
'googlesyndication.com','googletagmanager.com','google-analytics.com',
'doubleclick.net','amazon-adsystem.com','adnxs.com','criteo.com',
'taboola.com','outbrain.com','facebook.net','scorecardresearch.com',
'pubmatic.com','rubiconproject.com','openx.net','liveramp.com',
'adsrvr.org','bidswitch.net','indexexchange.com',
}
COOKIES_JSON = 'PASTE_PMC_COOKIES_HERE'
COOKIE_PATHS = [
Path('/kaggle/input/pmc-cookies/cookies.json'),
Path('/kaggle/input/cookies/cookies.json'),
]
class RateTracker:
def __init__(self, max_hr, win=300):
self.max_hr=max_hr; self.win=win; self.ts=deque()
def record(self):
now=time.time(); self.ts.append(now)
cut=now-self.win
while self.ts and self.ts[0]<cut: self.ts.popleft()
def rate(self):
if len(self.ts)<2: return 0.0
w=self.ts[-1]-self.ts[0]
return 0.0 if w<1 else len(self.ts)/(w/3600)
async def throttle(self, log=None):
r=self.rate()
if r>self.max_hr:
s=random.uniform(10,20)
if log: log(f'Throttle {s:.0f}s')
await asyncio.sleep(s)
@dataclass
class BuildEntry:
id:str; name:str; project_url:str
build_path:Optional[str]=None; status:str='pending'
class BuildTracker:
FIELDS=['id','name','project_url','build_path','status']
def __init__(self, path):
self.path=path; self.ids:Set[str]=set(); self._load()
if not path.exists():
with open(path,'w',newline='',encoding='utf-8') as f:
csv.DictWriter(f,fieldnames=self.FIELDS).writeheader()
def _load(self):
if not self.path.exists(): return
try:
with open(self.path,encoding='utf-8') as f:
for row in csv.DictReader(f):
bid=row.get('id','').strip()
if bid: self.ids.add(bid)
except Exception: pass
def is_scraped(self,bid): return bid in self.ids
def add(self, e):
self.ids.add(e.id)
with open(self.path,'a',newline='',encoding='utf-8') as f:
csv.DictWriter(f,fieldnames=self.FIELDS).writerow(asdict(e))
@property
def total(self): return len(self.ids)
def write_state(last_page, reason, stats):
remaining=max(0, END_PAGE-last_page)
state={
'run_id':RUN_ID,'session_id':SESSION_ID,
'slice_start':SLICE_START,'slice_end':SLICE_END,
'last_completed_page':last_page,
'next_resume_page':(last_page+1) if remaining>0 else None,
'remaining_pages':remaining,'stop_reason':reason,
'success':stats.get('success',0),'skip':stats.get('skip',0),
'fail':stats.get('fail',0),
'timestamp':time.strftime('%Y-%m-%d %H:%M:%S UTC',time.gmtime()),
'complete':remaining==0,
}
OUTPUT_DIR.mkdir(parents=True,exist_ok=True)
(OUTPUT_DIR/'run_state.json').write_text(json.dumps(state,indent=2))
import sys as _sys
_sys.stdout.write(f'\n##RUN_STATE##{json.dumps(state)}##RUN_STATE##\n')
_sys.stdout.flush()
print(f'DONE {RUN_ID} | page {last_page} | {remaining} left | {reason}')
return state
def _parse_cookies(raw):
out=[]
for c in raw:
ck={'name':c.get('name',''),'value':c.get('value',''),
'domain':c.get('domain',''),'path':c.get('path','/')}
if ck['name'] and ck['value'] and ck['domain']:
ss=c.get('sameSite','Lax')
ck['sameSite']=ss if ss in ('Strict','Lax','None') else 'Lax'
out.append(ck)
return out
def get_cookies():
s=COOKIES_JSON.strip()
if s and 'PASTE' not in s:
try: return _parse_cookies(json.loads(s))
except: pass
for p in COOKIE_PATHS:
if p.exists():
try: return _parse_cookies(json.loads(p.read_text()))
except: continue
return []
STEALTH = (
'Object.defineProperty(navigator,"webdriver",{get:()=>undefined});'
'window.chrome={runtime:{},loadTimes:function(){},csi:function(){},app:{}};'
'Object.defineProperty(navigator,"languages",{get:()=>["en-US","en"]});'
'Object.defineProperty(screen,"width",{get:()=>1920});'
'Object.defineProperty(screen,"height",{get:()=>1080});'
'Object.defineProperty(navigator,"hardwareConcurrency",{get:()=>4});'
'Object.defineProperty(navigator,"platform",{get:()=>"Win32"});'
)
async def block_ads(route):
try:
if any(d in route.request.url.lower() for d in BLOCKED):
await route.abort(); return
except: pass
await route.continue_()
async def new_tab(ctx):
p=await ctx.new_page(); await p.route('**/*',block_ads); return p
async def setup_browser(pw):
br=await pw.chromium.launch(headless=False,
args=['--disable-blink-features=AutomationControlled','--no-sandbox',
'--disable-dev-shm-usage','--start-maximized'])
ctx=await br.new_context(user_agent=UA,
viewport={'width':1920,'height':1080},locale='en-US',
timezone_id='America/New_York',accept_downloads=True)
await ctx.add_init_script(STEALTH)
return br, ctx
def _is_cf(t):
t=t.lower()
return any(k in t for k in ['attention','just a moment','cloudflare',
'checking your','please wait'])
async def dismiss_consent(page):
for sel in ['button.fc-cta-consent','button#accept-cookies','.cc-accept-all']:
try:
el=await page.query_selector(sel)
if el: await el.click(); await page.wait_for_timeout(300); return
except: pass
async def dismiss_popup(page):
try:
p=await page.query_selector('div.popup_wrap')
if not p or not await p.is_visible(): return
for sel in ['div.popup_wrap a.btn:not(.cancel)',
'div.popup_wrap button:not(.cancel)']:
for el in await page.query_selector_all(sel):
try:
txt=(await el.inner_text()).strip().lower()
if any(w in txt for w in ('cancel','close','no','back')): continue
await el.click(force=True); await page.wait_for_timeout(400); return
except: continue
await page.keyboard.press('Escape'); await page.wait_for_timeout(300)
except: pass
async def clear_cf(ctx, page):
cookies=get_cookies()
if cookies: await ctx.add_cookies(cookies); print(f'Loaded {len(cookies)} cookies')
else: print('No cookies — guest mode')
t0=time.time()
try: await page.goto(BASE_URL+'/',wait_until='domcontentloaded',timeout=60000)
except Exception as e: print(f'Nav error: {e}')
for i in range(24):
title=await page.title()
if not _is_cf(title):
print(f'CF cleared in {time.time()-t0:.1f}s')
await dismiss_consent(page)
try:
fresh=await ctx.cookies()
OUTPUT_DIR.mkdir(parents=True,exist_ok=True)
(OUTPUT_DIR/'fresh_cookies.json').write_text(json.dumps(fresh,indent=2))
except: pass
return True
for sel in ["#challenge-stage input[type='checkbox']",' .cf-turnstile input']:
try:
el=await page.query_selector(sel)
if el: await el.click(); await page.wait_for_timeout(1000)
except: pass
if i%6==0 and i>0: print(f'[{time.time()-t0:.0f}s] CF: {title[:50]}')
await page.wait_for_timeout(5000)
print('CF clear failed'); return False
def extract_meta(html, entry, schem_path=None):
soup=BeautifulSoup(html,'html.parser')
meta={'id':entry.id,'title':entry.name,'description':'','tags':[],
'author':'','downloads':0,'views':0,'project_url':entry.project_url,
'schem_path':str(schem_path) if schem_path else '',
'session_id':SESSION_ID,'run_id':RUN_ID}
for sel in ['h1.title-block','h1.project-title','h1.r-title','h1']:
el=soup.select_one(sel)
if el:
t=el.get_text(strip=True)
if len(t)>1: meta['title']=t[:300]; break
for sel in ['div.content-block','div#description','.project-body','.description-text']:
el=soup.select_one(sel)
if el:
d=el.get_text(separator=' ',strip=True)
if len(d)>10: meta['description']=d[:5000]; break
for sel in ['.tags-box .tag','.tag-item','.tags a']:
tags=[t.get_text(strip=True) for t in soup.select(sel) if t.get_text(strip=True)]
if tags: meta['tags']=tags[:30]; break
for sel in ['a.info-username','.username a','a[href*="/member/"]']:
el=soup.select_one(sel)
if el:
a=el.get_text(strip=True)
if a: meta['author']=a; break
def pn(raw):
raw=raw.strip().replace(',','').replace(' ','').lower()
try:
if raw.endswith('k'): return int(float(raw[:-1])*1000)
if raw.endswith('m'): return int(float(raw[:-1])*1000000)
return int(raw)
except: return 0
for sel in ['span.stat-num','.info-stat span']:
nums=[pn(el.get_text()) for el in soup.select(sel) if pn(el.get_text())>0]
if nums:
if len(nums)>=1: meta['downloads']=nums[0]
if len(nums)>=2: meta['views']=nums[1]
break
return meta
def save_meta(meta, folder):
folder.mkdir(parents=True,exist_ok=True)
(folder/'metadata.json').write_text(
json.dumps(meta,indent=2,ensure_ascii=False),encoding='utf-8')
DL_SELS=[
'a.branded-download[href*="/download/schematic/"]',
'a.branded-download[href*="/download/worldmap/"]',
'a.branded-download[href*="/download/worldsave/"]',
'a.branded-download','a.site_btn[href*="/download/schematic/"]',
'a[href*="/download/schematic/"]','a[href*="/download/worldmap/"]',
'#download_btn','a.btn-download','.download-section a[href]',
]
async def download_build(page, entry, out, log, diag):
safe=re.sub(r'[^\w\-]','_',entry.name)[:40]
folder=out/f'{entry.id[:40]}_{safe}'
if not diag[0]:
diag[0]=True
try:
await page.goto(entry.project_url,wait_until='domcontentloaded',timeout=25000)
await page.wait_for_timeout(1500)
folder.mkdir(parents=True,exist_ok=True)
await page.screenshot(path=str(out/'diag.png'),full_page=True)
already=True
except: already=False
else: already=False
try:
if not already:
await page.goto(entry.project_url,wait_until='domcontentloaded',timeout=25000)
await page.wait_for_timeout(random.randint(600,1200))
title=await page.title()
if _is_cf(title):
entry.status='BLOCKED'; log(f'CF: {entry.name}'); return entry
html=await page.content()
if '/account/sign_in/' in html and 'branded-download' not in html:
entry.status='LOGIN_REQUIRED'; return entry
await dismiss_consent(page); await dismiss_popup(page)
meta=extract_meta(html,entry)
save_meta(meta,folder)
found=False
for sel in DL_SELS:
await dismiss_popup(page)
btn=await page.query_selector(sel)
if not btn: continue
href=(await btn.get_attribute('href') or '').strip()
if '/download/mirror/' in href:
entry.status='SKIPPED_MIRROR'; found=True; continue
found=True
try:
async with page.expect_download(timeout=18000) as di:
await btn.click(force=True)
await page.wait_for_timeout(600)
await dismiss_popup(page)
dl=await di.value
fn=dl.suggested_filename or 'build.schematic'
sp=folder/fn
await dl.save_as(str(sp))
if sp.stat().st_size>100:
entry.build_path=str(sp); entry.status='SUCCESS'
meta['schem_path']=str(sp); save_meta(meta,folder)
log(f'OK: {entry.name} ({sp.stat().st_size:,}B)')
return entry
sp.unlink(missing_ok=True)
except Exception as e:
log(f'Fail: {str(e)[:50]}')
if href.startswith('/'):
try:
async with page.expect_download(timeout=15000) as di2:
await page.goto(BASE_URL+href,wait_until='domcontentloaded',timeout=20000)
dl2=await di2.value
sp2=folder/(dl2.suggested_filename or 'build.schematic')
await dl2.save_as(str(sp2))
if sp2.stat().st_size>100:
entry.build_path=str(sp2); entry.status='SUCCESS'
meta['schem_path']=str(sp2); save_meta(meta,folder)
return entry
sp2.unlink(missing_ok=True)
except: pass
try:
await page.goto(entry.project_url,wait_until='domcontentloaded',timeout=20000)
await page.wait_for_timeout(600)
except: pass
continue
if not found: entry.status='NO_BTN'
elif entry.status not in ('SUCCESS','SKIPPED_MIRROR'): entry.status='FAILED'
return entry
except Exception as e:
entry.status='ERROR'; log(f'ERR {entry.name}: {str(e)[:60]}'); return entry
async def run_worker(wid, page, tracker, queue, out, stop_ev, col_done, stats, diag, rate):
log=lambda m: logger.info(f'[{RUN_ID}|W{wid}] {m}')
log('Started'); n=0
while not stop_ev.is_set():
if time.time()-stats['t0']>TIME_LIMIT_SECS:
stats['stop_reason']='TIME_LIMIT'; stop_ev.set(); break
try: entry=await asyncio.wait_for(queue.get(),timeout=15)
except asyncio.TimeoutError:
if col_done.is_set() and queue.empty(): break
continue
if entry is None: break
if tracker.is_scraped(entry.id): queue.task_done(); continue
await rate.throttle(log)
result=await download_build(page,entry,out,log,diag)
if result.status=='BLOCKED':
tracker.add(result); stats['fail']+=1; queue.task_done()
stats['stop_reason']='CF_BLOCK'; stop_ev.set(); break
if result.status=='LOGIN_REQUIRED':
tracker.add(result); queue.task_done()
stats['stop_reason']='LOGIN_REQUIRED'; stop_ev.set(); break
tracker.add(result); rate.record(); n+=1
if result.status=='SUCCESS': stats['success']+=1
elif 'SKIP' in result.status or 'MIRROR' in result.status: stats['skip']+=1
else: stats['fail']+=1
elapsed=time.time()-stats['t0']
hr=stats['success']/(elapsed/3600) if elapsed>0 else 0
rem=max(0,TIME_LIMIT_SECS-elapsed)
tot=stats['success']+stats['skip']+stats['fail']
print(f'[{RUN_ID}|W{wid}] OK:{stats["success"]} SK:{stats["skip"]} '
f'FL:{stats["fail"]} | {tot}/{stats["queued"]} | '
f'{hr:.0f}/hr | {int(rem//3600)}h{int(rem%3600//60):02d}m | Q:{queue.qsize()}')
queue.task_done()
if n>0 and n%50==0:
p=random.randint(30,60); log(f'Think {p}s'); await asyncio.sleep(p)
else: await asyncio.sleep(random.uniform(5.0,12.0))
log(f'Done ok={stats["success"]}'); await page.close()
def parse_links(html):
soup=BeautifulSoup(html,'html.parser'); results=[]; seen=set()
cands=soup.select('a.r-title')
if not cands:
for cont in ['.r-search-result','.project-listing','article']:
found=soup.select(f'{cont} a[href*="/project/"]')
if found: cands=found; break
if not cands:
cands=[a for a in soup.select('a[href*="/project/"]')
if len(a.get_text(strip=True))>2]
for a in cands:
href=a.get('href',''); name=a.get_text(strip=True)
if not href or '/project/' not in href: continue
m=re.search(r'/project/([^/?#]+)',href)
if not m: continue
bid=m.group(1)
if bid in seen: continue
seen.add(bid)
url=(BASE_URL+href) if href.startswith('/') else href
if not url.endswith('/'): url+='/'
results.append((bid,name,url))
return results
async def run_collector(ctx, tracker, queue, stop_ev, col_done, stats, out):
page=await new_tab(ctx)
pn=START_PAGE; consec=0; ddiag=False
print(f'Collector: {pn}{END_PAGE} (slice {SLICE_START}{SLICE_END})')
try:
while not stop_ev.is_set() and pn<=END_PAGE:
if time.time()-stats['t0']>TIME_LIMIT_SECS:
stats['stop_reason']='TIME_LIMIT'; stop_ev.set(); break
while queue.qsize()>120 and not stop_ev.is_set(): await asyncio.sleep(2)
if stop_ev.is_set(): break
url=f'{LISTING_URL}&p={pn}'
try:
await page.goto(url,wait_until='domcontentloaded',timeout=30000)
title=await page.title()
if _is_cf(title):
print(f'CF on listing p{pn}')
stats['stop_reason']=f'CF_LISTING_{pn}'; stop_ev.set()
write_state(pn-1,stats['stop_reason'],stats)
await page.close(); return
await dismiss_consent(page)
if not ddiag:
html=await page.content()
(out/'diag_listing.html').write_text(html,encoding='utf-8')
ddiag=True
html=await page.content(); links=parse_links(html)
if not links:
consec+=1
if consec>=3: stats['stop_reason']='EMPTY'; break
pn+=1; continue
consec=0; new_n=0
for bid,name,purl in links:
if tracker.is_scraped(bid): continue
await queue.put(BuildEntry(id=bid,name=name,project_url=purl))
stats['queued']+=1; new_n+=1
stats['last_page']=pn
pct=(pn-START_PAGE+1)/max(END_PAGE-START_PAGE+1,1)*100
print(f'p{pn}/{END_PAGE} ({pct:.1f}%) | {len(links)} found '
f'{new_n} new | Q:{queue.qsize()} | tot:{stats["queued"]}')
except Exception as e:
print(f'p{pn} err: {e}'); consec+=1
if consec>=5: stats['stop_reason']='PAGE_ERRORS'; break
pn+=1; await asyncio.sleep(random.uniform(2.0,4.0))
finally:
last=stats.get('last_page',pn-1)
write_state(last,stats.get('stop_reason','COMPLETE'),stats)
print(f'Collector done. last={last} queued={stats["queued"]}')
col_done.set(); await page.close()
async def main():
OUTPUT_DIR.mkdir(parents=True,exist_ok=True)
tracker=BuildTracker(OUTPUT_DIR/'log.csv')
stats={'queued':0,'success':0,'skip':0,'fail':0,
'last_page':START_PAGE,'stop_reason':'RUNNING','t0':time.time()}
if tracker.total>0: print(f'Resuming — {tracker.total} in log.csv')
rate=RateTracker(max_hr=MAX_RATE_PER_HR); diag=[False]
print(f'PMC v13.0 | {RUN_ID} | s{SESSION_ID}/{TOTAL_SESSIONS-1}')
async with async_playwright() as pw:
br,ctx=await setup_browser(pw)
cfp=await ctx.new_page()
if not await clear_cf(ctx,cfp):
write_state(START_PAGE-1,'CF_FAILED',stats); await br.close(); return
await cfp.close()
wps=[]
for i in range(DL_WORKERS):
wp=await new_tab(ctx)
try:
await wp.goto(BASE_URL+'/',wait_until='domcontentloaded',timeout=12000)
await dismiss_consent(wp)
except: pass
wps.append(wp)
queue=asyncio.Queue(); stop_ev=asyncio.Event(); col_done=asyncio.Event()
col_task=asyncio.create_task(
run_collector(ctx,tracker,queue,stop_ev,col_done,stats,OUTPUT_DIR))
wk_tasks=[]
for i,wp in enumerate(wps):
async def _s(wid=i,wpage=wp):
await asyncio.sleep(wid*2.0)
await run_worker(wid,wpage,tracker,queue,OUTPUT_DIR,
stop_ev,col_done,stats,diag,rate)
wk_tasks.append(asyncio.create_task(_s()))
await col_task
if stop_ev.is_set():
while not queue.empty():
try: queue.get_nowait(); queue.task_done()
except: break
for _ in range(DL_WORKERS): await queue.put(None)
try:
await asyncio.wait_for(
asyncio.gather(*wk_tasks,return_exceptions=True),timeout=120)
except asyncio.TimeoutError: pass
await br.close()
e=time.time()-stats['t0']
print(f'DONE {RUN_ID} | {e/60:.1f}min | {stats["success"]/(e/3600):.0f}/hr')
def run():
d=Display(visible=0,size=(1920,1080)); d.start()
try: asyncio.get_event_loop().run_until_complete(main())
finally: d.stop()
run()