| from typing import List, Tuple, Any, Optional |
| import os |
| import subprocess |
| from config import Config |
|
|
|
|
| class AI: |
| """AIクラス - モデルをロードして文章とkを引数にトークンと確率のリストを返す(常駐版)""" |
| |
| _instances = {} |
| |
| def __new__(cls, model_path: str = None): |
| """シングルトンパターンでモデルを常駐""" |
| path = model_path or Config.get_default_model_path() |
| |
| if path not in cls._instances: |
| cls._instances[path] = super().__new__(cls) |
| cls._instances[path]._initialized = False |
| |
| return cls._instances[path] |
| |
| def __init__(self, model_path: str = None): |
| """ |
| モデルをロードして初期化(一度だけ実行、常駐) |
| |
| Args: |
| model_path: モデルファイルのパス(Noneの場合はデフォルトパスを使用) |
| """ |
| if hasattr(self, '_initialized') and self._initialized: |
| return |
| |
| self.model_path = model_path or Config.get_default_model_path() |
| self.model = self._load_model(self.model_path) |
| self._initialized = True |
| |
| if self.model is None: |
| raise ValueError(f"モデルのロードに失敗しました: {self.model_path}") |
| |
| @classmethod |
| def get_model(cls, model_path: str = None) -> 'AI': |
| """モデルインスタンスを取得(常駐キャッシュから)""" |
| return cls(model_path) |
| |
| @classmethod |
| def clear_cache(cls): |
| """キャッシュをクリア(開発・テスト用)""" |
| cls._instances.clear() |
| |
| def _load_model(self, model_path: str) -> Optional[Any]: |
| """モデルをロード(Transformers使用、Hubから直接読み込み)""" |
| try: |
| if not model_path: |
| return None |
| |
| |
| is_repo_id = "/" in model_path and not os.path.exists(model_path) |
| |
| |
| if not is_repo_id and not os.path.exists(model_path): |
| print(f"[AI] モデルパスが存在しません: {model_path}") |
| return None |
| |
| |
| try: |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| import torch |
| |
| |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| if device == "cuda": |
| print("[AI] GPU検出: CUDAを使用します") |
| else: |
| print("[AI] GPU未検出: CPUモードで実行します") |
| |
| print(f"[AI] モデルをロード中: {model_path}") |
| print(f"[AI] デバイス: {device}") |
| |
| |
| hf_token = os.getenv("HF_TOKEN") |
| is_repo_id = "/" in model_path and not os.path.exists(model_path) |
| |
| if is_repo_id: |
| print(f"[AI] Hugging Face Hub から直接読み込み: {model_path}") |
| else: |
| print(f"[AI] ローカルパスから読み込み: {model_path}") |
| |
| |
| tokenizer = AutoTokenizer.from_pretrained( |
| model_path, |
| token=hf_token, |
| ) |
| model = AutoModelForCausalLM.from_pretrained( |
| model_path, |
| torch_dtype=torch.float16 if device == "cuda" else torch.float32, |
| device_map="auto" if device == "cuda" else None, |
| token=hf_token, |
| ) |
| |
| if device == "cpu": |
| model = model.to(device) |
| |
| |
| print(f"[AI] モデルロード成功 ({device}モード)") |
| return (model, tokenizer) |
| except Exception as e: |
| import traceback |
| print(f"[AI] transformersでのロードに失敗: {e}") |
| traceback.print_exc() |
| return None |
| |
| except Exception as e: |
| import traceback |
| print(f"[AI] モデルロードエラー: {e}") |
| traceback.print_exc() |
| return None |
| |
| def get_token_probabilities(self, text: str, k: int = 5) -> List[Tuple[str, float]]: |
| """ |
| 文章とkを引数に、{token, 確率}のリストを返す |
| |
| Args: |
| text: 入力文章 |
| k: 取得するトークン数 |
| |
| Returns: |
| List[Tuple[str, float]]: (トークン, 確率)のリスト |
| """ |
| if self.model is None: |
| return [] |
| |
| try: |
| |
| if isinstance(self.model, tuple) and len(self.model) == 2: |
| model, tokenizer = self.model |
| import torch |
| |
| |
| inputs = tokenizer(text, return_tensors="pt") |
| device = next(model.parameters()).device |
| inputs = {k: v.to(device) for k, v in inputs.items()} |
| |
| |
| with torch.no_grad(): |
| outputs = model(**inputs) |
| logits = outputs.logits[0, -1, :] |
| |
| |
| probs = torch.softmax(logits, dim=-1) |
| |
| |
| top_probs, top_indices = torch.topk(probs, k) |
| |
| |
| items: List[Tuple[str, float]] = [] |
| |
| |
| LLAMA_SPECIAL_TOKENS = [ |
| "<|begin_of_text|>", |
| "<|end_of_text|>", |
| "<|eot_id|>", |
| "<|start_header_id|>", |
| "<|end_header_id|>", |
| ] |
| |
| def _clean_text(text: str) -> str: |
| """制御文字・不可視文字・置換文字を厳密に取り除く(正規タグは保持)""" |
| if not text: |
| return "" |
| |
| |
| |
| cleaned = [] |
| for ch in text: |
| code = ord(ch) |
| |
| if code in [0x09, 0x0A, 0x0D]: |
| cleaned.append(ch) |
| |
| elif ch.isprintable(): |
| |
| if ch != "\uFFFD": |
| cleaned.append(ch) |
| |
| |
| result = "".join(cleaned) |
| |
| result = result.replace("\u200B", "") |
| result = result.replace("\u200C", "") |
| result = result.replace("\u200D", "") |
| result = result.replace("\uFEFF", "") |
| |
| result = result.replace("\u200E", "") |
| result = result.replace("\u200F", "") |
| result = result.replace("\u202A", "") |
| result = result.replace("\u202B", "") |
| result = result.replace("\u202C", "") |
| result = result.replace("\u202D", "") |
| result = result.replace("\u202E", "") |
| return result.strip() |
|
|
| for idx, prob in zip(top_indices, top_probs): |
| token_id = idx.item() |
| |
| token = tokenizer.decode([token_id], skip_special_tokens=False, clean_up_tokenization_spaces=False) |
| token = _clean_text(token) |
| |
| if not token: |
| continue |
| prob_value = prob.item() |
| items.append((token, float(prob_value))) |
| |
| |
| if items: |
| total_prob = sum(prob for _, prob in items) |
| if total_prob > 0: |
| normalized_items: List[Tuple[str, float]] = [] |
| for token, prob in items: |
| normalized_prob = prob / total_prob |
| normalized_items.append((token, normalized_prob)) |
| return normalized_items |
| |
| return items |
| else: |
| print("モデルがサポートされていません") |
| return [] |
| |
| except Exception as e: |
| print(f"トークン確率取得エラー: {e}") |
| import traceback |
| traceback.print_exc() |
| return [] |
| |
| def _softmax_from_logprobs(self, logprobs: List[float]) -> List[float]: |
| """logprobsをsoftmaxで確率に変換""" |
| if not logprobs: |
| return [] |
| |
| |
| max_logprob = max(logprobs) |
| exp_logprobs = [exp(logprob - max_logprob) for logprob in logprobs] |
| sum_exp = sum(exp_logprobs) |
| |
| if sum_exp == 0: |
| return [0.0] * len(logprobs) |
| |
| return [exp_logprob / sum_exp for exp_logprob in exp_logprobs] |
|
|
|
|
| def exp(x: float) -> float: |
| """指数関数の近似実装(math.expの代替)""" |
| import math |
| return math.exp(x) |
|
|