File size: 11,209 Bytes
09c17cd 95e6a87 09c17cd 17b871a 09c17cd 17b871a 09c17cd 03fb695 09c17cd adb0f98 09c17cd adb0f98 95e6a87 adb0f98 95e6a87 adb0f98 95e6a87 17b871a adb0f98 17b871a 09c17cd adb0f98 17b871a adb0f98 09c17cd 95e6a87 adb0f98 95e6a87 09c17cd 95e6a87 09c17cd adb0f98 09c17cd adb0f98 09c17cd adb0f98 09c17cd adb0f98 5cc4cfa 04574d1 5cc4cfa adb0f98 c7f69f6 5cc4cfa adb0f98 09c17cd adb0f98 09c17cd adb0f98 09c17cd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | from typing import List, Tuple, Any, Optional
import os
import subprocess
from config import Config
class AI:
"""AIクラス - モデルをロードして文章とkを引数にトークンと確率のリストを返す(常駐版)"""
_instances = {} # モデルパスごとのインスタンスをキャッシュ(常駐)
def __new__(cls, model_path: str = None):
"""シングルトンパターンでモデルを常駐"""
path = model_path or Config.get_default_model_path()
if path not in cls._instances:
cls._instances[path] = super().__new__(cls)
cls._instances[path]._initialized = False
return cls._instances[path]
def __init__(self, model_path: str = None):
"""
モデルをロードして初期化(一度だけ実行、常駐)
Args:
model_path: モデルファイルのパス(Noneの場合はデフォルトパスを使用)
"""
if hasattr(self, '_initialized') and self._initialized:
return
self.model_path = model_path or Config.get_default_model_path()
self.model = self._load_model(self.model_path)
self._initialized = True
if self.model is None:
raise ValueError(f"モデルのロードに失敗しました: {self.model_path}")
@classmethod
def get_model(cls, model_path: str = None) -> 'AI':
"""モデルインスタンスを取得(常駐キャッシュから)"""
return cls(model_path)
@classmethod
def clear_cache(cls):
"""キャッシュをクリア(開発・テスト用)"""
cls._instances.clear()
def _load_model(self, model_path: str) -> Optional[Any]:
"""モデルをロード(Transformers使用、Hubから直接読み込み)"""
try:
if not model_path:
return None
# モデルパスがリポジトリID("user/repo"形式)か、ローカルパスかを判定
is_repo_id = "/" in model_path and not os.path.exists(model_path)
# リポジトリIDの場合は os.path.exists() チェックをスキップ
if not is_repo_id and not os.path.exists(model_path):
print(f"[AI] モデルパスが存在しません: {model_path}")
return None
# transformersを使用してモデルをロード
try:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
# GPUが利用可能かチェック
device = "cuda" if torch.cuda.is_available() else "cpu"
if device == "cuda":
print("[AI] GPU検出: CUDAを使用します")
else:
print("[AI] GPU未検出: CPUモードで実行します")
print(f"[AI] モデルをロード中: {model_path}")
print(f"[AI] デバイス: {device}")
# モデルパスがリポジトリID("user/repo"形式)か、ローカルパスかを判定
hf_token = os.getenv("HF_TOKEN")
is_repo_id = "/" in model_path and not os.path.exists(model_path)
if is_repo_id:
print(f"[AI] Hugging Face Hub から直接読み込み: {model_path}")
else:
print(f"[AI] ローカルパスから読み込み: {model_path}")
# トークナイザーとモデルをロード(Hubから直接読み込む)
tokenizer = AutoTokenizer.from_pretrained(
model_path,
token=hf_token,
)
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
device_map="auto" if device == "cuda" else None,
token=hf_token,
)
if device == "cpu":
model = model.to(device)
# モデルとトークナイザーをタプルで返す
print(f"[AI] モデルロード成功 ({device}モード)")
return (model, tokenizer)
except Exception as e:
import traceback
print(f"[AI] transformersでのロードに失敗: {e}")
traceback.print_exc()
return None
except Exception as e:
import traceback
print(f"[AI] モデルロードエラー: {e}")
traceback.print_exc()
return None
def get_token_probabilities(self, text: str, k: int = 5) -> List[Tuple[str, float]]:
"""
文章とkを引数に、{token, 確率}のリストを返す
Args:
text: 入力文章
k: 取得するトークン数
Returns:
List[Tuple[str, float]]: (トークン, 確率)のリスト
"""
if self.model is None:
return []
try:
# transformers モデルの場合
if isinstance(self.model, tuple) and len(self.model) == 2:
model, tokenizer = self.model
import torch
# テキストをトークン化
inputs = tokenizer(text, return_tensors="pt")
device = next(model.parameters()).device
inputs = {k: v.to(device) for k, v in inputs.items()}
# モデルで推論(勾配計算なし)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits[0, -1, :] # 最後のトークンのlogits
# logitsを確率に変換(softmax)
probs = torch.softmax(logits, dim=-1)
# 上位k個のトークンを取得
top_probs, top_indices = torch.topk(probs, k)
# トークンIDを文字列に変換
items: List[Tuple[str, float]] = []
# Llama 3.2の特殊トークンを定義
LLAMA_SPECIAL_TOKENS = [
"<|begin_of_text|>",
"<|end_of_text|>",
"<|eot_id|>",
"<|start_header_id|>",
"<|end_header_id|>",
]
def _clean_text(text: str) -> str:
"""制御文字・不可視文字・置換文字を厳密に取り除く(正規タグは保持)"""
if not text:
return ""
# 制御文字(0x00-0x1F、0x7F-0x9F)を除去
# ただし、改行・タブ・復帰は許可
cleaned = []
for ch in text:
code = ord(ch)
# 許可する制御文字: 改行(0x0A), タブ(0x09), 復帰(0x0D)
if code in [0x09, 0x0A, 0x0D]:
cleaned.append(ch)
# 通常の印刷可能文字(0x20-0x7E、およびその他のUnicode印刷可能文字)
elif ch.isprintable():
# 置換文字(U+FFFD)を除去
if ch != "\uFFFD":
cleaned.append(ch)
# その他の制御文字や不可視文字は除去
result = "".join(cleaned)
# ゼロ幅文字を除去
result = result.replace("\u200B", "") # Zero-width space
result = result.replace("\u200C", "") # Zero-width non-joiner
result = result.replace("\u200D", "") # Zero-width joiner
result = result.replace("\uFEFF", "") # Zero-width no-break space
# その他の不可視文字(結合文字など)を除去
result = result.replace("\u200E", "") # Left-to-right mark
result = result.replace("\u200F", "") # Right-to-left mark
result = result.replace("\u202A", "") # Left-to-right embedding
result = result.replace("\u202B", "") # Right-to-left embedding
result = result.replace("\u202C", "") # Pop directional formatting
result = result.replace("\u202D", "") # Left-to-right override
result = result.replace("\u202E", "") # Right-to-left override
return result.strip()
for idx, prob in zip(top_indices, top_probs):
token_id = idx.item()
# 正規タグ(<|eot_id|>など)を保持するため、skip_special_tokens=False
token = tokenizer.decode([token_id], skip_special_tokens=False, clean_up_tokenization_spaces=False)
token = _clean_text(token)
# 空文字列のトークンは除外
if not token:
continue
prob_value = prob.item()
items.append((token, float(prob_value)))
# 確率を正規化
if items:
total_prob = sum(prob for _, prob in items)
if total_prob > 0:
normalized_items: List[Tuple[str, float]] = []
for token, prob in items:
normalized_prob = prob / total_prob
normalized_items.append((token, normalized_prob))
return normalized_items
return items
else:
print("モデルがサポートされていません")
return []
except Exception as e:
print(f"トークン確率取得エラー: {e}")
import traceback
traceback.print_exc()
return []
def _softmax_from_logprobs(self, logprobs: List[float]) -> List[float]:
"""logprobsをsoftmaxで確率に変換"""
if not logprobs:
return []
# 数値安定性のため最大値を引く
max_logprob = max(logprobs)
exp_logprobs = [exp(logprob - max_logprob) for logprob in logprobs]
sum_exp = sum(exp_logprobs)
if sum_exp == 0:
return [0.0] * len(logprobs)
return [exp_logprob / sum_exp for exp_logprob in exp_logprobs]
def exp(x: float) -> float:
"""指数関数の近似実装(math.expの代替)"""
import math
return math.exp(x)
|