tktm8's picture
Upload 59 files
fb05e78 verified
"""
バッチスクレイピング処理モジュール
"""
import asyncio
import logging
import sys
from enum import Enum
from pathlib import Path
from typing import List, Tuple, Literal, Optional
from tqdm import tqdm
from src.scraping.exceptions import ArticleNotFoundError, FetchError
from src.scraping.pipeline import run as run_pipeline
# ロガーの設定
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)
class ScrapeStatus(Enum):
"""スクレイピング結果のステータス"""
SUCCESS = "success"
SKIPPED = "skipped" # 記事が存在しない
FAILED = "failed" # その他のエラー
async def scrape_single_page(url: str, out_dir: Path) -> Tuple[str, ScrapeStatus, str]:
"""
単一ページのスクレイピング
Returns:
(url, status, message) のタプル
"""
try:
path = await run_pipeline(url, out_dir)
return (url, ScrapeStatus.SUCCESS, f"保存完了: {path}")
except ArticleNotFoundError:
return (url, ScrapeStatus.SKIPPED, "記事が見つかりません")
except FetchError as e:
return (url, ScrapeStatus.FAILED, f"取得エラー: {str(e)}")
except Exception as e:
return (url, ScrapeStatus.FAILED, f"エラー: {str(e)}")
async def batch_scrape(
start_id: int,
end_id: int,
out_dir: Path,
delay: float = 1.0,
base_url: str = "https://ja.empatheme.org/potion",
verbose: bool = False
) -> List[Tuple[str, ScrapeStatus, str]]:
"""
指定範囲のIDでバッチスクレイピング実行
Args:
start_id: 開始ID
end_id: 終了ID(含む)
out_dir: 出力ディレクトリ
delay: 各リクエスト間の待機時間(秒)
base_url: ベースURL
verbose: 詳細ログを表示するか
Returns:
各URLの処理結果のリスト
"""
results = []
total = end_id - start_id + 1
logger.info(f"スクレイピング開始: ID {start_id} から {end_id} まで(計{total}件)")
logger.info(f"出力先: {out_dir}")
logger.info(f"待機時間: {delay}秒\n")
# カウンター初期化
success_count = 0
skipped_count = 0
failed_count = 0
# プログレスバーの作成(単一行で更新)
pbar = tqdm(
total=total,
desc="処理中",
leave=True,
ncols=80,
bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {postfix}]'
)
try:
for page_id in range(start_id, end_id + 1):
url = f"{base_url}/{page_id:03d}/"
# スクレイピング実行
result = await scrape_single_page(url, out_dir)
results.append(result)
# カウンター更新
url, status, message = result
if status == ScrapeStatus.SUCCESS:
success_count += 1
elif status == ScrapeStatus.SKIPPED:
skipped_count += 1
else: # FAILED
failed_count += 1
# プログレスバーの説明を更新
pbar.set_postfix({
'成功': success_count,
'スキップ': skipped_count,
'失敗': failed_count
})
# verboseモードの場合は詳細ログも表示
if verbose:
# プログレスバーを一時的にクリアして詳細を表示
pbar.clear()
if status == ScrapeStatus.SUCCESS:
print(f" ✓ {url}: {message}")
elif status == ScrapeStatus.SKIPPED:
print(f" ⊘ {url}: {message}")
else: # FAILED
print(f" ✗ {url}: {message}")
pbar.refresh()
# プログレスバーを進める
pbar.update(1)
# 最後のページでなければ待機
if page_id < end_id:
await asyncio.sleep(delay)
finally:
pbar.close()
return results
def print_summary(results: List[Tuple[str, ScrapeStatus, str]]) -> None:
"""処理結果のサマリーを表示"""
total = len(results)
success_count = sum(1 for _, status, _ in results if status == ScrapeStatus.SUCCESS)
skipped_count = sum(1 for _, status, _ in results if status == ScrapeStatus.SKIPPED)
failed_count = sum(1 for _, status, _ in results if status == ScrapeStatus.FAILED)
logger.info("\n" + "="*50)
logger.info("処理結果サマリー")
logger.info("="*50)
logger.info(f"合計: {total}件")
logger.info(f"成功: {success_count}件")
logger.info(f"スキップ(記事なし): {skipped_count}件")
logger.info(f"失敗: {failed_count}件")
# スキップしたURL(記事が存在しない)の表示
if skipped_count > 0:
logger.info("\nスキップしたURL(記事が存在しない):")
for url, status, message in results:
if status == ScrapeStatus.SKIPPED:
logger.info(f" ⊘ {url}")
# 失敗したURLの詳細表示
if failed_count > 0:
logger.info("\n失敗したURL:")
for url, status, message in results:
if status == ScrapeStatus.FAILED:
logger.info(f" ✗ {url}: {message}")