File size: 11,209 Bytes
09c17cd
 
95e6a87
09c17cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17b871a
09c17cd
17b871a
09c17cd
03fb695
 
 
 
 
 
 
 
09c17cd
adb0f98
09c17cd
adb0f98
 
95e6a87
 
adb0f98
 
 
 
95e6a87
 
adb0f98
 
95e6a87
17b871a
 
 
 
 
 
 
 
 
 
adb0f98
 
17b871a
09c17cd
adb0f98
 
 
 
17b871a
adb0f98
 
 
 
 
 
 
 
09c17cd
95e6a87
adb0f98
95e6a87
09c17cd
 
 
95e6a87
 
 
09c17cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adb0f98
 
 
 
09c17cd
adb0f98
 
 
 
09c17cd
adb0f98
 
 
 
09c17cd
adb0f98
 
 
 
 
 
 
 
5cc4cfa
 
 
 
 
 
 
 
 
 
 
04574d1
5cc4cfa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adb0f98
 
c7f69f6
 
5cc4cfa
 
 
 
adb0f98
 
09c17cd
 
 
 
 
 
 
 
 
 
 
 
 
adb0f98
09c17cd
 
 
 
adb0f98
 
09c17cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
from typing import List, Tuple, Any, Optional
import os
import subprocess
from config import Config


class AI:
    """AIクラス - モデルをロードして文章とkを引数にトークンと確率のリストを返す(常駐版)"""
    
    _instances = {}  # モデルパスごとのインスタンスをキャッシュ(常駐)
    
    def __new__(cls, model_path: str = None):
        """シングルトンパターンでモデルを常駐"""
        path = model_path or Config.get_default_model_path()
        
        if path not in cls._instances:
            cls._instances[path] = super().__new__(cls)
            cls._instances[path]._initialized = False
        
        return cls._instances[path]
    
    def __init__(self, model_path: str = None):
        """
        モデルをロードして初期化(一度だけ実行、常駐)
        
        Args:
            model_path: モデルファイルのパス(Noneの場合はデフォルトパスを使用)
        """
        if hasattr(self, '_initialized') and self._initialized:
            return
        
        self.model_path = model_path or Config.get_default_model_path()
        self.model = self._load_model(self.model_path)
        self._initialized = True
        
        if self.model is None:
            raise ValueError(f"モデルのロードに失敗しました: {self.model_path}")
    
    @classmethod
    def get_model(cls, model_path: str = None) -> 'AI':
        """モデルインスタンスを取得(常駐キャッシュから)"""
        return cls(model_path)
    
    @classmethod
    def clear_cache(cls):
        """キャッシュをクリア(開発・テスト用)"""
        cls._instances.clear()
    
    def _load_model(self, model_path: str) -> Optional[Any]:
        """モデルをロード(Transformers使用、Hubから直接読み込み)"""
        try:
            if not model_path:
                return None
            
            # モデルパスがリポジトリID("user/repo"形式)か、ローカルパスかを判定
            is_repo_id = "/" in model_path and not os.path.exists(model_path)
            
            # リポジトリIDの場合は os.path.exists() チェックをスキップ
            if not is_repo_id and not os.path.exists(model_path):
                print(f"[AI] モデルパスが存在しません: {model_path}")
                return None
                
            # transformersを使用してモデルをロード
            try:
                from transformers import AutoModelForCausalLM, AutoTokenizer
                import torch
                
                # GPUが利用可能かチェック
                device = "cuda" if torch.cuda.is_available() else "cpu"
                if device == "cuda":
                    print("[AI] GPU検出: CUDAを使用します")
                else:
                    print("[AI] GPU未検出: CPUモードで実行します")
                
                print(f"[AI] モデルをロード中: {model_path}")
                print(f"[AI] デバイス: {device}")
                
                # モデルパスがリポジトリID("user/repo"形式)か、ローカルパスかを判定
                hf_token = os.getenv("HF_TOKEN")
                is_repo_id = "/" in model_path and not os.path.exists(model_path)
                
                if is_repo_id:
                    print(f"[AI] Hugging Face Hub から直接読み込み: {model_path}")
                else:
                    print(f"[AI] ローカルパスから読み込み: {model_path}")
                
                # トークナイザーとモデルをロード(Hubから直接読み込む)
                tokenizer = AutoTokenizer.from_pretrained(
                    model_path,
                    token=hf_token,
                )
                model = AutoModelForCausalLM.from_pretrained(
                    model_path,
                    torch_dtype=torch.float16 if device == "cuda" else torch.float32,
                    device_map="auto" if device == "cuda" else None,
                    token=hf_token,
                )
                
                if device == "cpu":
                    model = model.to(device)
                
                # モデルとトークナイザーをタプルで返す
                print(f"[AI] モデルロード成功 ({device}モード)")
                return (model, tokenizer)
            except Exception as e:
                import traceback
                print(f"[AI] transformersでのロードに失敗: {e}")
                traceback.print_exc()
                return None
                
        except Exception as e:
            import traceback
            print(f"[AI] モデルロードエラー: {e}")
            traceback.print_exc()
            return None
    
    def get_token_probabilities(self, text: str, k: int = 5) -> List[Tuple[str, float]]:
        """
        文章とkを引数に、{token, 確率}のリストを返す
        
        Args:
            text: 入力文章
            k: 取得するトークン数
            
        Returns:
            List[Tuple[str, float]]: (トークン, 確率)のリスト
        """
        if self.model is None:
            return []
            
        try:
            # transformers モデルの場合
            if isinstance(self.model, tuple) and len(self.model) == 2:
                model, tokenizer = self.model
                import torch
                
                # テキストをトークン化
                inputs = tokenizer(text, return_tensors="pt")
                device = next(model.parameters()).device
                inputs = {k: v.to(device) for k, v in inputs.items()}
                
                # モデルで推論(勾配計算なし)
                with torch.no_grad():
                    outputs = model(**inputs)
                    logits = outputs.logits[0, -1, :]  # 最後のトークンのlogits
                
                # logitsを確率に変換(softmax)
                probs = torch.softmax(logits, dim=-1)
                
                # 上位k個のトークンを取得
                top_probs, top_indices = torch.topk(probs, k)
                
                # トークンIDを文字列に変換
                items: List[Tuple[str, float]] = []
                
                # Llama 3.2の特殊トークンを定義
                LLAMA_SPECIAL_TOKENS = [
                    "<|begin_of_text|>",
                    "<|end_of_text|>",
                    "<|eot_id|>",
                    "<|start_header_id|>",
                    "<|end_header_id|>",
                ]
                
                def _clean_text(text: str) -> str:
                    """制御文字・不可視文字・置換文字を厳密に取り除く(正規タグは保持)"""
                    if not text:
                        return ""
                    
                    # 制御文字(0x00-0x1F、0x7F-0x9F)を除去
                    # ただし、改行・タブ・復帰は許可
                    cleaned = []
                    for ch in text:
                        code = ord(ch)
                        # 許可する制御文字: 改行(0x0A), タブ(0x09), 復帰(0x0D)
                        if code in [0x09, 0x0A, 0x0D]:
                            cleaned.append(ch)
                        # 通常の印刷可能文字(0x20-0x7E、およびその他のUnicode印刷可能文字)
                        elif ch.isprintable():
                            # 置換文字(U+FFFD)を除去
                            if ch != "\uFFFD":
                                cleaned.append(ch)
                        # その他の制御文字や不可視文字は除去
                    
                    result = "".join(cleaned)
                    # ゼロ幅文字を除去
                    result = result.replace("\u200B", "")  # Zero-width space
                    result = result.replace("\u200C", "")  # Zero-width non-joiner
                    result = result.replace("\u200D", "")  # Zero-width joiner
                    result = result.replace("\uFEFF", "")  # Zero-width no-break space
                    # その他の不可視文字(結合文字など)を除去
                    result = result.replace("\u200E", "")  # Left-to-right mark
                    result = result.replace("\u200F", "")  # Right-to-left mark
                    result = result.replace("\u202A", "")  # Left-to-right embedding
                    result = result.replace("\u202B", "")  # Right-to-left embedding
                    result = result.replace("\u202C", "")  # Pop directional formatting
                    result = result.replace("\u202D", "")  # Left-to-right override
                    result = result.replace("\u202E", "")  # Right-to-left override
                    return result.strip()

                for idx, prob in zip(top_indices, top_probs):
                    token_id = idx.item()
                    # 正規タグ(<|eot_id|>など)を保持するため、skip_special_tokens=False
                    token = tokenizer.decode([token_id], skip_special_tokens=False, clean_up_tokenization_spaces=False)
                    token = _clean_text(token)
                    # 空文字列のトークンは除外
                    if not token:
                        continue
                    prob_value = prob.item()
                    items.append((token, float(prob_value)))
                
                # 確率を正規化
                if items:
                    total_prob = sum(prob for _, prob in items)
                    if total_prob > 0:
                        normalized_items: List[Tuple[str, float]] = []
                        for token, prob in items:
                            normalized_prob = prob / total_prob
                            normalized_items.append((token, normalized_prob))
                        return normalized_items
                
                return items
            else:
                print("モデルがサポートされていません")
                return []
                
        except Exception as e:
            print(f"トークン確率取得エラー: {e}")
            import traceback
            traceback.print_exc()
            return []
    
    def _softmax_from_logprobs(self, logprobs: List[float]) -> List[float]:
        """logprobsをsoftmaxで確率に変換"""
        if not logprobs:
            return []
            
        # 数値安定性のため最大値を引く
        max_logprob = max(logprobs)
        exp_logprobs = [exp(logprob - max_logprob) for logprob in logprobs]
        sum_exp = sum(exp_logprobs)
        
        if sum_exp == 0:
            return [0.0] * len(logprobs)
            
        return [exp_logprob / sum_exp for exp_logprob in exp_logprobs]


def exp(x: float) -> float:
    """指数関数の近似実装(math.expの代替)"""
    import math
    return math.exp(x)