EmpathemeBot_dev / src /processing /batch_processor.py
tktm8's picture
Upload 59 files
fb05e78 verified
raw
history blame
7.03 kB
"""バッチ処理とプログレス管理"""
import asyncio
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
from tqdm import tqdm
from src.processing.document_processor import DocumentProcessor
from src.scraping.exceptions import ArticleNotFoundError, FetchError
logger = logging.getLogger(__name__)
class BatchProcessor:
"""バッチ処理とプログレス管理"""
def __init__(self, wait_time: float = 1.0):
"""
Args:
wait_time: リクエスト間の待機時間(秒)
"""
self.wait_time = wait_time
self.document_processor = DocumentProcessor()
async def process_urls_batch(
self,
urls: List[str],
start_id: int = 1,
mode: str = "memory",
show_progress: bool = True,
save_dir: Optional[Path] = None,
verbose: bool = False
) -> List[Dict[str, Any]]:
"""
複数URLをバッチ処理してメタデータを生成
Args:
urls: 処理するURLのリスト
start_id: 開始ID
mode: "memory" または "save"
show_progress: プログレス表示の有無
save_dir: saveモード時のMarkdownファイル保存先ディレクトリ
verbose: 詳細ログを表示するか
Returns:
生成されたドキュメントメタデータのリスト
"""
documents = []
success_count = 0
skip_count = 0
fail_count = 0
total = len(urls)
end_id = start_id + total - 1
# saveモードの場合、保存先ディレクトリを設定
if mode == "save":
if save_dir is None:
save_dir = Path("data/raw")
save_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"保存先ディレクトリ: {save_dir}")
logger.info(f"スクレイピング開始: ID {start_id} から {end_id} まで(計{total}件)")
logger.info(f"モード: {'メモリー保管' if mode == 'memory' else 'ファイル保存'}")
logger.info(f"待機時間: {self.wait_time}秒\n")
# プログレスバーの作成(単一行で更新)
pbar = None
if show_progress:
pbar = tqdm(
total=total,
desc="処理中",
leave=True,
ncols=80,
bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {postfix}]'
)
try:
for i, url in enumerate(urls):
current_id = start_id + i
try:
# saveモードの場合はsave_dirを渡す
if mode == "save":
document = await self.document_processor.process_url(url, current_id, save_dir)
else:
document = await self.document_processor.process_url(url, current_id)
documents.append(document)
success_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
# プログレスバーを一時的にクリアして詳細を表示
if pbar:
pbar.clear()
if mode == "save":
logger.info(f" ✓ {url}: 保存完了 ({document['metadata']['file_name']})")
else:
logger.info(f" ✓ {url}: 処理完了 ({document['metadata']['file_name']})")
if pbar:
pbar.refresh()
except ArticleNotFoundError:
skip_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
if pbar:
pbar.clear()
logger.warning(f" ⊘ {url}: 記事が見つかりません")
if pbar:
pbar.refresh()
except FetchError as e:
fail_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
if pbar:
pbar.clear()
logger.error(f" ✗ {url}: 取得エラー: {str(e)}")
if pbar:
pbar.refresh()
except Exception as e:
fail_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
if pbar:
pbar.clear()
logger.error(f" ✗ {url}: エラー: {str(e)}")
if pbar:
pbar.refresh()
# プログレスバーを更新
if pbar:
pbar.set_postfix({
'成功': success_count,
'スキップ': skip_count,
'失敗': fail_count
})
pbar.update(1)
# 次のリクエストまで待機(最後のURLでは待機しない)
if i < len(urls) - 1:
await asyncio.sleep(self.wait_time)
finally:
if pbar:
pbar.close()
# サマリー表示
logger.info("\n" + "=" * 50)
logger.info("処理結果サマリー")
logger.info("=" * 50)
logger.info(f"合計: {total}件")
logger.info(f"成功: {success_count}件")
logger.info(f"スキップ(記事なし): {skip_count}件")
logger.info(f"失敗: {fail_count}件")
if mode == "save" and success_count > 0:
logger.info(f"\nMarkdownファイル保存先: {save_dir}")
return documents
def save_metadata(self, documents: List[Dict[str, Any]], output_path: Path):
"""メタデータをJSON形式で保存
Args:
documents: 保存するドキュメントメタデータのリスト
output_path: 出力ファイルパス
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(documents, f, ensure_ascii=False, indent=2)
logger.info(f"\nメタデータを保存しました: {output_path}")
logger.info(f"保存件数: {len(documents)}件")