#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 진보당 크롤러 - 고성능 비동기 버전 + 허깅페이스 자동 업로드 - jinboparty.com 자체 CMS 사용 - 보도자료: 카드형 레이아웃 (div.img_list_item) - 논평/모두발언: 테이블형 레이아웃 (div#moTable) - js_board_view('ID') → /pages/?p=...&b=...&bn=ID&m=read 패턴 """ import os import json import re import asyncio from datetime import datetime, timedelta from typing import List, Dict, Optional import pandas as pd from tqdm.asyncio import tqdm as async_tqdm import aiohttp from bs4 import BeautifulSoup from dotenv import load_dotenv from huggingface_hub import HfApi, login from datasets import Dataset, load_dataset load_dotenv() class JinboAsyncCrawler: def __init__(self, config_path="crawler_config.json"): self.base_url = "https://jinboparty.com" self.party_name = "진보당" self.config_path = config_path self.state_path = "crawler_state.json" self.load_config() self.hf_token = os.getenv("HF_TOKEN") self.hf_repo_id = os.getenv("HF_REPO_ID_JINBO", "jinbo-press-releases") self.semaphore = asyncio.Semaphore(10) def load_config(self): # boards 값은 {"p": "...", "b": "..."} 형태의 dict default_config = { "boards": { "보도자료": {"p": "286", "b": "b_1_111"}, "논평": {"p": "15", "b": "b_1_2"}, "모두발언": {"p": "14", "b": "b_1_1"} }, "start_date": "2017-10-14", "max_pages": 10000, "concurrent_requests": 10, "request_delay": 0.3, "output_path": "./data" } if os.path.exists(self.config_path): with open(self.config_path, 'r', encoding='utf-8') as f: config = json.load(f) self.config = config.get('jinbo', default_config) else: self.config = default_config self.boards = self.config["boards"] self.start_date = self.config["start_date"] self.max_pages = self.config["max_pages"] self.output_path = self.config["output_path"] def load_state(self) -> Dict: if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: state = json.load(f) return state.get('jinbo', {}) return {} def save_state(self, state: Dict): all_state = {} if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: all_state = json.load(f) all_state['jinbo'] = state with open(self.state_path, 'w', encoding='utf-8') as f: json.dump(all_state, f, ensure_ascii=False, indent=2) @staticmethod def parse_date(date_str: str) -> Optional[datetime]: """YYYY.MM.DD 또는 YYYY-MM-DD 파싱""" date_str = date_str.strip() for fmt in ('%Y.%m.%d', '%Y-%m-%d'): try: return datetime.strptime(date_str[:10], fmt) except: continue return None @staticmethod def clean_text(text: str) -> str: text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '') return text.strip() @staticmethod def extract_board_id(href: str) -> Optional[str]: """js_board_view('ID') 에서 ID 추출""" match = re.search(r"js_board_view\('(\d+)'\)", href) return match.group(1) if match else None async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str, max_retries: int = 3) -> Optional[str]: async with self.semaphore: for attempt in range(max_retries): try: await asyncio.sleep(self.config.get("request_delay", 0.3)) async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response: if response.status == 200: return await response.text() except Exception: if attempt < max_retries - 1: await asyncio.sleep(1) else: return None return None async def fetch_list_page(self, session: aiohttp.ClientSession, board_name: str, board_cfg: Dict, page_num: int, start_date: datetime, end_date: datetime) -> tuple: p = board_cfg['p'] b = board_cfg['b'] url = f"{self.base_url}/pages/index.php?nPage={page_num}&p={p}&b={b}" html = await self.fetch_with_retry(session, url) if not html: return [], False soup = BeautifulSoup(html, 'html.parser') data = [] stop_flag = False # ── 카드형 레이아웃 (보도자료) ────────────────────────────── card_items = soup.select('div.img_list_item') if card_items: for item in card_items: try: link = item.select_one('a[href]') if not link: continue bn = self.extract_board_id(link.get('href', '')) if not bn: continue title_el = item.select_one('h4._tit span') title = title_el.get_text(strip=True) if title_el else "" # 날짜: icon_cal 다음 span date_str = "" for span in item.select('div.item_bottom span'): text = span.get_text(strip=True) if re.match(r'\d{4}\.\d{2}\.\d{2}', text): date_str = text[:10] break if not date_str: continue article_date = self.parse_date(date_str) if not article_date: continue if article_date < start_date: stop_flag = True break if article_date > end_date: continue detail_url = f"{self.base_url}/pages/?p={p}&b={b}&bn={bn}&m=read" data.append({ 'board_name': board_name, 'title': title, 'category': board_name, 'date': article_date.strftime('%Y-%m-%d'), 'url': detail_url }) except: continue return data, stop_flag # ── 테이블형 레이아웃 (논평·모두발언) ────────────────────── table_items = soup.select('div#moTable li:not(.t_head)') if table_items: for item in table_items: try: link = item.select_one('div.tb_title_area a') if not link: continue bn = self.extract_board_id(link.get('href', '')) if not bn: continue title_el = item.select_one('p.title') title = title_el.get_text(strip=True) if title_el else "" # 날짜: div.col.wid_140 ("등록일 YYYY.MM.DD") date_div = item.select_one('div.col.wid_140') date_str = "" if date_div: raw = re.sub(r'등록일\s*', '', date_div.get_text(strip=True)).strip() date_str = raw[:10] if not date_str: continue article_date = self.parse_date(date_str) if not article_date: continue if article_date < start_date: stop_flag = True break if article_date > end_date: continue detail_url = f"{self.base_url}/pages/?p={p}&b={b}&bn={bn}&m=read" data.append({ 'board_name': board_name, 'title': title, 'category': board_name, 'date': article_date.strftime('%Y-%m-%d'), 'url': detail_url }) except: continue return data, stop_flag # 둘 다 없으면 빈 페이지 return [], True async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict: html = await self.fetch_with_retry(session, url) if not html: return {'text': "본문 조회 실패", 'writer': ""} soup = BeautifulSoup(html, 'html.parser') text_parts = [] writer = "" # 본문: div.content_box (class="td wid_full content_box") contents_div = soup.select_one('div.content_box') if contents_div: for p in contents_div.find_all('p'): cleaned = self.clean_text(p.get_text(strip=True)) if cleaned: text_parts.append(cleaned) # 작성자: ul.info_list li 중 "작성자" 항목 for li in soup.select('ul.info_list li'): b_tag = li.find('b') if b_tag and '작성자' in b_tag.get_text(): writer = li.get_text(strip=True).replace(b_tag.get_text(strip=True), '').strip() break return {'text': '\n'.join(text_parts), 'writer': writer} async def collect_board(self, board_name: str, board_cfg: Dict, start_date: str, end_date: str) -> List[Dict]: start_dt = datetime.strptime(start_date, '%Y-%m-%d') end_dt = datetime.strptime(end_date, '%Y-%m-%d') print(f"\n▶ [{board_name}] 목록 수집 시작...") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'ko-KR,ko;q=0.9', } async with aiohttp.ClientSession(headers=headers) as session: all_items = [] page_num = 1 empty_pages = 0 max_empty_pages = 3 with async_tqdm(desc=f"[{board_name}] 목록", unit="페이지") as pbar: while page_num <= self.max_pages: items, stop_flag = await self.fetch_list_page( session, board_name, board_cfg, page_num, start_dt, end_dt ) if not items: empty_pages += 1 if empty_pages >= max_empty_pages or stop_flag: break else: empty_pages = 0 all_items.extend(items) pbar.update(1) pbar.set_postfix({"수집": len(all_items)}) if stop_flag: break page_num += 1 print(f" ✓ {len(all_items)}개 항목 발견") if all_items: print(f" ▶ 상세 페이지 수집 중...") tasks = [self.fetch_article_detail(session, item['url']) for item in all_items] details = [] for coro in async_tqdm(asyncio.as_completed(tasks), total=len(tasks), desc=f"[{board_name}] 상세"): detail = await coro details.append(detail) for item, detail in zip(all_items, details): item.update(detail) print(f"✓ [{board_name}] 완료: {len(all_items)}개") return all_items async def collect_all(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame: if not end_date: end_date = datetime.now().strftime('%Y-%m-%d') if not start_date: start_date = self.start_date print(f"\n{'='*60}") print(f"진보당 보도자료 수집 - 비동기 고성능 버전") print(f"기간: {start_date} ~ {end_date}") print(f"{'='*60}") tasks = [ self.collect_board(board_name, board_cfg, start_date, end_date) for board_name, board_cfg in self.boards.items() ] results = await asyncio.gather(*tasks) all_data = [] for items in results: all_data.extend(items) if not all_data: print("\n⚠️ 수집된 데이터 없음") return pd.DataFrame() df = pd.DataFrame(all_data) df = df[['board_name', 'title', 'category', 'date', 'writer', 'text', 'url']] df = df[(df['title'] != "") & (df['text'] != "")] df['date'] = pd.to_datetime(df['date'], errors='coerce') print(f"\n✓ 총 {len(df)}개 수집 완료") return df def save_local(self, df: pd.DataFrame): os.makedirs(self.output_path, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv") xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx") df.to_csv(csv_path, index=False, encoding='utf-8-sig') df.to_excel(xlsx_path, index=False, engine='openpyxl') print(f"✓ CSV: {csv_path}") print(f"✓ Excel: {xlsx_path}") def upload_to_huggingface(self, df: pd.DataFrame): if not self.hf_token: print("\n⚠️ HF_TOKEN이 설정되지 않았습니다.") return print(f"\n▶ 허깅페이스 업로드 중... (repo: {self.hf_repo_id})") try: login(token=self.hf_token) new_dataset = Dataset.from_pandas(df) try: existing_dataset = load_dataset(self.hf_repo_id, split='train') existing_df = existing_dataset.to_pandas() combined_df = pd.concat([existing_df, df], ignore_index=True) combined_df = combined_df.drop_duplicates(subset=['url'], keep='last') combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True) final_dataset = Dataset.from_pandas(combined_df) print(f" ✓ 병합 후: {len(final_dataset)}개") except: final_dataset = new_dataset print(f" ℹ️ 신규 데이터셋 생성") final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token) print(f"✓ 허깅페이스 업로드 완료!") except Exception as e: print(f"✗ 업로드 실패: {e}") async def run_incremental(self): state = self.load_state() last_date = state.get('last_crawl_date') if last_date: start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d') print(f"📅 증분 업데이트: {start_date} 이후 데이터만 수집") else: start_date = self.start_date print(f"📅 전체 수집: {start_date}부터") end_date = datetime.now().strftime('%Y-%m-%d') df = await self.collect_all(start_date, end_date) if df.empty: print("✓ 새로운 데이터 없음") return self.save_local(df) self.upload_to_huggingface(df) state['last_crawl_date'] = end_date state['last_crawl_time'] = datetime.now().isoformat() state['last_count'] = len(df) self.save_state(state) print(f"\n{'='*60}\n✓ 완료!\n{'='*60}\n") async def main(): crawler = JinboAsyncCrawler() await crawler.run_incremental() if __name__ == "__main__": asyncio.run(main())