Spaces:
Sleeping
Sleeping
| from langgraph.graph import StateGraph, START, END | |
| from langgraph.graph.message import MessagesState | |
| from langchain_core.messages import AIMessage, HumanMessage, SystemMessage | |
| from services.llm_service import LLMService | |
| from services.file_service import save_response, create_file_element | |
| from models.chat_state import AttackState, get_initial_state | |
| import chainlit as cl | |
| import json | |
| import logging | |
| import os | |
| from urllib.parse import quote | |
| # Initialize services | |
| llm_service = LLMService() | |
| logger = logging.getLogger(__name__) | |
| CHAINLIT_URL = os.environ.get("SPACE_HOST") | |
| if not CHAINLIT_URL: | |
| CHAINLIT_URL = "http://localhost:8080" | |
| if not CHAINLIT_URL.startswith("https://"): | |
| CHAINLIT_URL = "https://" + CHAINLIT_URL | |
| async def evaluate_context_node(state: AttackState) -> AttackState: | |
| """Node for evaluating if the user input is valid for ATT&CK context""" | |
| msg = cl.Message(content="") | |
| # Get the last user message | |
| user_messages = [msg for msg in state['messages'] if isinstance(msg, HumanMessage)] | |
| user_message = user_messages[-1].content if user_messages else "" | |
| try: | |
| evaluation_result = await llm_service.evaluate_context(user_message) | |
| state['is_valid_context'] = evaluation_result.is_valid | |
| state['extracted_user_scenario'] = evaluation_result.extracted_scenario | |
| state['extracted_user_layer_operation'] = evaluation_result.extracted_layer_operation | |
| if state['is_valid_context']: | |
| response_text = "入力はATT&CKフレームワークのコンテキストに合致します。シナリオの評価を続けます。" | |
| else: | |
| response_text = "申し訳ありませんが、この入力はサイバー攻撃の分析やATT&CKフレームワークのレイヤーに関する指示として認識できませんでした。適切な指示を入力してください。" | |
| await msg.stream_token(response_text) | |
| await msg.send() | |
| state['messages'].append(AIMessage(content=response_text)) | |
| except Exception as e: | |
| error_msg = f"コンテキスト評価中にエラーが発生しました: {str(e)}" | |
| await msg.stream_token(error_msg) | |
| await msg.send() | |
| state['messages'].append(AIMessage(content=error_msg)) | |
| state['is_valid_context'] = False | |
| return state | |
| async def update_scenario_node(state: AttackState) -> AttackState: | |
| """Node for updating the scenario based on user input""" | |
| msg = cl.Message(content="") | |
| # Get the last user message | |
| user_message = state.get('extracted_user_scenario', None) | |
| current_scenario = state.get('scenario', None) | |
| if not user_message and not current_scenario: | |
| raise ValueError("シナリオの更新に必要な情報がありません。") | |
| try: | |
| updated_scenario = await llm_service.generate_scenario(user_message, current_scenario) | |
| state['scenario'] = updated_scenario | |
| message = "新しいシナリオを作成しました。" if not current_scenario else "シナリオを更新しました。" | |
| await msg.stream_token(message) | |
| await msg.send() | |
| state['messages'].append(AIMessage(content=message)) | |
| except Exception as e: | |
| error_msg = f"シナリオの{'作成' if not current_scenario else '更新'}中にエラーが発生しました: {str(e)}" | |
| await msg.stream_token(error_msg) | |
| await msg.send() | |
| state['messages'].append(AIMessage(content=error_msg)) | |
| return state | |
| async def generate_json_node(state: AttackState) -> AttackState: | |
| """Node for generating or updating ATT&CK Navigator JSON""" | |
| user_message = state.get('extracted_user_layer_operation') | |
| current_scenario = state.get('scenario') | |
| existing_json = state.get('attack_json') | |
| try: | |
| json_content = await llm_service.generate_attack_json(user_message, current_scenario, existing_json) | |
| # Save JSON to file | |
| filename, filepath = save_response(json_content) | |
| file_element = create_file_element(filename, filepath) | |
| json_url = CHAINLIT_URL + "/" + filepath | |
| json_url = quote(json_url) | |
| # Prepare and send the response message | |
| response = "MITRE ATT&CK Navigatorレイヤーを更新しました。" if existing_json else "MITRE ATT&CK Navigatorレイヤーを生成しました。" | |
| response += " ファイルをダウンロードしてインポートできます。" | |
| response += f"ATT&CK Navigator : https://mitre-attack.github.io/attack-navigator//#layerURL={json_url}" | |
| msg = cl.Message(content=response, elements=[file_element]) | |
| await msg.send() | |
| # Update state | |
| state['messages'].append(AIMessage(content=response)) | |
| state['attack_json'] = json.loads(json_content) | |
| except Exception as e: | |
| error_msg = f"ATT&CK Navigatorレイヤーの生成中にエラーが発生しました: {str(e)}" | |
| msg = cl.Message(content=error_msg) | |
| await msg.send() | |
| state['messages'].append(AIMessage(content=error_msg)) | |
| return state | |
| async def display_state_node(state: AttackState) -> AttackState: | |
| """Node for displaying the current state before ending""" | |
| async with cl.Step(name="状態表示", type="display") as step: | |
| # コンテキストの評価結果 | |
| if state.get('is_valid_context') is not None: | |
| status = "有効" if state['is_valid_context'] else "無効" | |
| step.input = "コンテキスト評価" | |
| step.output = f"コンテキストの評価結果: {status}" | |
| # 現在のシナリオ | |
| if state.get('scenario'): | |
| async with cl.Step(name="シナリオ情報", type="display") as scenario_step: | |
| scenario_step.input = "現在のシナリオ" | |
| scenario_step.output = state['scenario'] | |
| # JSONの状態 | |
| if state.get('attack_json'): | |
| async with cl.Step(name="ATT&CK情報", type="display") as attack_step: | |
| techniques = state['attack_json'].get('techniques', []) | |
| technique_count = len(techniques) | |
| attack_step.input = "登録済みテクニック" | |
| if technique_count > 0: | |
| technique_list = "\n".join([ | |
| f"- {t.get('techniqueID', 'Unknown')}: {t.get('comment', '説明なし')}" | |
| for t in techniques[:5] | |
| ]) | |
| if technique_count > 5: | |
| technique_list += f"\n... 他 {technique_count - 5} 件" | |
| attack_step.output = f"登録済みテクニック数: {technique_count}\n\n{technique_list}" | |
| else: | |
| attack_step.output = "登録済みテクニックはありません" | |
| msg = cl.Message(content="処理が完了しました。") | |
| await msg.send() | |
| return state | |
| # Create the graph | |
| workflow = StateGraph(AttackState) | |
| # Add nodes | |
| workflow.add_node("evaluate_context", evaluate_context_node) | |
| workflow.add_node("update_scenario", update_scenario_node) | |
| workflow.add_node("generate_json", generate_json_node) | |
| workflow.add_node("display_state", display_state_node) | |
| # Add edges | |
| workflow.add_edge(START, "evaluate_context") | |
| workflow.add_conditional_edges( | |
| "evaluate_context", | |
| lambda state: state.get('is_valid_context', False), | |
| { | |
| True: "update_scenario", | |
| False: "display_state" | |
| } | |
| ) | |
| workflow.add_edge("update_scenario", "generate_json") | |
| workflow.add_edge("generate_json", "display_state") | |
| workflow.add_edge("display_state", END) | |
| # Compile the graph | |
| chainlit_app = workflow.compile() | |
| async def test_evaluate_context_node(state: AttackState) -> AttackState: | |
| """テスト用のコンテキスト評価ノード""" | |
| user_messages = [msg for msg in state['messages'] if isinstance(msg, HumanMessage)] | |
| user_message = user_messages[-1].content if user_messages else "" | |
| try: | |
| evaluation_result = await llm_service.evaluate_context(user_message) | |
| state['is_valid_context'] = evaluation_result.is_valid | |
| state['extracted_user_scenario'] = evaluation_result.extracted_scenario | |
| state['extracted_user_layer_operation'] = evaluation_result.extracted_layer_operation | |
| response_text = "入力はATT&CKフレームワークのコンテキストに合致します。シナリオの評価を続けます。" if state['is_valid_context'] else "申し訳ありませんが、この入力はサイバー攻撃の分析やATT&CKフレームワークのレイヤーに関する指示として認識できませんでした。適切な指示を入力してください。" | |
| state['messages'].append(AIMessage(content=response_text)) | |
| except Exception as e: | |
| error_msg = f"コンテキスト評価中にエラーが発生しました: {str(e)}" | |
| state['messages'].append(AIMessage(content=error_msg)) | |
| state['is_valid_context'] = False | |
| return state | |
| async def test_update_scenario_node(state: AttackState) -> AttackState: | |
| """テスト用のシナリオ更新ノード""" | |
| # Get the last user message | |
| user_message = state.get('extracted_user_scenario') | |
| current_scenario = state.get('scenario') | |
| try: | |
| updated_scenario = await llm_service.generate_scenario(user_message, current_scenario) | |
| state['scenario'] = updated_scenario | |
| message = "新しいシナリオを作成しました。" if not current_scenario else "シナリオを更新しました。" | |
| state['messages'].append(AIMessage(content=message)) | |
| except Exception as e: | |
| error_msg = f"シナリオの{'作成' if not current_scenario else '更新'}中にエラーが発生しました: {str(e)}" | |
| state['messages'].append(AIMessage(content=error_msg)) | |
| return state | |
| async def test_generate_json_node(state: AttackState) -> AttackState: | |
| """テスト用のJSON生成ノード""" | |
| user_message = state.get('extracted_user_layer_operation') | |
| current_scenario = state.get('scenario') | |
| existing_json = state.get('attack_json') | |
| try: | |
| json_content = await llm_service.generate_attack_json(user_message, current_scenario, existing_json) | |
| response = "MITRE ATT&CK Navigatorレイヤーを更新しました。" if existing_json else "MITRE ATT&CK Navigatorレイヤーを生成しました。" | |
| response += " ファイルをダウンロードしてインポートできます。" | |
| state['messages'].append(AIMessage(content=response)) | |
| state['attack_json'] = json.loads(json_content) | |
| except Exception as e: | |
| error_msg = f"ATT&CK Navigatorレイヤーの生成中にエラーが発生しました: {str(e)}" | |
| state['messages'].append(AIMessage(content=error_msg)) | |
| return state | |
| async def test_display_state_node(state: AttackState) -> AttackState: | |
| """テスト用の状態表示ノード""" | |
| summary = [] | |
| if state.get('is_valid_context') is not None: | |
| status = "有効" if state['is_valid_context'] else "無効" | |
| summary.append(f"コンテキストの評価結果: {status}") | |
| if state.get('scenario'): | |
| summary.append(f"現在のシナリオ:\n{state['scenario']}") | |
| if state.get('attack_json'): | |
| techniques = state['attack_json'].get('techniques', []) | |
| technique_count = len(techniques) | |
| summary.append(f"登録済みテクニック数: {technique_count}") | |
| if technique_count > 0: | |
| technique_list = "\n".join([ | |
| f"- {t.get('techniqueID', 'Unknown')}: {t.get('comment', '説明なし')}" | |
| for t in techniques[:5] | |
| ]) | |
| if technique_count > 5: | |
| technique_list += f"\n... 他 {technique_count - 5} 件" | |
| summary.append(f"\n登録済みテクニック:\n{technique_list}") | |
| if summary: | |
| state_summary = "\n\n".join(summary) | |
| state['messages'].append(AIMessage(content=f"現在の状態:\n{state_summary}")) | |
| return state | |
| async def main(): | |
| """テスト用のメイン関数""" | |
| try: | |
| # 初期状態の作成 | |
| initial_state = get_initial_state() | |
| # テスト用の既存シナリオ | |
| existing_scenario = """ | |
| 標的システムへの不正アクセスシナリオ | |
| 概要: | |
| 攻撃者は、標的のシステムに不正アクセスを試み、機密情報を窃取します。 | |
| 攻撃フェーズ: | |
| 1. 初期アクセス | |
| - パスワードスプレー攻撃による認証情報の取得 | |
| - 有効なアカウントの特定 | |
| 2. 実行 | |
| - 取得した認証情報を使用してシステムにログイン | |
| - 不正なコマンドの実行 | |
| 3. 権限昇格 | |
| - 管理者権限の取得 | |
| - システム設定の変更 | |
| 4. 防御回避 | |
| - ログの削除 | |
| - 攻撃痕跡の隠蔽 | |
| """ | |
| # テスト用の既存JSON | |
| existing_json = { | |
| "name": "Test Layer", | |
| "versions": { | |
| "attack": "16.0", | |
| "navigator": "4.9.0", | |
| "layer": "4.5" | |
| }, | |
| "domain": "enterprise-attack", | |
| "description": "Test layer for development", | |
| "filters": { | |
| "platforms": ["Windows", "Linux", "macOS"] | |
| }, | |
| "gradient": { | |
| "colors": ["#ffffff", "#ff6666"], | |
| "minValue": 0, | |
| "maxValue": 100 | |
| }, | |
| "techniques": [ | |
| { | |
| "techniqueID": "T1110", | |
| "score": 50, | |
| "color": "#ff6666", | |
| "comment": "パスワードスプレー攻撃による認証情報の取得", | |
| "enabled": True | |
| }, | |
| { | |
| "techniqueID": "T1078", | |
| "score": 50, | |
| "color": "#ff6666", | |
| "comment": "有効なアカウントを使用した不正アクセス", | |
| "enabled": True | |
| } | |
| ] | |
| } | |
| # 初期状態に既存データを設定 | |
| initial_state['scenario'] = existing_scenario | |
| initial_state['attack_json'] = existing_json | |
| # テスト用のユーザーメッセージ | |
| test_message = """ | |
| 以下の攻撃シナリオを分析してください: | |
| 攻撃者は、標的のシステムに不正アクセスを試みます。 | |
| まず、パスワードスプレー攻撃を実行して、有効なアカウントの認証情報を取得します。 | |
| 取得した認証情報を使用して、システムにログインし、機密情報を窃取します。 | |
| 最後に、攻撃の痕跡を隠蔽するために、ログを削除します。 | |
| """ | |
| test_message = "ATTACKのバージョンを16にして、テクニックは青にして。" | |
| # ユーザーメッセージを状態に追加 | |
| initial_state['messages'].append(HumanMessage(content=test_message)) | |
| # テスト用ワークフローの実行 | |
| state = await test_evaluate_context_node(initial_state) | |
| if state.get('is_valid_context', False): | |
| # シナリオ更新 | |
| state = await test_update_scenario_node(state) | |
| # JSON生成 | |
| state = await test_generate_json_node(state) | |
| # 状態表示 | |
| state = await test_display_state_node(state) | |
| # 結果の表示 | |
| for msg in state['messages']: | |
| role = "User" if isinstance(msg, HumanMessage) else "Assistant" | |
| print(f"\n{role}:") | |
| print(msg.content) | |
| except Exception as e: | |
| print(f"エラーが発生しました: {str(e)}") | |
| raise | |
| if __name__ == "__main__": | |
| import asyncio | |
| asyncio.run(main()) |