"""バッチ処理とプログレス管理""" import asyncio import json import logging from pathlib import Path from typing import List, Dict, Any, Optional from tqdm import tqdm from src.processing.document_processor import DocumentProcessor from src.scraping.exceptions import ArticleNotFoundError, FetchError logger = logging.getLogger(__name__) class BatchProcessor: """バッチ処理とプログレス管理""" def __init__(self, wait_time: float = 1.0): """ Args: wait_time: リクエスト間の待機時間(秒) """ self.wait_time = wait_time self.document_processor = DocumentProcessor() async def process_urls_batch( self, urls: List[str], start_id: int = 1, mode: str = "memory", show_progress: bool = True, save_dir: Optional[Path] = None, verbose: bool = False ) -> List[Dict[str, Any]]: """ 複数URLをバッチ処理してメタデータを生成 Args: urls: 処理するURLのリスト start_id: 開始ID mode: "memory" または "save" show_progress: プログレス表示の有無 save_dir: saveモード時のMarkdownファイル保存先ディレクトリ verbose: 詳細ログを表示するか Returns: 生成されたドキュメントメタデータのリスト """ documents = [] success_count = 0 skip_count = 0 fail_count = 0 total = len(urls) end_id = start_id + total - 1 # saveモードの場合、保存先ディレクトリを設定 if mode == "save": if save_dir is None: save_dir = Path("data/raw") save_dir.mkdir(parents=True, exist_ok=True) logger.info(f"保存先ディレクトリ: {save_dir}") logger.info(f"スクレイピング開始: ID {start_id} から {end_id} まで(計{total}件)") logger.info(f"モード: {'メモリー保管' if mode == 'memory' else 'ファイル保存'}") logger.info(f"待機時間: {self.wait_time}秒\n") # プログレスバーの作成(単一行で更新) pbar = None if show_progress: pbar = tqdm( total=total, desc="処理中", leave=True, ncols=80, bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {postfix}]' ) try: for i, url in enumerate(urls): current_id = start_id + i try: # saveモードの場合はsave_dirを渡す if mode == "save": document = await self.document_processor.process_url(url, current_id, save_dir) else: document = await self.document_processor.process_url(url, current_id) documents.append(document) success_count += 1 # verboseモードの場合は詳細ログも表示 if verbose: # プログレスバーを一時的にクリアして詳細を表示 if pbar: pbar.clear() if mode == "save": logger.info(f" ✓ {url}: 保存完了 ({document['metadata']['file_name']})") else: logger.info(f" ✓ {url}: 処理完了 ({document['metadata']['file_name']})") if pbar: pbar.refresh() except ArticleNotFoundError: skip_count += 1 # verboseモードの場合は詳細ログも表示 if verbose: if pbar: pbar.clear() logger.warning(f" ⊘ {url}: 記事が見つかりません") if pbar: pbar.refresh() except FetchError as e: fail_count += 1 # verboseモードの場合は詳細ログも表示 if verbose: if pbar: pbar.clear() logger.error(f" ✗ {url}: 取得エラー: {str(e)}") if pbar: pbar.refresh() except Exception as e: fail_count += 1 # verboseモードの場合は詳細ログも表示 if verbose: if pbar: pbar.clear() logger.error(f" ✗ {url}: エラー: {str(e)}") if pbar: pbar.refresh() # プログレスバーを更新 if pbar: pbar.set_postfix({ '成功': success_count, 'スキップ': skip_count, '失敗': fail_count }) pbar.update(1) # 次のリクエストまで待機(最後のURLでは待機しない) if i < len(urls) - 1: await asyncio.sleep(self.wait_time) finally: if pbar: pbar.close() # サマリー表示 logger.info("\n" + "=" * 50) logger.info("処理結果サマリー") logger.info("=" * 50) logger.info(f"合計: {total}件") logger.info(f"成功: {success_count}件") logger.info(f"スキップ(記事なし): {skip_count}件") logger.info(f"失敗: {fail_count}件") if mode == "save" and success_count > 0: logger.info(f"\nMarkdownファイル保存先: {save_dir}") return documents def save_metadata(self, documents: List[Dict[str, Any]], output_path: Path): """メタデータをJSON形式で保存 Args: documents: 保存するドキュメントメタデータのリスト output_path: 出力ファイルパス """ output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(documents, f, ensure_ascii=False, indent=2) logger.info(f"\nメタデータを保存しました: {output_path}") logger.info(f"保存件数: {len(documents)}件")