adventure / app.py
oggata's picture
Update app.py
783ff05 verified
import gradio as gr
import json
import time
import random
import threading
from datetime import datetime, timedelta
from huggingface_hub import InferenceClient
from typing import Dict, List, Tuple, Optional
import asyncio
from dataclasses import dataclass, asdict
import uuid
# LLMクライアントの初期化
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
@dataclass
class Position:
"""3D空間での位置を表すクラス"""
x: float
y: float
z: float
@dataclass
class Location:
"""町の中の場所を表すクラス"""
name: str
position: Position
capacity: int
current_occupants: List[str]
description: str
@dataclass
class Resident:
"""住民を表すクラス"""
id: str
name: str
profession: str
personality: str
age: int
current_location: str
position: Position
target_position: Optional[Position]
friendships: Dict[str, float] # 他の住民との友好度 (0-1)
current_activity: str
mood: str
energy: float # 0-1
last_action_time: float
class CitySimulation:
"""都市シミュレーションのメインクラス"""
def __init__(self):
self.residents = {}
self.locations = {}
self.simulation_time = datetime(2024, 1, 1, 8, 0) # 朝8時からスタート
self.time_speed = 1 # 1秒 = 1分のシミュレーション時間
self.running = False
self.last_update = time.time()
self.action_log = []
self._initialize_locations()
self._initialize_residents()
def _initialize_locations(self):
"""町の場所を初期化"""
locations_data = [
{"name": "公園", "pos": (0, 0, 0), "capacity": 10, "desc": "緑豊かな憩いの場所"},
{"name": "図書館", "pos": (20, 0, 0), "capacity": 8, "desc": "静かな学習空間"},
{"name": "カフェ", "pos": (-20, 0, 0), "capacity": 6, "desc": "温かい飲み物と会話の場"},
{"name": "商店", "pos": (0, 0, 20), "capacity": 5, "desc": "生活必需品を購入できる店"},
{"name": "住宅A", "pos": (15, 0, 15), "capacity": 2, "desc": "太郎の家"},
{"name": "住宅B", "pos": (-15, 0, 15), "capacity": 2, "desc": "花子の家"},
{"name": "住宅C", "pos": (15, 0, -15), "capacity": 2, "desc": "健太の家"},
{"name": "住宅D", "pos": (-15, 0, -15), "capacity": 2, "desc": "美咲の家"},
{"name": "住宅E", "pos": (0, 0, -20), "capacity": 2, "desc": "雄二の家"},
]
for loc_data in locations_data:
loc = Location(
name=loc_data["name"],
position=Position(*loc_data["pos"]),
capacity=loc_data["capacity"],
current_occupants=[],
description=loc_data["desc"]
)
self.locations[loc.name] = loc
def _initialize_residents(self):
"""住民を初期化"""
residents_data = [
{
"name": "太郎", "profession": "教師", "age": 35,
"personality": "真面目で責任感が強く、読書が好き",
"home": "住宅A", "mood": "普通"
},
{
"name": "花子", "profession": "看護師", "age": 28,
"personality": "優しく思いやりがあり、人助けが好き",
"home": "住宅B", "mood": "元気"
},
{
"name": "健太", "profession": "エンジニア", "age": 30,
"personality": "論理的で技術に詳しく、コーヒーが好き",
"home": "住宅C", "mood": "集中"
},
{
"name": "美咲", "profession": "デザイナー", "age": 26,
"personality": "創造的で芸術的センスがあり、自然が好き",
"home": "住宅D", "mood": "インスピレーション"
},
{
"name": "雄二", "profession": "商店主", "age": 45,
"personality": "社交的で商売上手、地域のことをよく知っている",
"home": "住宅E", "mood": "活動的"
}
]
# 住民を作成
for i, data in enumerate(residents_data):
home_pos = self.locations[data["home"]].position
resident = Resident(
id=str(uuid.uuid4()),
name=data["name"],
profession=data["profession"],
personality=data["personality"],
age=data["age"],
current_location=data["home"],
position=Position(home_pos.x, home_pos.y, home_pos.z),
target_position=None,
friendships={},
current_activity="睡眠中",
mood=data["mood"],
energy=0.8,
last_action_time=time.time()
)
self.residents[resident.id] = resident
# 友好度を初期化(ランダムな値)
resident_ids = list(self.residents.keys())
for i, resident_id in enumerate(resident_ids):
for j, other_id in enumerate(resident_ids):
if i != j:
# 初期友好度は0.3-0.7の範囲でランダム
friendship = random.uniform(0.3, 0.7)
self.residents[resident_id].friendships[other_id] = friendship
def decide_action(self, resident: Resident) -> str:
"""住民の次の行動を決定(簡単版)"""
current_time_str = self.simulation_time.strftime("%H:%M")
# 時間帯による基本的な行動パターン
hour = self.simulation_time.hour
# 基本的な行動ルール
if 6 <= hour < 8:
if resident.current_location.startswith("住宅"):
actions = ["公園へ移動", "散歩をする"]
else:
actions = ["家に帰る"]
elif 8 <= hour < 12:
if resident.profession == "教師":
actions = ["図書館へ移動", "読書をする"]
elif resident.profession == "看護師":
actions = ["カフェへ移動", "コーヒーを飲む"]
elif resident.profession == "エンジニア":
actions = ["カフェへ移動", "プログラミング"]
elif resident.profession == "デザイナー":
actions = ["公園へ移動", "スケッチをする"]
else: # 商店主
actions = ["商店へ移動", "店番をする"]
elif 12 <= hour < 14:
actions = ["カフェへ移動", "昼食をとる", "休憩する"]
elif 14 <= hour < 18:
actions = ["図書館へ移動", "公園へ移動", "商店へ移動", "読書をする", "散歩をする"]
elif 18 <= hour < 20:
actions = ["カフェへ移動", "夕食をとる", "友人と話す"]
else:
actions = ["家に帰る", "休憩する"]
# エネルギーレベルによる調整
if resident.energy < 0.3:
actions = ["家に帰る", "休憩する"]
action = random.choice(actions)
reason = f"{current_time_str}の行動として適切だと判断"
return f"行動: {action}\n理由: {reason}"
def parse_action(self, action_text: str) -> Tuple[str, str]:
"""行動テキストから行動と理由を解析"""
lines = action_text.split('\n')
action = ""
reason = ""
for line in lines:
if line.startswith("行動:"):
action = line.replace("行動:", "").strip()
elif line.startswith("理由:"):
reason = line.replace("理由:", "").strip()
return action, reason
def execute_action(self, resident: Resident, action: str, reason: str):
"""住民の行動を実行"""
# 移動の場合
if "移動" in action:
target_location = None
for loc_name in self.locations.keys():
if loc_name in action:
target_location = loc_name
break
if target_location and target_location != resident.current_location:
# 現在の場所から退去
if resident.id in self.locations[resident.current_location].current_occupants:
self.locations[resident.current_location].current_occupants.remove(resident.id)
# 新しい場所へ移動
target_pos = self.locations[target_location].position
resident.target_position = Position(target_pos.x, target_pos.y, target_pos.z)
resident.current_location = target_location
self.locations[target_location].current_occupants.append(resident.id)
# 活動を更新
if target_location == "カフェ":
resident.current_activity = "コーヒーを飲んでいる"
elif target_location == "図書館":
resident.current_activity = "読書中"
elif target_location == "公園":
resident.current_activity = "散歩中"
elif target_location == "商店":
resident.current_activity = "買い物中"
elif target_location.startswith("住宅"):
resident.current_activity = "休憩中"
# その場での活動の場合
else:
resident.current_activity = action
# エネルギーと気分を更新
if "休憩" in action or "睡眠" in action:
resident.energy = min(1.0, resident.energy + 0.2)
else:
resident.energy = max(0.1, resident.energy - 0.1)
# ログに記録
log_entry = {
"time": self.simulation_time.strftime("%H:%M"),
"resident": resident.name,
"action": action,
"reason": reason,
"location": resident.current_location
}
self.action_log.append(log_entry)
# ログは最新100件のみ保持
if len(self.action_log) > 100:
self.action_log = self.action_log[-100:]
def update_positions(self):
"""住民の位置を更新(移動アニメーション用)"""
for resident in self.residents.values():
if resident.target_position:
# 現在位置から目標位置への移動
current = resident.position
target = resident.target_position
# 移動速度
speed = 2.0
# 距離を計算
dx = target.x - current.x
dy = target.y - current.y
dz = target.z - current.z
distance = (dx**2 + dy**2 + dz**2)**0.5
if distance < speed:
# 目標位置に到達
resident.position = target
resident.target_position = None
else:
# 目標に向かって移動
factor = speed / distance
resident.position.x += dx * factor
resident.position.y += dy * factor
resident.position.z += dz * factor
def simulation_step(self):
"""シミュレーションの1ステップを実行"""
current_time = time.time()
# 時間を進める
time_delta = (current_time - self.last_update) * self.time_speed
self.simulation_time += timedelta(minutes=time_delta)
self.last_update = current_time
# 位置を更新
self.update_positions()
# 各住民の行動を決定・実行(5分ごと)
for resident in self.residents.values():
if current_time - resident.last_action_time > 5: # 5秒ごとに行動判定
try:
action_text = self.decide_action(resident)
action, reason = self.parse_action(action_text)
self.execute_action(resident, action, reason)
resident.last_action_time = current_time
except Exception as e:
print(f"Action decision error for {resident.name}: {e}")
def get_state(self) -> Dict:
"""現在のシミュレーション状態を取得"""
return {
"time": self.simulation_time.strftime("%Y-%m-%d %H:%M"),
"residents": [
{
"id": r.id,
"name": r.name,
"profession": r.profession,
"position": {"x": r.position.x, "y": r.position.y, "z": r.position.z},
"location": r.current_location,
"activity": r.current_activity,
"mood": r.mood,
"energy": r.energy
}
for r in self.residents.values()
],
"locations": [
{
"name": loc.name,
"position": {"x": loc.position.x, "y": loc.position.y, "z": loc.position.z},
"occupants": len(loc.current_occupants),
"capacity": loc.capacity,
"description": loc.description
}
for loc in self.locations.values()
],
"recent_actions": self.action_log[-10:] # 最新10件のアクション
}
# グローバルシミュレーションインスタンス
simulation = CitySimulation()
simulation_thread = None
def run_simulation_thread():
"""シミュレーションを別スレッドで実行"""
while simulation.running:
try:
simulation.simulation_step()
time.sleep(1) # 1秒間隔で更新
except Exception as e:
print(f"Simulation error: {e}")
time.sleep(1)
def start_simulation():
"""シミュレーションを開始"""
global simulation_thread
if not simulation.running:
simulation.running = True
simulation_thread = threading.Thread(target=run_simulation_thread, daemon=True)
simulation_thread.start()
return "シミュレーションを開始しました", f"{simulation.time_speed}x"
return "シミュレーションは既に実行中です", f"{simulation.time_speed}x"
def stop_simulation():
"""シミュレーションを停止"""
if simulation.running:
simulation.running = False
return "シミュレーションを停止しました", f"{simulation.time_speed}x"
return "シミュレーションは停止中です", f"{simulation.time_speed}x"
def set_time_speed(speed: float):
"""シミュレーション時間の速度を設定"""
simulation.time_speed = max(0.1, min(speed, 8.0))
return f"時間速度を {simulation.time_speed}x に設定しました", f"{simulation.time_speed}x"
def get_simulation_state_json():
"""現在のシミュレーション状態をJSON文字列として返す"""
return json.dumps(simulation.get_state(), ensure_ascii=False, indent=2)
def get_friendship_matrix():
"""住民間の友好度マトリックスを取得"""
residents_list = list(simulation.residents.values())
matrix = []
header = ["住民"] + [r.name for r in residents_list]
matrix.append(header)
for resident in residents_list:
row = [resident.name]
for other_resident in residents_list:
if resident.id == other_resident.id:
row.append("自分")
else:
friendship = resident.friendships.get(other_resident.id, 0.5)
row.append(f"{friendship:.2f}")
matrix.append(row)
return matrix
def get_detailed_info():
"""詳細な住民情報を取得"""
info = []
for resident in simulation.residents.values():
friends = []
for other_id, friendship in resident.friendships.items():
other_name = next((r.name for r in simulation.residents.values() if r.id == other_id), "不明")
if friendship > 0.7:
friends.append(f"{other_name}(親友)")
elif friendship > 0.5:
friends.append(f"{other_name}(友人)")
info.append([
resident.name,
resident.profession,
f"{resident.age}歳",
resident.personality,
resident.current_location,
resident.current_activity,
resident.mood,
f"{resident.energy:.1f}/1.0",
", ".join(friends) if friends else "なし"
])
return info
def get_action_log():
"""アクションログを取得"""
log_data = []
for action in simulation.action_log[-20:]: # 最新20件
log_data.append([
action.get('time', ''),
action.get('resident', ''),
action.get('action', ''),
action.get('reason', ''),
action.get('location', '')
])
return log_data
# Gradioインターフェースの構築
def create_interface():
"""Gradioインターフェースを作成"""
with gr.Blocks(title="マルチエージェント都市シミュレーション", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🏙️ マルチエージェント都市シミュレーション")
gr.Markdown("5人の住民が暮らす町のリアルタイムシミュレーション。各住民は自律的に行動を決定します。")
with gr.Tabs():
# メインビューTab
with gr.TabItem("🎮 シミュレーション"):
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("### シミュレーション状態")
state_display = gr.JSON(
label="リアルタイム状態",
value=simulation.get_state()
)
auto_update_btn = gr.Button("🔄 状態更新", variant="secondary")
with gr.Column(scale=1):
gr.Markdown("### コントロール")
with gr.Row():
start_btn = gr.Button("▶️ 開始", variant="primary")
stop_btn = gr.Button("⏸️ 停止", variant="secondary")
status_display = gr.Textbox(
label="状態",
value="停止中",
interactive=False
)
speed_display = gr.Textbox(
label="現在の時間速度",
value="1x",
interactive=False
)
gr.Markdown("### 時間制御")
with gr.Row():
speed_up_btn = gr.Button("⏩ 2倍速")
slow_down_btn = gr.Button("⏪ 半分速")
time_speed_slider = gr.Slider(
minimum=0.1,
maximum=8.0,
value=1.0,
step=0.1,
label="時間速度倍率"
)
apply_speed_btn = gr.Button("速度適用")
# 住民情報Tab
with gr.TabItem("👥 住民情報"):
gr.Markdown("### 住民の詳細情報")
residents_info = gr.Dataframe(
headers=["名前", "職業", "年齢", "性格", "現在地", "活動", "気分", "エネルギー", "友人関係"],
value=get_detailed_info(),
interactive=False
)
refresh_residents_btn = gr.Button("🔄 情報更新")
# 友好度Tab
with gr.TabItem("💝 友好度マトリックス"):
gr.Markdown("### 住民間の友好度(0.0-1.0)")
friendship_matrix = gr.Dataframe(
value=get_friendship_matrix(),
interactive=False
)
refresh_friendship_btn = gr.Button("🔄 友好度更新")
# アクションログTab
with gr.TabItem("📝 アクションログ"):
gr.Markdown("### 最近の住民の行動")
action_log = gr.Dataframe(
headers=["時刻", "住民", "行動", "理由", "場所"],
value=get_action_log(),
interactive=False
)
refresh_log_btn = gr.Button("🔄 ログ更新")
# 3D可視化Tab
with gr.TabItem("🏗️ 3D町マップ"):
gr.Markdown("### 町の3Dレイアウト")
# Three.jsを使用した3Dマップの表示
three_map = gr.HTML(
value="""
<div id="three-container" style="width: 100%; height: 600px;"></div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/three.js/r128/three.min.js"></script>
<script>
// Three.jsを使用した3Dマップの実装
let scene, camera, renderer, buildings = {}, residents = {};
function init() {
// シーンの作成
scene = new THREE.Scene();
scene.background = new THREE.Color(0x87CEEB); // 空色の背景
// カメラの設定
camera = new THREE.PerspectiveCamera(75, window.innerWidth / window.innerHeight, 0.1, 1000);
camera.position.set(50, 50, 50);
camera.lookAt(0, 0, 0);
// レンダラーの設定
renderer = new THREE.WebGLRenderer({ antialias: true });
renderer.setSize(window.innerWidth, window.innerHeight);
document.getElementById('three-container').appendChild(renderer.domElement);
// 光源の追加
const ambientLight = new THREE.AmbientLight(0xffffff, 0.5);
scene.add(ambientLight);
const directionalLight = new THREE.DirectionalLight(0xffffff, 0.8);
directionalLight.position.set(50, 50, 50);
scene.add(directionalLight);
// 地面の作成
const groundGeometry = new THREE.PlaneGeometry(100, 100);
const groundMaterial = new THREE.MeshStandardMaterial({
color: 0x90EE90,
side: THREE.DoubleSide
});
const ground = new THREE.Mesh(groundGeometry, groundMaterial);
ground.rotation.x = -Math.PI / 2;
scene.add(ground);
// アニメーションループの開始
animate();
// ウィンドウリサイズのハンドリング
window.addEventListener('resize', onWindowResize, false);
}
function createBuilding(name, position, color = 0x808080) {
const geometry = new THREE.BoxGeometry(10, 10, 10);
const material = new THREE.MeshStandardMaterial({ color: color });
const building = new THREE.Mesh(geometry, material);
building.position.set(position.x, 5, position.z);
scene.add(building);
buildings[name] = building;
// 建物名のラベル
const canvas = document.createElement('canvas');
const context = canvas.getContext('2d');
canvas.width = 256;
canvas.height = 64;
context.fillStyle = '#ffffff';
context.font = '24px Arial';
context.fillText(name, 10, 32);
const texture = new THREE.CanvasTexture(canvas);
const labelMaterial = new THREE.SpriteMaterial({ map: texture });
const label = new THREE.Sprite(labelMaterial);
label.position.set(position.x, 15, position.z);
scene.add(label);
}
function createResident(name, position, color = 0xff0000) {
const geometry = new THREE.SphereGeometry(1, 32, 32);
const material = new THREE.MeshStandardMaterial({ color: color });
const resident = new THREE.Mesh(geometry, material);
resident.position.set(position.x, 1, position.z);
scene.add(resident);
residents[name] = resident;
// 住民名のラベル
const canvas = document.createElement('canvas');
const context = canvas.getContext('2d');
canvas.width = 256;
canvas.height = 64;
context.fillStyle = '#ffffff';
context.font = '24px Arial';
context.fillText(name, 10, 32);
const texture = new THREE.CanvasTexture(canvas);
const labelMaterial = new THREE.SpriteMaterial({ map: texture });
const label = new THREE.Sprite(labelMaterial);
label.position.set(position.x, 3, position.z);
scene.add(label);
}
function updatePositions(state) {
// 建物の位置を更新
state.locations.forEach(location => {
if (!buildings[location.name]) {
createBuilding(location.name, location.position);
}
});
// 住民の位置を更新
state.residents.forEach(resident => {
if (!residents[resident.name]) {
createResident(resident.name, resident.position);
} else {
residents[resident.name].position.set(
resident.position.x,
1,
resident.position.z
);
}
});
}
function onWindowResize() {
camera.aspect = window.innerWidth / window.innerHeight;
camera.updateProjectionMatrix();
renderer.setSize(window.innerWidth, window.innerHeight);
}
function animate() {
requestAnimationFrame(animate);
renderer.render(scene, camera);
}
// 初期化
init();
</script>
""",
label="3D町マップ"
)
update_map_btn = gr.Button("🗺️ マップ更新")
# イベントハンドラーの設定
def handle_start():
status, speed = start_simulation()
return status, speed, simulation.get_state()
def handle_stop():
status, speed = stop_simulation()
return status, speed, simulation.get_state()
def handle_speed_up():
current_speed = simulation.time_speed
new_speed = min(current_speed * 2, 8.0)
status, speed = set_time_speed(new_speed)
return status, speed, simulation.get_state()
def handle_slow_down():
current_speed = simulation.time_speed
new_speed = max(current_speed / 2, 0.1)
status, speed = set_time_speed(new_speed)
return status, speed, simulation.get_state()
def handle_apply_speed(speed):
status, speed_str = set_time_speed(speed)
return status, speed_str, simulation.get_state()
def update_state():
return simulation.get_state()
def update_map():
"""3Dマップの更新"""
state = simulation.get_state()
return f"""
<script>
if (typeof updatePositions === 'function') {{
updatePositions({json.dumps(state)});
}}
</script>
"""
# ボタンイベントの接続
start_btn.click(
handle_start,
outputs=[status_display, speed_display, state_display]
)
stop_btn.click(
handle_stop,
outputs=[status_display, speed_display, state_display]
)
speed_up_btn.click(
handle_speed_up,
outputs=[status_display, speed_display, state_display]
)
slow_down_btn.click(
handle_slow_down,
outputs=[status_display, speed_display, state_display]
)
apply_speed_btn.click(
handle_apply_speed,
inputs=[time_speed_slider],
outputs=[status_display, speed_display, state_display]
)
auto_update_btn.click(
update_state,
outputs=[state_display]
)
refresh_residents_btn.click(
get_detailed_info,
outputs=[residents_info]
)
refresh_friendship_btn.click(
get_friendship_matrix,
outputs=[friendship_matrix]
)
refresh_log_btn.click(
get_action_log,
outputs=[action_log]
)
update_map_btn.click(
update_map,
outputs=[three_map]
)
# 自動更新タイマー(10秒間隔)
timer = gr.Timer(10)
timer.tick(
lambda: (
simulation.get_state(),
get_detailed_info(),
get_action_log(),
"実行中" if simulation.running else "停止中",
f"{simulation.time_speed}x"
),
outputs=[state_display, residents_info, action_log, status_display, speed_display]
)
return demo
# メイン実行部分
if __name__ == "__main__":
# Gradioアプリケーションの作成
app = create_interface()
# 起動メッセージ
print("🏙️ マルチエージェント都市シミュレーションを起動中...")
print("📍 ブラウザでアプリケーションにアクセスしてください")
print("🎮 シミュレーション状態をリアルタイムで確認できます")
print("🤖 各住民は自律的に行動を決定します")
# アプリケーションの起動
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_error=True
)