EmpathemePotionBot / src /processing /batch_processor.py
tktm8's picture
Upload 59 files
fb05e78 verified
"""バッチ処理とプログレス管理"""
import asyncio
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
from tqdm import tqdm
from src.processing.document_processor import DocumentProcessor
from src.scraping.exceptions import ArticleNotFoundError, FetchError
logger = logging.getLogger(__name__)
class BatchProcessor:
"""バッチ処理とプログレス管理"""
def __init__(self, wait_time: float = 1.0):
"""
Args:
wait_time: リクエスト間の待機時間(秒)
"""
self.wait_time = wait_time
self.document_processor = DocumentProcessor()
async def process_urls_batch(
self,
urls: List[str],
start_id: int = 1,
mode: str = "memory",
show_progress: bool = True,
save_dir: Optional[Path] = None,
verbose: bool = False
) -> List[Dict[str, Any]]:
"""
複数URLをバッチ処理してメタデータを生成
Args:
urls: 処理するURLのリスト
start_id: 開始ID
mode: "memory" または "save"
show_progress: プログレス表示の有無
save_dir: saveモード時のMarkdownファイル保存先ディレクトリ
verbose: 詳細ログを表示するか
Returns:
生成されたドキュメントメタデータのリスト
"""
documents = []
success_count = 0
skip_count = 0
fail_count = 0
total = len(urls)
end_id = start_id + total - 1
# saveモードの場合、保存先ディレクトリを設定
if mode == "save":
if save_dir is None:
save_dir = Path("data/raw")
save_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"保存先ディレクトリ: {save_dir}")
logger.info(f"スクレイピング開始: ID {start_id} から {end_id} まで(計{total}件)")
logger.info(f"モード: {'メモリー保管' if mode == 'memory' else 'ファイル保存'}")
logger.info(f"待機時間: {self.wait_time}秒\n")
# プログレスバーの作成(単一行で更新)
pbar = None
if show_progress:
pbar = tqdm(
total=total,
desc="処理中",
leave=True,
ncols=80,
bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {postfix}]'
)
try:
for i, url in enumerate(urls):
current_id = start_id + i
try:
# saveモードの場合はsave_dirを渡す
if mode == "save":
document = await self.document_processor.process_url(url, current_id, save_dir)
else:
document = await self.document_processor.process_url(url, current_id)
documents.append(document)
success_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
# プログレスバーを一時的にクリアして詳細を表示
if pbar:
pbar.clear()
if mode == "save":
logger.info(f" ✓ {url}: 保存完了 ({document['metadata']['file_name']})")
else:
logger.info(f" ✓ {url}: 処理完了 ({document['metadata']['file_name']})")
if pbar:
pbar.refresh()
except ArticleNotFoundError:
skip_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
if pbar:
pbar.clear()
logger.warning(f" ⊘ {url}: 記事が見つかりません")
if pbar:
pbar.refresh()
except FetchError as e:
fail_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
if pbar:
pbar.clear()
logger.error(f" ✗ {url}: 取得エラー: {str(e)}")
if pbar:
pbar.refresh()
except Exception as e:
fail_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
if pbar:
pbar.clear()
logger.error(f" ✗ {url}: エラー: {str(e)}")
if pbar:
pbar.refresh()
# プログレスバーを更新
if pbar:
pbar.set_postfix({
'成功': success_count,
'スキップ': skip_count,
'失敗': fail_count
})
pbar.update(1)
# 次のリクエストまで待機(最後のURLでは待機しない)
if i < len(urls) - 1:
await asyncio.sleep(self.wait_time)
finally:
if pbar:
pbar.close()
# サマリー表示
logger.info("\n" + "=" * 50)
logger.info("処理結果サマリー")
logger.info("=" * 50)
logger.info(f"合計: {total}件")
logger.info(f"成功: {success_count}件")
logger.info(f"スキップ(記事なし): {skip_count}件")
logger.info(f"失敗: {fail_count}件")
if mode == "save" and success_count > 0:
logger.info(f"\nMarkdownファイル保存先: {save_dir}")
return documents
def save_metadata(self, documents: List[Dict[str, Any]], output_path: Path):
"""メタデータをJSON形式で保存
Args:
documents: 保存するドキュメントメタデータのリスト
output_path: 出力ファイルパス
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(documents, f, ensure_ascii=False, indent=2)
logger.info(f"\nメタデータを保存しました: {output_path}")
logger.info(f"保存件数: {len(documents)}件")