groxy / validators /code_verifier.py
Yasu777's picture
Create code_verifier.py
2709f1c verified
import json
import asyncio
import copy
import re
import ast
import tempfile
import subprocess
import os
from typing import Dict, Any, List, Optional, Union, Tuple, Set
from validators.base import BaseValidator, Validator
class CodeVerifier(BaseValidator, Validator):
"""コード検証を行うクラス"""
def __init__(self, client):
"""検証クライアントを初期化"""
super().__init__(client)
# 検証用のモデル
self.verify_model = "qwen-2.5-coder-32b" # コード検証に特化したモデル
self.verification_results = {}
async def validate(self, content, context=None):
"""コード検証を実行する(インターフェース実装)"""
result = await self.verify_code_blocks(content, context)
self.verification_results = result["verification_result"]
return result["verified_content"]
def get_result_summary(self):
"""検証結果の要約を返す(インターフェース実装)"""
if not self.verification_results:
return "検証はまだ実行されていません。"
return self.verification_results.get("verification_summary", "要約は利用できません。")
def _extract_problem_description(self, user_input: str) -> Optional[str]:
"""ユーザー入力から問題記述を抽出する"""
# 問題文を示す一般的なパターン
problem_indicators = [
"問題:", "問題文:", "**問題:**", "# 問題", "課題:", "要件:",
"Problem:", "Task:", "Question:", "Exercise:",
"以下の問題", "次の問題", "この問題"
]
# 制約条件を示す一般的なパターン
constraint_indicators = [
"制約:", "条件:", "ただし", "ここで", "なお",
"Constraints:", "Note:", "Where:", "Given:",
"評価ポイント", "注意点", "考慮点", "エッジケース"
]
# 例を示す一般的なパターン
example_indicators = [
"例:", "入力例:", "出力例:", "サンプル:", "テストケース:",
"Example:", "Input:", "Output:", "Sample:", "Test case:"
]
problem_desc = None
# 問題文のパターンマッチング
for indicator in problem_indicators:
if indicator in user_input:
# 問題文の開始位置を見つける
start_pos = user_input.find(indicator) + len(indicator)
# 次のセクション(例や制約)の開始位置を見つける
end_pos = len(user_input)
for end_indicator in example_indicators + constraint_indicators:
pos = user_input.find(end_indicator, start_pos)
if pos != -1 and pos < end_pos:
end_pos = pos
# 問題文を抽出
problem_desc = user_input[start_pos:end_pos].strip()
break
# 問題文が見つからない場合、ユーザー入力全体を使用
if not problem_desc:
return user_input
# 制約条件の抽出と追加
constraints = []
for indicator in constraint_indicators:
if indicator in user_input:
# 制約の開始位置を見つける
start_pos = user_input.find(indicator) + len(indicator)
# 次のセクションの開始位置を見つける
end_pos = len(user_input)
for end_indicator in problem_indicators + example_indicators:
pos = user_input.find(end_indicator, start_pos)
if pos != -1 and pos < end_pos:
end_pos = pos
# 制約を抽出
constraint = user_input[start_pos:end_pos].strip()
if constraint:
constraints.append(constraint)
# 例の抽出と追加
examples = []
for indicator in example_indicators:
if indicator in user_input:
# 例の開始位置を見つける
start_pos = user_input.find(indicator) + len(indicator)
# 次のセクションの開始位置を見つける
end_pos = len(user_input)
for end_indicator in problem_indicators + constraint_indicators:
pos = user_input.find(end_indicator, start_pos)
if pos != -1 and pos < end_pos:
end_pos = pos
# 例を抽出
example = user_input[start_pos:end_pos].strip()
if example:
examples.append(example)
# 問題記述を構築
full_description = problem_desc
if constraints:
full_description += "\n\n制約条件:\n" + "\n".join(constraints)
if examples:
full_description += "\n\n例:\n" + "\n".join(examples)
return full_description
def _extract_problem_constraints(self, problem_description: str) -> List[str]:
"""問題記述から特定の制約を抽出する"""
constraints = []
# 一般的な制約パターン
constraint_patterns = [
(r'i\s*!=\s*j', "インデックスが異なる必要がある"),
(r'i\s*≠\s*j', "インデックスが異なる必要がある"),
(r'i\s*<>\s*j', "インデックスが異なる必要がある"),
(r'0\s*<=\s*\w+\s*<\s*n', "インデックスは配列の範囲内"),
(r'O\(n\)', "線形時間複雑度が要求される"),
(r'O\(n\s*log\s*n\)', "n log n の時間複雑度が要求される"),
(r'O\(1\)', "定数時間/空間が要求される"),
(r'空間複雑度.*O\(\w+\)', "特定の空間複雑度制約がある"),
(r'時間複雑度.*O\(\w+\)', "特定の時間複雑度制約がある"),
(r'メモリ使用量.*\d+[MG]B', "メモリ使用量制約がある"),
(r'実行時間.*\d+m?s', "実行時間制約がある"),
(r'入力.*\d+\s*<=\s*\w+\s*<=\s*\d+', "入力値の範囲制約がある")
]
# 特定のアルゴリズム/データ構造に関する制約
algorithm_patterns = [
(r'(?:使用|利用)(?:して|する)?.*(?:トライ|Trie)', "トライ木の使用が推奨/要求される"),
(r'(?:使用|利用)(?:して|する)?.*(?:動的計画法|DP)', "動的計画法の使用が推奨/要求される"),
(r'(?:使用|利用)(?:して|する)?.*(?:二分探索|バイナリサーチ)', "二分探索の使用が推奨/要求される"),
(r'(?:使用|利用)(?:して|する)?.*(?:BFS|幅優先探索)', "幅優先探索の使用が推奨/要求される"),
(r'(?:使用|利用)(?:して|する)?.*(?:DFS|深さ優先探索)', "深さ優先探索の使用が推奨/要求される"),
(r'(?:使用|利用)(?:して|する)?.*(?:ヒープ|優先度付きキュー)', "ヒープ/優先度付きキューの使用が推奨/要求される")
]
# 特定のトピックに関する制約(ビット操作、数学など)
topic_patterns = [
(r'ビット.*操作', "ビット操作の知識が必要"),
(r'XOR|ビット排他的論理和', "XOR操作の理解が必要"),
(r'mod(?:ulo)?.*10\^9\s*\+\s*7', "大きな数値に対するモジュロ演算が必要")
]
# 物理シミュレーション関連の制約パターン
physics_patterns = [
(r'(?:物理|力学).*シミュレーション', "物理シミュレーションの実装が必要"),
(r'(?:流体|流れ).*力学', "流体力学の知識と実装が必要"),
(r'(?:衝突|干渉).*検出', "衝突検出アルゴリズムが必要"),
(r'(?:剛体|弾性体).*運動', "剛体/弾性体運動の実装が必要"),
(r'(?:ニュートン|運動).*法則', "運動法則の正確な実装が必要"),
(r'(?:保存|エネルギー).*法則', "エネルギー保存法則の考慮が必要"),
(r'数値安定性', "数値計算の安定性への考慮が必要")
]
# 全てのパターンをチェック
all_patterns = constraint_patterns + algorithm_patterns + topic_patterns + physics_patterns
for pattern, description in all_patterns:
if re.search(pattern, problem_description, re.IGNORECASE):
constraints.append(description)
return constraints
def _build_verification_prompt(self, language: str, code: str, user_input: str = None, design_doc: str = None) -> str:
"""コード検証用のプロンプトを構築する(問題文も含める、OOP原則、ベクトル効率、無限ループ検出を強化)"""
# 問題記述の抽出
problem_description = None
problem_constraints = []
if user_input:
problem_description = self._extract_problem_description(user_input)
if problem_description:
problem_constraints = self._extract_problem_constraints(problem_description)
# 制約条件の文字列化
constraints_str = ""
if problem_constraints:
constraints_str = "特に注意すべき制約条件:\n" + "\n".join([f"- {c}" for c in problem_constraints])
# OOP設計原則項目の追加
oop_principles = """
オブジェクト指向設計原則:
- 単一責任の原則 (SRP): 各クラスは明確で単一の責任を持つべき
- 開放/閉鎖原則 (OCP): クラスは拡張に対しては開かれ、修正に対しては閉じられるべき
- リスコフの置換原則 (LSP): サブクラスはスーパークラスの代わりに使えるべき
- インターフェース分離の原則 (ISP): 具体的なクライアント向けに最小限のインターフェースを設計
- 依存関係逆転の原則 (DIP): 高レベルモジュールは低レベルモジュールに依存すべきでない
- 継承よりコンポジションを優先する
- 過剰な継承階層を避ける(3-4階層以内に抑える)
- 適切なカプセル化とアクセス修飾子の使用
"""
# ベクトル計算効率項目の追加
vector_efficiency = """
ベクトル計算の効率:
- 大規模ベクトル操作はループの代わりにベクトル化された操作を使用
- 不必要なコピーを最小限に抑え、参照渡しや変更可能なベクトル操作を適切に使用
- キャッシュ効率を考慮した連続したメモリアクセスパターン
- 適切なBLAS/LAPACKなどの最適化ライブラリの使用(該当する場合)
- ゼロコピー操作とスライス操作の適切な使用
- 次元削減とブロードキャスト操作の効率的な利用
- 並列化可能なベクトル操作の特定と実装
"""
# 無限ループ検出の強化
infinite_loop_detection = """
無限ループの詳細な検出項目:
1. 終了条件の欠如または到達不能な終了条件
2. ループカウンタの更新漏れまたは不適切な更新
3. 無効または変更されないループ条件変数
4. 再帰呼び出しの無限連鎖の可能性
5. イテレータ/インデックスの増減が条件に影響しない場合
6. 浮動小数点比較による終了条件の見落とし
7. コレクションを反復処理中にその構造を変更するケース
8. 複数の終了条件の論理的矛盾
9. 外部依存による終了条件(ファイル終端、ユーザー入力など)の信頼性
10. 入力値に依存した無限ループの可能性(入力範囲の検証)
"""
# 物理シミュレーション固有の検証項目
physics_simulation_validation = """
物理シミュレーションの検証項目:
1. 物理法則の正確な適用(エネルギー保存、運動量保存など)
2. 単位の一貫性(SI単位系など)
3. 数値積分法の安定性(オイラー法、ルンゲクッタ法など)
4. 境界条件の適切な処理
5. 外力と拘束条件の正確な実装
6. エッジケースでの安定性(極端な初期条件、衝突など)
7. 計算効率と精度のバランス
8. 確率的要素の適切な実装(乱数生成など)
"""
# エラーハンドリングの検証項目
error_handling_validation = """
エラーハンドリングの検証項目:
1. 例外の種類ごとの適切な捕捉と処理
2. エラーメッセージの具体性と有用性
3. リソースの適切な解放(ファイルハンドル、ネットワーク接続など)
4. エラー発生時のシステム状態の一貫性維持
5. 致命的でないエラーからの回復メカニズム
6. エラーログの詳細度と使いやすさ
7. ユーザーに表示するエラーメッセージの適切性
8. セキュリティに関連するエラー情報の適切な隠蔽
"""
# コードの完全性検証項目
code_completeness_validation = """
コードの完全性検証項目:
1. 未実装メソッド(pass文や空の実装)の検出
2. インポートの漏れや未使用インポートの検出
3. 宣言されているが未使用の変数やメソッドの識別
4. 不完全なエラーハンドリング(try-exceptブロックなど)
5. 未完成のコメント(TODO、FIXME、XXXなど)の検出
6. テストコードの網羅性(全機能をカバーしているか)
7. ドキュメンテーションの完全性(すべての公開APIにドキュメントがあるか)
8. 設定ファイルや環境変数の完全性
"""
# 設計書の情報を追加
design_spec_str = ""
if design_doc:
design_spec_str = f"""
このコードは以下の設計書に基づいています。コードが設計書の要件と一致しているか確認してください:
{design_doc}
"""
prompt = {
"role": "Code_Verifier_And_Algorithm_Expert",
"instruction": f"""あなたは{language}の専門家で、アルゴリズムとデータ構造に精通しています。
提供されたコードを厳密に検証し、問題固有の制約条件を考慮して評価してください。
以下の観点でコードを検証し、JSON形式で結果を返してください:
1. 構文エラーやコンパイルエラー
2. 論理的な誤りやアルゴリズムの欠陥
3. 問題の制約条件との整合性
4. エッジケース(空の入力、最小/最大値、重複など)の処理
5. 時間/空間複雑度の要件への適合
6. データ構造の実装不備(例: トライ木の不完全な接続、グラフの誤った表現)
7. 指定されたアルゴリズムの正しい実装(該当する場合)
8. メモリリークやリソース管理の問題
特に重点的に検証すべき項目:
1. 無限ループの可能性
{infinite_loop_detection}
2. オブジェクト指向設計の原則(OOPコードの場合)
{oop_principles}
3. ベクトル計算の効率性(ベクトル/行列操作を含む場合)
{vector_efficiency}
4. 物理シミュレーションの正確性(物理シミュレーションの場合)
{physics_simulation_validation}
5. エラーハンドリングの完全性
{error_handling_validation}
6. コードの完全性と実行可能性
{code_completeness_validation}
{design_spec_str}
問題の説明:
{problem_description or "具体的な問題説明は提供されていません。一般的なコード品質を検証します。"}
{constraints_str}
応答は以下のJSON形式で返してください:
{{
"is_valid": boolean, // コードが有効かどうか
"issues": [string], // 検出された問題点のリスト(重要度順)
"design_compliance": boolean, // 設計書の要件に準拠しているか(設計書がある場合)
"missing_requirements": [string], // 設計書で定義されているが実装されていない要件(設計書がある場合)
"algorithm_analysis": string, // アルゴリズム分析(時間/空間複雑度など)
"edge_cases": [string], // テストすべきエッジケースのリスト
"oop_analysis": string, // OOP原則の遵守状況分析(該当する場合のみ)
"vector_efficiency_analysis": string, // ベクトル計算効率の分析(該当する場合のみ)
"physics_simulation_analysis": string, // 物理シミュレーションの分析(該当する場合のみ)
"error_handling_analysis": string, // エラーハンドリングの分析
"code_completeness_analysis": string, // コードの完全性分析
"infinite_loop_risk": {{
"risk_level": string, // "none", "low", "medium", "high"のいずれか
"potential_causes": [string], // 潜在的な無限ループの原因(リスクがある場合)
"recommended_safeguards": [string] // 推奨される安全対策(リスクがある場合)
}},
"fixed_code": string, // 修正されたコード(問題がある場合のみ)
"explanation": string // 修正の詳細な説明(問題がある場合のみ)
}}
""",
"code_to_verify": code,
"language": language
}
return json.dumps(prompt, ensure_ascii=False)
def _extract_design_doc(self, content: str) -> Optional[str]:
"""コンテンツから設計書部分を抽出する"""
# 設計書の開始と終了を示す一般的なパターン
design_doc_indicators = [
"設計書", "要件の整理と分析", "アーキテクチャ設計", "詳細設計",
"クラス/モジュール構造", "コンポーネント構成", "データモデル"
]
# コンテンツに設計書のパターンがあるか確認
for indicator in design_doc_indicators:
if indicator in content:
# 設計書全体を抽出
start_index = content.find(indicator)
design_doc = content[start_index:]
return design_doc
return None
def _extract_class_definitions_from_design(self, design_doc: str) -> Dict[str, Dict[str, Any]]:
"""設計書からクラス定義と要件を抽出する"""
class_definitions = {}
# クラス定義の抽出
class_pattern = r'class\s+(\w+):'
class_matches = re.finditer(class_pattern, design_doc)
for match in class_matches:
class_name = match.group(1)
# クラスの説明を抽出
start_pos = match.end()
next_class_match = re.search(r'class\s+\w+:', design_doc[start_pos:])
end_pos = start_pos + next_class_match.start() if next_class_match else len(design_doc)
class_body = design_doc[start_pos:end_pos].strip()
# メソッド定義の抽出
method_pattern = r'def\s+(\w+)\s*\((.*?)\)'
method_matches = re.finditer(method_pattern, class_body)
methods = {}
for method_match in method_matches:
method_name = method_match.group(1)
parameters = [p.strip() for p in method_match.group(2).split(',') if p.strip()]
methods[method_name] = parameters
class_definitions[class_name] = {
"methods": methods,
"description": class_body
}
return class_definitions
def _extract_requirements_from_design(self, design_doc: str) -> List[str]:
"""設計書から機能要件を抽出する"""
requirements = []
# 機能要件セクションの抽出
if "機能要件" in design_doc:
func_req_section = re.search(r'機能要件([\s\S]*?)(?=1\.2|非機能要件|$)', design_doc)
if func_req_section:
section_text = func_req_section.group(1)
# 箇条書きの抽出
bullet_pattern = r'[•*-]\s*(.*?)(?=$|\n)'
bullet_points = re.findall(bullet_pattern, section_text)
if bullet_points:
requirements.extend(bullet_points)
else:
# 番号付きリストの抽出
numbered_pattern = r'\d+\.\s*(.*?)(?=$|\n)'
numbered_points = re.findall(numbered_pattern, section_text)
if numbered_points:
requirements.extend(numbered_points)
return requirements
def _check_implementation_against_design(self, code: str, design_doc: str) -> Dict[str, Any]:
"""実装コードが設計書の要件に合致しているか確認する"""
result = {
"design_compliance": True,
"missing_requirements": [],
"missing_classes": [],
"missing_methods": []
}
# クラス定義の抽出
class_definitions = self._extract_class_definitions_from_design(design_doc)
# 要件の抽出
requirements = self._extract_requirements_from_design(design_doc)
# 実装されたクラスの抽出
implemented_classes = re.findall(r'class\s+(\w+)', code)
# クラスの確認
for class_name, class_info in class_definitions.items():
if class_name not in implemented_classes:
result["design_compliance"] = False
result["missing_classes"].append(class_name)
continue
# メソッドの確認
class_code_match = re.search(fr'class\s+{class_name}[\s\S]*?(?:class|\Z)', code)
if class_code_match:
class_code = class_code_match.group(0)
for method_name in class_info["methods"]:
method_pattern = fr'def\s+{method_name}\s*\('
if not re.search(method_pattern, class_code):
result["design_compliance"] = False
result["missing_methods"].append(f"{class_name}.{method_name}")
# 要件の確認
for req in requirements:
# キーワードベースの簡易チェック
keywords = self._extract_keywords(req)
if keywords and not all(re.search(fr'\b{re.escape(kw.lower())}\b', code.lower()) for kw in keywords if len(kw) > 3):
result["design_compliance"] = False
result["missing_requirements"].append(req)
return result
def _extract_keywords(self, text: str) -> List[str]:
"""テキストから重要なキーワードを抽出する"""
# ストップワードを定義
stop_words = {
"a", "an", "the", "this", "that", "these", "those", "and", "or", "but", "for", "nor", "so", "yet",
"を", "に", "は", "が", "で", "と", "から", "まで", "より", "して", "する", "ます", "です"
}
# 単語に分割
words = re.findall(r'\b\w+\b', text.lower())
# ストップワードと短すぎる単語を除外
keywords = [w for w in words if w not in stop_words and len(w) > 1]
return keywords
async def _verify_single_code_block(self, index: int, code_block: Dict[str, Any], user_input: str = None, design_doc: str = None) -> Dict[str, Any]:
"""単一のコードブロックを検証する(ユーザー入力コンテキスト付き)"""
print(f"[Code Verification] Started verification of code block {index+1}")
language = code_block.get("language", "unknown")
code = code_block.get("code", "")
# 空のコードブロックや意味のない内容を検出
if not code.strip() or code.strip() in ["pass", "#", "# コード", "# Code"]:
return {
"block_info": {
"block_index": index,
"language": language,
"is_valid": False,
"issues": ["コードが提供されていないか、空です"],
"algorithm_analysis": "コードが不足しています",
"edge_cases": [],
"was_fixed": False
},
"needs_update": False,
"fixed_code": code,
"explanation": "コードが提供されていません。適切なコードを実装してください。"
}
# 検証プロンプトの構築(ユーザー入力と設計書を含める)
verification_prompt = self._build_verification_prompt(language, code, user_input, design_doc)
messages = [
{
"role": "system",
"content": verification_prompt
}
]
response = await self._make_async_api_call(
self.verify_model,
messages,
temperature=0.1,
response_format={"type": "json_object"}
)
# 検証結果の解析
verification_result = json.loads(response.choices[0].message.content)
# 結果をまとめる
needs_update = False
fixed_code = code
if not verification_result.get("is_valid", True):
print(f"[Code Verification] Issues found in code block {index+1}")
if "fixed_code" in verification_result and verification_result["fixed_code"].strip():
fixed_code = verification_result["fixed_code"]
needs_update = code != fixed_code
# 設計書の要件確認
design_compliance = verification_result.get("design_compliance", True)
missing_requirements = verification_result.get("missing_requirements", [])
return {
"block_info": {
"block_index": index,
"language": language,
"is_valid": verification_result.get("is_valid", True),
"issues": verification_result.get("issues", []),
"algorithm_analysis": verification_result.get("algorithm_analysis", ""),
"edge_cases": verification_result.get("edge_cases", []),
"was_fixed": needs_update,
"design_compliance": design_compliance,
"missing_requirements": missing_requirements,
"physics_simulation_analysis": verification_result.get("physics_simulation_analysis", ""),
"error_handling_analysis": verification_result.get("error_handling_analysis", ""),
"code_completeness_analysis": verification_result.get("code_completeness_analysis", "")
},
"needs_update": needs_update,
"fixed_code": fixed_code,
"explanation": verification_result.get("explanation", "")
}
async def verify_code_blocks(self, content: str, user_input: str = None) -> Dict[str, Any]:
"""コンテンツから抽出したコードブロックを並列検証する(設計書認識機能付き)"""
print("\n[Code Verification] Verifying code blocks in content...")
# 設計書の検出と抽出
design_doc = self._extract_design_doc(content)
if design_doc:
print("[Code Verification] Design document detected")
# コンテンツからコードブロックを抽出
code_blocks = self._extract_code_blocks(content)
if not code_blocks:
print("[Code Verification] No code blocks found to verify")
# 設計書があるが実装がない場合
if design_doc:
return {
"verified_content": content,
"verification_result": {
"verified_code_blocks": [],
"issues_found": True,
"verification_summary": "設計書は見つかりましたが、実装コードがありません。設計書に基づいた実装が必要です。",
"design_requirements": self._extract_requirements_from_design(design_doc)
}
}
return {
"verified_content": content,
"verification_result": {
"verified_code_blocks": [],
"issues_found": False,
"verification_summary": "コードブロックが見つかりませんでした。"
}
}
# 検証結果を格納する辞書
verification_results = {
"verified_code_blocks": [],
"issues_found": False,
"verification_summary": "",
"verification_time": 0,
"algorithm_issues": [],
"critical_edge_cases": [],
"design_compliance": True,
"missing_requirements": [],
"physics_simulation_issues": [],
"error_handling_issues": [],
"code_completeness_issues": []
}
# 並列検証の開始時間を記録
start_time = asyncio.get_event_loop().time()
# 各コードブロックの検証タスクを作成
verification_tasks = []
for i, code_block in enumerate(code_blocks):
task = self._verify_single_code_block(i, code_block, user_input, design_doc)
verification_tasks.append(task)
# すべての検証タスクを並列実行
print(f"[Code Verification] Parallel verification of {len(verification_tasks)} code blocks started")
block_results = await asyncio.gather(*verification_tasks, return_exceptions=True)
# 更新されたコンテンツ
verified_content = content
# 設計書関連の問題を収集
design_issues = set()
# 検証結果の処理
for i, result in enumerate(block_results):
if isinstance(result, Exception):
print(f"[Error] Verification of block {i} failed: {str(result)}")
verification_results["verified_code_blocks"].append({
"block_index": i,
"language": code_blocks[i].get("language", "unknown"),
"verification_error": str(result)
})
continue
# 結果を格納
verification_results["verified_code_blocks"].append(result["block_info"])
# 設計書の遵守状況を確認
if not result["block_info"].get("design_compliance", True):
verification_results["design_compliance"] = False
missing_reqs = result["block_info"].get("missing_requirements", [])
for req in missing_reqs:
design_issues.add(req)
# アルゴリズム分析とエッジケースの収集
if result["block_info"].get("algorithm_analysis"):
verification_results["algorithm_issues"].append({
"block_index": i,
"analysis": result["block_info"]["algorithm_analysis"]
})
if result["block_info"].get("edge_cases"):
verification_results["critical_edge_cases"].extend(result["block_info"]["edge_cases"])
# 物理シミュレーション分析の収集
if result["block_info"].get("physics_simulation_analysis"):
verification_results["physics_simulation_issues"].append({
"block_index": i,
"analysis": result["block_info"]["physics_simulation_analysis"]
})
# エラーハンドリング分析の収集
if result["block_info"].get("error_handling_analysis"):
verification_results["error_handling_issues"].append({
"block_index": i,
"analysis": result["block_info"]["error_handling_analysis"]
})
# コード完全性分析の収集
if result["block_info"].get("code_completeness_analysis"):
verification_results["code_completeness_issues"].append({
"block_index": i,
"analysis": result["block_info"]["code_completeness_analysis"]
})
# 問題が見つかった場合はコンテンツを更新
if result["needs_update"]:
verification_results["issues_found"] = True
verified_content = self._update_code_in_content(
verified_content,
i,
result["fixed_code"]
)
# 修正の説明も保存
if "explanation" in result and result["explanation"]:
if "code_fixes_explanations" not in verification_results:
verification_results["code_fixes_explanations"] = []
verification_results["code_fixes_explanations"].append({
"block_index": i,
"explanation": result["explanation"]
})
# 検証時間を計算
end_time = asyncio.get_event_loop().time()
verification_time = end_time - start_time
verification_results["verification_time"] = round(verification_time, 2)
# 重複するエッジケースを削除
verification_results["critical_edge_cases"] = list(set(verification_results["critical_edge_cases"]))
# 設計書の要件を追加
if design_issues:
verification_results["missing_requirements"] = list(design_issues)
# 検証サマリーの生成
if verification_results["issues_found"]:
verification_results["verification_summary"] = f"コードの問題が検出され修正されました(検証時間: {verification_results['verification_time']}秒)。\n\n"
# 問題と修正内容のサマリーを対比形式で表示
for block in verification_results["verified_code_blocks"]:
if block.get("was_fixed", False):
issues_list = block.get("issues", [])
if issues_list:
verification_results["verification_summary"] += f"ブロック{block['block_index']+1}({block['language']}):\n"
for issue in issues_list[:3]:
verification_results["verification_summary"] += f"❌ 問題: {issue}\n"
verification_results["verification_summary"] += f"✅ 修正完了: コードが自動修正されました\n\n"
# 設計書の要件未実装がある場合
if not verification_results["design_compliance"] and verification_results["missing_requirements"]:
verification_results["verification_summary"] += "\n設計書の要件に対する問題:\n"
for req in verification_results["missing_requirements"][:5]:
verification_results["verification_summary"] += f"❌ 未実装: {req}\n"
if len(verification_results["missing_requirements"]) > 5:
verification_results["verification_summary"] += "...他にも未実装の要件があります\n"
# 物理シミュレーション関連の問題がある場合
if verification_results["physics_simulation_issues"]:
verification_results["verification_summary"] += "\n物理シミュレーションの問題:\n"
for issue in verification_results["physics_simulation_issues"][:3]:
verification_results["verification_summary"] += f"⚠️ {issue['analysis']}\n"
# エラーハンドリング関連の問題がある場合
if verification_results["error_handling_issues"]:
verification_results["verification_summary"] += "\nエラーハンドリングの問題:\n"
for issue in verification_results["error_handling_issues"][:3]:
verification_results["verification_summary"] += f"⚠️ {issue['analysis']}\n"
# コード完全性関連の問題がある場合
if verification_results["code_completeness_issues"]:
verification_results["verification_summary"] += "\nコードの完全性に関する問題:\n"
for issue in verification_results["code_completeness_issues"][:3]:
verification_results["verification_summary"] += f"⚠️ {issue['analysis']}\n"
else:
verification_results["verification_summary"] = f"✅ すべてのコードが検証を通過しました(検証時間: {verification_results['verification_time']}秒)。\n"
# エッジケースの提案がある場合
if verification_results["critical_edge_cases"]:
verification_results["verification_summary"] += f"\n\nテスト推奨エッジケース: {', '.join(verification_results['critical_edge_cases'][:5])}"
return {
"verified_content": verified_content,
"verification_result": verification_results
}
async def verify_implementation(self, technical_analysis, user_input: str = None) -> Dict[str, Any]:
"""実装データを検証する(JSON形式または純粋なコンテンツ)"""
if isinstance(technical_analysis, dict):
# 辞書/JSON形式のデータが渡された場合
code_blocks = self._extract_nested_code_blocks(technical_analysis)
if not code_blocks:
print("[Code Verification] No code blocks found in analysis")
return technical_analysis
# 辞書の場合の検証コードはこちらに実装する
# 現在はコードの抽出のみ実装
return technical_analysis
elif isinstance(technical_analysis, str):
# 文字列(マークダウンなど)の場合
result = await self.verify_code_blocks(technical_analysis, user_input)
return result["verified_content"]
else:
# その他の型の場合
print(f"[Error] Unsupported data type for verification: {type(technical_analysis)}")
return technical_analysis
async def verify_implementation_completeness(self, code: str) -> Dict[str, Any]:
"""コードの完全性を検証し、不足している部分を特定する"""
# 結果を格納する辞書
result = {
"is_complete": True,
"issues": [],
"unimplemented_methods": [],
"missing_imports": [],
"missing_error_handling": [],
"todos": []
}
# 空のコード、空のメソッド、パスのみのメソッドを検出
if not code.strip():
result["is_complete"] = False
result["issues"].append("コードが空です")
return result
try:
# ASTを使用してコードを解析
tree = ast.parse(code)
# 未実装メソッドの検出
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
# 関数本体の確認
function_body = node.body
# 空の関数や単なる'pass'のみの関数を検出
if not function_body:
result["is_complete"] = False
result["unimplemented_methods"].append(node.name)
result["issues"].append(f"未実装のメソッド: {node.name}")
elif len(function_body) == 1 and isinstance(function_body[0], ast.Pass):
result["is_complete"] = False
result["unimplemented_methods"].append(node.name)
result["issues"].append(f"未実装のメソッド (pass文のみ): {node.name}")
# ToDo コメントの検出
if isinstance(node, ast.Expr) and isinstance(node.value, ast.Str):
comment = node.value.s
if any(marker in comment for marker in ["TODO", "FIXME", "XXX", "todo", "fixme"]):
result["todos"].append(comment)
result["issues"].append(f"TODOコメント: {comment}")
# インポートの確認
imported_modules = set()
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for name in node.names:
imported_modules.add(name.name)
elif isinstance(node, ast.ImportFrom):
imported_modules.add(node.module)
# 使用されているが明示的にインポートされていないモジュールを検出
for node in ast.walk(tree):
if isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load):
module_name = node.id
# 標準ライブラリや組み込み関数は除外
if (module_name not in imported_modules and
module_name not in dir(__builtins__) and
not any(module_name.startswith(imp) for imp in imported_modules)):
result["missing_imports"].append(module_name)
result["issues"].append(f"使用されているが明示的にインポートされていないモジュール: {module_name}")
# エラーハンドリングの確認
try_blocks = sum(1 for node in ast.walk(tree) if isinstance(node, ast.Try))
if try_blocks == 0:
result["missing_error_handling"].append("try-exceptブロックが見つかりません")
result["issues"].append("エラーハンドリングが不足しています (try-exceptブロックなし)")
except SyntaxError as e:
result["is_complete"] = False
result["issues"].append(f"構文エラー: {str(e)}")
# 実行可能性のテスト
try:
# 一時ファイルにコードを書き込む
with tempfile.NamedTemporaryFile(suffix='.py', delete=False) as temp_file:
temp_file_path = temp_file.name
temp_file.write(code.encode('utf-8'))
# Pythonのコンパイルチェック(構文チェック)を実行
compile_result = subprocess.run(
['python', '-m', 'py_compile', temp_file_path],
capture_output=True,
text=True
)
# エラーがあれば記録
if compile_result.returncode != 0:
result["is_complete"] = False
result["issues"].append(f"コンパイルエラー: {compile_result.stderr}")
except Exception as e:
result["issues"].append(f"コードの実行可能性テスト中にエラー: {str(e)}")
finally:
# 一時ファイルの削除
if 'temp_file_path' in locals():
try:
os.remove(temp_file_path)
except:
pass
# 完全性の最終判定
result["is_complete"] = result["is_complete"] and not result["issues"]
return result
async def apply_verification_feedback(self, original_prompt: str, verification_result: Dict[str, Any]) -> str:
"""検証結果を基に改善されたプロンプトを生成する"""
# 検証結果から重要な問題点を抽出
issues = []
if "verified_code_blocks" in verification_result:
for block in verification_result["verified_code_blocks"]:
if not block.get("is_valid", True):
issues.extend(block.get("issues", []))
# 設計要件との不一致
if not verification_result.get("design_compliance", True):
missing_reqs = verification_result.get("missing_requirements", [])
for req in missing_reqs:
issues.append(f"設計要件未実装: {req}")
# エッジケース対応の不足
edge_cases = verification_result.get("critical_edge_cases", [])
if edge_cases:
issues.append(f"対応すべきエッジケース: {', '.join(edge_cases[:5])}")
# 物理シミュレーション関連の問題
if "physics_simulation_issues" in verification_result and verification_result["physics_simulation_issues"]:
for issue in verification_result["physics_simulation_issues"][:3]:
issues.append(f"物理シミュレーションの問題: {issue.get('analysis', '')}")
# エラーハンドリング関連の問題
if "error_handling_issues" in verification_result and verification_result["error_handling_issues"]:
for issue in verification_result["error_handling_issues"][:3]:
issues.append(f"エラーハンドリングの問題: {issue.get('analysis', '')}")
# コード完全性関連の問題
if "code_completeness_issues" in verification_result and verification_result["code_completeness_issues"]:
for issue in verification_result["code_completeness_issues"][:3]:
issues.append(f"コード完全性の問題: {issue.get('analysis', '')}")
# 問題点が見つからない場合は元のプロンプトを返す
if not issues:
return original_prompt
# 問題点を含めた改善プロンプトを生成
improved_prompt = f"{original_prompt}\n\n前回の実装では以下の問題点がありました。これらを解決した実装を提供してください:\n"
for i, issue in enumerate(issues[:10]): # 最大10個の問題を表示
improved_prompt += f"{i+1}. {issue}\n"
if len(issues) > 10:
improved_prompt += f"他にも{len(issues)-10}個の問題があります。\n"
improved_prompt += "\n特に以下の点に注意して実装してください:\n"
improved_prompt += "1. 全てのコードが実行可能であること\n"
improved_prompt += "2. エッジケース(極端な入力値、境界条件)に対応すること\n"
improved_prompt += "3. エラーハンドリングを徹底すること\n"
improved_prompt += "4. 設計書の全ての要件を実装すること\n"
return improved_prompt