File size: 5,156 Bytes
d18d4fc 6d7f180 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | import re
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Union, Tuple, Set
class BaseValidator:
"""すべての検証クラスの基底クラス"""
def __init__(self, client=None):
"""検証クラスの初期化"""
self.client = client
async def _make_async_api_call(self, model: str, messages: List[Dict[str, str]], **kwargs) -> Any:
"""非同期APIコールを行う共通メソッド"""
try:
response = self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
return response
except Exception as e:
print(f"[Error] API call failed: {str(e)}")
raise
def _extract_code_blocks(self, content: str) -> List[Dict[str, Any]]:
"""マークダウン形式のコードブロックを抽出する共通メソッド"""
code_blocks = []
# 通常のマークダウンコードブロックを検出
pattern = r'```(\w+)?\s*\n([\s\S]*?)\n```'
matches = list(re.finditer(pattern, content))
# マークダウンコードブロックが見つかった場合の処理
if matches:
for i, match in enumerate(matches):
language = match.group(1) or "python" # デフォルトはpython
language = language.strip().lower()
code = match.group(2)
if code.strip(): # 空のコードブロックを除外
code_blocks.append({
"language": language,
"code": code,
"block_index": i
})
# マークダウンコードブロックが見つからない場合は、インデントされたコードブロックやクラス定義を探す
else:
# Pythonのクラス定義やインポート文を探す
python_patterns = [
r'import\s+\w+', # import文
r'from\s+\w+\s+import', # from import文
r'class\s+\w+\s*\(?\w*\)?:', # クラス定義
r'def\s+\w+\s*\(.*\):', # 関数定義
]
python_code = False
for pattern in python_patterns:
if re.search(pattern, content):
python_code = True
break
if python_code:
# コンテンツ全体をPythonコードとして扱う
code_blocks.append({
"language": "python",
"code": content,
"block_index": 0
})
return code_blocks
def _update_code_in_content(self, content: str, block_index: int, new_code: str) -> str:
"""マークダウン形式のコンテンツ内のコードブロックを更新する"""
pattern = r'```(\w+)?\s*\n([\s\S]*?)\n```'
matches = list(re.finditer(pattern, content))
if 0 <= block_index < len(matches):
match = matches[block_index]
language = match.group(1) or "unknown"
start, end = match.span()
# 元のコードブロックを新しいものに置き換え
new_block = f"```{language}\n{new_code}\n```"
content = content[:start] + new_block + content[end:]
return content
def _extract_nested_code_blocks(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""辞書/JSONから入れ子になったコードブロックを抽出する"""
code_blocks = []
# 再帰的に辞書内のコードブロックを探索する関数
def extract_from_dict(data, current_path=[]):
if isinstance(data, dict):
# 'code', 'language' キーがあればコードブロックとして抽出
if "code" in data and isinstance(data["code"], str) and len(data["code"].strip()) > 0:
language = data.get("language", "unknown")
code_blocks.append({
"language": language,
"code": data["code"],
"location": current_path + ["code"]
})
# 再帰的に全てのキーを探索
for key, value in data.items():
extract_from_dict(value, current_path + [key])
elif isinstance(data, list):
for i, item in enumerate(data):
extract_from_dict(item, current_path + [i])
# 抽出を開始
extract_from_dict(data)
return code_blocks
class Validator(ABC):
"""検証クラスのインターフェース"""
@abstractmethod
async def validate(self, content, context=None):
"""検証を実行する抽象メソッド"""
pass
@abstractmethod
def get_result_summary(self):
"""検証結果の要約を返す抽象メソッド"""
pass |