Spaces:
Running
Running
| """バッチ処理とプログレス管理""" | |
| import asyncio | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| from tqdm import tqdm | |
| from src.processing.document_processor import DocumentProcessor | |
| from src.scraping.exceptions import ArticleNotFoundError, FetchError | |
| logger = logging.getLogger(__name__) | |
| class BatchProcessor: | |
| """バッチ処理とプログレス管理""" | |
| def __init__(self, wait_time: float = 1.0): | |
| """ | |
| Args: | |
| wait_time: リクエスト間の待機時間(秒) | |
| """ | |
| self.wait_time = wait_time | |
| self.document_processor = DocumentProcessor() | |
| async def process_urls_batch( | |
| self, | |
| urls: List[str], | |
| start_id: int = 1, | |
| mode: str = "memory", | |
| show_progress: bool = True, | |
| save_dir: Optional[Path] = None, | |
| verbose: bool = False | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| 複数URLをバッチ処理してメタデータを生成 | |
| Args: | |
| urls: 処理するURLのリスト | |
| start_id: 開始ID | |
| mode: "memory" または "save" | |
| show_progress: プログレス表示の有無 | |
| save_dir: saveモード時のMarkdownファイル保存先ディレクトリ | |
| verbose: 詳細ログを表示するか | |
| Returns: | |
| 生成されたドキュメントメタデータのリスト | |
| """ | |
| documents = [] | |
| success_count = 0 | |
| skip_count = 0 | |
| fail_count = 0 | |
| total = len(urls) | |
| end_id = start_id + total - 1 | |
| # saveモードの場合、保存先ディレクトリを設定 | |
| if mode == "save": | |
| if save_dir is None: | |
| save_dir = Path("data/raw") | |
| save_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"保存先ディレクトリ: {save_dir}") | |
| logger.info(f"スクレイピング開始: ID {start_id} から {end_id} まで(計{total}件)") | |
| logger.info(f"モード: {'メモリー保管' if mode == 'memory' else 'ファイル保存'}") | |
| logger.info(f"待機時間: {self.wait_time}秒\n") | |
| # プログレスバーの作成(単一行で更新) | |
| pbar = None | |
| if show_progress: | |
| pbar = tqdm( | |
| total=total, | |
| desc="処理中", | |
| leave=True, | |
| ncols=80, | |
| bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {postfix}]' | |
| ) | |
| try: | |
| for i, url in enumerate(urls): | |
| current_id = start_id + i | |
| try: | |
| # saveモードの場合はsave_dirを渡す | |
| if mode == "save": | |
| document = await self.document_processor.process_url(url, current_id, save_dir) | |
| else: | |
| document = await self.document_processor.process_url(url, current_id) | |
| documents.append(document) | |
| success_count += 1 | |
| # verboseモードの場合は詳細ログも表示 | |
| if verbose: | |
| # プログレスバーを一時的にクリアして詳細を表示 | |
| if pbar: | |
| pbar.clear() | |
| if mode == "save": | |
| logger.info(f" ✓ {url}: 保存完了 ({document['metadata']['file_name']})") | |
| else: | |
| logger.info(f" ✓ {url}: 処理完了 ({document['metadata']['file_name']})") | |
| if pbar: | |
| pbar.refresh() | |
| except ArticleNotFoundError: | |
| skip_count += 1 | |
| # verboseモードの場合は詳細ログも表示 | |
| if verbose: | |
| if pbar: | |
| pbar.clear() | |
| logger.warning(f" ⊘ {url}: 記事が見つかりません") | |
| if pbar: | |
| pbar.refresh() | |
| except FetchError as e: | |
| fail_count += 1 | |
| # verboseモードの場合は詳細ログも表示 | |
| if verbose: | |
| if pbar: | |
| pbar.clear() | |
| logger.error(f" ✗ {url}: 取得エラー: {str(e)}") | |
| if pbar: | |
| pbar.refresh() | |
| except Exception as e: | |
| fail_count += 1 | |
| # verboseモードの場合は詳細ログも表示 | |
| if verbose: | |
| if pbar: | |
| pbar.clear() | |
| logger.error(f" ✗ {url}: エラー: {str(e)}") | |
| if pbar: | |
| pbar.refresh() | |
| # プログレスバーを更新 | |
| if pbar: | |
| pbar.set_postfix({ | |
| '成功': success_count, | |
| 'スキップ': skip_count, | |
| '失敗': fail_count | |
| }) | |
| pbar.update(1) | |
| # 次のリクエストまで待機(最後のURLでは待機しない) | |
| if i < len(urls) - 1: | |
| await asyncio.sleep(self.wait_time) | |
| finally: | |
| if pbar: | |
| pbar.close() | |
| # サマリー表示 | |
| logger.info("\n" + "=" * 50) | |
| logger.info("処理結果サマリー") | |
| logger.info("=" * 50) | |
| logger.info(f"合計: {total}件") | |
| logger.info(f"成功: {success_count}件") | |
| logger.info(f"スキップ(記事なし): {skip_count}件") | |
| logger.info(f"失敗: {fail_count}件") | |
| if mode == "save" and success_count > 0: | |
| logger.info(f"\nMarkdownファイル保存先: {save_dir}") | |
| return documents | |
| def save_metadata(self, documents: List[Dict[str, Any]], output_path: Path): | |
| """メタデータをJSON形式で保存 | |
| Args: | |
| documents: 保存するドキュメントメタデータのリスト | |
| output_path: 出力ファイルパス | |
| """ | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(documents, f, ensure_ascii=False, indent=2) | |
| logger.info(f"\nメタデータを保存しました: {output_path}") | |
| logger.info(f"保存件数: {len(documents)}件") | |