Spaces:
Running
Running
File size: 7,026 Bytes
fb05e78 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
"""バッチ処理とプログレス管理"""
import asyncio
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
from tqdm import tqdm
from src.processing.document_processor import DocumentProcessor
from src.scraping.exceptions import ArticleNotFoundError, FetchError
logger = logging.getLogger(__name__)
class BatchProcessor:
"""バッチ処理とプログレス管理"""
def __init__(self, wait_time: float = 1.0):
"""
Args:
wait_time: リクエスト間の待機時間(秒)
"""
self.wait_time = wait_time
self.document_processor = DocumentProcessor()
async def process_urls_batch(
self,
urls: List[str],
start_id: int = 1,
mode: str = "memory",
show_progress: bool = True,
save_dir: Optional[Path] = None,
verbose: bool = False
) -> List[Dict[str, Any]]:
"""
複数URLをバッチ処理してメタデータを生成
Args:
urls: 処理するURLのリスト
start_id: 開始ID
mode: "memory" または "save"
show_progress: プログレス表示の有無
save_dir: saveモード時のMarkdownファイル保存先ディレクトリ
verbose: 詳細ログを表示するか
Returns:
生成されたドキュメントメタデータのリスト
"""
documents = []
success_count = 0
skip_count = 0
fail_count = 0
total = len(urls)
end_id = start_id + total - 1
# saveモードの場合、保存先ディレクトリを設定
if mode == "save":
if save_dir is None:
save_dir = Path("data/raw")
save_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"保存先ディレクトリ: {save_dir}")
logger.info(f"スクレイピング開始: ID {start_id} から {end_id} まで(計{total}件)")
logger.info(f"モード: {'メモリー保管' if mode == 'memory' else 'ファイル保存'}")
logger.info(f"待機時間: {self.wait_time}秒\n")
# プログレスバーの作成(単一行で更新)
pbar = None
if show_progress:
pbar = tqdm(
total=total,
desc="処理中",
leave=True,
ncols=80,
bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {postfix}]'
)
try:
for i, url in enumerate(urls):
current_id = start_id + i
try:
# saveモードの場合はsave_dirを渡す
if mode == "save":
document = await self.document_processor.process_url(url, current_id, save_dir)
else:
document = await self.document_processor.process_url(url, current_id)
documents.append(document)
success_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
# プログレスバーを一時的にクリアして詳細を表示
if pbar:
pbar.clear()
if mode == "save":
logger.info(f" ✓ {url}: 保存完了 ({document['metadata']['file_name']})")
else:
logger.info(f" ✓ {url}: 処理完了 ({document['metadata']['file_name']})")
if pbar:
pbar.refresh()
except ArticleNotFoundError:
skip_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
if pbar:
pbar.clear()
logger.warning(f" ⊘ {url}: 記事が見つかりません")
if pbar:
pbar.refresh()
except FetchError as e:
fail_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
if pbar:
pbar.clear()
logger.error(f" ✗ {url}: 取得エラー: {str(e)}")
if pbar:
pbar.refresh()
except Exception as e:
fail_count += 1
# verboseモードの場合は詳細ログも表示
if verbose:
if pbar:
pbar.clear()
logger.error(f" ✗ {url}: エラー: {str(e)}")
if pbar:
pbar.refresh()
# プログレスバーを更新
if pbar:
pbar.set_postfix({
'成功': success_count,
'スキップ': skip_count,
'失敗': fail_count
})
pbar.update(1)
# 次のリクエストまで待機(最後のURLでは待機しない)
if i < len(urls) - 1:
await asyncio.sleep(self.wait_time)
finally:
if pbar:
pbar.close()
# サマリー表示
logger.info("\n" + "=" * 50)
logger.info("処理結果サマリー")
logger.info("=" * 50)
logger.info(f"合計: {total}件")
logger.info(f"成功: {success_count}件")
logger.info(f"スキップ(記事なし): {skip_count}件")
logger.info(f"失敗: {fail_count}件")
if mode == "save" and success_count > 0:
logger.info(f"\nMarkdownファイル保存先: {save_dir}")
return documents
def save_metadata(self, documents: List[Dict[str, Any]], output_path: Path):
"""メタデータをJSON形式で保存
Args:
documents: 保存するドキュメントメタデータのリスト
output_path: 出力ファイルパス
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(documents, f, ensure_ascii=False, indent=2)
logger.info(f"\nメタデータを保存しました: {output_path}")
logger.info(f"保存件数: {len(documents)}件")
|