Agent_StudioDocker / modules /rag_indexer.py
Corin1998's picture
Update modules/rag_indexer.py
5b7266b verified
import os
import json
import requests
from pathlib import Path
from typing import List, Dict, Any, Optional
from bs4 import BeautifulSoup
# utils から書き込み先と分割関数を取得
from .utils import ensure_dirs, data_dir, chunk_text
# 依存は遅延ロード(モデル初期化は重いので)
_model = None
def _embedder():
"""
SentenceTransformer を遅延初期化。
- すべてのキャッシュは utils.ensure_dirs() 側で /tmp 等の書き込み可パスへ固定済み
- 環境によってはネット/モデルDLが禁止のことがあるため、呼び出し側での強制ウォームアップはしない
"""
global _model
if _model is not None:
return _model
ensure_dirs()
cache_base = data_dir() / "hf_cache"
# ここで環境変数も最終確認(多重防御)
os.environ.setdefault("HF_HOME", str(data_dir() / "hf_home"))
os.environ.setdefault("HUGGINGFACE_HUB_CACHE", str(cache_base))
os.environ.setdefault("TRANSFORMERS_CACHE", str(cache_base))
os.environ.setdefault("SENTENCE_TRANSFORMERS_HOME", str(cache_base))
os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1")
os.environ.setdefault("HF_TOKEN", "")
from sentence_transformers import SentenceTransformer
# ローカル同梱モデルがあれば優先(ネット不可時の対策)
local_model_dir = data_dir() / "models" / "all-MiniLM-L6-v2"
model_name = str(local_model_dir) if local_model_dir.exists() else os.getenv(
"EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2"
)
_model = SentenceTransformer(model_name, cache_folder=str(cache_base))
return _model
def _write_chunks(rows: List[Dict[str, Any]]) -> int:
"""
chunks.jsonl に追記(既存は破棄せず、簡易に追加)
1行: {"text": "...", "source": "path_or_url", "meta": {...}}
"""
ensure_dirs()
out = data_dir() / "chunks.jsonl"
with open(out, "a", encoding="utf-8") as f:
for r in rows:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
return len(rows)
def _load_text_from_url(url: str) -> str:
try:
r = requests.get(url, timeout=15)
r.raise_for_status()
html = r.text
soup = BeautifulSoup(html, "html.parser")
# タイトル + 本文テキスト(簡易)
title = (soup.title.string.strip() if soup.title and soup.title.string else "")
text = soup.get_text("\n", strip=True)
return (title + "\n\n" + text).strip()
except Exception:
return ""
def _load_text_from_file(path: Path) -> str:
# テキスト/Markdown想定(PDF等は最小構成では未対応)
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
except Exception:
return ""
def index_files_and_urls(file_paths: Optional[List[str]] = None, urls: Optional[List[str]] = None) -> str:
"""
- 受け取ったファイルとURLからテキストを抽出し、チャンク化して chunks.jsonl に追記
- 依存を最小化(PDF/Officeは最小構成では対象外)
- モデルのウォームアップは実施しない(ネット不可環境で失敗するため)
"""
ensure_dirs()
file_paths = file_paths or []
urls = urls or []
added = 0
rows: List[Dict[str, Any]] = []
# ファイル
for p in file_paths:
try:
path = Path(p)
txt = _load_text_from_file(path)
for ch in chunk_text(txt):
rows.append({"text": ch, "source": str(path), "meta": {"kind": "file"}})
except Exception:
continue
# URL
for u in urls:
txt = _load_text_from_url(u)
for ch in chunk_text(txt):
rows.append({"text": ch, "source": u, "meta": {"kind": "url"}})
if rows:
added = _write_chunks(rows)
# ここでの warmup を削除(または状態だけ返す)
warmed = False # UIに表示するためのダミー値
return f"indexed_chunks={added}, warmed_up={warmed}"